demessaging.config.messaging module

Messaging configuration classes for DASF.

Classes:

BaseMessagingConfig(_case_sensitive, ...)

Base class for messaging configs.

PulsarConfig(_case_sensitive, ...)

A configuration class to connect to the pulsar messaging framework.

WebsocketURLConfig(_case_sensitive, ...)

A configuration for a websocket.

class demessaging.config.messaging.BaseMessagingConfig(_case_sensitive: bool | None = None, _nested_model_default_partial_update: bool | None = None, _env_prefix: str | None = None, _env_file: DotenvType | None = PosixPath('.'), _env_file_encoding: str | None = None, _env_ignore_empty: bool | None = None, _env_nested_delimiter: str | None = None, _env_nested_max_split: int | None = None, _env_parse_none_str: str | None = None, _env_parse_enums: bool | None = None, _cli_prog_name: str | None = None, _cli_parse_args: bool | list[str] | tuple[str, ...] | None = None, _cli_settings_source: CliSettingsSource[Any] | None = None, _cli_parse_none_str: str | None = None, _cli_hide_none_type: bool | None = None, _cli_avoid_json: bool | None = None, _cli_enforce_required: bool | None = None, _cli_use_class_docs_for_groups: bool | None = None, _cli_exit_on_error: bool | None = None, _cli_prefix: str | None = None, _cli_flag_prefix_char: str | None = None, _cli_implicit_flags: bool | None = None, _cli_ignore_unknown_args: bool | None = None, _cli_kebab_case: bool | Literal['all', 'no_enums'] | None = None, _cli_shortcuts: Mapping[str, str | list[str]] | None = None, _secrets_dir: PathType | None = None, *, topic: str, header: ~typing.Annotated[~typing.Dict[str, ~typing.Any], Json] | ~typing.Dict[str, ~typing.Any] = <factory>, max_workers: ~typing.Annotated[int, ~annotated_types.Gt(gt=0)] | None = None, queue_size: ~typing.Annotated[int, ~annotated_types.Gt(gt=0)] | None = None, max_payload_size: int = 512000, producer_keep_alive: int = 120, producer_connection_timeout: int = 30)[source]

Bases: BaseSettings

Base class for messaging configs.

Parameters:
  • topic (str) – The topic identifier under which to register at the pulsar.

  • header (Union[Annotated[Dict[str, Any], Json], Dict[str, Any]]) – Header parameters for the request

  • max_workers (Optional[Annotated[int, Gt(gt=0)]]) – (optional) number of concurrent workers for handling requests, default: number of processors on the machine, multiplied by 5.

  • queue_size (Optional[Annotated[int, Gt(gt=0)]]) – (optional) size of the request queue, if MAX_WORKERS is set, this needs to be at least as big as MAX_WORKERS, otherwise an AttributeException is raised.

  • max_payload_size (int) – (optional) maximum payload size, must be smaller than pulsars ‘webSocketMaxTextFrameSize’, which is configured e.g.via ‘pulsar/conf/standalone.conf’.default: 512000 (500kb).

  • producer_keep_alive (int) – The amount of time that the websocket connection to a producer should be kept open. By default, 2 minutes (120 seconds). On each outgoing message, the timer will be reset. Set this to 0 to immediately close the connection when a message has been sent and acknowledged.

  • producer_connection_timeout (int) – The amount of time that we grant producers to establish a connection to the message broker in order to send a response. If a connection cannot be established in this time, the response will not be sent and the connection will be closed.

Methods:

get_topic_url(topic[, subscription])

Build the URL to connect to a websocket.

model_post_init(context, /)

This function is meant to behave like a BaseModel method to initialise private attributes.

validate_queue_size()

Check that the queue_size is smaller than the max_workers.

Attributes:

header

max_payload_size

max_workers

model_config

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

producer

The connected producer for the messaging config

producer_connection_timeout

producer_keep_alive

queue_size

topic

get_topic_url(topic: str, subscription: str | None = None) str[source]

Build the URL to connect to a websocket.

