demessaging.config.messaging module
Messaging configuration classes for DASF.
Classes:
|
Base class for messaging configs. |
|
A configuration class to connect to the pulsar messaging framework. |
|
A configuration for a websocket. |
- class demessaging.config.messaging.BaseMessagingConfig(_case_sensitive: bool | None = None, _nested_model_default_partial_update: bool | None = None, _env_prefix: str | None = None, _env_file: DotenvType | None = PosixPath('.'), _env_file_encoding: str | None = None, _env_ignore_empty: bool | None = None, _env_nested_delimiter: str | None = None, _env_nested_max_split: int | None = None, _env_parse_none_str: str | None = None, _env_parse_enums: bool | None = None, _cli_prog_name: str | None = None, _cli_parse_args: bool | list[str] | tuple[str, ...] | None = None, _cli_settings_source: CliSettingsSource[Any] | None = None, _cli_parse_none_str: str | None = None, _cli_hide_none_type: bool | None = None, _cli_avoid_json: bool | None = None, _cli_enforce_required: bool | None = None, _cli_use_class_docs_for_groups: bool | None = None, _cli_exit_on_error: bool | None = None, _cli_prefix: str | None = None, _cli_flag_prefix_char: str | None = None, _cli_implicit_flags: bool | None = None, _cli_ignore_unknown_args: bool | None = None, _cli_kebab_case: bool | Literal['all', 'no_enums'] | None = None, _cli_shortcuts: Mapping[str, str | list[str]] | None = None, _secrets_dir: PathType | None = None, *, topic: str, header: ~typing.Annotated[~typing.Dict[str, ~typing.Any], Json] | ~typing.Dict[str, ~typing.Any] = <factory>, max_workers: ~typing.Annotated[int, ~annotated_types.Gt(gt=0)] | None = None, queue_size: ~typing.Annotated[int, ~annotated_types.Gt(gt=0)] | None = None, max_payload_size: int = 512000, producer_keep_alive: int = 120, producer_connection_timeout: int = 30)[source]
Bases:
BaseSettingsBase class for messaging configs.
- Parameters:
topic (str) – The topic identifier under which to register at the pulsar.
header (Union[Annotated[Dict[str, Any], Json], Dict[str, Any]]) – Header parameters for the request
max_workers (Optional[Annotated[int, Gt(gt=0)]]) – (optional) number of concurrent workers for handling requests, default: number of processors on the machine, multiplied by 5.
queue_size (Optional[Annotated[int, Gt(gt=0)]]) – (optional) size of the request queue, if MAX_WORKERS is set, this needs to be at least as big as MAX_WORKERS, otherwise an AttributeException is raised.
max_payload_size (int) – (optional) maximum payload size, must be smaller than pulsars ‘webSocketMaxTextFrameSize’, which is configured e.g.via ‘pulsar/conf/standalone.conf’.default: 512000 (500kb).
producer_keep_alive (int) – The amount of time that the websocket connection to a producer should be kept open. By default, 2 minutes (120 seconds). On each outgoing message, the timer will be reset. Set this to 0 to immediately close the connection when a message has been sent and acknowledged.
producer_connection_timeout (int) – The amount of time that we grant producers to establish a connection to the message broker in order to send a response. If a connection cannot be established in this time, the response will not be sent and the connection will be closed.
Methods:
get_topic_url(topic[, subscription])Build the URL to connect to a websocket.
model_post_init(context, /)This function is meant to behave like a BaseModel method to initialise private attributes.
Check that the queue_size is smaller than the max_workers.
Attributes:
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
The connected producer for the messaging config
- get_topic_url(topic: str, subscription: str | None = None) str[source]
Build the URL to connect to a websocket.
- model_config: ClassVar[SettingsConfigDict] = {'arbitrary_types_allowed': True, 'case_sensitive': False, 'cli_avoid_json': False, 'cli_enforce_required': False, 'cli_exit_on_error': True, 'cli_flag_prefix_char': '-', 'cli_hide_none_type': False, 'cli_ignore_unknown_args': False, 'cli_implicit_flags': False, 'cli_kebab_case': False, 'cli_parse_args': None, 'cli_parse_none_str': None, 'cli_prefix': '', 'cli_prog_name': None, 'cli_shortcuts': None, 'cli_use_class_docs_for_groups': False, 'enable_decoding': True, 'env_file': None, 'env_file_encoding': None, 'env_ignore_empty': False, 'env_nested_delimiter': None, 'env_nested_max_split': None, 'env_parse_enums': None, 'env_parse_none_str': None, 'env_prefix': 'de_backend_', 'extra': 'forbid', 'json_file': None, 'json_file_encoding': None, 'nested_model_default_partial_update': False, 'protected_namespaces': ('model_validate', 'model_dump', 'settings_customise_sources'), 'secrets_dir': None, 'toml_file': None, 'validate_default': True, 'yaml_config_section': None, 'yaml_file': None, 'yaml_file_encoding': None}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_post_init(context: Any, /) None
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that’s what pydantic-core passes when calling it.
- Parameters:
self – The BaseModel instance.
context – The context.
- property producer: MessageProducer
The connected producer for the messaging config
- class demessaging.config.messaging.PulsarConfig(_case_sensitive: bool | None = None, _nested_model_default_partial_update: bool | None = None, _env_prefix: str | None = None, _env_file: DotenvType | None = PosixPath('.'), _env_file_encoding: str | None = None, _env_ignore_empty: bool | None = None, _env_nested_delimiter: str | None = None, _env_nested_max_split: int | None = None, _env_parse_none_str: str | None = None, _env_parse_enums: bool | None = None, _cli_prog_name: str | None = None, _cli_parse_args: bool | list[str] | tuple[str, ...] | None = None, _cli_settings_source: CliSettingsSource[Any] | None = None, _cli_parse_none_str: str | None = None, _cli_hide_none_type: bool | None = None, _cli_avoid_json: bool | None = None, _cli_enforce_required: bool | None = None, _cli_use_class_docs_for_groups: bool | None = None, _cli_exit_on_error: bool | None = None, _cli_prefix: str | None = None, _cli_flag_prefix_char: str | None = None, _cli_implicit_flags: bool | None = None, _cli_ignore_unknown_args: bool | None = None, _cli_kebab_case: bool | Literal['all', 'no_enums'] | None = None, _cli_shortcuts: Mapping[str, str | list[str]] | None = None, _secrets_dir: PathType | None = None, *, topic: str, header: ~typing.Annotated[~typing.Dict[str, ~typing.Any], Json] | ~typing.Dict[str, ~typing.Any] = <factory>, max_workers: ~typing.Annotated[int, ~annotated_types.Gt(gt=0)] | None = None, queue_size: ~typing.Annotated[int, ~annotated_types.Gt(gt=0)] | None = None, max_payload_size: int = 512000, producer_keep_alive: int = 120, producer_connection_timeout: int = 30, host: str = 'localhost', port: str = '8080', persistent: str = 'non-persistent', tenant: str = 'public', namespace: str = 'default')[source]
Bases:
BaseMessagingConfigA configuration class to connect to the pulsar messaging framework.
- Parameters:
topic (str) – The topic identifier under which to register at the pulsar.
header (Union[Annotated[Dict[str, Any], Json], Dict[str, Any]]) – Header parameters for the request
max_workers (Optional[Annotated[int, Gt(gt=0)]]) – (optional) number of concurrent workers for handling requests, default: number of processors on the machine, multiplied by 5.
queue_size (Optional[Annotated[int, Gt(gt=0)]]) – (optional) size of the request queue, if MAX_WORKERS is set, this needs to be at least as big as MAX_WORKERS, otherwise an AttributeException is raised.
max_payload_size (int) – (optional) maximum payload size, must be smaller than pulsars ‘webSocketMaxTextFrameSize’, which is configured e.g.via ‘pulsar/conf/standalone.conf’.default: 512000 (500kb).
producer_keep_alive (int) – The amount of time that the websocket connection to a producer should be kept open. By default, 2 minutes (120 seconds). On each outgoing message, the timer will be reset. Set this to 0 to immediately close the connection when a message has been sent and acknowledged.
producer_connection_timeout (int) – The amount of time that we grant producers to establish a connection to the message broker in order to send a response. If a connection cannot be established in this time, the response will not be sent and the connection will be closed.
host (str) – The remote host of the pulsar.
persistent (str) – None
tenant (str) – None
namespace (str) – None
Methods:
get_topic_url(topic[, subscription])Build the URL to connect to a websocket.
model_post_init(context, /)This function is meant to behave like a BaseModel method to initialise private attributes.
Attributes:
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- get_topic_url(topic: str, subscription: str | None = None) str[source]
Build the URL to connect to a websocket.
- model_config: ClassVar[SettingsConfigDict] = {'arbitrary_types_allowed': True, 'case_sensitive': False, 'cli_avoid_json': False, 'cli_enforce_required': False, 'cli_exit_on_error': True, 'cli_flag_prefix_char': '-', 'cli_hide_none_type': False, 'cli_ignore_unknown_args': False, 'cli_implicit_flags': False, 'cli_kebab_case': False, 'cli_parse_args': None, 'cli_parse_none_str': None, 'cli_prefix': '', 'cli_prog_name': None, 'cli_shortcuts': None, 'cli_use_class_docs_for_groups': False, 'enable_decoding': True, 'env_file': None, 'env_file_encoding': None, 'env_ignore_empty': False, 'env_nested_delimiter': None, 'env_nested_max_split': None, 'env_parse_enums': None, 'env_parse_none_str': None, 'env_prefix': 'de_backend_', 'extra': 'forbid', 'json_file': None, 'json_file_encoding': None, 'nested_model_default_partial_update': False, 'protected_namespaces': ('model_validate', 'model_dump', 'settings_customise_sources'), 'secrets_dir': None, 'toml_file': None, 'validate_default': True, 'yaml_config_section': None, 'yaml_file': None, 'yaml_file_encoding': None}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class demessaging.config.messaging.WebsocketURLConfig(_case_sensitive: bool | None = None, _nested_model_default_partial_update: bool | None = None, _env_prefix: str | None = None, _env_file: DotenvType | None = PosixPath('.'), _env_file_encoding: str | None = None, _env_ignore_empty: bool | None = None, _env_nested_delimiter: str | None = None, _env_nested_max_split: int | None = None, _env_parse_none_str: str | None = None, _env_parse_enums: bool | None = None, _cli_prog_name: str | None = None, _cli_parse_args: bool | list[str] | tuple[str, ...] | None = None, _cli_settings_source: CliSettingsSource[Any] | None = None, _cli_parse_none_str: str | None = None, _cli_hide_none_type: bool | None = None, _cli_avoid_json: bool | None = None, _cli_enforce_required: bool | None = None, _cli_use_class_docs_for_groups: bool | None = None, _cli_exit_on_error: bool | None = None, _cli_prefix: str | None = None, _cli_flag_prefix_char: str | None = None, _cli_implicit_flags: bool | None = None, _cli_ignore_unknown_args: bool | None = None, _cli_kebab_case: bool | Literal['all', 'no_enums'] | None = None, _cli_shortcuts: Mapping[str, str | list[str]] | None = None, _secrets_dir: PathType | None = None, *, topic: str, header: ~typing.Annotated[~typing.Dict[str, ~typing.Any], Json] | ~typing.Dict[str, ~typing.Any] = <factory>, max_workers: ~typing.Annotated[int, ~annotated_types.Gt(gt=0)] | None = None, queue_size: ~typing.Annotated[int, ~annotated_types.Gt(gt=0)] | None = None, max_payload_size: int = 512000, producer_keep_alive: int = 120, producer_connection_timeout: int = 30, websocket_url: str = '', producer_url: str | None = None, consumer_url: str | None = None)[source]
Bases:
BaseMessagingConfigA configuration for a websocket.
- Parameters:
topic (str) – The topic identifier under which to register at the pulsar.
header (Union[Annotated[Dict[str, Any], Json], Dict[str, Any]]) – Header parameters for the request
max_workers (Optional[Annotated[int, Gt(gt=0)]]) – (optional) number of concurrent workers for handling requests, default: number of processors on the machine, multiplied by 5.
queue_size (Optional[Annotated[int, Gt(gt=0)]]) – (optional) size of the request queue, if MAX_WORKERS is set, this needs to be at least as big as MAX_WORKERS, otherwise an AttributeException is raised.
max_payload_size (int) – (optional) maximum payload size, must be smaller than pulsars ‘webSocketMaxTextFrameSize’, which is configured e.g.via ‘pulsar/conf/standalone.conf’.default: 512000 (500kb).
producer_keep_alive (int) – The amount of time that the websocket connection to a producer should be kept open. By default, 2 minutes (120 seconds). On each outgoing message, the timer will be reset. Set this to 0 to immediately close the connection when a message has been sent and acknowledged.
producer_connection_timeout (int) – The amount of time that we grant producers to establish a connection to the message broker in order to send a response. If a connection cannot be established in this time, the response will not be sent and the connection will be closed.
websocket_url (str) – The fully qualified URL to the websocket.
producer_url (Optional[str]) – An alternative URL to use for producers. If None, the websocket_url will be used.
consumer_url (Optional[str]) – An alternative URL to use for consumers. If None, the websocket_url will be used.
Attributes:
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Methods:
get_topic_url(topic[, subscription])Build the URL to connect to a websocket.
model_post_init(context, /)This function is meant to behave like a BaseModel method to initialise private attributes.
- get_topic_url(topic: str, subscription: str | None = None) str[source]
Build the URL to connect to a websocket.
- model_config: ClassVar[SettingsConfigDict] = {'arbitrary_types_allowed': True, 'case_sensitive': False, 'cli_avoid_json': False, 'cli_enforce_required': False, 'cli_exit_on_error': True, 'cli_flag_prefix_char': '-', 'cli_hide_none_type': False, 'cli_ignore_unknown_args': False, 'cli_implicit_flags': False, 'cli_kebab_case': False, 'cli_parse_args': None, 'cli_parse_none_str': None, 'cli_prefix': '', 'cli_prog_name': None, 'cli_shortcuts': None, 'cli_use_class_docs_for_groups': False, 'enable_decoding': True, 'env_file': None, 'env_file_encoding': None, 'env_ignore_empty': False, 'env_nested_delimiter': None, 'env_nested_max_split': None, 'env_parse_enums': None, 'env_parse_none_str': None, 'env_prefix': 'de_backend_', 'extra': 'forbid', 'json_file': None, 'json_file_encoding': None, 'nested_model_default_partial_update': False, 'protected_namespaces': ('model_validate', 'model_dump', 'settings_customise_sources'), 'secrets_dir': None, 'toml_file': None, 'validate_default': True, 'yaml_config_section': None, 'yaml_file': None, 'yaml_file_encoding': None}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].