Source code for demessaging.backend.module

# SPDX-FileCopyrightText: 2019-2025 Helmholtz Centre Potsdam GFZ German Research Centre for Geosciences
# SPDX-FileCopyrightText: 2020-2021 Helmholtz-Zentrum Geesthacht GmbH
# SPDX-FileCopyrightText: 2021-2025 Helmholtz-Zentrum hereon GmbH
#
# SPDX-License-Identifier: Apache-2.0

"""Backend module to transform a python module into a pydantic model.

This module defines the main model in the demessaging framework. It takes a
list of members, or a module, and creates a new Model that can be used to
generate code, connect to the pulsar, and more. See :class:`BackendModule` for
details.
"""
from __future__ import annotations

import atexit
import base64
import datetime as dt
import hashlib
import inspect
import io
import logging
import subprocess as spr
import tempfile
import traceback
from importlib import import_module
from pathlib import Path
from typing import (
    IO,
    TYPE_CHECKING,
    Any,
    Callable,
    ClassVar,
    Dict,
    List,
    Optional,
    Type,
    Union,
    cast,
)

import docstring_parser
from deprogressapi import BaseReport
from pydantic import Field  # pylint: disable=no-name-in-module
from pydantic import BaseModel, RootModel, ValidationError, create_model
from pydantic.json_schema import JsonSchemaValue

from demessaging.backend import utils
from demessaging.backend.class_ import BackendClass, ClassAPIModel
from demessaging.backend.function import (
    BackendFunction,
    FunctionAPIModel,
    ReturnModel,
)
from demessaging.config import ModuleConfig
from demessaging.messaging.consumer import MessageConsumer
from demessaging.PulsarMessageConstants import PropertyKeys, Status
from demessaging.utils import append_parameter_docs, merge_config

logger = logging.getLogger(__name__)