header: Json[Dict[str, Any]] | Dict[str, Any]
max_payload_size: int
max_workers: PositiveInt | None
model_config: ClassVar[SettingsConfigDict] = {'arbitrary_types_allowed': True, 'case_sensitive': False, 'cli_avoid_json': False, 'cli_enforce_required': False, 'cli_exit_on_error': True, 'cli_flag_prefix_char': '-', 'cli_hide_none_type': False, 'cli_ignore_unknown_args': False, 'cli_implicit_flags': False, 'cli_kebab_case': False, 'cli_parse_args': None, 'cli_parse_none_str': None, 'cli_prefix': '', 'cli_prog_name': None, 'cli_shortcuts': None, 'cli_use_class_docs_for_groups': False, 'enable_decoding': True, 'env_file': None, 'env_file_encoding': None, 'env_ignore_empty': False, 'env_nested_delimiter': None, 'env_nested_max_split': None, 'env_parse_enums': None, 'env_parse_none_str': None, 'env_prefix': 'de_backend_', 'extra': 'forbid', 'json_file': None, 'json_file_encoding': None, 'nested_model_default_partial_update': False, 'protected_namespaces': ('model_validate', 'model_dump', 'settings_customise_sources'), 'secrets_dir': None, 'toml_file': None, 'validate_default': True, 'yaml_config_section': None, 'yaml_file': None, 'yaml_file_encoding': None}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_post_init(context: Any, /) None

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that’s what pydantic-core passes when calling it.

Parameters:
  • self – The BaseModel instance.

  • context – The context.

property producer: MessageProducer

The connected producer for the messaging config

producer_connection_timeout: int
producer_keep_alive: int
queue_size: PositiveInt | None
topic: str
validate_queue_size()[source]

Check that the queue_size is smaller than the max_workers.

class demessaging.config.messaging.PulsarConfig(_case_sensitive: bool | None = None, _nested_model_default_partial_update: bool | None = None, _env_prefix: str | None = None, _env_file: DotenvType | None = PosixPath('.'), _env_file_encoding: str | None = None, _env_ignore_empty: bool | None = None, _env_nested_delimiter: str | None = None, _env_nested_max_split: int | None = None, _env_parse_none_str: str | None = None, _env_parse_enums: bool | None = None, _cli_prog_name: str | None = None, _cli_parse_args: bool | list[str] | tuple[str, ...] | None = None, _cli_settings_source: CliSettingsSource[Any] | None = None, _cli_parse_none_str: str | None = None, _cli_hide_none_type: bool | None = None, _cli_avoid_json: bool | None = None, _cli_enforce_required: bool | None = None, _cli_use_class_docs_for_groups: bool | None = None, _cli_exit_on_error: bool | None = None, _cli_prefix: str | None = None, _cli_flag_prefix_char: str | None = None, _cli_implicit_flags: bool | None = None, _cli_ignore_unknown_args: bool | None = None, _cli_kebab_case: bool | Literal['all', 'no_enums'] | None = None, _cli_shortcuts: Mapping[str, str | list[str]] | None = None, _secrets_dir: PathType | None = None, *, topic: str, header: ~typing.Annotated[~typing.Dict[str, ~typing.Any], Json] | ~typing.Dict[str, ~typing.Any] = <factory>, max_workers: ~typing.Annotated[int, ~annotated_types.Gt(gt=0)] | None = None, queue_size: ~typing.Annotated[int, ~annotated_types.Gt(gt=0)] | None = None, max_payload_size: int = 512000, producer_keep_alive: int = 120, producer_connection_timeout: int = 30, host: str = 'localhost', port: str = '8080', persistent: str = 'non-persistent', tenant: str = 'public', namespace: str = 'default')[source]

Bases: BaseMessagingConfig

A configuration class to connect to the pulsar messaging framework.

