Source code for demessaging.messaging.connection

# SPDX-FileCopyrightText: 2019-2025 Helmholtz Centre Potsdam GFZ German Research Centre for Geosciences
# SPDX-FileCopyrightText: 2020-2021 Helmholtz-Zentrum Geesthacht GmbH
# SPDX-FileCopyrightText: 2021-2025 Helmholtz-Zentrum hereon GmbH
#
# SPDX-License-Identifier: Apache-2.0

"""Base module for a websocket connection."""
from __future__ import annotations

import logging
import random
import string
from abc import ABC
from typing import Optional

import websocket
from pydantic import validate_call

from demessaging import config


[docs] def get_random_letters(length: int) -> str: return "".join(random.choice(string.ascii_letters) for i in range(length))
logger = logging.getLogger(__name__)
[docs] class WebsocketConnection(ABC): """Base class to connect to a message broker using a websocket.""" @validate_call def __init__(self, pulsar_config: config.BaseMessagingConfig): self.pulsar_config = pulsar_config
[docs] def generate_response_topic(self, topic: Optional[str] = None) -> str: topic_name = topic or self.pulsar_config.topic or "anonymous" return topic_name + "_" + get_random_letters(8)
[docs] def on_message(self, ws_app: websocket.WebSocketApp, msg): raise NotImplementedError
[docs] def on_ping(self, ws_app: websocket.WebSocketApp, payload): logger.debug("pinged %s", ws_app.url)
[docs] def on_pong(self, ws_app: websocket.WebSocketApp, payload): logger.debug("received pong from %s", ws_app.url)
[docs] def on_close( self, ws_app: websocket.WebSocketApp, close_status_code, close_msg ): logger.debug( "Websocket connection to %s closed with status code %s, message %s.", ws_app.url, close_status_code, close_msg, )
[docs] def create_websocketapp( self, subscription: Optional[str] = None, topic: Optional[str] = None, **app_kws, ) -> websocket.WebSocketApp: topic_name = topic or self.pulsar_config.topic topic_url = self.pulsar_config.get_topic_url(topic_name, subscription) app_kws.setdefault("on_message", self.on_message) app_kws.setdefault("on_ping", self.on_ping) app_kws.setdefault("on_pong", self.on_pong) app_kws.setdefault("on_close", self.on_close) logger.debug("Creating websocket connection for %s", topic_url) app = websocket.WebSocketApp(topic_url, **app_kws) return app
[docs] def open_socket( self, subscription: Optional[str] = None, topic: Optional[str] = None, **connection_kws, ) -> websocket.WebSocket: topic_name = topic or self.pulsar_config.topic topic_url = self.pulsar_config.get_topic_url(topic_name, subscription) sock = websocket.create_connection(topic_url, **connection_kws) if sock: logger.debug("connection to {0} established".format(topic_url)) return sock