[docs] @append_parameter_docs class BackendModuleConfig(ModuleConfig): """Configuration class for a backend module.""" # it should be Union[Type[BackendFunction], Type[BackendClass]], but # this is not supported by pydantic if TYPE_CHECKING: models: List[Union[Type[BackendFunction], Type[BackendClass]]] models: List[Any] = Field( # type: ignore default_factory=list, # type: ignore[arg-type] description=( "a list of function or class models for the members of the " "backend module" ), ) module: Any = Field( description="The imported backend module (or none, if there is none)" ) class_name: str = Field(description="Name of the model class")
[docs] class ModuleAPIModel(BaseModel): """An model that represants the API of a backend module.""" classes: List[ClassAPIModel] = Field( description="The RPC-enabled classes that this module contains." ) functions: List[FunctionAPIModel] = Field( description="The RPC-enabled functions that this module contains." ) rpc_schema: JsonSchemaValue = Field( description="The aggregated JSON schema for an RPC call to this module." )
ModuleMember = Union[ Type[BackendFunction], Type[BackendClass], Callable, str, Type[object] ]
[docs] @append_parameter_docs class BackendModule(RootModel): """A base class for a backend module. Do not directly instantiate from this class, rather use the :meth:`create_model` method. """ backend_config: ClassVar[BackendModuleConfig] pulsar: ClassVar[MessageConsumer] # type that is implemented by subclasses root: Union[BackendFunction, BackendClass] def __call__(self) -> ReturnModel: """Call the selected member of this backend module.""" return self.root() # type: ignore
[docs] @classmethod def create_model( cls, module_name: Optional[str] = None, members: Optional[List[ModuleMember]] = None, config: Optional[ModuleConfig] = None, class_name: Optional[str] = None, **config_kws, ) -> Type[BackendModule]: """Generate a module for a backend module. Parameters ---------- module_name: str The name of the module to import. If none is given, the `members` must be specified members: list of members The list of members that shall be added to this module. It can be a list of - :class:`~demessaging.backend.function.BackendFunction` classes ( generated with :meth:`~demessaging.backend.function.BackendFunction.create_model`) - :class:`~demessaging.backend.class_.BackendClass` classes ( generated with :meth:`~demessaging.backend.class_.BackendClass.create_model`) - functions (that will then be transformed using :meth:`~demessaging.backend.function.BackendFunction.create_model`) - classes (that will then be transformed using :meth:`~demessaging.backend.class_.BackendClass.create_model`) - strings, in which case they point to the member of the given `module_name` config: ModuleConfig, optional The configuration for the module. If this is not given, you must provide ``config_kws`` or define a ``backend_config`` variable within the module corresponding to `module_name`. class_name: str, optional The name for the generated subclass of :class:`pydantic.BaseModel`. If not given, the name of `Class` is used ``**config_kws`` An alternative way to specify the configuration for the backend module. Returns ------- Subclass of BackendFunction The newly generated class that represents this module. """ if module_name is not None: module: Any = import_module(module_name) else: module = None if members is None and module is None: raise ValueError("Either members or module need to be provided!") if config and config_kws: raise ValueError("Either config or config_kws can be used!") if config_kws: config = ModuleConfig(**config_kws) elif ( config is None and module is not None and hasattr(module, "backend_config") ): config = module.backend_config config = cast(ModuleConfig, config) # this should not be camelized class_name = class_name or module_name or config.messaging_config.topic assert config is not None config = BackendModuleConfig( module=module, class_name=class_name, **config.model_copy().model_dump(), ) if not members: members = list(config.members) if not members: assert module is not None if hasattr(module, "__all__"): members = list(module.__all__) else: functions = inspect.getmembers( module, predicate=inspect.isfunction ) classes = inspect.getmembers( module, predicate=inspect.isfunction ) members = [t[1] for t in functions if not t[0].startswith("_")] members += [t[1] for t in classes if not t[0].startswith("_")] # finally check if we have any members if not members: raise ValueError( f"Found no members for the given module {module_name}!" ) models: List[Union[Type[BackendFunction], Type[BackendClass]]] = [] for i, member in enumerate(list(members)): member_obj: ModuleMember member_model: Union[Type[BackendFunction], Type[BackendClass]] if isinstance(member, str): member = getattr(module, member) if inspect.isclass(member) and issubclass( member, (BackendFunction, BackendClass) # type: ignore ): member = cast( Union[Type[BackendFunction], Type[BackendClass]], member ) member_model = member member_obj = ( member.backend_config.Class if issubclass(member, BackendClass) else member.backend_config.function ) elif inspect.isclass(member): member_model = BackendClass.create_model(member) member_obj = member elif callable(member): member_model = BackendFunction.create_model(member) member_obj = member else: raise ValueError( f"Cannot transform {member} to a member model!" ) members[i] = member_obj models.append(member_model) config.members = members config.models = models if not config.doc and module: docstring = docstring_parser.parse(module.__doc__) config.doc = utils.get_desc(docstring) member_types = models[0] for model in models[1:]: member_types = Union[member_types, model] # type: ignore kws = {"__module__": module_name} if module_name else {} Model: Type[BackendModule] = create_model( # type: ignore class_name, __base__=cls, root=(member_types, Field(description="The member to call.")), **kws, # type: ignore ) Model.model_config["title"] = config.messaging_config.topic # type: ignore Model.backend_config = config # configure logging config.log_config.configure_logging() if module is not None: config.imports += "\n" + utils.get_module_imports(module) Model.__doc__ = config.doc return Model
[docs] @classmethod def test_connect(cls): """Connect to the message pulsar.""" cls.pulsar = consumer = MessageConsumer( pulsar_config=cls.backend_config.messaging_config, handle_request=cls.handle_message, module_info=cls.model_json_schema(), api_info=cls.get_api_info(), ) atexit.register(consumer.disconnect) consumer.setup_subscription()
[docs] @classmethod def get_api_info(cls) -> ModuleAPIModel: """Get the API info on the module.""" return ModuleAPIModel( classes=[ class_.get_api_info() for class_ in cls.backend_config.models if issubclass(class_, BackendClass) ], functions=[ class_.get_api_info() for class_ in cls.backend_config.models if issubclass(class_, BackendFunction) ], rpc_schema=cls.model_json_schema(), )
[docs] @classmethod def listen(cls): """_summary_ Parameters ---------- dump_to : Optional[str], optional Instead of processing the request, dump it as a file to the given location. If you need further customization, use `--dump-tool`. dump_tool : Optional[str], optional Instead of using `--dump-to`, use this option to run a specific command for each request. We will first create a temporary file and then run this command as subprocess. This parameter requires `--dump-to` and two curly brackets (``{}``) in the argument that specify where to insert the target path. Or use ``{path}`` or ``{basename}`` or ``{directory}`` for more explicit control in your command. If you want to process the dumped file further, combine this option with `cmd` cmd : Optional[str], optional Instead of processing the request here, dump the request as file to the disc and run the dedicated command. The specified command must contain two curly braces (``{}``) that will be replaced with the path or basename of th file. Or use ``{path}``, or ``{basename}`` or ``{directory}`` for more explicit control in your command. Examples -------- Copy the request to a given location via rsync:: BackendModule.listen(dump_tool='rsync {} .') Copy the request via SSH to another server:: BackendModule.listen( dump_tool='scp {} user@machine:/some/folder/' ) Print the request to stdout and delete the temporary file:: BackendModule.listen( dump_tool='cat {path} && rm {path}' ) Cat the request (i.e. always return the input to the sender):: BackendModule.listen(cmd='cat {}') Copy the file via scp and run some command to process it on a remote machine:: BackendModule.listen( dump_tool='scp {} user@machine:/some/folder/', cmd='some-command /some/folder/{basename}', ) """ cls.pulsar = pulsar = MessageConsumer( pulsar_config=cls.backend_config.messaging_config, handle_request=cls.handle_message, module_info=cls.model_json_schema(), api_info=cls.get_api_info(), ) atexit.register(pulsar.disconnect) pulsar.wait_for_request()
[docs] @classmethod def send_request( cls: Type[BackendModule], request: Union[BackendModule, IO, Dict[str, Any]], ) -> BaseModel: """Test a request to the backend. Parameters ---------- request: dict or file-like object A request to the backend module. """ if isinstance(request, io.IOBase): model = cls.model_validate_json("\n".join(request.readlines())) # type: ignore[arg-type] elif hasattr(request, "root"): request = cast(BackendModule, request) model = cls.model_validate(request.root) else: model = cls.model_validate(request) payload = base64.b64encode( model.model_dump_json().encode("utf-8") ).decode("utf-8") request = { "properties": {}, "payload": payload, } producer = cls.backend_config.messaging_config.producer response = utils.run_async(producer.send_request, request) status = response[PropertyKeys.STATUS] if status == Status.SUCCESS: logger.debug("request successful") result = response["msg"] elif status == Status.ERROR: logger.error("request failed: %s", response["msg"]) raise ValueError(response["error"]) else: raise ValueError("Unknonw status message %s" % (status,)) return model.root.return_model.model_validate_json(result)
[docs] def compute(self) -> BaseModel: """Send this request to the backend module and compute the result. This method updates the model inplace. """ response = self.send_request(self) return response
[docs] @classmethod def process_request( cls: Type[BackendModule], request: Union[BackendModule, IO, Dict[str, Any]], ) -> ReturnModel: """Test a request to the backend. Parameters ---------- request: dict or file-like object The request that we should process. """ if isinstance(request, io.IOBase): model = cls.model_validate_json("\n".join(request.readlines())) # type: ignore[arg-type] elif hasattr(request, "root"): request = cast(BackendModule, request) model = cls.model_validate(request.root) else: model = cls.model_validate(request) return model()
[docs] @classmethod def shell(cls): """Start a shell with the module defined.""" from IPython import start_ipython start_ipython(argv=[], user_ns=dict(Model=cls))
[docs] @classmethod def generate( cls, line_length: int = 79, use_formatters: bool = True, use_autoflake: bool = True, use_black: bool = True, use_isort: bool = True, ) -> str: """Generate the code for the frontend module.""" import autoflake import black import isort code = cls.backend_config.render() if use_formatters: if use_isort: code = isort.code(code, float_to_top=True, profile="black") if use_black: code = black.format_str( code, mode=black.Mode(line_length=line_length) ) # remove unused imports if use_autoflake: code = autoflake.fix_code(code, remove_all_unused_imports=True) if use_isort: code = isort.code(code, float_to_top=True, profile="black") if cls.backend_config.module: # remove __main__, etc. name = cls.backend_config.module.__name__ code = code.replace(name + ".", "") return code.strip() + "\n"
[docs] @classmethod def handle_message( cls, request_msg: dict, ): """_summary_ Parameters ---------- request_msg : dict The message to handle """ message_id = request_msg["messageId"] logger.info("processing request %s", message_id) def handle_error(header: str, e: Exception): logger.error( "Failed processing request %s", message_id, exc_info=True ) if cls.backend_config.debug: msg = traceback.format_exc() else: msg = str(e) cls.pulsar.send_error( request=request_msg, error_message="{}: {}".format(header, msg), ) payload = base64.b64decode(request_msg["payload"]).decode("utf-8") listen_config = cls.backend_config.listen_config try: model = cls.model_validate_json(payload) except ValidationError as e: handle_error("error validating request", e) except Exception as e: handle_error("error processing request", e) else: # dump the file if necessary hash = hashlib.sha256(payload.encode("utf-8")).hexdigest() request_file_name = "%s_%s_%s.json" % ( utils.slugify(dt.datetime.now().isoformat()), utils.slugify(message_id), hash, ) if listen_config.dump_to: target_file = Path(listen_config.dump_to) / request_file_name logger.debug( "Dumping payload of %i to %s", message_id, target_file ) with target_file.open("w") as f: f.write(payload) try: if listen_config.dump_tool or listen_config.cmd: with tempfile.TemporaryDirectory() as tmpdir: target_file = Path(tmpdir) / request_file_name with target_file.open("w") as f: f.write(payload) if listen_config.dump_tool: logger.debug( "Running dump_tool for %s", target_file ) dump_cmd = cls._format_cmd( listen_config.dump_tool, target_file ) logger.debug(" --> %s", dump_cmd) spr.check_output( dump_cmd, stderr=spr.STDOUT, shell=True ) if listen_config.cmd: compiled = cls._format_cmd( listen_config.cmd, target_file ) result = cls._process_cmd( request_msg, model, compiled ) else: result = cls._process_model(request_msg, model) else: result = cls._process_model(request_msg, model) except Exception as e: handle_error("error executing request", e) else: cls.pulsar.send_response( request=request_msg, response_properties={PropertyKeys.STATUS: Status.SUCCESS}, response_payload=result.model_dump_json(), )
@classmethod def _format_cmd(cls, template: str, target_file: Path) -> str: """Utility function to turn a template into a command-line command.""" return template.format( str(target_file), path=str(target_file), basename=target_file.name, directory=target_file.parent, ) @classmethod def _process_model(cls, request_msg, model) -> ReturnModel: reporter_args = model.root.backend_config.reporter_args for key, reporter in reporter_args.items(): member_reporter = getattr(model.root, key) if member_reporter and isinstance(member_reporter, BaseReport): member_reporter._pulsar = cls.pulsar member_reporter._request = request_msg return model() @classmethod def _process_cmd(cls, request_msg, model, cmd) -> ReturnModel: """Process a shell command""" result = spr.check_output(cmd, stderr=spr.STDOUT, shell=True) return model.root.return_model.model_validate_json( result.decode("utf-8") )
[docs] @classmethod def model_json_schema(cls, *args, **kwargs) -> Dict[str, Any]: ret = super().model_json_schema(*args, **kwargs) if cls.backend_config.json_schema_extra: ret = merge_config(ret, cls.backend_config.json_schema_extra) return ret
try: ModuleConfig.model_rebuild() except AttributeError: ModuleConfig.update_forward_refs()