Parameters:
  • topic (str) – The topic identifier under which to register at the pulsar.

  • header (Union[Annotated[Dict[str, Any], Json], Dict[str, Any]]) – Header parameters for the request

  • max_workers (Optional[Annotated[int, Gt(gt=0)]]) – (optional) number of concurrent workers for handling requests, default: number of processors on the machine, multiplied by 5.

  • queue_size (Optional[Annotated[int, Gt(gt=0)]]) – (optional) size of the request queue, if MAX_WORKERS is set, this needs to be at least as big as MAX_WORKERS, otherwise an AttributeException is raised.

  • max_payload_size (int) – (optional) maximum payload size, must be smaller than pulsars ‘webSocketMaxTextFrameSize’, which is configured e.g.via ‘pulsar/conf/standalone.conf’.default: 512000 (500kb).

  • producer_keep_alive (int) – The amount of time that the websocket connection to a producer should be kept open. By default, 2 minutes (120 seconds). On each outgoing message, the timer will be reset. Set this to 0 to immediately close the connection when a message has been sent and acknowledged.

  • producer_connection_timeout (int) – The amount of time that we grant producers to establish a connection to the message broker in order to send a response. If a connection cannot be established in this time, the response will not be sent and the connection will be closed.

  • host (str) – The remote host of the pulsar.

  • port (str) – The port of the pulsar at the given host.

  • persistent (str) – None

  • tenant (str) – None

  • namespace (str) – None

Methods:

get_topic_url(topic[, subscription])

Build the URL to connect to a websocket.

model_post_init(context, /)

This function is meant to behave like a BaseModel method to initialise private attributes.

Attributes:

host

model_config

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

namespace

persistent

port

tenant

get_topic_url(topic: str, subscription: str | None = None) str[source]

Build the URL to connect to a websocket.

header: Json[Dict[str, Any]] | Dict[str, Any]
host: str
max_payload_size: int
max_workers: PositiveInt | None
model_config: ClassVar[SettingsConfigDict] = {'arbitrary_types_allowed': True, 'case_sensitive': False, 'cli_avoid_json': False, 'cli_enforce_required': False, 'cli_exit_on_error': True, 'cli_flag_prefix_char': '-', 'cli_hide_none_type': False, 'cli_ignore_unknown_args': False, 'cli_implicit_flags': False, 'cli_kebab_case': False, 'cli_parse_args': None, 'cli_parse_none_str': None, 'cli_prefix': '', 'cli_prog_name': None, 'cli_shortcuts': None, 'cli_use_class_docs_for_groups': False, 'enable_decoding': True, 'env_file': None, 'env_file_encoding': None, 'env_ignore_empty': False, 'env_nested_delimiter': None, 'env_nested_max_split': None, 'env_parse_enums': None, 'env_parse_none_str': None, 'env_prefix': 'de_backend_', 'extra': 'forbid', 'json_file': None, 'json_file_encoding': None, 'nested_model_default_partial_update': False, 'protected_namespaces': ('model_validate', 'model_dump', 'settings_customise_sources'), 'secrets_dir': None, 'toml_file': None, 'validate_default': True, 'yaml_config_section': None, 'yaml_file': None, 'yaml_file_encoding': None}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_post_init(context: Any, /) None

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that’s what pydantic-core passes when calling it.

Parameters:
  • self – The BaseModel instance.

  • context – The context.

namespace: str
persistent: str
port: str
producer_connection_timeout: int
producer_keep_alive: int
queue_size: PositiveInt | None
tenant: str
topic: str
class demessaging.config.messaging.WebsocketURLConfig(_case_sensitive: bool | None = None, _nested_model_default_partial_update: bool | None = None, _env_prefix: str | None = None, _env_file: DotenvType | None = PosixPath('.'), _env_file_encoding: str | None = None, _env_ignore_empty: bool | None = None, _env_nested_delimiter: str | None = None, _env_nested_max_split: int | None = None, _env_parse_none_str: str | None = None, _env_parse_enums: bool | None = None, _cli_prog_name: str | None = None, _cli_parse_args: bool | list[str] | tuple[str, ...] | None = None, _cli_settings_source: CliSettingsSource[Any] | None = None, _cli_parse_none_str: str | None = None, _cli_hide_none_type: bool | None = None, _cli_avoid_json: bool | None = None, _cli_enforce_required: bool | None = None, _cli_use_class_docs_for_groups: bool | None = None, _cli_exit_on_error: bool | None = None, _cli_prefix: str | None = None, _cli_flag_prefix_char: str | None = None, _cli_implicit_flags: bool | None = None, _cli_ignore_unknown_args: bool | None = None, _cli_kebab_case: bool | Literal['all', 'no_enums'] | None = None, _cli_shortcuts: Mapping[str, str | list[str]] | None = None, _secrets_dir: PathType | None = None, *, topic: str, header: ~typing.Annotated[~typing.Dict[str, ~typing.Any], Json] | ~typing.Dict[str, ~typing.Any] = <factory>, max_workers: ~typing.Annotated[int, ~annotated_types.Gt(gt=0)] | None = None, queue_size: ~typing.Annotated[int, ~annotated_types.Gt(gt=0)] | None = None, max_payload_size: int = 512000, producer_keep_alive: int = 120, producer_connection_timeout: int = 30, websocket_url: str = '', producer_url: str | None = None, consumer_url: str | None = None)[source]

