From 2d282e4fe65b8055b553e6f47fc63f240942249a Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Fri, 22 Nov 2024 13:17:37 +0100 Subject: [PATCH 01/25] Bump the minimum required channels version to v1.4.0 This is to be able to use type guards in with `Receiver.filter` to narrow the received type and the new `WithPrevious` class. Signed-off-by: Leandro Lucarella --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 84f91fb7a..07db7ec80 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,7 +30,7 @@ dependencies = [ # changing the version # (plugins.mkdocstrings.handlers.python.import) "frequenz-client-microgrid >= 0.6.0, < 0.7.0", - "frequenz-channels >= 1.2.0, < 2.0.0", + "frequenz-channels >= 1.4.0, < 2.0.0", "frequenz-quantities >= 1.0.0rc3, < 2.0.0", "networkx >= 2.8, < 4", "numpy >= 1.26.4, < 2", From b67acd62c5fe40398dacef4bf25d7371e0dc176d Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Fri, 15 Nov 2024 13:18:21 +0100 Subject: [PATCH 02/25] Add a `ConfigManager` class This class instantiates and starts the `ConfigManagingActor` and creates the channel needed to communicate with it. It offers a convenience method to get receivers to receive configuration updates. Signed-off-by: Leandro Lucarella --- src/frequenz/sdk/config/__init__.py | 4 +- src/frequenz/sdk/config/_manager.py | 99 +++++++++++++++++++++++++++++ 2 files changed, 102 insertions(+), 1 deletion(-) create mode 100644 src/frequenz/sdk/config/_manager.py diff --git a/src/frequenz/sdk/config/__init__.py b/src/frequenz/sdk/config/__init__.py index 50a16cb79..327777d31 100644 --- a/src/frequenz/sdk/config/__init__.py +++ b/src/frequenz/sdk/config/__init__.py @@ -1,13 +1,15 @@ # License: MIT # Copyright © 2024 Frequenz Energy-as-a-Service GmbH -"""Read and update config variables.""" +"""Configuration management.""" from ._logging_actor import LoggerConfig, LoggingConfig, LoggingConfigUpdatingActor +from ._manager import ConfigManager from ._managing_actor import ConfigManagingActor from ._util import load_config __all__ = [ + "ConfigManager", "ConfigManagingActor", "LoggerConfig", "LoggingConfig", diff --git a/src/frequenz/sdk/config/_manager.py b/src/frequenz/sdk/config/_manager.py new file mode 100644 index 000000000..32458459b --- /dev/null +++ b/src/frequenz/sdk/config/_manager.py @@ -0,0 +1,99 @@ +# License: MIT +# Copyright © 2024 Frequenz Energy-as-a-Service GmbH + +"""Management of configuration.""" + +import pathlib +from collections.abc import Mapping, Sequence +from datetime import timedelta +from typing import Any, Final + +from frequenz.channels import Broadcast, Receiver + +from ._managing_actor import ConfigManagingActor + + +class ConfigManager: + """A manager for configuration files. + + This class reads configuration files and sends the configuration to the receivers, + providing optional configuration key filtering and schema validation. + """ + + def __init__( # pylint: disable=too-many-arguments + self, + config_paths: Sequence[pathlib.Path], + /, + *, + auto_start: bool = True, + force_polling: bool = True, + name: str | None = None, + polling_interval: timedelta = timedelta(seconds=5), + ) -> None: + """Initialize this config manager. + + Args: + config_paths: The paths to the TOML files with the configuration. Order + matters, as the configuration will be read and updated in the order + of the paths, so the last path will override the configuration set by + the previous paths. Dict keys will be merged recursively, but other + objects (like lists) will be replaced by the value in the last path. + auto_start: Whether to start the actor automatically. If `False`, the actor + will need to be started manually by calling `start()` on the actor. + force_polling: Whether to force file polling to check for changes. + name: A name to use when creating actors. If `None`, `str(id(self))` will + be used. This is used mostly for debugging purposes. + polling_interval: The interval to poll for changes. Only relevant if + polling is enabled. + """ + self.name: Final[str] = str(id(self)) if name is None else name + """The name of this config manager.""" + + self.config_channel: Final[Broadcast[Mapping[str, Any]]] = Broadcast( + name=f"{self}_config", resend_latest=True + ) + """The broadcast channel for the configuration.""" + + self.actor: Final[ConfigManagingActor] = ConfigManagingActor( + config_paths, + self.config_channel.new_sender(), + name=str(self), + force_polling=force_polling, + polling_interval=polling_interval, + ) + """The actor that manages the configuration.""" + + if auto_start: + self.actor.start() + + def __repr__(self) -> str: + """Return a string representation of this config manager.""" + return ( + f"<{self.__class__.__name__}: " + f"name={self.name!r}, " + f"config_channel={self.config_channel!r}, " + f"actor={self.actor!r}>" + ) + + def __str__(self) -> str: + """Return a string representation of this config manager.""" + return f"{type(self).__name__}[{self.name}]" + + # The noqa DOC502 is needed because we raise TimeoutError indirectly. + async def new_receiver(self) -> Receiver[Mapping[str, Any] | None]: # noqa: DOC502 + """Create a new receiver for the configuration. + + Note: + If there is a burst of configuration updates, the receiver will only + receive the last configuration, older configurations will be ignored. + + Example: + ```python + # TODO: Add Example + ``` + + Returns: + The receiver for the configuration. + """ + recv_name = f"{self}_receiver" + return self.config_channel.new_receiver(name=recv_name, limit=1) From 29725c4a87095ce82ffec3e294342e8115802299 Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Fri, 22 Nov 2024 12:50:57 +0100 Subject: [PATCH 03/25] Add an option to wait for the first configuration This option is useful for when retrieving the configuration for the first time, as users might want to block the program's initialization until the configuration is read. Signed-off-by: Leandro Lucarella --- src/frequenz/sdk/config/_manager.py | 52 +++++++++++++++++++++++++++-- 1 file changed, 50 insertions(+), 2 deletions(-) diff --git a/src/frequenz/sdk/config/_manager.py b/src/frequenz/sdk/config/_manager.py index 32458459b..fc1a0b4ce 100644 --- a/src/frequenz/sdk/config/_manager.py +++ b/src/frequenz/sdk/config/_manager.py @@ -3,6 +3,7 @@ """Management of configuration.""" +import asyncio import pathlib from collections.abc import Mapping, Sequence from datetime import timedelta @@ -29,6 +30,7 @@ def __init__( # pylint: disable=too-many-arguments force_polling: bool = True, name: str | None = None, polling_interval: timedelta = timedelta(seconds=5), + wait_for_first_timeout: timedelta = timedelta(seconds=5), ) -> None: """Initialize this config manager. @@ -45,6 +47,10 @@ def __init__( # pylint: disable=too-many-arguments be used. This is used mostly for debugging purposes. polling_interval: The interval to poll for changes. Only relevant if polling is enabled. + wait_for_first_timeout: The timeout to use when waiting for the first + configuration in + [`new_receiver`][frequenz.sdk.config.ConfigManager.new_receiver] if + `wait_for_first` is `True`. """ self.name: Final[str] = str(id(self)) if name is None else name """The name of this config manager.""" @@ -63,6 +69,14 @@ def __init__( # pylint: disable=too-many-arguments ) """The actor that manages the configuration.""" + self.wait_for_first_timeout: timedelta = wait_for_first_timeout + """The timeout to use when waiting for the first configuration. + + When passing `wait_for_first` as `True` to + [`new_receiver`][frequenz.sdk.config.ConfigManager.new_receiver], this timeout + will be used to wait for the first configuration to be received. + """ + if auto_start: self.actor.start() @@ -71,6 +85,7 @@ def __repr__(self) -> str: return ( f"<{self.__class__.__name__}: " f"name={self.name!r}, " + f"wait_for_first_timeout={self.wait_for_first_timeout!r}, " f"config_channel={self.config_channel!r}, " f"actor={self.actor!r}>" ) @@ -80,20 +95,53 @@ def __str__(self) -> str: return f"{type(self).__name__}[{self.name}]" # The noqa DOC502 is needed because we raise TimeoutError indirectly. - async def new_receiver(self) -> Receiver[Mapping[str, Any] | None]: # noqa: DOC502 + async def new_receiver( # pylint: disable=too-many-arguments # noqa: DOC502 + self, + *, + wait_for_first: bool = False, + ) -> Receiver[Mapping[str, Any] | None]: """Create a new receiver for the configuration. Note: If there is a burst of configuration updates, the receiver will only receive the last configuration, older configurations will be ignored. + ### Waiting for the first configuration + + If `wait_for_first` is `True`, the receiver will wait for the first + configuration to be received before returning the receiver. If the + configuration can't be received in time, a timeout error will be raised. + + If the configuration is received successfully, the first configuration can be + simply retrieved by calling [`consume()`][frequenz.channels.Receiver.consume] on + the receiver without blocking. + Example: ```python # TODO: Add Example ``` + Args: + wait_for_first: Whether to wait for the first configuration to be received + before returning the receiver. If the configuration can't be received + for + [`wait_for_first_timeout`][frequenz.sdk.config.ConfigManager.wait_for_first_timeout] + time, a timeout error will be raised. If receiving was successful, the + first configuration can be simply retrieved by calling + [`consume()`][frequenz.channels.Receiver.consume] on the receiver. + Returns: The receiver for the configuration. + + Raises: + asyncio.TimeoutError: If `wait_for_first` is `True` and the first + configuration can't be received in time. """ recv_name = f"{self}_receiver" - return self.config_channel.new_receiver(name=recv_name, limit=1) + receiver = self.config_channel.new_receiver(name=recv_name, limit=1) + + if wait_for_first: + async with asyncio.timeout(self.wait_for_first_timeout.total_seconds()): + await receiver.ready() + + return receiver From 2921ed85b9c2c277f8a631a5d5e36ed74b847af4 Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Fri, 22 Nov 2024 13:07:34 +0100 Subject: [PATCH 04/25] Add option to only send changed configurations When this option is enabled, configurations will be sent to the receiver only if they are different from the previously received configuration. Signed-off-by: Leandro Lucarella --- src/frequenz/sdk/config/_manager.py | 31 +++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/src/frequenz/sdk/config/_manager.py b/src/frequenz/sdk/config/_manager.py index fc1a0b4ce..5f5173e33 100644 --- a/src/frequenz/sdk/config/_manager.py +++ b/src/frequenz/sdk/config/_manager.py @@ -4,15 +4,19 @@ """Management of configuration.""" import asyncio +import logging import pathlib from collections.abc import Mapping, Sequence from datetime import timedelta from typing import Any, Final from frequenz.channels import Broadcast, Receiver +from frequenz.channels.experimental import WithPrevious from ._managing_actor import ConfigManagingActor +_logger = logging.getLogger(__name__) + class ConfigManager: """A manager for configuration files. @@ -99,13 +103,24 @@ async def new_receiver( # pylint: disable=too-many-arguments # noqa: DOC502 self, *, wait_for_first: bool = False, + skip_unchanged: bool = True, ) -> Receiver[Mapping[str, Any] | None]: """Create a new receiver for the configuration. + This method has a lot of features and functionalities to make it easier to + receive configurations. + Note: If there is a burst of configuration updates, the receiver will only receive the last configuration, older configurations will be ignored. + ### Skipping superfluous updates + + If `skip_unchanged` is set to `True`, then a configuration that didn't change + compared to the last one received will be ignored and not sent to the receiver. + The comparison is done using the *raw* `dict` to determine if the configuration + has changed. + ### Waiting for the first configuration If `wait_for_first` is `True`, the receiver will wait for the first @@ -129,6 +144,8 @@ async def new_receiver( # pylint: disable=too-many-arguments # noqa: DOC502 time, a timeout error will be raised. If receiving was successful, the first configuration can be simply retrieved by calling [`consume()`][frequenz.channels.Receiver.consume] on the receiver. + skip_unchanged: Whether to skip sending the configuration if it hasn't + changed compared to the last one received. Returns: The receiver for the configuration. @@ -140,8 +157,22 @@ async def new_receiver( # pylint: disable=too-many-arguments # noqa: DOC502 recv_name = f"{self}_receiver" receiver = self.config_channel.new_receiver(name=recv_name, limit=1) + if skip_unchanged: + receiver = receiver.filter(WithPrevious(not_equal_with_logging)) + if wait_for_first: async with asyncio.timeout(self.wait_for_first_timeout.total_seconds()): await receiver.ready() return receiver + + +def not_equal_with_logging( + old_config: Mapping[str, Any], new_config: Mapping[str, Any] +) -> bool: + """Return whether the two mappings are not equal, logging if they are the same.""" + if old_config == new_config: + _logger.info("Configuration has not changed, skipping update") + _logger.debug("Old configuration being kept: %r", old_config) + return False + return True From 751d20e3b7e2b35bd074fff67a3a06a187b18fd6 Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Fri, 22 Nov 2024 13:21:12 +0100 Subject: [PATCH 05/25] Add support for subscribing to specific config keys This is useful for actors to be able to subscribe to a particular key with the actor configuration, avoiding spurious updates and noise in the received configuration. Signed-off-by: Leandro Lucarella --- src/frequenz/sdk/config/_manager.py | 96 ++++++++++++++++++++++++----- 1 file changed, 82 insertions(+), 14 deletions(-) diff --git a/src/frequenz/sdk/config/_manager.py b/src/frequenz/sdk/config/_manager.py index 5f5173e33..48cad6642 100644 --- a/src/frequenz/sdk/config/_manager.py +++ b/src/frequenz/sdk/config/_manager.py @@ -8,7 +8,7 @@ import pathlib from collections.abc import Mapping, Sequence from datetime import timedelta -from typing import Any, Final +from typing import Any, Final, overload from frequenz.channels import Broadcast, Receiver from frequenz.channels.experimental import WithPrevious @@ -98,13 +98,31 @@ def __str__(self) -> str: """Return a string representation of this config manager.""" return f"{type(self).__name__}[{self.name}]" + @overload + async def new_receiver( + self, + *, + wait_for_first: bool = True, + skip_unchanged: bool = True, + ) -> Receiver[Mapping[str, Any]]: ... + + @overload + async def new_receiver( + self, + *, + wait_for_first: bool = True, + skip_unchanged: bool = True, + key: str, + ) -> Receiver[Mapping[str, Any] | None]: ... + # The noqa DOC502 is needed because we raise TimeoutError indirectly. async def new_receiver( # pylint: disable=too-many-arguments # noqa: DOC502 self, *, wait_for_first: bool = False, skip_unchanged: bool = True, - ) -> Receiver[Mapping[str, Any] | None]: + key: str | None = None, + ) -> Receiver[Mapping[str, Any]] | Receiver[Mapping[str, Any] | None]: """Create a new receiver for the configuration. This method has a lot of features and functionalities to make it easier to @@ -121,6 +139,14 @@ async def new_receiver( # pylint: disable=too-many-arguments # noqa: DOC502 The comparison is done using the *raw* `dict` to determine if the configuration has changed. + ### Filtering + + The configuration can be filtered by a `key`. + + If the `key` is `None`, the receiver will receive the full configuration, + otherwise only the part of the configuration under the specified key is + received, or `None` if the key is not found. + ### Waiting for the first configuration If `wait_for_first` is `True`, the receiver will wait for the first @@ -146,6 +172,8 @@ async def new_receiver( # pylint: disable=too-many-arguments # noqa: DOC502 [`consume()`][frequenz.channels.Receiver.consume] on the receiver. skip_unchanged: Whether to skip sending the configuration if it hasn't changed compared to the last one received. + key: The key to filter the configuration. If `None`, the full configuration + will be received. Returns: The receiver for the configuration. @@ -154,25 +182,65 @@ async def new_receiver( # pylint: disable=too-many-arguments # noqa: DOC502 asyncio.TimeoutError: If `wait_for_first` is `True` and the first configuration can't be received in time. """ - recv_name = f"{self}_receiver" + recv_name = f"{self}_receiver" if key is None else f"{self}_receiver_{key}" receiver = self.config_channel.new_receiver(name=recv_name, limit=1) if skip_unchanged: - receiver = receiver.filter(WithPrevious(not_equal_with_logging)) + receiver = receiver.filter(WithPrevious(_NotEqualWithLogging(key))) if wait_for_first: async with asyncio.timeout(self.wait_for_first_timeout.total_seconds()): await receiver.ready() - return receiver + if key is None: + return receiver + + return receiver.map(lambda config: config.get(key)) -def not_equal_with_logging( - old_config: Mapping[str, Any], new_config: Mapping[str, Any] -) -> bool: - """Return whether the two mappings are not equal, logging if they are the same.""" - if old_config == new_config: - _logger.info("Configuration has not changed, skipping update") - _logger.debug("Old configuration being kept: %r", old_config) - return False - return True +class _NotEqualWithLogging: + """A predicate that returns whether the two mappings are not equal. + + If the mappings are equal, a logging message will be emitted indicating that the + configuration has not changed for the specified key. + """ + + def __init__(self, key: str | None = None) -> None: + """Initialize this instance. + + Args: + key: The key to use in the logging message. + """ + self._key = key + + def __call__( + self, old_config: Mapping[str, Any] | None, new_config: Mapping[str, Any] | None + ) -> bool: + """Return whether the two mappings are not equal, logging if they are the same.""" + if self._key is None: + has_changed = new_config != old_config + else: + match (new_config, old_config): + case (None, None): + has_changed = False + case (None, Mapping()): + has_changed = old_config.get(self._key) is not None + case (Mapping(), None): + has_changed = new_config.get(self._key) is not None + case (Mapping(), Mapping()): + has_changed = new_config.get(self._key) != old_config.get(self._key) + case unexpected: + # We can't use `assert_never` here because `mypy` is having trouble + # narrowing the types of a tuple. See for example: + # https://github.com/python/mypy/issues/16722 + # https://github.com/python/mypy/issues/16650 + # https://github.com/python/mypy/issues/14833 + # assert_never(unexpected) + assert False, f"Unexpected match: {unexpected}" + + if not has_changed: + key_str = f" for key '{self._key}'" if self._key else "" + _logger.info("Configuration%s has not changed, skipping update", key_str) + _logger.debug("Old configuration%s being kept: %r", key_str, old_config) + + return has_changed From 3ccb4eb795320c99e21ef1c67142e3068d1acbbd Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Thu, 28 Nov 2024 09:28:29 +0100 Subject: [PATCH 06/25] Add support for validating configurations with a schema This commit adds support for validating configurations with a schema. This is useful to ensure the configuration is correct and to receive it as an instance of a dataclass. The `new_receiver` method now accepts a `schema` argument that is used to validate the configuration. If the configuration doesn't pass the validation, it will be ignored and an error will be logged. The schema is expected to be a dataclass, which is used to create a marshmallow schema to validate the configuration. To customize the schema, you can use `marshmallow_dataclass.dataclass` to specify extra metadata. Signed-off-by: Leandro Lucarella --- src/frequenz/sdk/config/_manager.py | 187 +++++++++++++++++++++++++++- 1 file changed, 181 insertions(+), 6 deletions(-) diff --git a/src/frequenz/sdk/config/_manager.py b/src/frequenz/sdk/config/_manager.py index 48cad6642..1ff049786 100644 --- a/src/frequenz/sdk/config/_manager.py +++ b/src/frequenz/sdk/config/_manager.py @@ -8,12 +8,14 @@ import pathlib from collections.abc import Mapping, Sequence from datetime import timedelta -from typing import Any, Final, overload +from typing import Any, Final, TypeGuard, overload from frequenz.channels import Broadcast, Receiver from frequenz.channels.experimental import WithPrevious +from marshmallow import Schema, ValidationError from ._managing_actor import ConfigManagingActor +from ._util import DataclassT, load_config _logger = logging.getLogger(__name__) @@ -106,6 +108,21 @@ async def new_receiver( skip_unchanged: bool = True, ) -> Receiver[Mapping[str, Any]]: ... + @overload + async def new_receiver( # pylint: disable=too-many-arguments + self, + *, + wait_for_first: bool = True, + skip_unchanged: bool = True, + # We need to specify the key here because we have kwargs, so if it is not + # present is not considered None as the only possible value, as any value can be + # accepted as part of the kwargs. + key: None = None, + schema: type[DataclassT], + base_schema: type[Schema] | None = None, + **marshmallow_load_kwargs: Any, + ) -> Receiver[DataclassT]: ... + @overload async def new_receiver( self, @@ -115,6 +132,18 @@ async def new_receiver( key: str, ) -> Receiver[Mapping[str, Any] | None]: ... + @overload + async def new_receiver( # pylint: disable=too-many-arguments + self, + *, + wait_for_first: bool = True, + skip_unchanged: bool = True, + key: str, + schema: type[DataclassT], + base_schema: type[Schema] | None, + **marshmallow_load_kwargs: Any, + ) -> Receiver[DataclassT | None]: ... + # The noqa DOC502 is needed because we raise TimeoutError indirectly. async def new_receiver( # pylint: disable=too-many-arguments # noqa: DOC502 self, @@ -122,7 +151,15 @@ async def new_receiver( # pylint: disable=too-many-arguments # noqa: DOC502 wait_for_first: bool = False, skip_unchanged: bool = True, key: str | None = None, - ) -> Receiver[Mapping[str, Any]] | Receiver[Mapping[str, Any] | None]: + schema: type[DataclassT] | None = None, + base_schema: type[Schema] | None = None, + **marshmallow_load_kwargs: Any, + ) -> ( + Receiver[Mapping[str, Any]] + | Receiver[Mapping[str, Any] | None] + | Receiver[DataclassT] + | Receiver[DataclassT | None] + ): """Create a new receiver for the configuration. This method has a lot of features and functionalities to make it easier to @@ -147,6 +184,26 @@ async def new_receiver( # pylint: disable=too-many-arguments # noqa: DOC502 otherwise only the part of the configuration under the specified key is received, or `None` if the key is not found. + ### Schema validation + + The configuration is received as a dictionary unless a `schema` is provided. In + this case, the configuration will be validated against the schema and received + as an instance of the configuration class. + + The configuration `schema` class is expected to be + a [`dataclasses.dataclass`][], which is used to create + a [`marshmallow.Schema`][] schema to validate the configuration dictionary. + + To customize the schema derived from the configuration dataclass, you can + use [`marshmallow_dataclass.dataclass`][] to specify extra metadata. + + Configurations that don't pass the validation will be ignored and not sent to + the receiver, but an error will be logged. Errors other than `ValidationError` + will not be handled and will be raised. + + Additional arguments can be passed to [`marshmallow.Schema.load`][] using keyword + arguments. + ### Waiting for the first configuration If `wait_for_first` is `True`, the receiver will wait for the first @@ -174,6 +231,13 @@ async def new_receiver( # pylint: disable=too-many-arguments # noqa: DOC502 changed compared to the last one received. key: The key to filter the configuration. If `None`, the full configuration will be received. + schema: The type of the configuration. If provided, the configuration + will be validated against this type. + base_schema: An optional class to be used as a base schema for the + configuration class. This allow using custom fields for example. Will be + passed to [`marshmallow_dataclass.class_schema`][]. + **marshmallow_load_kwargs: Additional arguments to be passed to + [`marshmallow.Schema.load`][]. Returns: The receiver for the configuration. @@ -182,6 +246,79 @@ async def new_receiver( # pylint: disable=too-many-arguments # noqa: DOC502 asyncio.TimeoutError: If `wait_for_first` is `True` and the first configuration can't be received in time. """ + # All supporting generic function (using DataclassT) need to be nested + # here. For some reasons mypy has trouble if these functions are + # global, it consider the DataclassT used by this method and the global + # functions to be different, leading to very hard to find typing + # errors. + + @overload + def _load_config_with_logging( + config: Mapping[str, Any], + schema: type[DataclassT], + *, + key: None = None, + base_schema: type[Schema] | None = None, + **marshmallow_load_kwargs: Any, + ) -> DataclassT | _InvalidConfig: ... + + @overload + def _load_config_with_logging( + config: Mapping[str, Any], + schema: type[DataclassT], + *, + key: str, + base_schema: type[Schema] | None = None, + **marshmallow_load_kwargs: Any, + ) -> DataclassT | None | _InvalidConfig: ... + + def _load_config_with_logging( + config: Mapping[str, Any], + schema: type[DataclassT], + *, + key: str | None = None, + base_schema: type[Schema] | None = None, + **marshmallow_load_kwargs: Any, + ) -> DataclassT | None | _InvalidConfig: + """Try to load a configuration and log any validation errors.""" + if key is not None: + maybe_config = config.get(key, None) + if maybe_config is None: + _logger.debug( + "Configuration key %s not found, sending None: config=%r", + key, + config, + ) + return None + config = maybe_config + + try: + return load_config( + schema, config, base_schema=base_schema, **marshmallow_load_kwargs + ) + except ValidationError as err: + key_str = "" + if key: + key_str = f" for key '{key}'" + _logger.error( + "The configuration%s is invalid, the configuration update will be skipped: %s", + key_str, + err, + ) + return _INVALID_CONFIG + + def _is_valid_or_none( + config: DataclassT | _InvalidConfig | None, + ) -> TypeGuard[DataclassT | None]: + """Return whether the configuration is valid or `None`.""" + return config is not _INVALID_CONFIG + + def _is_valid( + config: DataclassT | _InvalidConfig, + ) -> TypeGuard[DataclassT]: + """Return whether the configuration is valid and not `None`.""" + return config is not _INVALID_CONFIG + recv_name = f"{self}_receiver" if key is None else f"{self}_receiver_{key}" receiver = self.config_channel.new_receiver(name=recv_name, limit=1) @@ -192,10 +329,40 @@ async def new_receiver( # pylint: disable=too-many-arguments # noqa: DOC502 async with asyncio.timeout(self.wait_for_first_timeout.total_seconds()): await receiver.ready() - if key is None: - return receiver - - return receiver.map(lambda config: config.get(key)) + match (key, schema): + case (None, None): + return receiver + case (None, type()): + return receiver.map( + lambda config: _load_config_with_logging( + config, + schema, + # we need to pass it explicitly because of the + # variadic keyword arguments, otherwise key + # could be included in marshmallow_load_kwargs + # with a value different than None. + key=None, + base_schema=base_schema, + **marshmallow_load_kwargs, + ) + ).filter(_is_valid) + case (str(), None): + return receiver.map(lambda config: config.get(key)) + case (str(), type()): + return receiver.map( + lambda config: _load_config_with_logging( + config, + schema, + key=key, + base_schema=base_schema, + **marshmallow_load_kwargs, + ) + ).filter(_is_valid_or_none) + case unexpected: + # We can't use `assert_never` here because `mypy` is + # having trouble + # narrowing the types of a tuple. + assert False, f"Unexpected match: {unexpected}" class _NotEqualWithLogging: @@ -244,3 +411,11 @@ def __call__( _logger.debug("Old configuration%s being kept: %r", key_str, old_config) return has_changed + + +class _InvalidConfig: + """A sentinel to represent an invalid configuration.""" + + +_INVALID_CONFIG = _InvalidConfig() +"""A sentinel singleton instance to represent an invalid configuration.""" From e39cd6b6aa4b389a4d26160c1566af93099003f4 Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Mon, 25 Nov 2024 09:47:36 +0100 Subject: [PATCH 07/25] Add support to filter a sub-key Configuration can be nested, and actors could have sub-actors that need their own configuration, so we need to support filtering the configuration by a sequence of keys. For example, if the configuration is `{"key": {"subkey": "value"}}`, and the key is `["key", "subkey"]`, then the receiver will get only `"value"`. Signed-off-by: Leandro Lucarella --- src/frequenz/sdk/config/_manager.py | 79 ++++++++++++++++++++++++----- 1 file changed, 66 insertions(+), 13 deletions(-) diff --git a/src/frequenz/sdk/config/_manager.py b/src/frequenz/sdk/config/_manager.py index 1ff049786..782f328fa 100644 --- a/src/frequenz/sdk/config/_manager.py +++ b/src/frequenz/sdk/config/_manager.py @@ -129,7 +129,7 @@ async def new_receiver( *, wait_for_first: bool = True, skip_unchanged: bool = True, - key: str, + key: str | Sequence[str], ) -> Receiver[Mapping[str, Any] | None]: ... @overload @@ -138,7 +138,7 @@ async def new_receiver( # pylint: disable=too-many-arguments *, wait_for_first: bool = True, skip_unchanged: bool = True, - key: str, + key: str | Sequence[str], schema: type[DataclassT], base_schema: type[Schema] | None, **marshmallow_load_kwargs: Any, @@ -150,7 +150,12 @@ async def new_receiver( # pylint: disable=too-many-arguments # noqa: DOC502 *, wait_for_first: bool = False, skip_unchanged: bool = True, - key: str | None = None, + # This is tricky, because a str is also a Sequence[str], if we would use only + # Sequence[str], then a regular string would also be accepted and taken as + # a sequence, like "key" -> ["k", "e", "y"]. We should never remove the str from + # the allowed types without changing Sequence[str] to something more specific, + # like list[str] or tuple[str]. + key: str | Sequence[str] | None = None, schema: type[DataclassT] | None = None, base_schema: type[Schema] | None = None, **marshmallow_load_kwargs: Any, @@ -184,6 +189,10 @@ async def new_receiver( # pylint: disable=too-many-arguments # noqa: DOC502 otherwise only the part of the configuration under the specified key is received, or `None` if the key is not found. + If the key is a sequence of strings, it will be treated as a nested key and the + receiver will receive the configuration under the nested key. For example + `["key", "subkey"]` will get only `config["key"]["subkey"]`. + ### Schema validation The configuration is received as a dictionary unless a `schema` is provided. In @@ -267,7 +276,7 @@ def _load_config_with_logging( config: Mapping[str, Any], schema: type[DataclassT], *, - key: str, + key: str | Sequence[str], base_schema: type[Schema] | None = None, **marshmallow_load_kwargs: Any, ) -> DataclassT | None | _InvalidConfig: ... @@ -276,13 +285,13 @@ def _load_config_with_logging( config: Mapping[str, Any], schema: type[DataclassT], *, - key: str | None = None, + key: str | Sequence[str] | None = None, base_schema: type[Schema] | None = None, **marshmallow_load_kwargs: Any, ) -> DataclassT | None | _InvalidConfig: """Try to load a configuration and log any validation errors.""" if key is not None: - maybe_config = config.get(key, None) + maybe_config = _get_key(config, key) if maybe_config is None: _logger.debug( "Configuration key %s not found, sending None: config=%r", @@ -347,7 +356,7 @@ def _is_valid( ) ).filter(_is_valid) case (str(), None): - return receiver.map(lambda config: config.get(key)) + return receiver.map(lambda config: _get_key(config, key)) case (str(), type()): return receiver.map( lambda config: _load_config_with_logging( @@ -372,7 +381,7 @@ class _NotEqualWithLogging: configuration has not changed for the specified key. """ - def __init__(self, key: str | None = None) -> None: + def __init__(self, key: str | Sequence[str] | None) -> None: """Initialize this instance. Args: @@ -384,18 +393,19 @@ def __call__( self, old_config: Mapping[str, Any] | None, new_config: Mapping[str, Any] | None ) -> bool: """Return whether the two mappings are not equal, logging if they are the same.""" - if self._key is None: + key = self._key + if key is None: has_changed = new_config != old_config else: match (new_config, old_config): case (None, None): has_changed = False case (None, Mapping()): - has_changed = old_config.get(self._key) is not None + has_changed = _get_key(old_config, key) is not None case (Mapping(), None): - has_changed = new_config.get(self._key) is not None + has_changed = _get_key(new_config, key) is not None case (Mapping(), Mapping()): - has_changed = new_config.get(self._key) != old_config.get(self._key) + has_changed = _get_key(new_config, key) != _get_key(old_config, key) case unexpected: # We can't use `assert_never` here because `mypy` is having trouble # narrowing the types of a tuple. See for example: @@ -406,13 +416,56 @@ def __call__( assert False, f"Unexpected match: {unexpected}" if not has_changed: - key_str = f" for key '{self._key}'" if self._key else "" + key_str = f" for key '{key}'" if key else "" _logger.info("Configuration%s has not changed, skipping update", key_str) _logger.debug("Old configuration%s being kept: %r", key_str, old_config) return has_changed +def _get_key( + config: Mapping[str, Any], + # This is tricky, because a str is also a Sequence[str], if we would use only + # Sequence[str], then a regular string would also be accepted and taken as + # a sequence, like "key" -> ["k", "e", "y"]. We should never remove the str from + # the allowed types without changing Sequence[str] to something more specific, + # like list[str] or tuple[str]. + key: str | Sequence[str] | None, +) -> Mapping[str, Any] | None: + """Get the value from the configuration under the specified key.""" + if key is None: + return config + # Order here is very important too, str() needs to come first, otherwise a regular + # will also match the Sequence[str] case. + # TODO: write tests to validate this works correctly. + if isinstance(key, str): + key = (key,) + value = config + current_path = [] + for subkey in key: + current_path.append(subkey) + if value is None: + return None + match value.get(subkey): + case None: + return None + case Mapping() as new_value: + value = new_value + case _: + subkey_str = "" + if len(key) > 1: + subkey_str = f" when looking for sub-key {key!r}" + _logger.error( + "Found key %r%s but it's not a mapping, returning None: config=%r", + current_path[0] if len(current_path) == 1 else current_path, + subkey_str, + config, + ) + return None + value = new_value + return value + + class _InvalidConfig: """A sentinel to represent an invalid configuration.""" From 3129c420eb8716cad9b651b5159b9184a32d106d Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Fri, 6 Dec 2024 12:53:04 +0100 Subject: [PATCH 08/25] Assert the receiver has the correct type For every path where we create a new receiver, we now assert we end up with the correct type, as the code proved to be a bit weak, and we sometime ended up with a `Receiver[Any]` without noticing. Signed-off-by: Leandro Lucarella --- src/frequenz/sdk/config/_manager.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/src/frequenz/sdk/config/_manager.py b/src/frequenz/sdk/config/_manager.py index 782f328fa..04d7dc59c 100644 --- a/src/frequenz/sdk/config/_manager.py +++ b/src/frequenz/sdk/config/_manager.py @@ -8,7 +8,7 @@ import pathlib from collections.abc import Mapping, Sequence from datetime import timedelta -from typing import Any, Final, TypeGuard, overload +from typing import Any, Final, TypeGuard, assert_type, overload from frequenz.channels import Broadcast, Receiver from frequenz.channels.experimental import WithPrevious @@ -340,9 +340,10 @@ def _is_valid( match (key, schema): case (None, None): + assert_type(receiver, Receiver[Mapping[str, Any]]) return receiver case (None, type()): - return receiver.map( + recv_dataclass = receiver.map( lambda config: _load_config_with_logging( config, schema, @@ -355,10 +356,14 @@ def _is_valid( **marshmallow_load_kwargs, ) ).filter(_is_valid) + assert_type(recv_dataclass, Receiver[DataclassT]) + return recv_dataclass case (str(), None): - return receiver.map(lambda config: _get_key(config, key)) + recv_map_or_none = receiver.map(lambda config: _get_key(config, key)) + assert_type(recv_map_or_none, Receiver[Mapping[str, Any] | None]) + return recv_map_or_none case (str(), type()): - return receiver.map( + recv_dataclass_or_none = receiver.map( lambda config: _load_config_with_logging( config, schema, @@ -367,6 +372,8 @@ def _is_valid( **marshmallow_load_kwargs, ) ).filter(_is_valid_or_none) + assert_type(recv_dataclass_or_none, Receiver[DataclassT | None]) + return recv_dataclass_or_none case unexpected: # We can't use `assert_never` here because `mypy` is # having trouble From 63c96a08163bd6a123f2a0b69f20c4a7e101d400 Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Fri, 6 Dec 2024 14:44:05 +0100 Subject: [PATCH 09/25] Add support for skipping `None` configs This is useful for cases where the the receiver can't react to `None` configurations, either because it is handled externally or because it should just keep the previous configuration. Signed-off-by: Leandro Lucarella --- src/frequenz/sdk/config/_manager.py | 72 +++++++++++++++++++++++++++-- 1 file changed, 67 insertions(+), 5 deletions(-) diff --git a/src/frequenz/sdk/config/_manager.py b/src/frequenz/sdk/config/_manager.py index 04d7dc59c..4008b5e9d 100644 --- a/src/frequenz/sdk/config/_manager.py +++ b/src/frequenz/sdk/config/_manager.py @@ -8,7 +8,7 @@ import pathlib from collections.abc import Mapping, Sequence from datetime import timedelta -from typing import Any, Final, TypeGuard, assert_type, overload +from typing import Any, Final, Literal, TypeGuard, assert_type, cast, overload from frequenz.channels import Broadcast, Receiver from frequenz.channels.experimental import WithPrevious @@ -106,6 +106,7 @@ async def new_receiver( *, wait_for_first: bool = True, skip_unchanged: bool = True, + skip_none: Literal[False] = False, ) -> Receiver[Mapping[str, Any]]: ... @overload @@ -114,6 +115,7 @@ async def new_receiver( # pylint: disable=too-many-arguments *, wait_for_first: bool = True, skip_unchanged: bool = True, + skip_none: Literal[False] = False, # We need to specify the key here because we have kwargs, so if it is not # present is not considered None as the only possible value, as any value can be # accepted as part of the kwargs. @@ -129,27 +131,54 @@ async def new_receiver( *, wait_for_first: bool = True, skip_unchanged: bool = True, + skip_none: Literal[False] = False, key: str | Sequence[str], ) -> Receiver[Mapping[str, Any] | None]: ... + @overload + async def new_receiver( + self, + *, + wait_for_first: bool = True, + skip_unchanged: bool = True, + skip_none: Literal[True] = True, + key: str | Sequence[str], + ) -> Receiver[Mapping[str, Any]]: ... + @overload async def new_receiver( # pylint: disable=too-many-arguments self, *, wait_for_first: bool = True, skip_unchanged: bool = True, + skip_none: Literal[False] = False, key: str | Sequence[str], schema: type[DataclassT], base_schema: type[Schema] | None, **marshmallow_load_kwargs: Any, ) -> Receiver[DataclassT | None]: ... + @overload + async def new_receiver( # pylint: disable=too-many-arguments + self, + *, + wait_for_first: bool = True, + skip_unchanged: bool = True, + skip_none: Literal[True] = True, + key: str | Sequence[str], + schema: type[DataclassT], + base_schema: type[Schema] | None, + **marshmallow_load_kwargs: Any, + ) -> Receiver[DataclassT]: ... + # The noqa DOC502 is needed because we raise TimeoutError indirectly. - async def new_receiver( # pylint: disable=too-many-arguments # noqa: DOC502 + # pylint: disable-next=too-many-arguments,too-many-locals + async def new_receiver( # noqa: DOC502 self, *, wait_for_first: bool = False, skip_unchanged: bool = True, + skip_none: bool = True, # This is tricky, because a str is also a Sequence[str], if we would use only # Sequence[str], then a regular string would also be accepted and taken as # a sequence, like "key" -> ["k", "e", "y"]. We should never remove the str from @@ -181,6 +210,13 @@ async def new_receiver( # pylint: disable=too-many-arguments # noqa: DOC502 The comparison is done using the *raw* `dict` to determine if the configuration has changed. + If `skip_none` is set to `True`, then a configuration that is `None` will be + ignored and not sent to the receiver. This is useful for cases where the the + receiver can't react to `None` configurations, either because it is handled + externally or because it should just keep the previous configuration. + This can only be used when `key` is not `None` as when `key` is `None`, the + configuration can never be `None`. + ### Filtering The configuration can be filtered by a `key`. @@ -238,6 +274,8 @@ async def new_receiver( # pylint: disable=too-many-arguments # noqa: DOC502 [`consume()`][frequenz.channels.Receiver.consume] on the receiver. skip_unchanged: Whether to skip sending the configuration if it hasn't changed compared to the last one received. + skip_none: Whether to skip sending the configuration if it is `None`. Only + valid when `key` is not `None`. key: The key to filter the configuration. If `None`, the full configuration will be received. schema: The type of the configuration. If provided, the configuration @@ -322,12 +360,22 @@ def _is_valid_or_none( """Return whether the configuration is valid or `None`.""" return config is not _INVALID_CONFIG - def _is_valid( - config: DataclassT | _InvalidConfig, + def _is_valid_and_not_none( + config: DataclassT | _InvalidConfig | None, ) -> TypeGuard[DataclassT]: """Return whether the configuration is valid and not `None`.""" return config is not _INVALID_CONFIG + def _is_dataclass(config: DataclassT | None) -> TypeGuard[DataclassT]: + """Return whether the configuration is a dataclass.""" + return config is not None + + def _is_mapping( + config: Mapping[str, Any] | None + ) -> TypeGuard[Mapping[str, Any]]: + """Return whether the configuration is a mapping.""" + return config is not None + recv_name = f"{self}_receiver" if key is None else f"{self}_receiver_{key}" receiver = self.config_channel.new_receiver(name=recv_name, limit=1) @@ -355,12 +403,22 @@ def _is_valid( base_schema=base_schema, **marshmallow_load_kwargs, ) - ).filter(_is_valid) + ).filter(_is_valid_and_not_none) assert_type(recv_dataclass, Receiver[DataclassT]) return recv_dataclass case (str(), None): recv_map_or_none = receiver.map(lambda config: _get_key(config, key)) assert_type(recv_map_or_none, Receiver[Mapping[str, Any] | None]) + if skip_none: + # For some reason mypy is having trouble narrowing the type here, + # so we need to cast it (pyright narrowes it correctly). + recv_map = cast( + Receiver[Mapping[str, Any]], + recv_map_or_none.filter(_is_mapping), + ) + assert_type(recv_map, Receiver[Mapping[str, Any]]) + return recv_map + assert_type(recv_map_or_none, Receiver[Mapping[str, Any] | None]) return recv_map_or_none case (str(), type()): recv_dataclass_or_none = receiver.map( @@ -373,6 +431,10 @@ def _is_valid( ) ).filter(_is_valid_or_none) assert_type(recv_dataclass_or_none, Receiver[DataclassT | None]) + if skip_none: + recv_dataclass = recv_dataclass_or_none.filter(_is_dataclass) + assert_type(recv_dataclass, Receiver[DataclassT]) + return recv_dataclass return recv_dataclass_or_none case unexpected: # We can't use `assert_never` here because `mypy` is From 77295dcf6ea3f5cd79472fc3fa4e94cc860a21db Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Fri, 22 Nov 2024 12:43:08 +0100 Subject: [PATCH 10/25] Add a global instance for the config manager This global instance can be used as a single point where any actor can obtain a receiver to receive configuration updates. Signed-off-by: Leandro Lucarella --- src/frequenz/sdk/config/__init__.py | 8 ++ src/frequenz/sdk/config/_global.py | 146 ++++++++++++++++++++++++++++ 2 files changed, 154 insertions(+) create mode 100644 src/frequenz/sdk/config/_global.py diff --git a/src/frequenz/sdk/config/__init__.py b/src/frequenz/sdk/config/__init__.py index 327777d31..60300ab06 100644 --- a/src/frequenz/sdk/config/__init__.py +++ b/src/frequenz/sdk/config/__init__.py @@ -3,6 +3,11 @@ """Configuration management.""" +from ._global import ( + get_config_manager, + initialize_config_manager, + shutdown_config_manager, +) from ._logging_actor import LoggerConfig, LoggingConfig, LoggingConfigUpdatingActor from ._manager import ConfigManager from ._managing_actor import ConfigManagingActor @@ -14,5 +19,8 @@ "LoggerConfig", "LoggingConfig", "LoggingConfigUpdatingActor", + "get_config_manager", + "initialize_config_manager", "load_config", + "shutdown_config_manager", ] diff --git a/src/frequenz/sdk/config/_global.py b/src/frequenz/sdk/config/_global.py new file mode 100644 index 000000000..4acda6b7f --- /dev/null +++ b/src/frequenz/sdk/config/_global.py @@ -0,0 +1,146 @@ +# License: MIT +# Copyright © 2024 Frequenz Energy-as-a-Service GmbH + +"""Global config manager.""" + +import asyncio +import logging +import pathlib +from collections.abc import Sequence +from datetime import timedelta + +from ._manager import ConfigManager + +_logger = logging.getLogger(__name__) + +# pylint: disable=global-statement +_CONFIG_MANAGER: ConfigManager | None = None +"""Global instance of the ConfigManagingActor. + +This is the only instance of the ConfigManagingActor that should be used in the +entire application. It is created lazily on the first access and should be +accessed via the `get_config_manager` function. +""" + + +def initialize_config_manager( # pylint: disable=too-many-arguments + config_paths: Sequence[pathlib.Path], + /, + *, + force_polling: bool = True, + name: str = "global", + polling_interval: timedelta = timedelta(seconds=5), + wait_for_first_timeout: timedelta = timedelta(seconds=5), +) -> ConfigManager: + """Initialize the singleton instance of the ConfigManagingActor. + + Args: + config_paths: Paths to the TOML configuration files. + force_polling: Whether to force file polling to check for changes. + name: The name of the config manager. + polling_interval: The interval to poll for changes. Only relevant if + polling is enabled. + wait_for_first_timeout: The timeout to use when waiting for the first + configuration in + [`new_receiver`][frequenz.sdk.config.ConfigManager.new_receiver] if + `wait_for_first` is `True`. + + Returns: + The global instance of the ConfigManagingActor. + + Raises: + RuntimeError: If the config manager is already initialized. + """ + _logger.info( + "Initializing config manager %s for %s with force_polling=%s, " + "polling_interval=%s, wait_for_first_timeout=%s", + name, + config_paths, + force_polling, + polling_interval, + wait_for_first_timeout, + ) + + global _CONFIG_MANAGER + if _CONFIG_MANAGER is not None: + raise RuntimeError("Config already initialized") + + _CONFIG_MANAGER = ConfigManager( + config_paths, + name=name, + force_polling=force_polling, + polling_interval=polling_interval, + wait_for_first_timeout=wait_for_first_timeout, + auto_start=True, + ) + + return _CONFIG_MANAGER + + +async def shutdown_config_manager( + *, + msg: str = "Config manager is shutting down", + timeout: timedelta | None = timedelta(seconds=5), +) -> None: + """Shutdown the global config manager. + + This will stop the config manager and release any resources it holds. + + Note: + The config manager must be + [initialized][frequenz.sdk.config.initialize_config] before calling this + function. + + Args: + msg: The message to be passed to the tasks being cancelled. + timeout: The maximum time to wait for the config manager to stop. If `None`, + the method will only cancel the config manager actor and return immediately + without awaiting at all (stopping might continue in the background). If the + time is exceeded, an error will be logged. + + Raises: + RuntimeError: If the config manager is not initialized. + """ + _logger.info("Shutting down config manager (timeout=%s)...", timeout) + + global _CONFIG_MANAGER + if _CONFIG_MANAGER is None: + raise RuntimeError("Config not initialized") + + if timeout is None: + _CONFIG_MANAGER.actor.cancel(msg) + _logger.info( + "Config manager cancelled, stopping might continue in the background." + ) + else: + try: + async with asyncio.timeout(timeout.total_seconds()): + await _CONFIG_MANAGER.actor.stop(msg) + _logger.info("Config manager stopped.") + except asyncio.TimeoutError: + _logger.warning( + "Config manager did not stop within %s seconds, it might continue " + "stopping in the background", + timeout, + ) + + _CONFIG_MANAGER = None + + +def get_config_manager() -> ConfigManager: + """Return the global config manager. + + Note: + The config manager must be + [initialized][frequenz.sdk.config.initialize_config] before calling this + function. + + Returns: + The global instance of the config manager. + + Raises: + RuntimeError: If the config manager is not initialized. + """ + if _CONFIG_MANAGER is None: + raise RuntimeError("Config not initialized") + return _CONFIG_MANAGER From cd08feb44b9151ef1aa802ae19c2f7e6a10beb1a Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Fri, 6 Dec 2024 15:08:12 +0100 Subject: [PATCH 11/25] Support using `BackgroundService` as a *mixin* This means using background service with multiple inheritance, so when calling `super().__init__()` it can properly ignore all the keyword arguments it doesn't use. This also means now `Actor` can be used as a *mixin*, as it doesn't provide its own `__init__()`. Signed-off-by: Leandro Lucarella --- src/frequenz/sdk/actor/_background_service.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/frequenz/sdk/actor/_background_service.py b/src/frequenz/sdk/actor/_background_service.py index 2d44d2afe..6951eb8b2 100644 --- a/src/frequenz/sdk/actor/_background_service.py +++ b/src/frequenz/sdk/actor/_background_service.py @@ -69,15 +69,20 @@ async def main() -> None: ``` """ - def __init__(self, *, name: str | None = None) -> None: + def __init__(self, *, name: str | None = None, **kwargs: Any) -> None: """Initialize this BackgroundService. Args: name: The name of this background service. If `None`, `str(id(self))` will be used. This is used mostly for debugging purposes. + **kwargs: Additional keyword arguments to be passed to the parent class + constructor. This is only provided to allow this class to be used as + a mixin alonside other classes that require additional keyword + arguments. """ self._name: str = str(id(self)) if name is None else name self._tasks: set[asyncio.Task[Any]] = set() + super().__init__(**kwargs) @abc.abstractmethod def start(self) -> None: From fbe18deefb9ed1dcd077477455615a32555fb8e9 Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Fri, 6 Dec 2024 15:12:19 +0100 Subject: [PATCH 12/25] Add a base config schema that provides quantities support This schema is provided to use as a default, and might be extended in the future to support more commonly used fields that are not provided by marshmallow by default. To use the quantity schema we need to bump the `frequenz-quantities` dependency and add the optional `marshmallow` dependency. Signed-off-by: Leandro Lucarella --- pyproject.toml | 2 +- src/frequenz/sdk/config/_base_schema.py | 10 ++++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) create mode 100644 src/frequenz/sdk/config/_base_schema.py diff --git a/pyproject.toml b/pyproject.toml index 07db7ec80..10f132936 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,7 +31,7 @@ dependencies = [ # (plugins.mkdocstrings.handlers.python.import) "frequenz-client-microgrid >= 0.6.0, < 0.7.0", "frequenz-channels >= 1.4.0, < 2.0.0", - "frequenz-quantities >= 1.0.0rc3, < 2.0.0", + "frequenz-quantities[marshmallow] >= 1.0.0, < 2.0.0", "networkx >= 2.8, < 4", "numpy >= 1.26.4, < 2", "typing_extensions >= 4.6.1, < 5", diff --git a/src/frequenz/sdk/config/_base_schema.py b/src/frequenz/sdk/config/_base_schema.py new file mode 100644 index 000000000..d25611958 --- /dev/null +++ b/src/frequenz/sdk/config/_base_schema.py @@ -0,0 +1,10 @@ +# License: MIT +# Copyright © 2024 Frequenz Energy-as-a-Service GmbH + +"""Base schema for configuration classes.""" + +from frequenz.quantities.experimental.marshmallow import QuantitySchema + + +class BaseConfigSchema(QuantitySchema): + """A base schema for configuration classes.""" From 3201348b34c9f9fa50ab26399acc4a4b99a51f1f Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Fri, 6 Dec 2024 15:09:34 +0100 Subject: [PATCH 13/25] Add a `Reconfigurable` *mixin* This class is mainly provided as a guideline on how to implement actors that can be reconfigured, so actor authors don't forget to do the basic steps to allow reconfiguration, and to have a common interface and pattern when creating reconfigurable actors. Signed-off-by: Leandro Lucarella --- src/frequenz/sdk/config/__init__.py | 2 + src/frequenz/sdk/config/_reconfigurable.py | 140 +++++++++++++++++++++ 2 files changed, 142 insertions(+) create mode 100644 src/frequenz/sdk/config/_reconfigurable.py diff --git a/src/frequenz/sdk/config/__init__.py b/src/frequenz/sdk/config/__init__.py index 60300ab06..5a6b7db08 100644 --- a/src/frequenz/sdk/config/__init__.py +++ b/src/frequenz/sdk/config/__init__.py @@ -11,6 +11,7 @@ from ._logging_actor import LoggerConfig, LoggingConfig, LoggingConfigUpdatingActor from ._manager import ConfigManager from ._managing_actor import ConfigManagingActor +from ._reconfigurable import Reconfigurable from ._util import load_config __all__ = [ @@ -19,6 +20,7 @@ "LoggerConfig", "LoggingConfig", "LoggingConfigUpdatingActor", + "Reconfigurable", "get_config_manager", "initialize_config_manager", "load_config", diff --git a/src/frequenz/sdk/config/_reconfigurable.py b/src/frequenz/sdk/config/_reconfigurable.py new file mode 100644 index 000000000..2ccdcf085 --- /dev/null +++ b/src/frequenz/sdk/config/_reconfigurable.py @@ -0,0 +1,140 @@ +# License: MIT +# Copyright © 2024 Frequenz Energy-as-a-Service GmbH + +"""Mixin for reconfigurable classes.""" + +from __future__ import annotations + +from typing import ( + TYPE_CHECKING, + Any, + Final, + Generic, + Literal, + Sequence, + assert_type, + overload, +) + +import marshmallow +from frequenz.channels import Receiver +from marshmallow import Schema + +from . import _global +from ._base_schema import BaseConfigSchema +from ._manager import ConfigManager +from ._util import DataclassT + + +class Reconfigurable(Generic[DataclassT]): + """A mixin for reconfigurable classes. + + This mixin provides a method to initialize the configuration of a class. It is + meant mostly as a guide on how to implement reconfigurable classes. + + TODO: Example in module. + """ + + def __init__( + self, + *, + config_key: str | Sequence[str], + config_schema: type[DataclassT], + config_manager: ConfigManager | None = None, + **kwargs: Any, + ) -> None: + """Initialize this reconfigurable mixin. + + Args: + config_key: The key to use to retrieve the configuration from the + configuration manager. + config_schema: The schema to use to load the configuration. + config_manager: The configuration manager to use. If `None`, the [global + configuration manager][frequenz.sdk.config.get_config_manager] will be + used. + **kwargs: Additional keyword arguments to be passed to the parent class + constructor. This is only provided to allow this class to be used as + a mixin alonside other classes that require additional keyword + arguments. + """ + self.config_schema: Final[type[DataclassT]] = config_schema + if not isinstance(config_key, (str, tuple)): + config_key = tuple(config_key) + self.config_key: Final[str | tuple[str, ...]] = config_key + if config_manager is None: + config_manager = _global.get_config_manager() + self.config_manager: Final[ConfigManager] = config_manager + super().__init__(**kwargs) + + @overload + async def initialize_config( # noqa: DOC502 + self, + *, + skip_unchanged: bool = True, + skip_none: Literal[True] = True, + base_schema: type[Schema] | None = BaseConfigSchema, + **marshmallow_load_kwargs: Any, + ) -> Receiver[DataclassT]: ... + + @overload + async def initialize_config( # noqa: DOC502 + self, + *, + skip_unchanged: bool = True, + skip_none: Literal[False] = False, + base_schema: type[Schema] | None = BaseConfigSchema, + **marshmallow_load_kwargs: Any, + ) -> Receiver[DataclassT | None]: ... + + # The noqa DOC502 is needed because we raise TimeoutError indirectly. + async def initialize_config( # noqa: DOC502 + self, + *, + skip_unchanged: bool = True, + skip_none: bool = True, + base_schema: type[Schema] | None = BaseConfigSchema, + **marshmallow_load_kwargs: Any, + ) -> Receiver[DataclassT] | Receiver[DataclassT | None]: + """Initialize the configuration. + + Args: + skip_unchanged: Whether to skip unchanged configurations. + skip_none: Whether to skip sending the configuration if it is `None`. Only + valid when `key` is not `None`. + base_schema: The base schema to use for the configuration schema. + **marshmallow_load_kwargs: Additional arguments to pass to + `marshmallow.Schema.load`. + + Returns: + A receiver to get configuration updates, + [ready][frequenz.channels.Receiver.ready] to receive the first + configuration. + + Raises: + asyncio.TimeoutError: If the first configuration can't be received in time. + """ + if "unknown" not in marshmallow_load_kwargs: + marshmallow_load_kwargs["unknown"] = marshmallow.EXCLUDE + if skip_none: + recv_not_none = await self.config_manager.new_receiver( + wait_for_first=True, + skip_unchanged=skip_unchanged, + skip_none=True, + key=self.config_key, + schema=self.config_schema, + base_schema=base_schema, + **marshmallow_load_kwargs, + ) + assert_type(recv_not_none, Receiver[DataclassT]) + return recv_not_none + recv_none = await self.config_manager.new_receiver( + wait_for_first=True, + skip_unchanged=skip_unchanged, + skip_none=False, + key=self.config_key, + schema=self.config_schema, + base_schema=base_schema, + **marshmallow_load_kwargs, + ) + assert_type(recv_none, Receiver[DataclassT | None]) + return recv_none From 2878a2ab65e034b4b444b6d3e409846ce8be876d Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Mon, 9 Dec 2024 11:57:15 +0100 Subject: [PATCH 14/25] Make the `LoggingConfigUpdatingActor` `Reconfigurable` The `LoggingConfigUpdatingActor` now inherits also from `Reconfigurable` to ensure a consistent initialization. This also means the actor can now receive `None` as configuration (for example if the configuration is removed), in which case it will go back to the default configuration. Signed-off-by: Leandro Lucarella --- RELEASE_NOTES.md | 1 + src/frequenz/sdk/config/_logging_actor.py | 68 +++++++++------------- tests/config/test_logging_actor.py | 71 ++++++++--------------- 3 files changed, 54 insertions(+), 86 deletions(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 1bbbd075f..7c6a0ced9 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -12,6 +12,7 @@ + Renamed to `LoggingConfigUpdatingActor` to follow the actor naming convention. + Make all arguments to the constructor keyword-only. + + If the configuration is removed, the actor will now load back the default configuration. * `LoggingConfig` diff --git a/src/frequenz/sdk/config/_logging_actor.py b/src/frequenz/sdk/config/_logging_actor.py index 7ba2f6b21..8f27d7d13 100644 --- a/src/frequenz/sdk/config/_logging_actor.py +++ b/src/frequenz/sdk/config/_logging_actor.py @@ -3,17 +3,18 @@ """Read and update logging severity from config.""" +from __future__ import annotations + import logging -from collections.abc import Mapping from dataclasses import dataclass, field -from typing import Annotated, Any +from typing import Annotated, Sequence import marshmallow import marshmallow.validate -from frequenz.channels import Receiver from ..actor import Actor -from ._util import load_config +from ._manager import ConfigManager +from ._reconfigurable import Reconfigurable _logger = logging.getLogger(__name__) @@ -66,7 +67,7 @@ class LoggingConfig: """The list of loggers configurations.""" -class LoggingConfigUpdatingActor(Actor): +class LoggingConfigUpdatingActor(Actor, Reconfigurable[LoggingConfig]): """Actor that listens for logging configuration changes and sets them. Example: @@ -84,26 +85,12 @@ class LoggingConfigUpdatingActor(Actor): ```python import asyncio - from collections.abc import Mapping - from typing import Any - from frequenz.channels import Broadcast - from frequenz.sdk.config import LoggingConfigUpdatingActor, ConfigManagingActor + from frequenz.sdk.config import LoggingConfigUpdatingActor from frequenz.sdk.actor import run as run_actors async def run() -> None: - config_channel = Broadcast[Mapping[str, Any]](name="config", resend_latest=True) - actors = [ - ConfigManagingActor( - config_paths=["config.toml"], output=config_channel.new_sender() - ), - LoggingConfigUpdatingActor( - config_recv=config_channel.new_receiver(limit=1)).map( - lambda app_config: app_config.get("logging", {} - ) - ), - ] - await run_actors(*actors) + await run_actors(LoggingConfigUpdatingActor()) asyncio.run(run()) ``` @@ -112,10 +99,12 @@ async def run() -> None: will be updated as well. """ + # pylint: disable-next=too-many-arguments def __init__( self, *, - config_recv: Receiver[Mapping[str, Any]], + config_key: str | Sequence[str] = "logging", + config_manager: ConfigManager | None = None, log_datefmt: str = "%Y-%m-%dT%H:%M:%S%z", log_format: str = "%(asctime)s %(levelname)-8s %(name)s:%(lineno)s: %(message)s", name: str | None = None, @@ -123,7 +112,11 @@ def __init__( """Initialize this instance. Args: - config_recv: The receiver to listen for configuration changes. + config_key: The key to use to retrieve the configuration from the + configuration manager. If `None`, the whole configuration will be used. + config_manager: The configuration manager to use. If `None`, the [global + configuration manager][frequenz.sdk.config.get_config_manager] will be + used. log_datefmt: Use the specified date/time format in logs. log_format: Use the specified format string in logs. name: The name of this actor. If `None`, `str(id(self))` will be used. This @@ -135,15 +128,18 @@ def __init__( in the application (through a previous `basicConfig()` call), then the format settings specified here will be ignored. """ - self._config_recv = config_recv + self._current_config: LoggingConfig = LoggingConfig() + + super().__init__( + name=name, + config_key=config_key, + config_manager=config_manager, + config_schema=LoggingConfig, + ) # Setup default configuration. # This ensures logging is configured even if actor fails to start or # if the configuration cannot be loaded. - self._current_config: LoggingConfig = LoggingConfig() - - super().__init__(name=name) - logging.basicConfig( format=log_format, datefmt=log_datefmt, @@ -152,17 +148,11 @@ def __init__( async def _run(self) -> None: """Listen for configuration changes and update logging.""" - async for message in self._config_recv: - try: - new_config = load_config(LoggingConfig, message) - except marshmallow.ValidationError: - _logger.exception( - "Invalid logging configuration received. Skipping config update" - ) - continue - - if new_config != self._current_config: - self._update_logging(new_config) + config_receiver = await self.initialize_config(skip_none=False) + async for new_config in config_receiver: + # When we receive None, we want to reset the logging configuration to the + # default + self._update_logging(new_config or LoggingConfig()) def _update_logging(self, config: LoggingConfig) -> None: """Configure the logging level.""" diff --git a/tests/config/test_logging_actor.py b/tests/config/test_logging_actor.py index 75c1e256c..340a55d0c 100644 --- a/tests/config/test_logging_actor.py +++ b/tests/config/test_logging_actor.py @@ -5,7 +5,6 @@ import asyncio import logging -from collections.abc import Mapping from typing import Any import pytest @@ -77,12 +76,16 @@ async def test_logging_config_updating_actor( # is not working anyway - python ignores it. mocker.patch("frequenz.sdk.config._logging_actor.logging.basicConfig") - config_channel = Broadcast[Mapping[str, Any]](name="config") - config_sender = config_channel.new_sender() + # Mock ConfigManager + mock_config_manager = mocker.Mock() + mock_config_manager.config_channel = Broadcast[LoggingConfig | None](name="config") + mock_config_manager.new_receiver = mocker.AsyncMock( + return_value=mock_config_manager.config_channel.new_receiver() + ) + async with LoggingConfigUpdatingActor( - config_recv=config_channel.new_receiver().map( - lambda app_config: app_config.get("logging", {}) - ) + config_key="logging", + config_manager=mock_config_manager, ) as actor: assert logging.getLogger("frequenz.sdk.actor").level == logging.NOTSET assert logging.getLogger("frequenz.sdk.timeseries").level == logging.NOTSET @@ -90,65 +93,39 @@ async def test_logging_config_updating_actor( update_logging_spy = mocker.spy(actor, "_update_logging") # Send first config - await config_sender.send( - { - "logging": { - "root_logger": {"level": "ERROR"}, - "loggers": { - "frequenz.sdk.actor": {"level": "DEBUG"}, - "frequenz.sdk.timeseries": {"level": "ERROR"}, - }, - } - } + expected_config = LoggingConfig( + root_logger=LoggerConfig(level="ERROR"), + loggers={ + "frequenz.sdk.actor": LoggerConfig(level="DEBUG"), + "frequenz.sdk.timeseries": LoggerConfig(level="ERROR"), + }, ) + await mock_config_manager.config_channel.new_sender().send(expected_config) await asyncio.sleep(0.01) - update_logging_spy.assert_called_once_with( - LoggingConfig( - root_logger=LoggerConfig(level="ERROR"), - loggers={ - "frequenz.sdk.actor": LoggerConfig(level="DEBUG"), - "frequenz.sdk.timeseries": LoggerConfig(level="ERROR"), - }, - ) - ) + update_logging_spy.assert_called_once_with(expected_config) assert logging.getLogger("frequenz.sdk.actor").level == logging.DEBUG assert logging.getLogger("frequenz.sdk.timeseries").level == logging.ERROR update_logging_spy.reset_mock() # Update config - await config_sender.send( - { - "logging": { - "root_logger": {"level": "WARNING"}, - "loggers": { - "frequenz.sdk.actor": {"level": "INFO"}, - }, - } - } - ) - await asyncio.sleep(0.01) expected_config = LoggingConfig( root_logger=LoggerConfig(level="WARNING"), loggers={ "frequenz.sdk.actor": LoggerConfig(level="INFO"), }, ) + await mock_config_manager.config_channel.new_sender().send(expected_config) + await asyncio.sleep(0.01) update_logging_spy.assert_called_once_with(expected_config) assert logging.getLogger("frequenz.sdk.actor").level == logging.INFO assert logging.getLogger("frequenz.sdk.timeseries").level == logging.NOTSET update_logging_spy.reset_mock() - # Send invalid config to make sure actor doesn't crash and doesn't setup invalid config. - await config_sender.send({"logging": {"root_logger": {"level": "UNKNOWN"}}}) - await asyncio.sleep(0.01) - update_logging_spy.assert_not_called() - assert actor._current_config == expected_config - update_logging_spy.reset_mock() - - # Send empty config to reset logging to default - await config_sender.send({"other": {"var1": 1}}) + # Send a None config to make sure actor doesn't crash and configures a default logging + await mock_config_manager.config_channel.new_sender().send(None) await asyncio.sleep(0.01) update_logging_spy.assert_called_once_with(LoggingConfig()) - assert logging.getLogger("frequenz.sdk.actor").level == logging.NOTSET - assert logging.getLogger("frequenz.sdk.timeseries").level == logging.NOTSET + assert ( + actor._current_config == LoggingConfig() # pylint: disable=protected-access + ) update_logging_spy.reset_mock() From 0b54413bb24eb84b911a48cfa57de2228df635bc Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Mon, 9 Dec 2024 13:56:48 +0100 Subject: [PATCH 15/25] Allow configuring logging via `ConfigManager` Signed-off-by: Leandro Lucarella --- src/frequenz/sdk/config/_global.py | 18 +++++++++++++++--- src/frequenz/sdk/config/_manager.py | 21 ++++++++++++++++++--- 2 files changed, 33 insertions(+), 6 deletions(-) diff --git a/src/frequenz/sdk/config/_global.py b/src/frequenz/sdk/config/_global.py index 4acda6b7f..886466b90 100644 --- a/src/frequenz/sdk/config/_global.py +++ b/src/frequenz/sdk/config/_global.py @@ -28,6 +28,7 @@ def initialize_config_manager( # pylint: disable=too-many-arguments /, *, force_polling: bool = True, + logging_config_key: str | Sequence[str] | None = "logging", name: str = "global", polling_interval: timedelta = timedelta(seconds=5), wait_for_first_timeout: timedelta = timedelta(seconds=5), @@ -37,6 +38,10 @@ def initialize_config_manager( # pylint: disable=too-many-arguments Args: config_paths: Paths to the TOML configuration files. force_polling: Whether to force file polling to check for changes. + logging_config_key: The key to use for the logging configuration. If `None`, + logging configuration will not be managed. If a key is provided, the + manager update the logging configuration whenever the configuration + changes. name: The name of the config manager. polling_interval: The interval to poll for changes. Only relevant if polling is enabled. @@ -53,10 +58,11 @@ def initialize_config_manager( # pylint: disable=too-many-arguments """ _logger.info( "Initializing config manager %s for %s with force_polling=%s, " - "polling_interval=%s, wait_for_first_timeout=%s", + "logging_config_key=%s, polling_interval=%s, wait_for_first_timeout=%s", name, config_paths, force_polling, + logging_config_key, polling_interval, wait_for_first_timeout, ) @@ -69,6 +75,7 @@ def initialize_config_manager( # pylint: disable=too-many-arguments config_paths, name=name, force_polling=force_polling, + logging_config_key=logging_config_key, polling_interval=polling_interval, wait_for_first_timeout=wait_for_first_timeout, auto_start=True, @@ -108,14 +115,19 @@ async def shutdown_config_manager( raise RuntimeError("Config not initialized") if timeout is None: - _CONFIG_MANAGER.actor.cancel(msg) + _CONFIG_MANAGER.config_actor.cancel(msg) + if _CONFIG_MANAGER.logging_actor: + _CONFIG_MANAGER.logging_actor.cancel(msg) _logger.info( "Config manager cancelled, stopping might continue in the background." ) else: + to_stop = [_CONFIG_MANAGER.config_actor.stop(msg)] + if _CONFIG_MANAGER.logging_actor: + to_stop.append(_CONFIG_MANAGER.logging_actor.stop(msg)) try: async with asyncio.timeout(timeout.total_seconds()): - await _CONFIG_MANAGER.actor.stop(msg) + await asyncio.gather(*to_stop) _logger.info("Config manager stopped.") except asyncio.TimeoutError: _logger.warning( diff --git a/src/frequenz/sdk/config/_manager.py b/src/frequenz/sdk/config/_manager.py index 4008b5e9d..8080ead70 100644 --- a/src/frequenz/sdk/config/_manager.py +++ b/src/frequenz/sdk/config/_manager.py @@ -34,6 +34,7 @@ def __init__( # pylint: disable=too-many-arguments *, auto_start: bool = True, force_polling: bool = True, + logging_config_key: str | Sequence[str] | None = "logging", name: str | None = None, polling_interval: timedelta = timedelta(seconds=5), wait_for_first_timeout: timedelta = timedelta(seconds=5), @@ -49,6 +50,10 @@ def __init__( # pylint: disable=too-many-arguments auto_start: Whether to start the actor automatically. If `False`, the actor will need to be started manually by calling `start()` on the actor. force_polling: Whether to force file polling to check for changes. + logging_config_key: The key to use for the logging configuration. If `None`, + logging configuration will not be managed. If a key is provided, the + manager update the logging configuration whenever the configuration + changes. name: A name to use when creating actors. If `None`, `str(id(self))` will be used. This is used mostly for debugging purposes. polling_interval: The interval to poll for changes. Only relevant if @@ -66,7 +71,7 @@ def __init__( # pylint: disable=too-many-arguments ) """The broadcast channel for the configuration.""" - self.actor: Final[ConfigManagingActor] = ConfigManagingActor( + self.config_actor: Final[ConfigManagingActor] = ConfigManagingActor( config_paths, self.config_channel.new_sender(), name=str(self), @@ -83,8 +88,17 @@ def __init__( # pylint: disable=too-many-arguments will be used to wait for the first configuration to be received. """ + # pylint: disable-next=import-outside-toplevel,cyclic-import + from ._logging_actor import LoggingConfigUpdatingActor + + self.logging_actor: Final[LoggingConfigUpdatingActor | None] = ( + None if logging_config_key is None else LoggingConfigUpdatingActor() + ) + if auto_start: - self.actor.start() + self.config_actor.start() + if self.logging_actor: + self.logging_actor.start() def __repr__(self) -> str: """Return a string representation of this config manager.""" @@ -93,7 +107,8 @@ def __repr__(self) -> str: f"name={self.name!r}, " f"wait_for_first_timeout={self.wait_for_first_timeout!r}, " f"config_channel={self.config_channel!r}, " - f"actor={self.actor!r}>" + f"logging_actor={self.logging_actor!r}, " + f"config_actor={self.config_actor!r}>" ) def __str__(self) -> str: From 80fc62604cc2a246322b8a21f4fd459739648bc9 Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Tue, 10 Dec 2024 14:03:02 +0100 Subject: [PATCH 16/25] Revert "Make the `LoggingConfigUpdatingActor` `Reconfigurable`" The `Reconfigurable` mixin will be removed as it seems to be overkill and confusing. This reverts commit 3102990ef2d57c2a804a2c41391a512de13fb189. Signed-off-by: Leandro Lucarella --- RELEASE_NOTES.md | 1 - src/frequenz/sdk/config/_logging_actor.py | 68 +++++++++++++--------- tests/config/test_logging_actor.py | 71 +++++++++++++++-------- 3 files changed, 86 insertions(+), 54 deletions(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 7c6a0ced9..1bbbd075f 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -12,7 +12,6 @@ + Renamed to `LoggingConfigUpdatingActor` to follow the actor naming convention. + Make all arguments to the constructor keyword-only. - + If the configuration is removed, the actor will now load back the default configuration. * `LoggingConfig` diff --git a/src/frequenz/sdk/config/_logging_actor.py b/src/frequenz/sdk/config/_logging_actor.py index 8f27d7d13..7ba2f6b21 100644 --- a/src/frequenz/sdk/config/_logging_actor.py +++ b/src/frequenz/sdk/config/_logging_actor.py @@ -3,18 +3,17 @@ """Read and update logging severity from config.""" -from __future__ import annotations - import logging +from collections.abc import Mapping from dataclasses import dataclass, field -from typing import Annotated, Sequence +from typing import Annotated, Any import marshmallow import marshmallow.validate +from frequenz.channels import Receiver from ..actor import Actor -from ._manager import ConfigManager -from ._reconfigurable import Reconfigurable +from ._util import load_config _logger = logging.getLogger(__name__) @@ -67,7 +66,7 @@ class LoggingConfig: """The list of loggers configurations.""" -class LoggingConfigUpdatingActor(Actor, Reconfigurable[LoggingConfig]): +class LoggingConfigUpdatingActor(Actor): """Actor that listens for logging configuration changes and sets them. Example: @@ -85,12 +84,26 @@ class LoggingConfigUpdatingActor(Actor, Reconfigurable[LoggingConfig]): ```python import asyncio + from collections.abc import Mapping + from typing import Any - from frequenz.sdk.config import LoggingConfigUpdatingActor + from frequenz.channels import Broadcast + from frequenz.sdk.config import LoggingConfigUpdatingActor, ConfigManagingActor from frequenz.sdk.actor import run as run_actors async def run() -> None: - await run_actors(LoggingConfigUpdatingActor()) + config_channel = Broadcast[Mapping[str, Any]](name="config", resend_latest=True) + actors = [ + ConfigManagingActor( + config_paths=["config.toml"], output=config_channel.new_sender() + ), + LoggingConfigUpdatingActor( + config_recv=config_channel.new_receiver(limit=1)).map( + lambda app_config: app_config.get("logging", {} + ) + ), + ] + await run_actors(*actors) asyncio.run(run()) ``` @@ -99,12 +112,10 @@ async def run() -> None: will be updated as well. """ - # pylint: disable-next=too-many-arguments def __init__( self, *, - config_key: str | Sequence[str] = "logging", - config_manager: ConfigManager | None = None, + config_recv: Receiver[Mapping[str, Any]], log_datefmt: str = "%Y-%m-%dT%H:%M:%S%z", log_format: str = "%(asctime)s %(levelname)-8s %(name)s:%(lineno)s: %(message)s", name: str | None = None, @@ -112,11 +123,7 @@ def __init__( """Initialize this instance. Args: - config_key: The key to use to retrieve the configuration from the - configuration manager. If `None`, the whole configuration will be used. - config_manager: The configuration manager to use. If `None`, the [global - configuration manager][frequenz.sdk.config.get_config_manager] will be - used. + config_recv: The receiver to listen for configuration changes. log_datefmt: Use the specified date/time format in logs. log_format: Use the specified format string in logs. name: The name of this actor. If `None`, `str(id(self))` will be used. This @@ -128,18 +135,15 @@ def __init__( in the application (through a previous `basicConfig()` call), then the format settings specified here will be ignored. """ - self._current_config: LoggingConfig = LoggingConfig() - - super().__init__( - name=name, - config_key=config_key, - config_manager=config_manager, - config_schema=LoggingConfig, - ) + self._config_recv = config_recv # Setup default configuration. # This ensures logging is configured even if actor fails to start or # if the configuration cannot be loaded. + self._current_config: LoggingConfig = LoggingConfig() + + super().__init__(name=name) + logging.basicConfig( format=log_format, datefmt=log_datefmt, @@ -148,11 +152,17 @@ def __init__( async def _run(self) -> None: """Listen for configuration changes and update logging.""" - config_receiver = await self.initialize_config(skip_none=False) - async for new_config in config_receiver: - # When we receive None, we want to reset the logging configuration to the - # default - self._update_logging(new_config or LoggingConfig()) + async for message in self._config_recv: + try: + new_config = load_config(LoggingConfig, message) + except marshmallow.ValidationError: + _logger.exception( + "Invalid logging configuration received. Skipping config update" + ) + continue + + if new_config != self._current_config: + self._update_logging(new_config) def _update_logging(self, config: LoggingConfig) -> None: """Configure the logging level.""" diff --git a/tests/config/test_logging_actor.py b/tests/config/test_logging_actor.py index 340a55d0c..75c1e256c 100644 --- a/tests/config/test_logging_actor.py +++ b/tests/config/test_logging_actor.py @@ -5,6 +5,7 @@ import asyncio import logging +from collections.abc import Mapping from typing import Any import pytest @@ -76,16 +77,12 @@ async def test_logging_config_updating_actor( # is not working anyway - python ignores it. mocker.patch("frequenz.sdk.config._logging_actor.logging.basicConfig") - # Mock ConfigManager - mock_config_manager = mocker.Mock() - mock_config_manager.config_channel = Broadcast[LoggingConfig | None](name="config") - mock_config_manager.new_receiver = mocker.AsyncMock( - return_value=mock_config_manager.config_channel.new_receiver() - ) - + config_channel = Broadcast[Mapping[str, Any]](name="config") + config_sender = config_channel.new_sender() async with LoggingConfigUpdatingActor( - config_key="logging", - config_manager=mock_config_manager, + config_recv=config_channel.new_receiver().map( + lambda app_config: app_config.get("logging", {}) + ) ) as actor: assert logging.getLogger("frequenz.sdk.actor").level == logging.NOTSET assert logging.getLogger("frequenz.sdk.timeseries").level == logging.NOTSET @@ -93,39 +90,65 @@ async def test_logging_config_updating_actor( update_logging_spy = mocker.spy(actor, "_update_logging") # Send first config - expected_config = LoggingConfig( - root_logger=LoggerConfig(level="ERROR"), - loggers={ - "frequenz.sdk.actor": LoggerConfig(level="DEBUG"), - "frequenz.sdk.timeseries": LoggerConfig(level="ERROR"), - }, + await config_sender.send( + { + "logging": { + "root_logger": {"level": "ERROR"}, + "loggers": { + "frequenz.sdk.actor": {"level": "DEBUG"}, + "frequenz.sdk.timeseries": {"level": "ERROR"}, + }, + } + } ) - await mock_config_manager.config_channel.new_sender().send(expected_config) await asyncio.sleep(0.01) - update_logging_spy.assert_called_once_with(expected_config) + update_logging_spy.assert_called_once_with( + LoggingConfig( + root_logger=LoggerConfig(level="ERROR"), + loggers={ + "frequenz.sdk.actor": LoggerConfig(level="DEBUG"), + "frequenz.sdk.timeseries": LoggerConfig(level="ERROR"), + }, + ) + ) assert logging.getLogger("frequenz.sdk.actor").level == logging.DEBUG assert logging.getLogger("frequenz.sdk.timeseries").level == logging.ERROR update_logging_spy.reset_mock() # Update config + await config_sender.send( + { + "logging": { + "root_logger": {"level": "WARNING"}, + "loggers": { + "frequenz.sdk.actor": {"level": "INFO"}, + }, + } + } + ) + await asyncio.sleep(0.01) expected_config = LoggingConfig( root_logger=LoggerConfig(level="WARNING"), loggers={ "frequenz.sdk.actor": LoggerConfig(level="INFO"), }, ) - await mock_config_manager.config_channel.new_sender().send(expected_config) - await asyncio.sleep(0.01) update_logging_spy.assert_called_once_with(expected_config) assert logging.getLogger("frequenz.sdk.actor").level == logging.INFO assert logging.getLogger("frequenz.sdk.timeseries").level == logging.NOTSET update_logging_spy.reset_mock() - # Send a None config to make sure actor doesn't crash and configures a default logging - await mock_config_manager.config_channel.new_sender().send(None) + # Send invalid config to make sure actor doesn't crash and doesn't setup invalid config. + await config_sender.send({"logging": {"root_logger": {"level": "UNKNOWN"}}}) + await asyncio.sleep(0.01) + update_logging_spy.assert_not_called() + assert actor._current_config == expected_config + update_logging_spy.reset_mock() + + # Send empty config to reset logging to default + await config_sender.send({"other": {"var1": 1}}) await asyncio.sleep(0.01) update_logging_spy.assert_called_once_with(LoggingConfig()) - assert ( - actor._current_config == LoggingConfig() # pylint: disable=protected-access - ) + assert logging.getLogger("frequenz.sdk.actor").level == logging.NOTSET + assert logging.getLogger("frequenz.sdk.timeseries").level == logging.NOTSET update_logging_spy.reset_mock() From c34c199de4881a4d6442097378fb2fcba48c6b35 Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Tue, 10 Dec 2024 14:10:00 +0100 Subject: [PATCH 17/25] Revert "Add a `Reconfigurable` *mixin*" This class proved to be more confusing than helpful when presented to a wider audience, it seems to be better to propose ways to implement actors as documentation. This reverts commit 3201348b34c9f9fa50ab26399acc4a4b99a51f1f. Signed-off-by: Leandro Lucarella --- src/frequenz/sdk/config/__init__.py | 2 - src/frequenz/sdk/config/_reconfigurable.py | 140 --------------------- 2 files changed, 142 deletions(-) delete mode 100644 src/frequenz/sdk/config/_reconfigurable.py diff --git a/src/frequenz/sdk/config/__init__.py b/src/frequenz/sdk/config/__init__.py index 5a6b7db08..60300ab06 100644 --- a/src/frequenz/sdk/config/__init__.py +++ b/src/frequenz/sdk/config/__init__.py @@ -11,7 +11,6 @@ from ._logging_actor import LoggerConfig, LoggingConfig, LoggingConfigUpdatingActor from ._manager import ConfigManager from ._managing_actor import ConfigManagingActor -from ._reconfigurable import Reconfigurable from ._util import load_config __all__ = [ @@ -20,7 +19,6 @@ "LoggerConfig", "LoggingConfig", "LoggingConfigUpdatingActor", - "Reconfigurable", "get_config_manager", "initialize_config_manager", "load_config", diff --git a/src/frequenz/sdk/config/_reconfigurable.py b/src/frequenz/sdk/config/_reconfigurable.py deleted file mode 100644 index 2ccdcf085..000000000 --- a/src/frequenz/sdk/config/_reconfigurable.py +++ /dev/null @@ -1,140 +0,0 @@ -# License: MIT -# Copyright © 2024 Frequenz Energy-as-a-Service GmbH - -"""Mixin for reconfigurable classes.""" - -from __future__ import annotations - -from typing import ( - TYPE_CHECKING, - Any, - Final, - Generic, - Literal, - Sequence, - assert_type, - overload, -) - -import marshmallow -from frequenz.channels import Receiver -from marshmallow import Schema - -from . import _global -from ._base_schema import BaseConfigSchema -from ._manager import ConfigManager -from ._util import DataclassT - - -class Reconfigurable(Generic[DataclassT]): - """A mixin for reconfigurable classes. - - This mixin provides a method to initialize the configuration of a class. It is - meant mostly as a guide on how to implement reconfigurable classes. - - TODO: Example in module. - """ - - def __init__( - self, - *, - config_key: str | Sequence[str], - config_schema: type[DataclassT], - config_manager: ConfigManager | None = None, - **kwargs: Any, - ) -> None: - """Initialize this reconfigurable mixin. - - Args: - config_key: The key to use to retrieve the configuration from the - configuration manager. - config_schema: The schema to use to load the configuration. - config_manager: The configuration manager to use. If `None`, the [global - configuration manager][frequenz.sdk.config.get_config_manager] will be - used. - **kwargs: Additional keyword arguments to be passed to the parent class - constructor. This is only provided to allow this class to be used as - a mixin alonside other classes that require additional keyword - arguments. - """ - self.config_schema: Final[type[DataclassT]] = config_schema - if not isinstance(config_key, (str, tuple)): - config_key = tuple(config_key) - self.config_key: Final[str | tuple[str, ...]] = config_key - if config_manager is None: - config_manager = _global.get_config_manager() - self.config_manager: Final[ConfigManager] = config_manager - super().__init__(**kwargs) - - @overload - async def initialize_config( # noqa: DOC502 - self, - *, - skip_unchanged: bool = True, - skip_none: Literal[True] = True, - base_schema: type[Schema] | None = BaseConfigSchema, - **marshmallow_load_kwargs: Any, - ) -> Receiver[DataclassT]: ... - - @overload - async def initialize_config( # noqa: DOC502 - self, - *, - skip_unchanged: bool = True, - skip_none: Literal[False] = False, - base_schema: type[Schema] | None = BaseConfigSchema, - **marshmallow_load_kwargs: Any, - ) -> Receiver[DataclassT | None]: ... - - # The noqa DOC502 is needed because we raise TimeoutError indirectly. - async def initialize_config( # noqa: DOC502 - self, - *, - skip_unchanged: bool = True, - skip_none: bool = True, - base_schema: type[Schema] | None = BaseConfigSchema, - **marshmallow_load_kwargs: Any, - ) -> Receiver[DataclassT] | Receiver[DataclassT | None]: - """Initialize the configuration. - - Args: - skip_unchanged: Whether to skip unchanged configurations. - skip_none: Whether to skip sending the configuration if it is `None`. Only - valid when `key` is not `None`. - base_schema: The base schema to use for the configuration schema. - **marshmallow_load_kwargs: Additional arguments to pass to - `marshmallow.Schema.load`. - - Returns: - A receiver to get configuration updates, - [ready][frequenz.channels.Receiver.ready] to receive the first - configuration. - - Raises: - asyncio.TimeoutError: If the first configuration can't be received in time. - """ - if "unknown" not in marshmallow_load_kwargs: - marshmallow_load_kwargs["unknown"] = marshmallow.EXCLUDE - if skip_none: - recv_not_none = await self.config_manager.new_receiver( - wait_for_first=True, - skip_unchanged=skip_unchanged, - skip_none=True, - key=self.config_key, - schema=self.config_schema, - base_schema=base_schema, - **marshmallow_load_kwargs, - ) - assert_type(recv_not_none, Receiver[DataclassT]) - return recv_not_none - recv_none = await self.config_manager.new_receiver( - wait_for_first=True, - skip_unchanged=skip_unchanged, - skip_none=False, - key=self.config_key, - schema=self.config_schema, - base_schema=base_schema, - **marshmallow_load_kwargs, - ) - assert_type(recv_none, Receiver[DataclassT | None]) - return recv_none From 05164ef39aac797cc2c74ebd5713c4ce06029f45 Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Tue, 10 Dec 2024 14:05:07 +0100 Subject: [PATCH 18/25] Revert "Support using `BackgroundService` as a *mixin*" Since `Reconfigurable` was removed we don't need this anymore. The added possibility to use as a mixin doesn't seem to justify the loss of safety of adding arbitrary keyword arguments to the constructor. This reverts commit 5eb64165e8603bb50a5c0eb661de7c5770022cb2. Signed-off-by: Leandro Lucarella --- src/frequenz/sdk/actor/_background_service.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/src/frequenz/sdk/actor/_background_service.py b/src/frequenz/sdk/actor/_background_service.py index 6951eb8b2..2d44d2afe 100644 --- a/src/frequenz/sdk/actor/_background_service.py +++ b/src/frequenz/sdk/actor/_background_service.py @@ -69,20 +69,15 @@ async def main() -> None: ``` """ - def __init__(self, *, name: str | None = None, **kwargs: Any) -> None: + def __init__(self, *, name: str | None = None) -> None: """Initialize this BackgroundService. Args: name: The name of this background service. If `None`, `str(id(self))` will be used. This is used mostly for debugging purposes. - **kwargs: Additional keyword arguments to be passed to the parent class - constructor. This is only provided to allow this class to be used as - a mixin alonside other classes that require additional keyword - arguments. """ self._name: str = str(id(self)) if name is None else name self._tasks: set[asyncio.Task[Any]] = set() - super().__init__(**kwargs) @abc.abstractmethod def start(self) -> None: From dcd76fb86a307acdc388893937426f20efb2219d Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Tue, 10 Dec 2024 14:07:47 +0100 Subject: [PATCH 19/25] Revert "Add a global instance for the config manager" It seems to be more clear and easier to test to always pass a config manager explicitly, so we just remove the global instance, which was also a bit confusing when presented. This reverts commit 77295dcf6ea3f5cd79472fc3fa4e94cc860a21db. Signed-off-by: Leandro Lucarella --- src/frequenz/sdk/config/__init__.py | 8 -- src/frequenz/sdk/config/_global.py | 158 ---------------------------- 2 files changed, 166 deletions(-) delete mode 100644 src/frequenz/sdk/config/_global.py diff --git a/src/frequenz/sdk/config/__init__.py b/src/frequenz/sdk/config/__init__.py index 60300ab06..327777d31 100644 --- a/src/frequenz/sdk/config/__init__.py +++ b/src/frequenz/sdk/config/__init__.py @@ -3,11 +3,6 @@ """Configuration management.""" -from ._global import ( - get_config_manager, - initialize_config_manager, - shutdown_config_manager, -) from ._logging_actor import LoggerConfig, LoggingConfig, LoggingConfigUpdatingActor from ._manager import ConfigManager from ._managing_actor import ConfigManagingActor @@ -19,8 +14,5 @@ "LoggerConfig", "LoggingConfig", "LoggingConfigUpdatingActor", - "get_config_manager", - "initialize_config_manager", "load_config", - "shutdown_config_manager", ] diff --git a/src/frequenz/sdk/config/_global.py b/src/frequenz/sdk/config/_global.py deleted file mode 100644 index 886466b90..000000000 --- a/src/frequenz/sdk/config/_global.py +++ /dev/null @@ -1,158 +0,0 @@ -# License: MIT -# Copyright © 2024 Frequenz Energy-as-a-Service GmbH - -"""Global config manager.""" - -import asyncio -import logging -import pathlib -from collections.abc import Sequence -from datetime import timedelta - -from ._manager import ConfigManager - -_logger = logging.getLogger(__name__) - -# pylint: disable=global-statement -_CONFIG_MANAGER: ConfigManager | None = None -"""Global instance of the ConfigManagingActor. - -This is the only instance of the ConfigManagingActor that should be used in the -entire application. It is created lazily on the first access and should be -accessed via the `get_config_manager` function. -""" - - -def initialize_config_manager( # pylint: disable=too-many-arguments - config_paths: Sequence[pathlib.Path], - /, - *, - force_polling: bool = True, - logging_config_key: str | Sequence[str] | None = "logging", - name: str = "global", - polling_interval: timedelta = timedelta(seconds=5), - wait_for_first_timeout: timedelta = timedelta(seconds=5), -) -> ConfigManager: - """Initialize the singleton instance of the ConfigManagingActor. - - Args: - config_paths: Paths to the TOML configuration files. - force_polling: Whether to force file polling to check for changes. - logging_config_key: The key to use for the logging configuration. If `None`, - logging configuration will not be managed. If a key is provided, the - manager update the logging configuration whenever the configuration - changes. - name: The name of the config manager. - polling_interval: The interval to poll for changes. Only relevant if - polling is enabled. - wait_for_first_timeout: The timeout to use when waiting for the first - configuration in - [`new_receiver`][frequenz.sdk.config.ConfigManager.new_receiver] if - `wait_for_first` is `True`. - - Returns: - The global instance of the ConfigManagingActor. - - Raises: - RuntimeError: If the config manager is already initialized. - """ - _logger.info( - "Initializing config manager %s for %s with force_polling=%s, " - "logging_config_key=%s, polling_interval=%s, wait_for_first_timeout=%s", - name, - config_paths, - force_polling, - logging_config_key, - polling_interval, - wait_for_first_timeout, - ) - - global _CONFIG_MANAGER - if _CONFIG_MANAGER is not None: - raise RuntimeError("Config already initialized") - - _CONFIG_MANAGER = ConfigManager( - config_paths, - name=name, - force_polling=force_polling, - logging_config_key=logging_config_key, - polling_interval=polling_interval, - wait_for_first_timeout=wait_for_first_timeout, - auto_start=True, - ) - - return _CONFIG_MANAGER - - -async def shutdown_config_manager( - *, - msg: str = "Config manager is shutting down", - timeout: timedelta | None = timedelta(seconds=5), -) -> None: - """Shutdown the global config manager. - - This will stop the config manager and release any resources it holds. - - Note: - The config manager must be - [initialized][frequenz.sdk.config.initialize_config] before calling this - function. - - Args: - msg: The message to be passed to the tasks being cancelled. - timeout: The maximum time to wait for the config manager to stop. If `None`, - the method will only cancel the config manager actor and return immediately - without awaiting at all (stopping might continue in the background). If the - time is exceeded, an error will be logged. - - Raises: - RuntimeError: If the config manager is not initialized. - """ - _logger.info("Shutting down config manager (timeout=%s)...", timeout) - - global _CONFIG_MANAGER - if _CONFIG_MANAGER is None: - raise RuntimeError("Config not initialized") - - if timeout is None: - _CONFIG_MANAGER.config_actor.cancel(msg) - if _CONFIG_MANAGER.logging_actor: - _CONFIG_MANAGER.logging_actor.cancel(msg) - _logger.info( - "Config manager cancelled, stopping might continue in the background." - ) - else: - to_stop = [_CONFIG_MANAGER.config_actor.stop(msg)] - if _CONFIG_MANAGER.logging_actor: - to_stop.append(_CONFIG_MANAGER.logging_actor.stop(msg)) - try: - async with asyncio.timeout(timeout.total_seconds()): - await asyncio.gather(*to_stop) - _logger.info("Config manager stopped.") - except asyncio.TimeoutError: - _logger.warning( - "Config manager did not stop within %s seconds, it might continue " - "stopping in the background", - timeout, - ) - - _CONFIG_MANAGER = None - - -def get_config_manager() -> ConfigManager: - """Return the global config manager. - - Note: - The config manager must be - [initialized][frequenz.sdk.config.initialize_config] before calling this - function. - - Returns: - The global instance of the config manager. - - Raises: - RuntimeError: If the config manager is not initialized. - """ - if _CONFIG_MANAGER is None: - raise RuntimeError("Config not initialized") - return _CONFIG_MANAGER From 9215c15d21a0015b90a892202930849c46603e4f Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Fri, 22 Nov 2024 13:29:39 +0100 Subject: [PATCH 20/25] WIP: Add full example in the `config` module. --- src/frequenz/sdk/config/__init__.py | 65 ++++++++++++++++++++++++++++- 1 file changed, 64 insertions(+), 1 deletion(-) diff --git a/src/frequenz/sdk/config/__init__.py b/src/frequenz/sdk/config/__init__.py index 327777d31..4b10fa861 100644 --- a/src/frequenz/sdk/config/__init__.py +++ b/src/frequenz/sdk/config/__init__.py @@ -1,7 +1,70 @@ # License: MIT # Copyright © 2024 Frequenz Energy-as-a-Service GmbH -"""Configuration management.""" +"""Configuration management. + +Example: App configuring the global config manager. + ```python + import asyncio + import dataclasses + import sys + + import marshmallow + + from frequenz.channels import select, selected_from + from frequenz.sdk.actor import Actor + from frequenz.sdk.config import ( + initialize_config, + config_manager, + LoggingConfigUpdatingActor, + ConfigManager, + ) + + @dataclasses.dataclass + class ActorConfig: + name: str + + class MyActor(Actor): + def __init__(self, config: ActorConfig) -> None: + self._config = config + super().__init__() + + async def _run(self) -> None: + receiver = ... + config_receiver = await config_manager().new_receiver(schema=ActorConfig) + + async for selected in select(receiver, config_receiver): + if selected_from(selected, receiver): + ... + elif selected_from(selected, config_receiver): + self._config = selected.message + # Restart whatever is needed after a config update + + + @dataclasses.dataclass + class AppConfig: + positive_int: int = dataclasses.field( + default=42, + metadata={"validate": marshmallow.validate.Range(min=0)}, + ) + my_actor: ActorConfig | None = None + logging: LoggingConfig = LoggingConfig() + + async def main() -> None: + config_manager = initialize_config_manager(["config.toml"]) + try: + # Receive the first configuration + initial_config = await config_manager.new_receiver(schema=AppConfig, + wait_for_first=True) + except asyncio.TimeoutError: + print("No configuration received in time") + sys.exit(1) + + actor = MyActor(ActorConfig(name=initial_config.my_actor)) + actor.start() + await actor + ``` +""" from ._logging_actor import LoggerConfig, LoggingConfig, LoggingConfigUpdatingActor from ._manager import ConfigManager From 40dcc3fe728dad3a4ffb6744553479d00a897489 Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Tue, 10 Dec 2024 14:18:54 +0100 Subject: [PATCH 21/25] Revert "Add an option to wait for the first configuration" This feature is adding too much complexity, just to make it slightly easier to debug a very specific case, when a receiver gets stuck without being able to receive any messages at all. We will improve logging instead, so we can still debug this case without adding this complexity to the API. This also allows us to make the `new_receiver` method sync. This reverts commit 29725c4a87095ce82ffec3e294342e8115802299. Signed-off-by: Leandro Lucarella --- src/frequenz/sdk/config/_manager.py | 61 ++++------------------------- 1 file changed, 7 insertions(+), 54 deletions(-) diff --git a/src/frequenz/sdk/config/_manager.py b/src/frequenz/sdk/config/_manager.py index 8080ead70..94517eca0 100644 --- a/src/frequenz/sdk/config/_manager.py +++ b/src/frequenz/sdk/config/_manager.py @@ -3,7 +3,6 @@ """Management of configuration.""" -import asyncio import logging import pathlib from collections.abc import Mapping, Sequence @@ -37,7 +36,6 @@ def __init__( # pylint: disable=too-many-arguments logging_config_key: str | Sequence[str] | None = "logging", name: str | None = None, polling_interval: timedelta = timedelta(seconds=5), - wait_for_first_timeout: timedelta = timedelta(seconds=5), ) -> None: """Initialize this config manager. @@ -58,10 +56,6 @@ def __init__( # pylint: disable=too-many-arguments be used. This is used mostly for debugging purposes. polling_interval: The interval to poll for changes. Only relevant if polling is enabled. - wait_for_first_timeout: The timeout to use when waiting for the first - configuration in - [`new_receiver`][frequenz.sdk.config.ConfigManager.new_receiver] if - `wait_for_first` is `True`. """ self.name: Final[str] = str(id(self)) if name is None else name """The name of this config manager.""" @@ -80,14 +74,6 @@ def __init__( # pylint: disable=too-many-arguments ) """The actor that manages the configuration.""" - self.wait_for_first_timeout: timedelta = wait_for_first_timeout - """The timeout to use when waiting for the first configuration. - - When passing `wait_for_first` as `True` to - [`new_receiver`][frequenz.sdk.config.ConfigManager.new_receiver], this timeout - will be used to wait for the first configuration to be received. - """ - # pylint: disable-next=import-outside-toplevel,cyclic-import from ._logging_actor import LoggingConfigUpdatingActor @@ -105,7 +91,6 @@ def __repr__(self) -> str: return ( f"<{self.__class__.__name__}: " f"name={self.name!r}, " - f"wait_for_first_timeout={self.wait_for_first_timeout!r}, " f"config_channel={self.config_channel!r}, " f"logging_actor={self.logging_actor!r}, " f"config_actor={self.config_actor!r}>" @@ -116,19 +101,17 @@ def __str__(self) -> str: return f"{type(self).__name__}[{self.name}]" @overload - async def new_receiver( + def new_receiver( self, *, - wait_for_first: bool = True, skip_unchanged: bool = True, skip_none: Literal[False] = False, ) -> Receiver[Mapping[str, Any]]: ... @overload - async def new_receiver( # pylint: disable=too-many-arguments + def new_receiver( # pylint: disable=too-many-arguments self, *, - wait_for_first: bool = True, skip_unchanged: bool = True, skip_none: Literal[False] = False, # We need to specify the key here because we have kwargs, so if it is not @@ -141,30 +124,27 @@ async def new_receiver( # pylint: disable=too-many-arguments ) -> Receiver[DataclassT]: ... @overload - async def new_receiver( + def new_receiver( self, *, - wait_for_first: bool = True, skip_unchanged: bool = True, skip_none: Literal[False] = False, key: str | Sequence[str], ) -> Receiver[Mapping[str, Any] | None]: ... @overload - async def new_receiver( + def new_receiver( self, *, - wait_for_first: bool = True, skip_unchanged: bool = True, skip_none: Literal[True] = True, key: str | Sequence[str], ) -> Receiver[Mapping[str, Any]]: ... @overload - async def new_receiver( # pylint: disable=too-many-arguments + def new_receiver( # pylint: disable=too-many-arguments self, *, - wait_for_first: bool = True, skip_unchanged: bool = True, skip_none: Literal[False] = False, key: str | Sequence[str], @@ -174,10 +154,9 @@ async def new_receiver( # pylint: disable=too-many-arguments ) -> Receiver[DataclassT | None]: ... @overload - async def new_receiver( # pylint: disable=too-many-arguments + def new_receiver( # pylint: disable=too-many-arguments self, *, - wait_for_first: bool = True, skip_unchanged: bool = True, skip_none: Literal[True] = True, key: str | Sequence[str], @@ -188,10 +167,9 @@ async def new_receiver( # pylint: disable=too-many-arguments # The noqa DOC502 is needed because we raise TimeoutError indirectly. # pylint: disable-next=too-many-arguments,too-many-locals - async def new_receiver( # noqa: DOC502 + def new_receiver( # noqa: DOC502 self, *, - wait_for_first: bool = False, skip_unchanged: bool = True, skip_none: bool = True, # This is tricky, because a str is also a Sequence[str], if we would use only @@ -264,29 +242,12 @@ async def new_receiver( # noqa: DOC502 Additional arguments can be passed to [`marshmallow.Schema.load`][] using keyword arguments. - ### Waiting for the first configuration - - If `wait_for_first` is `True`, the receiver will wait for the first - configuration to be received before returning the receiver. If the - configuration can't be received in time, a timeout error will be raised. - - If the configuration is received successfully, the first configuration can be - simply retrieved by calling [`consume()`][frequenz.channels.Receiver.consume] on - the receiver without blocking. - Example: ```python # TODO: Add Example ``` Args: - wait_for_first: Whether to wait for the first configuration to be received - before returning the receiver. If the configuration can't be received - for - [`wait_for_first_timeout`][frequenz.sdk.config.ConfigManager.wait_for_first_timeout] - time, a timeout error will be raised. If receiving was successful, the - first configuration can be simply retrieved by calling - [`consume()`][frequenz.channels.Receiver.consume] on the receiver. skip_unchanged: Whether to skip sending the configuration if it hasn't changed compared to the last one received. skip_none: Whether to skip sending the configuration if it is `None`. Only @@ -303,10 +264,6 @@ async def new_receiver( # noqa: DOC502 Returns: The receiver for the configuration. - - Raises: - asyncio.TimeoutError: If `wait_for_first` is `True` and the first - configuration can't be received in time. """ # All supporting generic function (using DataclassT) need to be nested # here. For some reasons mypy has trouble if these functions are @@ -397,10 +354,6 @@ def _is_mapping( if skip_unchanged: receiver = receiver.filter(WithPrevious(_NotEqualWithLogging(key))) - if wait_for_first: - async with asyncio.timeout(self.wait_for_first_timeout.total_seconds()): - await receiver.ready() - match (key, schema): case (None, None): assert_type(receiver, Receiver[Mapping[str, Any]]) From 1f96cecd265a97f2216b35de3c5bc0d9955ef0e2 Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Tue, 10 Dec 2024 14:26:53 +0100 Subject: [PATCH 22/25] Improve logging for configuration file reading Signed-off-by: Leandro Lucarella --- src/frequenz/sdk/config/_managing_actor.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/src/frequenz/sdk/config/_managing_actor.py b/src/frequenz/sdk/config/_managing_actor.py index b54d07890..aff3833ca 100644 --- a/src/frequenz/sdk/config/_managing_actor.py +++ b/src/frequenz/sdk/config/_managing_actor.py @@ -117,9 +117,16 @@ def _read_config(self) -> abc.Mapping[str, Any] | None: config: dict[str, Any] = {} for config_path in self._config_paths: + _logger.info("%s: Reading configuration file %s...", self, config_path) try: with config_path.open("rb") as toml_file: data = tomllib.load(toml_file) + _logger.info( + "%s: Read %s bytes from configuration file %s.", + self, + len(data), + config_path, + ) config = _recursive_update(config, data) except ValueError as err: _logger.error("%s: Can't read config file, err: %s", self, err) @@ -140,6 +147,12 @@ def _read_config(self) -> abc.Mapping[str, Any] | None: ) return None + _logger.info( + "%s: Read %s/%s configuration files successfully.", + self, + len(self._config_paths) - error_count, + len(self._config_paths), + ) return config async def send_config(self) -> None: From 61d4b110272bc1ac68bb227f20a2576c5a7a8d96 Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Tue, 10 Dec 2024 14:50:26 +0100 Subject: [PATCH 23/25] Remove support for receiving raw mapping as configuration This seems to be a very niche feature that adds quite a bit of complexity. Users than need this kind of raw access can just get a receiver from the `config_channel` themselves and do the processing they need. Now the `schema` is required, was renamed to `config_class` for extra clarity and is a positional-only argument. Signed-off-by: Leandro Lucarella --- src/frequenz/sdk/config/_manager.py | 135 +++++++++------------------- 1 file changed, 44 insertions(+), 91 deletions(-) diff --git a/src/frequenz/sdk/config/_manager.py b/src/frequenz/sdk/config/_manager.py index 94517eca0..4d9d65356 100644 --- a/src/frequenz/sdk/config/_manager.py +++ b/src/frequenz/sdk/config/_manager.py @@ -7,8 +7,9 @@ import pathlib from collections.abc import Mapping, Sequence from datetime import timedelta -from typing import Any, Final, Literal, TypeGuard, assert_type, cast, overload +from typing import Any, Final, Literal, TypeGuard, assert_type, overload +import marshmallow from frequenz.channels import Broadcast, Receiver from frequenz.channels.experimental import WithPrevious from marshmallow import Schema, ValidationError @@ -100,17 +101,11 @@ def __str__(self) -> str: """Return a string representation of this config manager.""" return f"{type(self).__name__}[{self.name}]" - @overload - def new_receiver( - self, - *, - skip_unchanged: bool = True, - skip_none: Literal[False] = False, - ) -> Receiver[Mapping[str, Any]]: ... - @overload def new_receiver( # pylint: disable=too-many-arguments self, + config_class: type[DataclassT], + /, *, skip_unchanged: bool = True, skip_none: Literal[False] = False, @@ -118,37 +113,19 @@ def new_receiver( # pylint: disable=too-many-arguments # present is not considered None as the only possible value, as any value can be # accepted as part of the kwargs. key: None = None, - schema: type[DataclassT], base_schema: type[Schema] | None = None, **marshmallow_load_kwargs: Any, ) -> Receiver[DataclassT]: ... - @overload - def new_receiver( - self, - *, - skip_unchanged: bool = True, - skip_none: Literal[False] = False, - key: str | Sequence[str], - ) -> Receiver[Mapping[str, Any] | None]: ... - - @overload - def new_receiver( - self, - *, - skip_unchanged: bool = True, - skip_none: Literal[True] = True, - key: str | Sequence[str], - ) -> Receiver[Mapping[str, Any]]: ... - @overload def new_receiver( # pylint: disable=too-many-arguments self, + config_class: type[DataclassT], + /, *, skip_unchanged: bool = True, skip_none: Literal[False] = False, key: str | Sequence[str], - schema: type[DataclassT], base_schema: type[Schema] | None, **marshmallow_load_kwargs: Any, ) -> Receiver[DataclassT | None]: ... @@ -156,11 +133,12 @@ def new_receiver( # pylint: disable=too-many-arguments @overload def new_receiver( # pylint: disable=too-many-arguments self, + config_class: type[DataclassT], + /, *, skip_unchanged: bool = True, skip_none: Literal[True] = True, key: str | Sequence[str], - schema: type[DataclassT], base_schema: type[Schema] | None, **marshmallow_load_kwargs: Any, ) -> Receiver[DataclassT]: ... @@ -169,6 +147,8 @@ def new_receiver( # pylint: disable=too-many-arguments # pylint: disable-next=too-many-arguments,too-many-locals def new_receiver( # noqa: DOC502 self, + config_class: type[DataclassT], + /, *, skip_unchanged: bool = True, skip_none: bool = True, @@ -178,15 +158,9 @@ def new_receiver( # noqa: DOC502 # the allowed types without changing Sequence[str] to something more specific, # like list[str] or tuple[str]. key: str | Sequence[str] | None = None, - schema: type[DataclassT] | None = None, base_schema: type[Schema] | None = None, **marshmallow_load_kwargs: Any, - ) -> ( - Receiver[Mapping[str, Any]] - | Receiver[Mapping[str, Any] | None] - | Receiver[DataclassT] - | Receiver[DataclassT | None] - ): + ) -> Receiver[DataclassT] | Receiver[DataclassT | None]: """Create a new receiver for the configuration. This method has a lot of features and functionalities to make it easier to @@ -196,6 +170,25 @@ def new_receiver( # noqa: DOC502 If there is a burst of configuration updates, the receiver will only receive the last configuration, older configurations will be ignored. + ### Schema validation + + The raw configuration received as a `Mapping` will be validated and loaded to + as a `config_class`. The `config_class` class is expected to be + a [`dataclasses.dataclass`][], which is used to create + a [`marshmallow.Schema`][] via the [`marshmallow_dataclass.class_schema`][] + function. + + This means you can customize the schema derived from the configuration + dataclass using [`marshmallow_dataclass`][] to specify extra validation and + options via field metadata. + + Configurations that don't pass the validation will be ignored and not sent to + the receiver, but an error will be logged. Errors other than `ValidationError` + will not be handled and will be raised. + + Additional arguments can be passed to [`marshmallow.Schema.load`][] using + the `marshmallow_load_kwargs` keyword arguments. + ### Skipping superfluous updates If `skip_unchanged` is set to `True`, then a configuration that didn't change @@ -222,40 +215,20 @@ def new_receiver( # noqa: DOC502 receiver will receive the configuration under the nested key. For example `["key", "subkey"]` will get only `config["key"]["subkey"]`. - ### Schema validation - - The configuration is received as a dictionary unless a `schema` is provided. In - this case, the configuration will be validated against the schema and received - as an instance of the configuration class. - - The configuration `schema` class is expected to be - a [`dataclasses.dataclass`][], which is used to create - a [`marshmallow.Schema`][] schema to validate the configuration dictionary. - - To customize the schema derived from the configuration dataclass, you can - use [`marshmallow_dataclass.dataclass`][] to specify extra metadata. - - Configurations that don't pass the validation will be ignored and not sent to - the receiver, but an error will be logged. Errors other than `ValidationError` - will not be handled and will be raised. - - Additional arguments can be passed to [`marshmallow.Schema.load`][] using keyword - arguments. - Example: ```python # TODO: Add Example ``` Args: + config_class: The type of the configuration. If provided, the configuration + will be validated against this type. skip_unchanged: Whether to skip sending the configuration if it hasn't changed compared to the last one received. skip_none: Whether to skip sending the configuration if it is `None`. Only valid when `key` is not `None`. key: The key to filter the configuration. If `None`, the full configuration will be received. - schema: The type of the configuration. If provided, the configuration - will be validated against this type. base_schema: An optional class to be used as a base schema for the configuration class. This allow using custom fields for example. Will be passed to [`marshmallow_dataclass.class_schema`][]. @@ -274,7 +247,7 @@ def new_receiver( # noqa: DOC502 @overload def _load_config_with_logging( config: Mapping[str, Any], - schema: type[DataclassT], + config_class: type[DataclassT], *, key: None = None, base_schema: type[Schema] | None = None, @@ -284,7 +257,7 @@ def _load_config_with_logging( @overload def _load_config_with_logging( config: Mapping[str, Any], - schema: type[DataclassT], + config_class: type[DataclassT], *, key: str | Sequence[str], base_schema: type[Schema] | None = None, @@ -293,7 +266,7 @@ def _load_config_with_logging( def _load_config_with_logging( config: Mapping[str, Any], - schema: type[DataclassT], + config_class: type[DataclassT], *, key: str | Sequence[str] | None = None, base_schema: type[Schema] | None = None, @@ -313,7 +286,10 @@ def _load_config_with_logging( try: return load_config( - schema, config, base_schema=base_schema, **marshmallow_load_kwargs + config_class, + config, + base_schema=base_schema, + **marshmallow_load_kwargs, ) except ValidationError as err: key_str = "" @@ -342,27 +318,18 @@ def _is_dataclass(config: DataclassT | None) -> TypeGuard[DataclassT]: """Return whether the configuration is a dataclass.""" return config is not None - def _is_mapping( - config: Mapping[str, Any] | None - ) -> TypeGuard[Mapping[str, Any]]: - """Return whether the configuration is a mapping.""" - return config is not None - recv_name = f"{self}_receiver" if key is None else f"{self}_receiver_{key}" receiver = self.config_channel.new_receiver(name=recv_name, limit=1) if skip_unchanged: receiver = receiver.filter(WithPrevious(_NotEqualWithLogging(key))) - match (key, schema): - case (None, None): - assert_type(receiver, Receiver[Mapping[str, Any]]) - return receiver - case (None, type()): + match key: + case None: recv_dataclass = receiver.map( lambda config: _load_config_with_logging( config, - schema, + config_class, # we need to pass it explicitly because of the # variadic keyword arguments, otherwise key # could be included in marshmallow_load_kwargs @@ -374,25 +341,11 @@ def _is_mapping( ).filter(_is_valid_and_not_none) assert_type(recv_dataclass, Receiver[DataclassT]) return recv_dataclass - case (str(), None): - recv_map_or_none = receiver.map(lambda config: _get_key(config, key)) - assert_type(recv_map_or_none, Receiver[Mapping[str, Any] | None]) - if skip_none: - # For some reason mypy is having trouble narrowing the type here, - # so we need to cast it (pyright narrowes it correctly). - recv_map = cast( - Receiver[Mapping[str, Any]], - recv_map_or_none.filter(_is_mapping), - ) - assert_type(recv_map, Receiver[Mapping[str, Any]]) - return recv_map - assert_type(recv_map_or_none, Receiver[Mapping[str, Any] | None]) - return recv_map_or_none - case (str(), type()): + case str(): recv_dataclass_or_none = receiver.map( lambda config: _load_config_with_logging( config, - schema, + config_class, key=key, base_schema=base_schema, **marshmallow_load_kwargs, From 0cc7e9425f10e9617c5c7e534033c4b4c4580c24 Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Tue, 10 Dec 2024 14:55:16 +0100 Subject: [PATCH 24/25] Move note about update bursts to Skipping superfluous updates Signed-off-by: Leandro Lucarella --- src/frequenz/sdk/config/_manager.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/frequenz/sdk/config/_manager.py b/src/frequenz/sdk/config/_manager.py index 4d9d65356..90e1708ff 100644 --- a/src/frequenz/sdk/config/_manager.py +++ b/src/frequenz/sdk/config/_manager.py @@ -166,10 +166,6 @@ def new_receiver( # noqa: DOC502 This method has a lot of features and functionalities to make it easier to receive configurations. - Note: - If there is a burst of configuration updates, the receiver will only - receive the last configuration, older configurations will be ignored. - ### Schema validation The raw configuration received as a `Mapping` will be validated and loaded to @@ -191,6 +187,9 @@ def new_receiver( # noqa: DOC502 ### Skipping superfluous updates + If there is a burst of configuration updates, the receiver will only receive the + last configuration, older configurations will be ignored. + If `skip_unchanged` is set to `True`, then a configuration that didn't change compared to the last one received will be ignored and not sent to the receiver. The comparison is done using the *raw* `dict` to determine if the configuration From 418906643135cea09f82569021671cde90771f4c Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Tue, 10 Dec 2024 14:55:52 +0100 Subject: [PATCH 25/25] Exclude unknown fields from the config by default This is what most users will need, so we better make it the default. Signed-off-by: Leandro Lucarella --- src/frequenz/sdk/config/_manager.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/frequenz/sdk/config/_manager.py b/src/frequenz/sdk/config/_manager.py index 90e1708ff..8434943e6 100644 --- a/src/frequenz/sdk/config/_manager.py +++ b/src/frequenz/sdk/config/_manager.py @@ -185,6 +185,10 @@ def new_receiver( # noqa: DOC502 Additional arguments can be passed to [`marshmallow.Schema.load`][] using the `marshmallow_load_kwargs` keyword arguments. + If unspecified, the `marshmallow_load_kwargs` will have the `unknown` key set to + [`marshmallow.EXCLUDE`][] (instead of the normal [`marshmallow.RAISE`][] + default). + ### Skipping superfluous updates If there is a burst of configuration updates, the receiver will only receive the @@ -317,6 +321,9 @@ def _is_dataclass(config: DataclassT | None) -> TypeGuard[DataclassT]: """Return whether the configuration is a dataclass.""" return config is not None + if "unknown" not in marshmallow_load_kwargs: + marshmallow_load_kwargs["unknown"] = marshmallow.EXCLUDE + recv_name = f"{self}_receiver" if key is None else f"{self}_receiver_{key}" receiver = self.config_channel.new_receiver(name=recv_name, limit=1)