Source code for demessaging.messaging.producer

# SPDX-FileCopyrightText: 2019-2025 Helmholtz Centre Potsdam GFZ German Research Centre for Geosciences
# SPDX-FileCopyrightText: 2020-2021 Helmholtz-Zentrum Geesthacht GmbH
# SPDX-FileCopyrightText: 2021-2025 Helmholtz-Zentrum hereon GmbH
#
# SPDX-License-Identifier: Apache-2.0

"""Producer of messages submitted for the message broker."""
from __future__ import annotations

import atexit
import base64
import json
import logging
import queue
import threading
import time
from datetime import datetime
from itertools import count
from typing import TYPE_CHECKING, Any, Dict, Optional

from deprogressapi import BaseReport
from websocket import WebSocketApp

from demessaging.messaging.connection import WebsocketConnection
from demessaging.PulsarMessageConstants import (
    MessageType,
    PropertyKeys,
    Status,
)

if TYPE_CHECKING:
    from demessaging.config import BaseMessagingConfig


logger = logging.getLogger(__name__)


[docs] class MessageProducer(WebsocketConnection): """Producer class to send requests to a registered backend module (topic)""" SOCKET_PING_INTERVAL = 60 # 1min RECONNECT_TIMEOUT_SLEEP = 5 # seconds out_app: WebSocketApp in_app: WebSocketApp def __init__( self, pulsar_config: BaseMessagingConfig, topic: Optional[str] = None ): super().__init__(pulsar_config) self.subscription_name: str = ( "python-backend-" + datetime.now().isoformat() ) self.context_counter = count() self._message_queues: Dict[int, queue.Queue] = {} # establish connections out_topic = self.pulsar_config.topic # topic override if given if topic is not None: out_topic = topic self.response_topic: str = self.generate_response_topic() self.out_topic = out_topic self.setup_subscription()
[docs] def setup_subscription(self): self.out_app = self.create_websocketapp( topic=self.out_topic, header=self.pulsar_config.header, on_message=self.on_out_message, ) # To be thread safe, we generate the response topic here self.in_app = self.create_websocketapp( subscription=self.subscription_name, topic=self.response_topic, header=self.pulsar_config.header, on_message=self.on_in_message, )
[docs] def connect(self): self.out_app_thread = threading.Thread( target=self.out_app.run_forever, kwargs=dict( reconnect=self.RECONNECT_TIMEOUT_SLEEP, ping_interval=self.SOCKET_PING_INTERVAL, ), daemon=True, ) self.in_app_thread = threading.Thread( target=self.in_app.run_forever, kwargs=dict( reconnect=self.RECONNECT_TIMEOUT_SLEEP, ping_interval=self.SOCKET_PING_INTERVAL, ), daemon=True, ) self.out_app_thread.start() self.in_app_thread.start() atexit.register(self.disconnect)
@property def is_connected(self) -> bool: """Check if the websocket apps are connected.""" return bool( self.out_app.sock and self.out_app.sock.connected and self.in_app.sock and self.in_app.sock.connected )
[docs] def wait_for_connection(self, timeout=10): """Wait until the websockets are connected""" start = time.time() while not self.is_connected and time.time() - start < timeout: time.sleep(0.1) return self.is_connected
[docs] def on_out_message(self, ws_app: WebSocketApp, ack): """Message handler for the outgoing websocket connection.""" # here we only expect acknowledgement messages. ack = json.loads(ack) if "context" in ack and "result" in ack: if "error" in ack["result"]: self._message_queues[int(ack["context"])].put_nowait( { "status": "error", "error": "error sending the request", "msg": ack, } ) else: logger.error("Invalid message from outgoing websocket: %s", ack)
[docs] def on_in_message(self, ws_app: WebSocketApp, response): """Message handler for the incoming websocket connection.""" # parse json message response = json.loads(response) props = response["properties"] # acknowledge the response self.in_app.send(json.dumps({"messageId": response["messageId"]})) # mapping from ids to existing reports reports: Dict[str, BaseReport] = {} if props[PropertyKeys.MESSAGE_TYPE] == MessageType.PROGRESS: # we received a progress report - print and ignore # decode progress data progress_data = base64.b64decode(response["payload"]).decode( "utf-8" ) report = BaseReport.from_payload(progress_data) if report.report_id in reports: base_report = reports[report.report_id] for field in report.model_fields: setattr(base_report, field, getattr(report, field)) else: reports[report.report_id] = base_report = report if base_report.status != Status.RUNNING: base_report.complete(base_report.status) else: base_report.submit() return # assert that we received a 'response' message and check for matching context if props[PropertyKeys.MESSAGE_TYPE] != MessageType.RESPONSE: msg = { "status": "error", "error": "received message is not a response to the sent request", "msg": response, } elif "info" in props: msg = { "status": props.get("status", "success"), "msg": props["info"], } elif "api_info" in props: msg = { "status": props.get("status", "success"), "msg": props["api_info"], } elif "payload" in response: status = "success" if "status" in props: # we might successfully get a response, but it might contain an error from the backend status = props["status"] # decode b64 payload msg payload: str = response["payload"] try: payload = base64.b64decode(payload).decode("utf-8") except Exception as e: status = "error" payload = "error decoding payload: {0}".format(e) if status == "error": msg = { "status": "error", "error": payload, "msg": response, } else: msg = {"status": status, "msg": payload} else: msg = { "status": "error", "error": "missing response payload", "msg": response, } context = int(props[PropertyKeys.REQUEST_CONTEXT]) self._message_queues[context].put_nowait(msg)
[docs] async def send_request(self, request_msg) -> Any: """Sends the given request to the backend module bound to the topic provided in the pulsar configuration. In order to increase re-usability the destination topic can be overridden with the optional topic argument. :param request_msg: dictionary providing a 'property' dictionary, a payload string, or both :param topic: overrides the used topic for this request :return: received response from the backend module """ if not self.is_connected: connected = self.wait_for_connection() if not connected: raise ValueError( "No websocket connection has been established!" ) # create message context (from counter) context = next(self.context_counter) request_msg["context"] = context if "properties" not in request_msg: request_msg["properties"] = {} request_msg["properties"][ PropertyKeys.RESPONSE_TOPIC ] = self.response_topic request_msg["properties"][PropertyKeys.REQUEST_CONTEXT] = request_msg[ "context" ] request_msg["properties"].setdefault( PropertyKeys.MESSAGE_TYPE, MessageType.REQUEST ) q = self._message_queues[context] = queue.Queue() # send message via outgoing connection to request topic self.out_app.send(json.dumps(request_msg)) # wait for the response. In order to be able to use a keyboard # interrupt here, we are using timeouts of 1ms logger.info("start waiting for message") while True: try: return q.get(timeout=1) # Allow check for Ctrl-C every second except queue.Empty: pass
[docs] def disconnect(self): """Disconnect in- and out-sockets.""" if hasattr(self, "in_app"): try: self.in_app.close() except Exception: logger.error( "Failed to close incoming websocket connection for producer", exc_info=True, ) finally: del self.in_app if hasattr(self, "in_app"): try: self.out_app.close() except Exception: logger.error( "Failed to close outgoing websocket connection for producer", exc_info=True, ) finally: del self.out_app