Bases: BaseMessagingConfig

A configuration for a websocket.

Parameters:
  • topic (str) – The topic identifier under which to register at the pulsar.

  • header (Union[Annotated[Dict[str, Any], Json], Dict[str, Any]]) – Header parameters for the request

  • max_workers (Optional[Annotated[int, Gt(gt=0)]]) – (optional) number of concurrent workers for handling requests, default: number of processors on the machine, multiplied by 5.

  • queue_size (Optional[Annotated[int, Gt(gt=0)]]) – (optional) size of the request queue, if MAX_WORKERS is set, this needs to be at least as big as MAX_WORKERS, otherwise an AttributeException is raised.

  • max_payload_size (int) – (optional) maximum payload size, must be smaller than pulsars ‘webSocketMaxTextFrameSize’, which is configured e.g.via ‘pulsar/conf/standalone.conf’.default: 512000 (500kb).

  • producer_keep_alive (int) – The amount of time that the websocket connection to a producer should be kept open. By default, 2 minutes (120 seconds). On each outgoing message, the timer will be reset. Set this to 0 to immediately close the connection when a message has been sent and acknowledged.

  • producer_connection_timeout (int) – The amount of time that we grant producers to establish a connection to the message broker in order to send a response. If a connection cannot be established in this time, the response will not be sent and the connection will be closed.

  • websocket_url (str) – The fully qualified URL to the websocket.

  • producer_url (Optional[str]) – An alternative URL to use for producers. If None, the websocket_url will be used.

  • consumer_url (Optional[str]) – An alternative URL to use for consumers. If None, the websocket_url will be used.

Attributes:

consumer_url

model_config

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

producer_url

websocket_url

Methods:

get_topic_url(topic[, subscription])

Build the URL to connect to a websocket.

model_post_init(context, /)

This function is meant to behave like a BaseModel method to initialise private attributes.

consumer_url: str | None
get_topic_url(topic: str, subscription: str | None = None) str[source]

Build the URL to connect to a websocket.

header: Json[Dict[str, Any]] | Dict[str, Any]
max_payload_size: int
max_workers: PositiveInt | None
model_config: ClassVar[SettingsConfigDict] = {'arbitrary_types_allowed': True, 'case_sensitive': False, 'cli_avoid_json': False, 'cli_enforce_required': False, 'cli_exit_on_error': True, 'cli_flag_prefix_char': '-', 'cli_hide_none_type': False, 'cli_ignore_unknown_args': False, 'cli_implicit_flags': False, 'cli_kebab_case': False, 'cli_parse_args': None, 'cli_parse_none_str': None, 'cli_prefix': '', 'cli_prog_name': None, 'cli_shortcuts': None, 'cli_use_class_docs_for_groups': False, 'enable_decoding': True, 'env_file': None, 'env_file_encoding': None, 'env_ignore_empty': False, 'env_nested_delimiter': None, 'env_nested_max_split': None, 'env_parse_enums': None, 'env_parse_none_str': None, 'env_prefix': 'de_backend_', 'extra': 'forbid', 'json_file': None, 'json_file_encoding': None, 'nested_model_default_partial_update': False, 'protected_namespaces': ('model_validate', 'model_dump', 'settings_customise_sources'), 'secrets_dir': None, 'toml_file': None, 'validate_default': True, 'yaml_config_section': None, 'yaml_file': None, 'yaml_file_encoding': None}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_post_init(context: Any, /) None

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that’s what pydantic-core passes when calling it.

Parameters:
  • self – The BaseModel instance.

  • context – The context.

producer_connection_timeout: int
producer_keep_alive: int
producer_url: str | None
queue_size: PositiveInt | None
topic: str
websocket_url: str