diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 057a3ef83..46fd449ae 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -2,7 +2,7 @@ ## Summary - +This release includes a new `ConfigManager` class to simplify managing the configuration, and ships other improvements and fixes to the config system in general. ## Upgrading @@ -11,7 +11,9 @@ * `LoggingConfigUpdater` + Renamed to `LoggingConfigUpdatingActor` to follow the actor naming convention. - + Make all arguments to the constructor keyword-only. + + The actor must now be constructed using a `ConfigManager` instead of a receiver. + + Make all arguments to the constructor keyword-only, except for the `config_manager` argument. + + If the configuration is removed, the actor will now load back the default configuration. * `LoggingConfig` @@ -31,12 +33,30 @@ + The `base_schema` argument is now keyword-only. + The arguments forwarded to `marshmallow.Schema.load()` now must be passed explicitly via the `marshmallow_load_kwargs` argument, as a `dict`, to improve the type-checking. + * `ConfigManagingActor`: Raise a `ValueError` if the `config_files` argument an empty sequence. + ## New Features -- `LoggingConfigUpdatingActor` +- `frequenz.sdk.config` + + - Logging was improved in general. + + - Added documentation and user guide. + + - `LoggingConfigUpdatingActor` - * Added a new `name` argument to the constructor to be able to override the actor's name. + * Added a new `name` argument to the constructor to be able to override the actor's name. + + - `ConfigManager`: Added a class to simplify managing the configuration. It takes care of instantiating the config actors and provides a convenient method for creating receivers with a lot of common functionality. + + - `BaseConfigSchema`: Added a `marshmallow` base `Schema` that includes custom fields for `frequenz-quantities`. In the futute more commonly used fields might be added. + + - `wait_for_first()`: Added a function to make it easy to wait for the first configuration to be received with a timeout. + + - `ConfigManagingActor`: Allow passing a single configuration file. ## Bug Fixes - Fix a bug in `BackgroundService` where it won't try to `self.cancel()` and `await self.wait()` if there are no internal tasks. This prevented to properly implement custom stop logic without having to redefine the `stop()` method too. + +- Fix a bug where if a string was passed to the `ConfigManagingActor` it would be interpreted as a sequence of 1 character strings. diff --git a/docs/user-guide/config.md b/docs/user-guide/config.md new file mode 100644 index 000000000..be4e42f77 --- /dev/null +++ b/docs/user-guide/config.md @@ -0,0 +1,9 @@ +# Configuration + +::: frequenz.sdk.config + options: + members: [] + show_bases: false + show_root_heading: false + show_root_toc_entry: false + show_source: false diff --git a/mkdocs.yml b/mkdocs.yml index 5a6343e20..bb59da720 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -119,6 +119,7 @@ plugins: - https://docs.python.org/3/objects.inv - https://frequenz-floss.github.io/frequenz-channels-python/v1.1/objects.inv - https://frequenz-floss.github.io/frequenz-client-microgrid-python/v0.5/objects.inv + - https://frequenz-floss.github.io/frequenz-quantities-python/v1/objects.inv - https://lovasoa.github.io/marshmallow_dataclass/html/objects.inv - https://marshmallow.readthedocs.io/en/stable/objects.inv - https://networkx.org/documentation/stable/objects.inv diff --git a/pyproject.toml b/pyproject.toml index 84f91fb7a..10f132936 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,8 +30,8 @@ dependencies = [ # changing the version # (plugins.mkdocstrings.handlers.python.import) "frequenz-client-microgrid >= 0.6.0, < 0.7.0", - "frequenz-channels >= 1.2.0, < 2.0.0", - "frequenz-quantities >= 1.0.0rc3, < 2.0.0", + "frequenz-channels >= 1.4.0, < 2.0.0", + "frequenz-quantities[marshmallow] >= 1.0.0, < 2.0.0", "networkx >= 2.8, < 4", "numpy >= 1.26.4, < 2", "typing_extensions >= 4.6.1, < 5", diff --git a/src/frequenz/sdk/config/__init__.py b/src/frequenz/sdk/config/__init__.py index 50a16cb79..bf564fa26 100644 --- a/src/frequenz/sdk/config/__init__.py +++ b/src/frequenz/sdk/config/__init__.py @@ -1,16 +1,445 @@ # License: MIT # Copyright © 2024 Frequenz Energy-as-a-Service GmbH -"""Read and update config variables.""" +"""Configuration management. +# Overview + +To provide dynamic configurations to an application, you can use the +[`ConfigManager`][frequenz.sdk.config.ConfigManager] class. This class provides +a convenient interface to manage configurations from multiple config files and receive +updates when the configurations change. Users can create a receiver to receive +configurations from the manager. + +# Setup + +To use the `ConfigManager`, you need to create an instance of it and pass the +paths to the configuration files. The configuration files must be in the TOML +format. + +When specifying multiple files order matters, as the configuration will be read and +updated in the order of the paths, so the last path will override the configuration set +by the previous paths. Dict keys will be merged recursively, but other objects (like +lists) will be replaced by the value in the last path. + +```python +from frequenz.sdk.config import ConfigManager + +async with ConfigManager(["base-config.toml", "overrides.toml"]) as config_manager: + ... +``` + +# Logging + +The `ConfigManager` can also instantiate +a [`LoggingConfigUpdatingActor`][frequenz.sdk.config.LoggingConfigUpdatingActor] to +monitor logging configurations. This actor will listen for logging configuration changes +and update the logging configuration accordingly. + +This feature is enabled by default using the key `logging` in the configuration file. To +disable it you can pass `logging_config_key=None` to the `ConfigManager`. + +# Receiving configurations + +To receive configurations, you can create a receiver using the [`new_receiver()`][ +frequenz.sdk.config.ConfigManager.new_receiver] method. The receiver will receive +configurations from the manager for a particular key, and validate and load the +configurations to a dataclass using [`marshmallow_dataclass`][]. + +If the key is a sequence of strings, it will be treated as a nested key and the +receiver will receive the configuration under the nested key. For example +`["key", "subkey"]` will get only `config["key"]["subkey"]`. + +Besides a configuration instance, the receiver can also receive exceptions if there are +errors loading the configuration (typically +a [`ValidationError`][marshmallow.ValidationError]), or `None` if there is no +configuration for the key. + +The value under `key` must be another mapping, otherwise +a [`InvalidValueForKeyError`][frequenz.sdk.config.InvalidValueForKeyError] instance will +be sent to the receiver. + +If there were any errors loading the configuration, the error will be logged too. + +```python +from dataclasses import dataclass +from frequenz.sdk.config import ConfigManager + +@dataclass(frozen=True, kw_only=True) +class AppConfig: + test: int + +async with ConfigManager("config.toml") as config_manager: + receiver = config_manager.new_receiver("app", AppConfig) + app_config = await receiver.receive() + match app_config: + case AppConfig(test=42): + print("App configured with 42") + case Exception() as error: + print(f"Error loading configuration: {error}") + case None: + print("There is no configuration for the app key") +``` + +## Validation and loading + +The configuration class used to create the configuration instance is expected to be +a [`dataclasses.dataclass`][], which is used to create a [`marshmallow.Schema`][] via +the [`marshmallow_dataclass.class_schema`][] function. + +This means you can customize the schema derived from the configuration +dataclass using [`marshmallow_dataclass`][] to specify extra validation and +options via field metadata. + +Customization can also be done via a `base_schema`. By default +[`BaseConfigSchema`][frequenz.sdk.config.BaseConfigSchema] is used to provide support +for some extra commonly used fields (like [quantities][frequenz.quantities]). + +```python +import marshmallow.validate +from dataclasses import dataclass, field + +@dataclass(frozen=True, kw_only=True) +class Config: + test: int = field( + metadata={"validate": marshmallow.validate.Range(min=0)}, + ) +``` + +Additional arguments can be passed to [`marshmallow.Schema.load`][] using +the `marshmallow_load_kwargs` keyword arguments. + +If unspecified, the `marshmallow_load_kwargs` will have the `unknown` key set to +[`marshmallow.EXCLUDE`][] (instead of the normal [`marshmallow.RAISE`][] +default). + +But when [`marshmallow.EXCLUDE`][] is used, a warning will be logged if there are extra +fields in the configuration that are excluded. This is useful, for example, to catch +typos in the configuration file. + +## Skipping superfluous updates + +If there is a burst of configuration updates, the receiver will only receive the +last configuration, older configurations will be ignored. + +If `skip_unchanged` is set to `True`, then a configuration that didn't change +compared to the last one received will be ignored and not sent to the receiver. +The comparison is done using the *raw* `dict` to determine if the configuration +has changed. + +## Error handling + +The value under `key` must be another mapping, otherwise an error +will be logged and a [`frequenz.sdk.config.InvalidValueForKeyError`][] instance +will be sent to the receiver. + +Configurations that don't pass the validation will be logged as an error and +the [`ValidationError`][marshmallow.ValidationError] sent to the receiver. + +Any other unexpected error raised during the configuration loading will be +logged as an error and the error instance sent to the receiver. + +## Further customization + +If you have special needs for receiving the configurations (for example validating using +`marshmallow` doesn't fit your needs), you can create a custom receiver using +[`config_channel.new_receiver()`][frequenz.sdk.config.ConfigManager.config_channel] +directly. Please bear in mind that this provides a low-level access to the whole config +in the file as a raw Python mapping. + +# Recommended usage + +Actors that need to be reconfigured should take a configuration manager and a key to +receive configurations updates, and instantiate the new receiver themselves. This allows +actors to have full control over how the configuration is loaded (for example providing +a custom base schema or marshmallow options). + +Passing the key explicitly too allows application to structure the configuration in +whatever way is most convenient for the application. + +Actors can use the [`wait_for_first()`][frequenz.sdk.config.wait_for_first] function to +wait for the first configuration to be received, and cache the configuration for later +use and in case the actor is restarted. If the configuration is not received after some +timeout, a [`asyncio.TimeoutError`][] will be raised (and if uncaught, the actor will +be automatically restarted after some delay). + +Example: Actor that can run without a configuration (using a default configuration) + ```python title="actor.py" hl_lines="18 34 42 62 64" + import dataclasses + import logging + from collections.abc import Sequence + from datetime import timedelta + from typing import assert_never + + from frequenz.channels import select, selected_from + from frequenz.channels.event import Event + + from frequenz.sdk.actor import Actor + from frequenz.sdk.config import ConfigManager, wait_for_first + + _logger = logging.getLogger(__name__) + + @dataclasses.dataclass(frozen=True, kw_only=True) + class MyActorConfig: + some_config: timedelta = dataclasses.field( + default=timedelta(seconds=42), # (1)! + metadata={"metadata": {"description": "Some optional configuration"}}, + ) + + class MyActor(Actor): + def __init__( + self, + config_manager: ConfigManager, + /, + *, + config_key: str | Sequence[str], + name: str | None = None, + ) -> None: + super().__init__(name=name) + self._config_manager = config_manager + self._config_key = config_key + self._config: MyActorConfig = MyActorConfig() # (2)! + + async def _run(self) -> None: + config_receiver = self._config_manager.new_receiver( + self._config_key, MyActorConfig + ) + self._update_config( + await wait_for_first( + config_receiver, receiver_name=str(self), allow_none=True # (3)! + ) + ) + + other_receiver = Event() + + async for selected in select(config_receiver, other_receiver): + if selected_from(selected, config_receiver): + self._update_config(selected.message) + elif selected_from(selected, other_receiver): + # Do something else + ... + + def _update_config(self, config_update: MyActorConfig | Exception | None) -> None: + match config_update: + case MyActorConfig() as config: + _logger.info("New configuration received, updating.") + self._reconfigure(config) + case None: + _logger.info("Configuration was unset, resetting to the default") + self._reconfigure(MyActorConfig()) # (4)! + case Exception(): + _logger.info( # (5)! + "New configuration has errors, keeping the old configuration." + ) + case unexpected: + assert_never(unexpected) + + def _reconfigure(self, config: MyActorConfig) -> None: + self._config = config + # Do something with the new configuration + ``` + + 1. This is different when the actor requires a configuration to run. Here, the + config has a default value. + 2. This is different when the actor requires a configuration to run. Here, the actor + can just instantiate a default configuration. + 3. This is different when the actor requires a configuration to run. Here, the actor + can accept a `None` configuration. + 4. This is different when the actor requires a configuration to run. Here, the actor + can reset to a default configuration. + 5. There is no need to log the error itself, the configuration manager will log it + automatically. + +Example: Actor that requires a configuration to run + ```python title="actor.py" hl_lines="17 33 40 58 60" + import dataclasses + import logging + from collections.abc import Sequence + from datetime import timedelta + from typing import assert_never + + from frequenz.channels import select, selected_from + from frequenz.channels.event import Event + + from frequenz.sdk.actor import Actor + from frequenz.sdk.config import ConfigManager, wait_for_first + + _logger = logging.getLogger(__name__) + + @dataclasses.dataclass(frozen=True, kw_only=True) + class MyActorConfig: + some_config: timedelta = dataclasses.field( # (1)! + metadata={"metadata": {"description": "Some required configuration"}}, + ) + + class MyActor(Actor): + def __init__( + self, + config_manager: ConfigManager, + /, + *, + config_key: str | Sequence[str], + name: str | None = None, + ) -> None: + super().__init__(name=name) + self._config_manager = config_manager + self._config_key = config_key + self._config: MyActorConfig # (2)! + + async def _run(self) -> None: + config_receiver = self._config_manager.new_receiver( + self._config_key, MyActorConfig + ) + self._update_config( + await wait_for_first(config_receiver, receiver_name=str(self)) # (3)! + ) + + other_receiver = Event() + + async for selected in select(config_receiver, other_receiver): + if selected_from(selected, config_receiver): + self._update_config(selected.message) + elif selected_from(selected, other_receiver): + # Do something else + ... + + def _update_config(self, config_update: MyActorConfig | Exception | None) -> None: + match config_update: + case MyActorConfig() as config: + _logger.info("New configuration received, updating.") + self._reconfigure(config) + case None: + _logger.info("Configuration was unset, keeping the old configuration.") # (4)! + case Exception(): + _logger.info( # (5)! + "New configuration has errors, keeping the old configuration." + ) + case unexpected: + assert_never(unexpected) + + def _reconfigure(self, config: MyActorConfig) -> None: + self._config = config + # Do something with the new configuration + ``` + + 1. This is different when the actor can use a default configuration. Here, the + field is required, so there is no default configuration possible. + 2. This is different when the actor can use a default configuration. Here, the + assignment of the configuration is delayed to the `_run()` method. + 3. This is different when the actor can use a default configuration. Here, the actor + doesn't accept `None` as a valid configuration as it can't create a default + configuration. + 4. This is different when the actor can use a default configuration. Here, the actor + doesn't accept `None` as a valid configuration as it can't create a default + configuration, so it needs to keep the old configuration. + 5. There is no need to log the error itself, the configuration manager will log it + automatically. + + +Example: Application + The pattern used by the application is very similar to the one used by actors. In + this case the application requires a configuration to run, but if it could also use + a default configuration, the changes would be the same as in the actor examples. + + ```python title="app.py" hl_lines="14" + import asyncio + import dataclasses + import logging + import pathlib + from collections.abc import Sequence + from datetime import timedelta + from typing import Sequence, assert_never + + from frequenz.sdk.actor import Actor + from frequenz.sdk.config import ConfigManager, wait_for_first + + _logger = logging.getLogger(__name__) + + class MyActor(Actor): # (1)! + def __init__( + self, config_manager: ConfigManager, /, *, config_key: str | Sequence[str] + ) -> None: + super().__init__() + self._config_manager = config_manager + self._config_key = config_key + async def _run(self) -> None: ... + + @dataclasses.dataclass(frozen=True, kw_only=True) + class AppConfig: + enable_actor: bool = dataclasses.field( + metadata={"metadata": {"description": "Whether to enable the actor"}}, + ) + + class App: + def __init__(self, *, config_paths: Sequence[pathlib.Path]): + self._config_manager = ConfigManager(config_paths) + self._config_receiver = self._config_manager.new_receiver("app", AppConfig) + self._actor = MyActor(self._config_manager, config_key="actor") + + async def _update_config(self, config_update: AppConfig | Exception | None) -> None: + match config_update: + case AppConfig() as config: + _logger.info("New configuration received, updating.") + await self._reconfigure(config) + case None: + _logger.info("Configuration was unset, keeping the old configuration.") + case Exception(): + _logger.info("New configuration has errors, keeping the old configuration.") + case unexpected: + assert_never(unexpected) + + async def _reconfigure(self, config: AppConfig) -> None: + if config.enable_actor: + self._actor.start() + else: + await self._actor.stop() + + async def run(self) -> None: + _logger.info("Starting App...") + + async with self._config_manager: + await self._update_config( + await wait_for_first(self._config_receiver, receiver_name="app") + ) + + _logger.info("Waiting for configuration updates...") + async for config_update in self._config_receiver: + await self._reconfigure(config_update) + + if __name__ == "__main__": + asyncio.run(App(config_paths="config.toml").run()) + ``` + + 1. Look for the actor examples for a proper implementation of the actor. + + Example configuration file: + + ```toml title="config.toml" + [app] + enable_actor = true + + [actor] + some_config = 10 + + [logging.root_logger] + level = "DEBUG" + ``` +""" + +from ._base_schema import BaseConfigSchema from ._logging_actor import LoggerConfig, LoggingConfig, LoggingConfigUpdatingActor +from ._manager import ConfigManager, InvalidValueForKeyError, wait_for_first from ._managing_actor import ConfigManagingActor from ._util import load_config __all__ = [ + "BaseConfigSchema", + "ConfigManager", "ConfigManagingActor", + "InvalidValueForKeyError", "LoggerConfig", "LoggingConfig", "LoggingConfigUpdatingActor", "load_config", + "wait_for_first", ] diff --git a/src/frequenz/sdk/config/_base_schema.py b/src/frequenz/sdk/config/_base_schema.py new file mode 100644 index 000000000..d25611958 --- /dev/null +++ b/src/frequenz/sdk/config/_base_schema.py @@ -0,0 +1,10 @@ +# License: MIT +# Copyright © 2024 Frequenz Energy-as-a-Service GmbH + +"""Base schema for configuration classes.""" + +from frequenz.quantities.experimental.marshmallow import QuantitySchema + + +class BaseConfigSchema(QuantitySchema): + """A base schema for configuration classes.""" diff --git a/src/frequenz/sdk/config/_logging_actor.py b/src/frequenz/sdk/config/_logging_actor.py index d5d75bf5c..739e0be07 100644 --- a/src/frequenz/sdk/config/_logging_actor.py +++ b/src/frequenz/sdk/config/_logging_actor.py @@ -4,16 +4,14 @@ """Read and update logging severity from config.""" import logging -from collections.abc import Mapping from dataclasses import dataclass, field -from typing import Annotated, Any +from typing import Annotated, Sequence, assert_never import marshmallow import marshmallow.validate -from frequenz.channels import Receiver from ..actor import Actor -from ._util import load_config +from ._manager import ConfigManager, wait_for_first _logger = logging.getLogger(__name__) @@ -84,26 +82,13 @@ class LoggingConfigUpdatingActor(Actor): ```python import asyncio - from collections.abc import Mapping - from typing import Any - from frequenz.channels import Broadcast - from frequenz.sdk.config import LoggingConfigUpdatingActor, ConfigManagingActor + from frequenz.sdk.config import LoggingConfigUpdatingActor from frequenz.sdk.actor import run as run_actors async def run() -> None: - config_channel = Broadcast[Mapping[str, Any]](name="config", resend_latest=True) - actors = [ - ConfigManagingActor( - config_paths=["config.toml"], output=config_channel.new_sender() - ), - LoggingConfigUpdatingActor( - config_recv=config_channel.new_receiver(limit=1)).map( - lambda app_config: app_config.get("logging", {} - ) - ), - ] - await run_actors(*actors) + config_manager: ConfigManager = ... + await run_actors(LoggingConfigUpdatingActor(config_manager)) asyncio.run(run()) ``` @@ -112,10 +97,13 @@ async def run() -> None: will be updated as well. """ + # pylint: disable-next=too-many-arguments def __init__( self, + config_manager: ConfigManager, + /, *, - config_recv: Receiver[Mapping[str, Any]], + config_key: str | Sequence[str] = "logging", log_datefmt: str = "%Y-%m-%dT%H:%M:%S%z", log_format: str = "%(asctime)s %(levelname)-8s %(name)s:%(lineno)s: %(message)s", name: str | None = None, @@ -123,7 +111,9 @@ def __init__( """Initialize this instance. Args: - config_recv: The receiver to listen for configuration changes. + config_manager: The configuration manager to use. + config_key: The key to use to retrieve the configuration from the + configuration manager. If `None`, the whole configuration will be used. log_datefmt: Use the specified date/time format in logs. log_format: Use the specified format string in logs. name: The name of this actor. If `None`, `str(id(self))` will be used. This @@ -135,7 +125,9 @@ def __init__( in the application (through a previous `basicConfig()` call), then the format settings specified here will be ignored. """ - self._config_recv = config_recv + self._config_receiver = config_manager.new_receiver( + config_key, LoggingConfig, base_schema=None + ) # Setup default configuration. # This ensures logging is configured even if actor fails to start or @@ -149,21 +141,45 @@ def __init__( datefmt=log_datefmt, level=logging.INFO, ) - self._update_logging(self._current_config) + _logger.info("Applying initial default logging configuration...") + self._reconfigure(self._current_config) async def _run(self) -> None: """Listen for configuration changes and update logging.""" - async for message in self._config_recv: - try: - new_config = load_config(LoggingConfig, message) - except marshmallow.ValidationError: - _logger.exception( - "Invalid logging configuration received. Skipping config update" - ) - continue + self._reconfigure( + await wait_for_first( + self._config_receiver, receiver_name=str(self), allow_none=True + ) + ) + async for config_update in self._config_receiver: + self._reconfigure(config_update) + + def _reconfigure(self, config_update: LoggingConfig | Exception | None) -> None: + """Update the logging configuration. - if new_config != self._current_config: - self._update_logging(new_config) + Args: + config_update: The new configuration, or an exception if there was an error + parsing the configuration, or `None` if the configuration was unset. + """ + match config_update: + case LoggingConfig(): + _logger.info( + "New configuration received, updating logging configuration." + ) + self._update_logging(config_update) + case None: + _logger.info( + "Configuration was unset, resetting to the default " + "logging configuration." + ) + self._update_logging(LoggingConfig()) + case Exception(): + _logger.info( + "New configuration has errors, keeping the old logging " + "configuration." + ) + case unexpected: + assert_never(unexpected) def _update_logging(self, config: LoggingConfig) -> None: """Configure the logging level.""" diff --git a/src/frequenz/sdk/config/_manager.py b/src/frequenz/sdk/config/_manager.py new file mode 100644 index 000000000..61d969915 --- /dev/null +++ b/src/frequenz/sdk/config/_manager.py @@ -0,0 +1,515 @@ +# License: MIT +# Copyright © 2024 Frequenz Energy-as-a-Service GmbH + +"""Management of configuration.""" + +import asyncio +import logging +import pathlib +from collections.abc import Mapping, Sequence +from dataclasses import is_dataclass +from datetime import timedelta +from typing import Any, Final, Literal, TypeGuard, overload + +import marshmallow +from frequenz.channels import Broadcast, Receiver, ReceiverStoppedError +from frequenz.channels.experimental import WithPrevious +from marshmallow import Schema, ValidationError +from typing_extensions import override + +from ..actor._background_service import BackgroundService +from ._base_schema import BaseConfigSchema +from ._managing_actor import ConfigManagingActor +from ._util import DataclassT, load_config + +_logger = logging.getLogger(__name__) + + +class InvalidValueForKeyError(ValueError): + """An error indicating that the value under the specified key is invalid.""" + + def __init__(self, msg: str, *, key: Sequence[str], value: Any) -> None: + """Initialize this error. + + Args: + msg: The error message. + key: The key that has an invalid value. + value: The actual value that was found that is not a mapping. + """ + super().__init__(msg) + + self.key: Final[Sequence[str]] = key + """The key that has an invalid value.""" + + self.value: Final[Any] = value + """The actual value that was found that is not a mapping.""" + + +class ConfigManager(BackgroundService): + """A manager for configuration files. + + This class reads configuration files and sends the configuration to the receivers, + providing configuration key filtering and value validation. + + For a more in-depth introduction and examples, please read the [module + documentation][frequenz.sdk.config]. + """ + + def __init__( # pylint: disable=too-many-arguments + self, + config_paths: str | pathlib.Path | Sequence[pathlib.Path | str], + /, + *, + force_polling: bool = True, + logging_config_key: str | Sequence[str] | None = "logging", + name: str | None = None, + polling_interval: timedelta = timedelta(seconds=1), + ) -> None: + """Initialize this config manager. + + Args: + config_paths: The paths to the TOML files with the configuration. Order + matters, as the configuration will be read and updated in the order + of the paths, so the last path will override the configuration set by + the previous paths. Dict keys will be merged recursively, but other + objects (like lists) will be replaced by the value in the last path. + force_polling: Whether to force file polling to check for changes. + logging_config_key: The key to use for the logging configuration. If `None`, + logging configuration will not be managed. If a key is provided, the + manager update the logging configuration whenever the configuration + changes. + name: A name to use when creating actors. If `None`, `str(id(self))` will + be used. This is used mostly for debugging purposes. + polling_interval: The interval to poll for changes. Only relevant if + polling is enabled. + """ + super().__init__(name=name) + + self.config_channel: Final[Broadcast[Mapping[str, Any]]] = Broadcast( + name=f"{self}_config", resend_latest=True + ) + """The channel used for sending configuration updates (resends the latest value). + + This is the channel used to communicate with the + [`ConfigManagingActor`][frequenz.sdk.config.ConfigManager.config_actor] and will + receive the complete raw configuration as a mapping. + """ + + self.config_actor: Final[ConfigManagingActor] = ConfigManagingActor( + config_paths, + self.config_channel.new_sender(), + name=self.name, + force_polling=force_polling, + polling_interval=polling_interval, + ) + """The actor that manages the configuration for this manager.""" + + # pylint: disable-next=import-outside-toplevel,cyclic-import + from ._logging_actor import LoggingConfigUpdatingActor + + self.logging_actor: Final[LoggingConfigUpdatingActor | None] = ( + None + if logging_config_key is None + else LoggingConfigUpdatingActor( + self, config_key=logging_config_key, name=self.name + ) + ) + """The actor that manages the logging configuration for this manager.""" + + @override + def start(self) -> None: + """Start this config manager.""" + self.config_actor.start() + if self.logging_actor: + self.logging_actor.start() + + @property + @override + def is_running(self) -> bool: + """Whether this config manager is running.""" + return self.config_actor.is_running or ( + self.logging_actor is not None and self.logging_actor.is_running + ) + + @override + def cancel(self, msg: str | None = None) -> None: + """Cancel all running tasks and actors spawned by this config manager. + + Args: + msg: The message to be passed to the tasks being cancelled. + """ + if self.logging_actor: + self.logging_actor.cancel(msg) + self.config_actor.cancel(msg) + + @override + async def wait(self) -> None: + """Wait this config manager to finish. + + Wait until all tasks and actors are finished. + + Raises: + BaseExceptionGroup: If any of the tasks spawned by this service raised an + exception (`CancelError` is not considered an error and not returned in + the exception group). + """ + exceptions: list[BaseException] = [] + if self.logging_actor: + try: + await self.logging_actor + except BaseExceptionGroup as err: # pylint: disable=try-except-raise + exceptions.append(err) + + try: + await self.config_actor + except BaseExceptionGroup as err: # pylint: disable=try-except-raise + exceptions.append(err) + + if exceptions: + raise BaseExceptionGroup(f"Error while stopping {self!r}", exceptions) + + @override + def __repr__(self) -> str: + """Return a string representation of this config manager.""" + logging_actor = ( + f"logging_actor={self.logging_actor!r}, " if self.logging_actor else "" + ) + return ( + f"<{self.__class__.__name__}: " + f"name={self.name!r}, " + f"config_channel={self.config_channel!r}, " + + logging_actor + + f"config_actor={self.config_actor!r}>" + ) + + def new_receiver( # pylint: disable=too-many-arguments + self, + # This is tricky, because a str is also a Sequence[str], if we would use only + # Sequence[str], then a regular string would also be accepted and taken as + # a sequence, like "key" -> ["k", "e", "y"]. We should never remove the str from + # the allowed types without changing Sequence[str] to something more specific, + # like list[str] or tuple[str] (but both have their own problems). + key: str | Sequence[str], + config_class: type[DataclassT], + /, + *, + skip_unchanged: bool = True, + base_schema: type[Schema] | None = BaseConfigSchema, + marshmallow_load_kwargs: dict[str, Any] | None = None, + ) -> Receiver[DataclassT | Exception | None]: + """Create a new receiver for receiving the configuration for a particular key. + + This method has a lot of features and functionalities to make it easier to + receive configurations, but it also imposes some restrictions on how the + configurations are received. If you need more control over the configuration + receiver, you can create a receiver directly using + [`config_channel.new_receiver()`][frequenz.sdk.config.ConfigManager.config_channel]. + + For a more in-depth introduction and examples, please read the [module + documentation][frequenz.sdk.config]. + + Args: + key: The configuration key to be read by the receiver. If a sequence of + strings is used, it is used as a sub-key. + config_class: The class object to use to instantiate a configuration. The + configuration will be validated against this type too using + [`marshmallow_dataclass`][]. + skip_unchanged: Whether to skip sending the configuration if it hasn't + changed compared to the last one received. + base_schema: An optional class to be used as a base schema for the + configuration class. This allow using custom fields for example. Will be + passed to [`marshmallow_dataclass.class_schema`][]. + marshmallow_load_kwargs: Additional arguments to be passed to + [`marshmallow.Schema.load`][]. + + Returns: + The receiver for the configuration. + + Raises: + ValueError: If the `unknown` option in `marshmallow_load_kwargs` is set to + [`marshmallow.INCLUDE`][]. + """ + marshmallow_load_kwargs = ( + {} if marshmallow_load_kwargs is None else marshmallow_load_kwargs.copy() + ) + + if "unknown" not in marshmallow_load_kwargs: + marshmallow_load_kwargs["unknown"] = marshmallow.EXCLUDE + elif marshmallow_load_kwargs["unknown"] == marshmallow.INCLUDE: + raise ValueError( + "The 'unknown' option can't be 'INCLUDE' when loading to a dataclass" + ) + + receiver = self.config_channel.new_receiver(name=f"{self}:{key}", limit=1).map( + lambda config: _load_config_with_logging_and_errors( + config, + config_class, + key=key, + base_schema=base_schema, + marshmallow_load_kwargs=marshmallow_load_kwargs, + ) + ) + + if skip_unchanged: + # For some reason the type argument for WithPrevious is not inferred + # correctly, so we need to specify it explicitly. + return receiver.filter( + WithPrevious[DataclassT | Exception | None]( + lambda old, new: _not_equal_with_logging( + key=key, old_value=old, new_value=new + ) + ) + ) + + return receiver + + +@overload +async def wait_for_first( + receiver: Receiver[DataclassT | Exception | None], + /, + *, + receiver_name: str | None = None, + allow_none: Literal[False] = False, + timeout: timedelta = timedelta(minutes=1), +) -> DataclassT: ... + + +@overload +async def wait_for_first( + receiver: Receiver[DataclassT | Exception | None], + /, + *, + receiver_name: str | None = None, + allow_none: Literal[True] = True, + timeout: timedelta = timedelta(minutes=1), +) -> DataclassT | None: ... + + +async def wait_for_first( + receiver: Receiver[DataclassT | Exception | None], + /, + *, + receiver_name: str | None = None, + allow_none: bool = False, + timeout: timedelta = timedelta(minutes=1), +) -> DataclassT | None: + """Wait for and receive the the first configuration. + + For a more in-depth introduction and examples, please read the [module + documentation][frequenz.sdk.config]. + + Args: + receiver: The receiver to receive the first configuration from. + receiver_name: The name of the receiver, used for logging. If `None`, the + string representation of the receiver will be used. + allow_none: Whether consider a `None` value as a valid configuration. + timeout: The timeout in seconds to wait for the first configuration. + + Returns: + The first configuration received. + + Raises: + asyncio.TimeoutError: If the first configuration is not received within the + timeout. + ReceiverStoppedError: If the receiver is stopped before the first configuration + is received. + """ + if receiver_name is None: + receiver_name = str(receiver) + + # We need this type guard because we can't use a TypeVar for isinstance checks or + # match cases. + def is_config_class(value: DataclassT | Exception | None) -> TypeGuard[DataclassT]: + return is_dataclass(value) if value is not None else False + + _logger.info( + "%s: Waiting %s seconds for the first configuration to arrive...", + receiver_name, + timeout.total_seconds(), + ) + try: + async with asyncio.timeout(timeout.total_seconds()): + async for config in receiver: + match config: + case None: + if allow_none: + return None + _logger.error( + "%s: Received empty configuration, waiting again for " + "a first configuration to be set.", + receiver_name, + ) + case Exception() as error: + _logger.error( + "%s: Error while receiving the first configuration, " + "will keep waiting for an update: %s.", + receiver_name, + error, + ) + case config if is_config_class(config): + _logger.info("%s: Received first configuration.", receiver_name) + return config + case unexpected: + assert ( + False + ), f"{receiver_name}: Unexpected value received: {unexpected!r}." + except asyncio.TimeoutError: + _logger.error("%s: No configuration received in time.", receiver_name) + raise + raise ReceiverStoppedError(receiver) + + +def _not_equal_with_logging( + *, + key: str | Sequence[str], + old_value: DataclassT | Exception | None, + new_value: DataclassT | Exception | None, +) -> bool: + """Return whether the two mappings are not equal, logging if they are the same.""" + if old_value == new_value: + _logger.info("Configuration has not changed for key %r, skipping update.", key) + return False + + if isinstance(new_value, InvalidValueForKeyError) and not isinstance( + old_value, InvalidValueForKeyError + ): + subkey_str = "" + if key != new_value.key: + subkey_str = f"When looking for sub-key {key!r}: " + _logger.error( + "%sConfiguration for key %r has an invalid value: %r", + subkey_str, + new_value.key, + new_value.value, + ) + return True + + +def _load_config_with_logging_and_errors( + config: Mapping[str, Any], + config_class: type[DataclassT], + *, + key: str | Sequence[str], + base_schema: type[Schema] | None = None, + marshmallow_load_kwargs: dict[str, Any] | None = None, +) -> DataclassT | Exception | None: + """Load the configuration for the specified key, logging errors and returning them.""" + try: + sub_config = _get_key(config, key) + if sub_config is None: + _logger.debug("Configuration key %r not found, sending None", key) + return None + + loaded_config = _load_config( + sub_config, + config_class, + key=key, + base_schema=base_schema, + marshmallow_load_kwargs=marshmallow_load_kwargs, + ) + _logger.debug("Received new configuration: %s", loaded_config) + return loaded_config + except InvalidValueForKeyError as error: + if len(key) > 1 and key != error.key: + _logger.error("Error when looking for sub-key %r: %s", key, error) + else: + _logger.error(str(error)) + return error + except ValidationError as error: + _logger.error("The configuration for key %r is invalid: %s", key, error) + return error + except Exception as error: # pylint: disable=broad-except + _logger.exception( + "An unexpected error occurred while loading the configuration for key %r: %s", + key, + error, + ) + return error + + +def _get_key( + config: Mapping[str, Any], + # This is tricky, because a str is also a Sequence[str], if we would use only + # Sequence[str], then a regular string would also be accepted and taken as + # a sequence, like "key" -> ["k", "e", "y"]. We should never remove the str from + # the allowed types without changing Sequence[str] to something more specific, + # like list[str] or tuple[str]. + key: str | Sequence[str], +) -> Mapping[str, Any] | None: + """Get the value from the configuration under the specified key. + + Args: + config: The configuration to get the value from. + key: The key to get the value for. + + Returns: + The value under the key, or `None` if the key is not found. + + Raises: + InvalidValueForKeyError: If the value under the key is not a mapping. + """ + # We first normalize to a Sequence[str] to make it easier to work with. + if isinstance(key, str): + key = (key,) + value = config + current_path = [] + for subkey in key: + current_path.append(subkey) + if value is None: + return None + match value.get(subkey): + case None: + return None + case Mapping() as new_value: + value = new_value + case invalid_value: + raise InvalidValueForKeyError( + f"Value for key {current_path!r} is not a mapping: {invalid_value!r}", + key=current_path, + value=invalid_value, + ) + value = new_value + return value + + +def _load_config( + config: Mapping[str, Any], + config_class: type[DataclassT], + *, + key: str | Sequence[str], + base_schema: type[Schema] | None = BaseConfigSchema, + marshmallow_load_kwargs: dict[str, Any] | None = None, +) -> DataclassT | InvalidValueForKeyError | ValidationError | None: + """Try to load a configuration and log any validation errors.""" + loaded_config = load_config( + config_class, + config, + base_schema=base_schema, + marshmallow_load_kwargs=marshmallow_load_kwargs, + ) + + marshmallow_load_kwargs = ( + {} if marshmallow_load_kwargs is None else marshmallow_load_kwargs.copy() + ) + + unknown = marshmallow_load_kwargs.get("unknown") + if unknown == marshmallow.EXCLUDE: + # When excluding unknown fields we still want to notify the user, as + # this could mean there is a typo in the configuration and some value is + # not being loaded as desired. + marshmallow_load_kwargs["unknown"] = marshmallow.RAISE + try: + load_config( + config_class, + config, + base_schema=base_schema, + marshmallow_load_kwargs=marshmallow_load_kwargs, + ) + except ValidationError as err: + _logger.warning( + "The configuration for key %r has extra fields that will be ignored: %s", + key, + err, + ) + return loaded_config diff --git a/src/frequenz/sdk/config/_managing_actor.py b/src/frequenz/sdk/config/_managing_actor.py index 4bb322803..9c91cac21 100644 --- a/src/frequenz/sdk/config/_managing_actor.py +++ b/src/frequenz/sdk/config/_managing_actor.py @@ -72,7 +72,7 @@ class ConfigManagingActor(Actor): # pylint: disable-next=too-many-arguments def __init__( self, - config_paths: abc.Sequence[pathlib.Path | str], + config_paths: str | pathlib.Path | abc.Sequence[pathlib.Path | str], output: Sender[abc.Mapping[str, Any]], *, name: str | None = None, @@ -93,16 +93,29 @@ def __init__( force_polling: Whether to force file polling to check for changes. polling_interval: The interval to poll for changes. Only relevant if polling is enabled. + + Raises: + ValueError: If no configuration path is provided. """ super().__init__(name=name) - self._config_paths: list[pathlib.Path] = [ - ( - config_path - if isinstance(config_path, pathlib.Path) - else pathlib.Path(config_path) - ) - for config_path in config_paths - ] + match config_paths: + case str(): + self._config_paths = [pathlib.Path(config_paths)] + case pathlib.Path(): + self._config_paths = [config_paths] + case abc.Sequence() as seq if len(seq) == 0: + raise ValueError("At least one config path is required.") + case abc.Sequence(): + self._config_paths = [ + ( + config_path + if isinstance(config_path, pathlib.Path) + else pathlib.Path(config_path) + ) + for config_path in config_paths + ] + case unexpected: + assert_never(unexpected) self._output: Sender[abc.Mapping[str, Any]] = output self._force_polling: bool = force_polling self._polling_interval: timedelta = polling_interval @@ -117,24 +130,26 @@ def _read_config(self) -> abc.Mapping[str, Any] | None: config: dict[str, Any] = {} for config_path in self._config_paths: - _logger.info("%s: Reading configuration file %s...", self, config_path) + _logger.info( + "[%s] Reading configuration file %r...", self.name, str(config_path) + ) try: with config_path.open("rb") as toml_file: data = tomllib.load(toml_file) _logger.info( - "%s: Configuration file %r read successfully.", - self, + "[%s] Configuration file %r read successfully.", + self.name, str(config_path), ) config = _recursive_update(config, data) except ValueError as err: - _logger.error("%s: Can't read config file, err: %s", self, err) + _logger.error("[%s] Can't read config file, err: %s", self.name, err) error_count += 1 except OSError as err: # It is ok for config file to don't exist. _logger.error( - "%s: Error reading config file %r (%s). Ignoring it.", - self, + "[%s] Error reading config file %r (%s). Ignoring it.", + self.name, str(config_path), err, ) @@ -142,13 +157,13 @@ def _read_config(self) -> abc.Mapping[str, Any] | None: if error_count == len(self._config_paths): _logger.error( - "%s: Can't read any of the config files, ignoring config update.", self + "[%s] Can't read any of the config files, ignoring config update.", self ) return None _logger.info( - "%s: Read %s/%s configuration files successfully.", - self, + "[%s] Read %s/%s configuration files successfully.", + self.name, len(self._config_paths) - error_count, len(self._config_paths), ) @@ -185,8 +200,8 @@ async def _run(self) -> None: async for event in file_watcher: if not event.path.exists(): _logger.error( - "%s: Received event %s, but the watched path %s doesn't exist.", - self, + "[%s] Received event %s, but the watched path %s doesn't exist.", + self.name, event, event.path, ) @@ -207,23 +222,23 @@ async def _run(self) -> None: match event.type: case EventType.CREATE: _logger.info( - "%s: The configuration file %s was created, sending new config...", - self, + "[%s] The configuration file %s was created, sending new config...", + self.name, event.path, ) await self.send_config() case EventType.MODIFY: _logger.info( - "%s: The configuration file %s was modified, sending update...", - self, + "[%s] The configuration file %s was modified, sending update...", + self.name, event.path, ) await self.send_config() case EventType.DELETE: _logger.error( - "%s: Unexpected DELETE event for path %s. Please report this " + "[%s] Unexpected DELETE event for path %s. Please report this " "issue to Frequenz.", - self, + self.name, event.path, ) case _: diff --git a/tests/config/test_logging_actor.py b/tests/config/test_logging_actor.py index 75c1e256c..71b020b41 100644 --- a/tests/config/test_logging_actor.py +++ b/tests/config/test_logging_actor.py @@ -5,7 +5,6 @@ import asyncio import logging -from collections.abc import Mapping from typing import Any import pytest @@ -77,78 +76,69 @@ async def test_logging_config_updating_actor( # is not working anyway - python ignores it. mocker.patch("frequenz.sdk.config._logging_actor.logging.basicConfig") - config_channel = Broadcast[Mapping[str, Any]](name="config") - config_sender = config_channel.new_sender() - async with LoggingConfigUpdatingActor( - config_recv=config_channel.new_receiver().map( - lambda app_config: app_config.get("logging", {}) - ) - ) as actor: + # Mock ConfigManager + mock_config_manager = mocker.Mock() + mock_config_manager.config_channel = Broadcast[LoggingConfig | Exception | None]( + name="config" + ) + mock_config_manager.new_receiver = mocker.Mock( + return_value=mock_config_manager.config_channel.new_receiver() + ) + + async with LoggingConfigUpdatingActor(mock_config_manager) as actor: assert logging.getLogger("frequenz.sdk.actor").level == logging.NOTSET assert logging.getLogger("frequenz.sdk.timeseries").level == logging.NOTSET update_logging_spy = mocker.spy(actor, "_update_logging") # Send first config - await config_sender.send( - { - "logging": { - "root_logger": {"level": "ERROR"}, - "loggers": { - "frequenz.sdk.actor": {"level": "DEBUG"}, - "frequenz.sdk.timeseries": {"level": "ERROR"}, - }, - } - } + expected_config = LoggingConfig( + root_logger=LoggerConfig(level="ERROR"), + loggers={ + "frequenz.sdk.actor": LoggerConfig(level="DEBUG"), + "frequenz.sdk.timeseries": LoggerConfig(level="ERROR"), + }, ) + await mock_config_manager.config_channel.new_sender().send(expected_config) await asyncio.sleep(0.01) - update_logging_spy.assert_called_once_with( - LoggingConfig( - root_logger=LoggerConfig(level="ERROR"), - loggers={ - "frequenz.sdk.actor": LoggerConfig(level="DEBUG"), - "frequenz.sdk.timeseries": LoggerConfig(level="ERROR"), - }, - ) - ) + update_logging_spy.assert_called_once_with(expected_config) assert logging.getLogger("frequenz.sdk.actor").level == logging.DEBUG assert logging.getLogger("frequenz.sdk.timeseries").level == logging.ERROR update_logging_spy.reset_mock() - # Update config - await config_sender.send( - { - "logging": { - "root_logger": {"level": "WARNING"}, - "loggers": { - "frequenz.sdk.actor": {"level": "INFO"}, - }, - } - } + # Send an exception and verify the previous config is maintained + await mock_config_manager.config_channel.new_sender().send( + ValueError("Test error") ) await asyncio.sleep(0.01) + update_logging_spy.assert_not_called() # Should not try to update logging + # Previous config should be maintained + assert logging.getLogger("frequenz.sdk.actor").level == logging.DEBUG + assert logging.getLogger("frequenz.sdk.timeseries").level == logging.ERROR + assert ( + actor._current_config == expected_config # pylint: disable=protected-access + ) # pylint: disable=protected-access + update_logging_spy.reset_mock() + + # Update config expected_config = LoggingConfig( root_logger=LoggerConfig(level="WARNING"), loggers={ "frequenz.sdk.actor": LoggerConfig(level="INFO"), }, ) + await mock_config_manager.config_channel.new_sender().send(expected_config) + await asyncio.sleep(0.01) update_logging_spy.assert_called_once_with(expected_config) assert logging.getLogger("frequenz.sdk.actor").level == logging.INFO assert logging.getLogger("frequenz.sdk.timeseries").level == logging.NOTSET update_logging_spy.reset_mock() - # Send invalid config to make sure actor doesn't crash and doesn't setup invalid config. - await config_sender.send({"logging": {"root_logger": {"level": "UNKNOWN"}}}) - await asyncio.sleep(0.01) - update_logging_spy.assert_not_called() - assert actor._current_config == expected_config - update_logging_spy.reset_mock() - - # Send empty config to reset logging to default - await config_sender.send({"other": {"var1": 1}}) + # Send a None config to make sure actor doesn't crash and configures a default logging + await mock_config_manager.config_channel.new_sender().send(None) await asyncio.sleep(0.01) update_logging_spy.assert_called_once_with(LoggingConfig()) - assert logging.getLogger("frequenz.sdk.actor").level == logging.NOTSET - assert logging.getLogger("frequenz.sdk.timeseries").level == logging.NOTSET + assert ( + actor._current_config == LoggingConfig() # pylint: disable=protected-access + ) update_logging_spy.reset_mock() diff --git a/tests/config/test_manager.py b/tests/config/test_manager.py new file mode 100644 index 000000000..b36154758 --- /dev/null +++ b/tests/config/test_manager.py @@ -0,0 +1,462 @@ +# License: MIT +# Copyright © 2024 Frequenz Energy-as-a-Service GmbH + +"""Tests for the config manager module.""" + + +import asyncio +import dataclasses +import logging +import pathlib +from collections.abc import Mapping, Sequence +from dataclasses import dataclass +from datetime import timedelta +from typing import Any, assert_never + +import marshmallow +import pytest +import pytest_mock + +from frequenz.sdk.config import ConfigManager, InvalidValueForKeyError, wait_for_first +from frequenz.sdk.config._manager import _get_key + + +@dataclass +class SimpleConfig: + """A simple configuration class for testing.""" + + name: str = dataclasses.field(metadata={"validate": lambda s: s.startswith("test")}) + value: int + + +@dataclass(frozen=True, kw_only=True) +class ReceiverTestCase: + """A test case for testing new_receiver configurations.""" + + title: str + key: str | tuple[str, ...] + config_class: type[SimpleConfig] + input_config: dict[str, Any] + expected_output: Any | None + base_schema: type[marshmallow.Schema] | None = None + marshmallow_load_kwargs: dict[str, Any] | None = None + + +# Test cases for new_receiver +receiver_test_cases = [ + ReceiverTestCase( + title="Basic Config", + key="test", + config_class=SimpleConfig, + input_config={"test": {"name": "test1", "value": 42}}, + expected_output=SimpleConfig(name="test1", value=42), + ), + ReceiverTestCase( + title="Nested Key Config", + key=("nested", "config"), + config_class=SimpleConfig, + input_config={"nested": {"config": {"name": "test2", "value": 43}}}, + expected_output=SimpleConfig(name="test2", value=43), + ), + ReceiverTestCase( + title="Validation Error", + key="test", + config_class=SimpleConfig, + input_config={"test": {"name": "no-test1", "value": 42}}, + expected_output="{'name': ['Invalid value.']}", + ), + ReceiverTestCase( + title="Invalid Value Type", + key="test", + config_class=SimpleConfig, + input_config={"test": "not a mapping"}, + expected_output="Value for key ['test'] is not a mapping: 'not a mapping'", + ), + ReceiverTestCase( + title="Raise on unknown", + key="test", + config_class=SimpleConfig, + marshmallow_load_kwargs={"unknown": marshmallow.RAISE}, + input_config={"test": {"name": "test3", "value": 44, "not_allowed": 42}}, + expected_output="{'not_allowed': ['Unknown field.']}", + ), + ReceiverTestCase( + title="Missing Key", + key="missing", + config_class=SimpleConfig, + input_config={"test": {"name": "test3", "value": 44}}, + expected_output=None, + ), +] + + +@pytest.mark.parametrize("test_case", receiver_test_cases, ids=lambda tc: tc.title) +async def test_new_receiver_configurations( + test_case: ReceiverTestCase, mocker: pytest_mock.MockFixture +) -> None: + """Test different configurations for new_receiver.""" + mocker.patch("frequenz.sdk.config._manager.ConfigManagingActor") + config_manager = ConfigManager([pathlib.Path("dummy.toml")]) + await config_manager.config_channel.new_sender().send(test_case.input_config) + receiver = config_manager.new_receiver( + test_case.key, + test_case.config_class, + base_schema=test_case.base_schema, + marshmallow_load_kwargs=test_case.marshmallow_load_kwargs, + ) + + async with asyncio.timeout(1): + result = await receiver.receive() + match result: + case SimpleConfig() | None: + assert result == test_case.expected_output + case Exception(): + assert str(result) == str(test_case.expected_output) + case unexpected: + assert_never(unexpected) + + +async def test_warn_on_unknown_key( + mocker: pytest_mock.MockerFixture, caplog: pytest.LogCaptureFixture +) -> None: + """Test that a warning is logged when an unknown key is received.""" + mocker.patch("frequenz.sdk.config._manager.ConfigManagingActor") + config_manager = ConfigManager([pathlib.Path("dummy.toml")]) + await config_manager.config_channel.new_sender().send( + {"test": {"name": "test3", "value": 44, "not_allowed": 42}} + ) + receiver = config_manager.new_receiver("test", SimpleConfig) + + async with asyncio.timeout(1): + await receiver.receive() + + expected_log_entry = ( + "frequenz.sdk.config._manager", + logging.WARNING, + "The configuration for key 'test' has extra fields that will be ignored: " + "{'not_allowed': ['Unknown field.']}", + ) + assert expected_log_entry in caplog.record_tuples + + +async def test_skip_config_update_bursts(mocker: pytest_mock.MockerFixture) -> None: + """Test that a burst of updates will only send the last update.""" + mocker.patch("frequenz.sdk.config._manager.ConfigManagingActor") + config_manager = ConfigManager([pathlib.Path("dummy.toml")]) + sender = config_manager.config_channel.new_sender() + receiver = config_manager.new_receiver( + "test", + SimpleConfig, + skip_unchanged=True, + ) + + await sender.send({"test": {"name": "test1", "value": 42}}) + await sender.send({"test": {"name": "test2", "value": 43}}) + await sender.send({"test": {"name": "test3", "value": 44}}) + + # Should only receive one orig_config and then the changed_config + async with asyncio.timeout(1): + result = await receiver.receive() + assert result == SimpleConfig(name="test3", value=44) + + # There should be no more messages + with pytest.raises(asyncio.TimeoutError): + async with asyncio.timeout(0.1): + await receiver.receive() + + +async def test_skip_unchanged_config(mocker: pytest_mock.MockerFixture) -> None: + """Test that unchanged configurations are skipped when skip_unchanged is True.""" + mocker.patch("frequenz.sdk.config._manager.ConfigManagingActor") + config_manager = ConfigManager([pathlib.Path("dummy.toml")]) + sender = config_manager.config_channel.new_sender() + receiver = config_manager.new_receiver( + "test", + SimpleConfig, + skip_unchanged=True, + ) + + # A first config should be received + orig_config = {"test": {"name": "test1", "value": 42}} + await sender.send(orig_config) + async with asyncio.timeout(1): + result = await receiver.receive() + assert result == SimpleConfig(name="test1", value=42) + + # An unchanged config should be skipped (no message received) + await sender.send(orig_config) + with pytest.raises(asyncio.TimeoutError): + async with asyncio.timeout(0.1): + await receiver.receive() + + # A changed config should be received + changed_config = {"test": {"name": "test2", "value": 43}} + await sender.send(changed_config) + async with asyncio.timeout(1): + result = await receiver.receive() + assert result == SimpleConfig(name="test2", value=43) + + # There should be no more messages + with pytest.raises(asyncio.TimeoutError): + async with asyncio.timeout(0.1): + await receiver.receive() + + +async def test_wait_for_first(mocker: pytest_mock.MockerFixture) -> None: + """Test wait_for_first function.""" + mocker.patch("frequenz.sdk.config._manager.ConfigManagingActor") + config_manager = ConfigManager([pathlib.Path("dummy.toml")]) + + receiver = config_manager.new_receiver( + "test", + SimpleConfig, + ) + + async with asyncio.timeout(0.2): + with pytest.raises(asyncio.TimeoutError): + await wait_for_first(receiver, timeout=timedelta(seconds=0.1)) + + # Test successful wait + await config_manager.config_channel.new_sender().send( + {"test": {"name": "test1", "value": 42}} + ) + async with asyncio.timeout(0.2): + result = await wait_for_first(receiver, timeout=timedelta(seconds=0.1)) + assert result == SimpleConfig(name="test1", value=42) + + +def test_unknown_include_not_supported() -> None: + """Test that unknown marshmallow load kwargs are not supported.""" + with pytest.raises(ValueError): + ConfigManager([pathlib.Path("dummy.toml")]).new_receiver( + "test", + SimpleConfig, + marshmallow_load_kwargs={"unknown": marshmallow.INCLUDE}, + ) + + +@pytest.mark.integration +class TestConfigManagerIntegration: + """Integration tests for ConfigManager.""" + + @pytest.fixture + def config_file(self, tmp_path: pathlib.Path) -> pathlib.Path: + """Create a temporary config file for testing.""" + config_file = tmp_path / "config.toml" + config_file.write_text( + """ + [test] + name = "test1" + value = 42 + + [logging.loggers.test] + level = "DEBUG" + """ + ) + return config_file + + async def test_full_config_flow(self, config_file: pathlib.Path) -> None: + """Test the complete flow of configuration management.""" + async with ( + # Disabling force_polling is a hack because of a bug in watchfiles not + # detecting sub-second changes when using polling. + ConfigManager([config_file], force_polling=False) as config_manager, + asyncio.timeout(1), + ): + receiver = config_manager.new_receiver("test", SimpleConfig) + first_config = await wait_for_first(receiver) + assert first_config == SimpleConfig(name="test1", value=42) + assert logging.getLogger("test").level == logging.DEBUG + + # Update config file + config_file.write_text( + """ + [test] + name = "test2" + value = 43 + + [logging.loggers.test] + level = "INFO" + """ + ) + + # Check updated config + config = await receiver.receive() + assert config == SimpleConfig(name="test2", value=43) + + # Check updated logging config + assert logging.getLogger("test").level == logging.INFO + + async def test_full_config_flow_without_logging( + self, config_file: pathlib.Path + ) -> None: + """Test the complete flow of configuration management without logging.""" + logging.getLogger("test").setLevel(logging.WARNING) + async with ( + # Disabling force_polling is a hack because of a bug in watchfiles not + # detecting sub-second changes when using polling. + ConfigManager( + [config_file], logging_config_key=None, force_polling=False + ) as config_manager, + asyncio.timeout(1), + ): + receiver = config_manager.new_receiver("test", SimpleConfig) + first_config = await wait_for_first(receiver) + assert first_config == SimpleConfig(name="test1", value=42) + assert logging.getLogger("test").level == logging.WARNING + + # Update config file + config_file.write_text( + """ + [test] + name = "test2" + value = 43 + + [logging.loggers.test] + level = "DEBUG" + """ + ) + + # Check updated config + config = await receiver.receive() + assert config == SimpleConfig(name="test2", value=43) + + # Check updated logging config + assert logging.getLogger("test").level == logging.WARNING + + +@dataclass(frozen=True) +class GetKeyTestCase: + """Test case for _get_key function.""" + + title: str + config: dict[str, Any] + key: str | Sequence[str] + expected_result: Mapping[str, Any] | None | type[InvalidValueForKeyError] + expected_error_key: list[str] | None = None + expected_error_value: Any | None = None + + +_get_key_test_cases = [ + # Simple string key tests + GetKeyTestCase( + title="Simple string key - exists", + config={"a": {"b": 1}}, + key="a", + expected_result={"b": 1}, + ), + GetKeyTestCase( + title="Simple string key - doesn't exist", + config={"a": {"b": 1}}, + key="x", + expected_result=None, + ), + GetKeyTestCase( + title="Simple string key - invalid value type", + config={"a": 42}, + key="a", + expected_result=InvalidValueForKeyError, + expected_error_key=["a"], + expected_error_value=42, + ), + # Sequence key tests + GetKeyTestCase( + title="Sequence key - all exist", + config={"a": {"b": {"c": {"d": 1}}}}, + key=["a", "b", "c"], + expected_result={"d": 1}, + ), + GetKeyTestCase( + title="Sequence key - middle doesn't exist", + config={"a": {"b": {"c": 1}}}, + key=["a", "x", "c"], + expected_result=None, + ), + GetKeyTestCase( + title="Sequence key - invalid value in middle", + config={"a": {"b": 42, "c": 1}}, + key=["a", "b", "c"], + expected_result=InvalidValueForKeyError, + expected_error_key=["a", "b"], + expected_error_value=42, + ), + GetKeyTestCase( + title="Empty sequence key", + config={"a": 1}, + key=[], + expected_result={"a": 1}, + ), + # Empty string tests + GetKeyTestCase( + title="Empty string key", + config={"": {"a": 1}}, + key="", + expected_result={"a": 1}, + ), + GetKeyTestCase( + title="Empty string in sequence", + config={"": {"": {"a": 1}}}, + key=["", ""], + expected_result={"a": 1}, + ), + # None value tests + GetKeyTestCase( + title="Value is None", + config={"a": None}, + key="a", + expected_result=None, + ), + GetKeyTestCase( + title="Nested None value", + config={"a": {"b": None}}, + key=["a", "b", "c"], + expected_result=None, + ), + # Special string key tests to verify string handling + GetKeyTestCase( + title="String that looks like a sequence", + config={"key": {"e": 1}}, + key="key", + expected_result={"e": 1}, + ), + GetKeyTestCase( + title="String with special characters", + config={"a.b": {"c": 1}}, + key="a.b", + expected_result={"c": 1}, + ), + # Nested mapping tests + GetKeyTestCase( + title="Deeply nested valid path", + config={"a": {"b": {"c": {"d": {"e": {"f": 1}}}}}}, + key=["a", "b", "c", "d", "e"], + expected_result={"f": 1}, + ), + GetKeyTestCase( + title="Mixed type nested invalid", + config={"a": {"b": [1, 2, 3]}}, + key=["a", "b"], + expected_result=InvalidValueForKeyError, + expected_error_key=["a", "b"], + expected_error_value=[1, 2, 3], + ), +] + + +@pytest.mark.parametrize("test_case", _get_key_test_cases, ids=lambda tc: tc.title) +def test_get_key(test_case: GetKeyTestCase) -> None: + """Test the _get_key function with various inputs. + + Args: + test_case: The test case to run. + """ + if test_case.expected_result is InvalidValueForKeyError: + with pytest.raises(InvalidValueForKeyError) as exc_info: + _get_key(test_case.config, test_case.key) + + # Verify the error details + assert exc_info.value.key == test_case.expected_error_key + assert exc_info.value.value == test_case.expected_error_value + else: + result = _get_key(test_case.config, test_case.key) + assert result == test_case.expected_result