Skip to content
Closed
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
2d282e4
Bump the minimum required channels version to v1.4.0
llucax Nov 22, 2024
b67acd6
Add a `ConfigManager` class
llucax Nov 15, 2024
29725c4
Add an option to wait for the first configuration
llucax Nov 22, 2024
2921ed8
Add option to only send changed configurations
llucax Nov 22, 2024
751d20e
Add support for subscribing to specific config keys
llucax Nov 22, 2024
3ccb4eb
Add support for validating configurations with a schema
llucax Nov 28, 2024
e39cd6b
Add support to filter a sub-key
llucax Nov 25, 2024
3129c42
Assert the receiver has the correct type
llucax Dec 6, 2024
63c96a0
Add support for skipping `None` configs
llucax Dec 6, 2024
77295dc
Add a global instance for the config manager
llucax Nov 22, 2024
cd08feb
Support using `BackgroundService` as a *mixin*
llucax Dec 6, 2024
fbe18de
Add a base config schema that provides quantities support
llucax Dec 6, 2024
3201348
Add a `Reconfigurable` *mixin*
llucax Dec 6, 2024
2878a2a
Make the `LoggingConfigUpdatingActor` `Reconfigurable`
llucax Dec 9, 2024
0b54413
Allow configuring logging via `ConfigManager`
llucax Dec 9, 2024
80fc626
Revert "Make the `LoggingConfigUpdatingActor` `Reconfigurable`"
llucax Dec 10, 2024
c34c199
Revert "Add a `Reconfigurable` *mixin*"
llucax Dec 10, 2024
05164ef
Revert "Support using `BackgroundService` as a *mixin*"
llucax Dec 10, 2024
dcd76fb
Revert "Add a global instance for the config manager"
llucax Dec 10, 2024
9215c15
WIP: Add full example in the `config` module.
llucax Nov 22, 2024
40dcc3f
Revert "Add an option to wait for the first configuration"
llucax Dec 10, 2024
1f96cec
Improve logging for configuration file reading
llucax Dec 10, 2024
61d4b11
Remove support for receiving raw mapping as configuration
llucax Dec 10, 2024
0cc7e94
Move note about update bursts to Skipping superfluous updates
llucax Dec 10, 2024
4189066
Exclude unknown fields from the config by default
llucax Dec 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 82 additions & 14 deletions src/frequenz/sdk/config/_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import pathlib
from collections.abc import Mapping, Sequence
from datetime import timedelta
from typing import Any, Final
from typing import Any, Final, overload

from frequenz.channels import Broadcast, Receiver
from frequenz.channels.experimental import WithPrevious
Expand Down Expand Up @@ -98,13 +98,31 @@ def __str__(self) -> str:
"""Return a string representation of this config manager."""
return f"{type(self).__name__}[{self.name}]"

@overload
async def new_receiver(
self,
*,
wait_for_first: bool = True,
skip_unchanged: bool = True,
) -> Receiver[Mapping[str, Any]]: ...

@overload
async def new_receiver(
self,
*,
wait_for_first: bool = True,
skip_unchanged: bool = True,
key: str,
) -> Receiver[Mapping[str, Any] | None]: ...

# The noqa DOC502 is needed because we raise TimeoutError indirectly.
async def new_receiver( # pylint: disable=too-many-arguments # noqa: DOC502
self,
*,
wait_for_first: bool = False,
Copy link
Contributor

@ela-kotulska-frequenz ela-kotulska-frequenz Dec 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider

  • removing wait_for_first_timeout from constructor arguments.
  • changing wait_for_first to `wait_for_first_timeout: timedelta | None = None```` :

If wait_for_first is None don't wait for it first config, if timedelta(X)- wait for X seconds.

Interface will simplify a little - we will have one config instead of 2.
Unless you see any use case to wait infinitelly for the first config.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After the presentation of this to a wider audience I'm really considering to remove the wait for first completely, I think it adds a lot of complexity only to deal with a very niche an obscure situation where there might be a bug and for some reason we don't receive the first config.

I think we can improve debugability of that situation by adding a timeout to the reading of the config files in the config managing actor instead (if we don't have it yet), which would be probably the only place where this could really hang. Also we might add more logging before and after reading the first config, so if you see a "Waiting for first config" without an immediate "Got the initial config: %s", config, that would mean the config reading somehow got stuck. We can now do this in the ConfigManager itself, so users don't need to remember to add these log lines.

What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm even more convinced we should remove this, as if we keep this option, the new_receiver() method needs to be async, and if it is async, it can be used in constructors, which complicates initialization even further.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, lets remove it :) It is not so difficult to write it now (after you simplified other things)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I finally moved it to an utility function. This removes all the complexity from the config manager, and if you need this functionality, you can just do a await wait_for_first() and that will get the default timeout if you don't pass one explicitly.

skip_unchanged: bool = True,
) -> Receiver[Mapping[str, Any] | None]:
key: str | None = None,
) -> Receiver[Mapping[str, Any]] | Receiver[Mapping[str, Any] | None]:
"""Create a new receiver for the configuration.

This method has a lot of features and functionalities to make it easier to
Expand All @@ -121,6 +139,14 @@ async def new_receiver( # pylint: disable=too-many-arguments # noqa: DOC502
The comparison is done using the *raw* `dict` to determine if the configuration
has changed.

### Filtering

The configuration can be filtered by a `key`.

If the `key` is `None`, the receiver will receive the full configuration,
otherwise only the part of the configuration under the specified key is
received, or `None` if the key is not found.

### Waiting for the first configuration

If `wait_for_first` is `True`, the receiver will wait for the first
Expand All @@ -146,6 +172,8 @@ async def new_receiver( # pylint: disable=too-many-arguments # noqa: DOC502
[`consume()`][frequenz.channels.Receiver.consume] on the receiver.
skip_unchanged: Whether to skip sending the configuration if it hasn't
changed compared to the last one received.
key: The key to filter the configuration. If `None`, the full configuration
will be received.

Returns:
The receiver for the configuration.
Expand All @@ -154,25 +182,65 @@ async def new_receiver( # pylint: disable=too-many-arguments # noqa: DOC502
asyncio.TimeoutError: If `wait_for_first` is `True` and the first
configuration can't be received in time.
"""
recv_name = f"{self}_receiver"
recv_name = f"{self}_receiver" if key is None else f"{self}_receiver_{key}"
receiver = self.config_channel.new_receiver(name=recv_name, limit=1)

if skip_unchanged:
receiver = receiver.filter(WithPrevious(not_equal_with_logging))
receiver = receiver.filter(WithPrevious(_NotEqualWithLogging(key)))

if wait_for_first:
async with asyncio.timeout(self.wait_for_first_timeout.total_seconds()):
await receiver.ready()

return receiver
if key is None:
return receiver

return receiver.map(lambda config: config.get(key))


def not_equal_with_logging(
old_config: Mapping[str, Any], new_config: Mapping[str, Any]
) -> bool:
"""Return whether the two mappings are not equal, logging if they are the same."""
if old_config == new_config:
_logger.info("Configuration has not changed, skipping update")
_logger.debug("Old configuration being kept: %r", old_config)
return False
return True
class _NotEqualWithLogging:
Copy link
Contributor

@ela-kotulska-frequenz ela-kotulska-frequenz Dec 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should simplify code a lot if you do map.filter instead of filter.map :)
In other words you first get the key and then apply all filters.

        if key is not None:
            receiver = receiver.map(lambda config: config.get(key))

        if skip_unchanged:
            # Now WithPrevious takes config[key] instead of whole config so it should simplify, too.
            receiver = receiver.filter(WithPrevious(_NotEqualWithLogging(key)))

        return receiver


class _NotEqualWithLogging:
    """A predicate that returns whether the two mappings are not equal.

    If the mappings are equal, a logging message will be emitted indicating that the
    configuration has not changed for the specified key.
    """

    def __init__(self, key: str | None = None) -> None:
        """Initialize this instance.

        Args:
            key: The key to use in the logging message.
        """
        self._key = key

    def __call__(
        self, old_config: Mapping[str, Any] | None, new_config: Mapping[str, Any] | None
    ) -> bool:
        """Return whether the two mappings are not equal, logging if they are the same."""
        if new_config != old_config:
            key_str = f" for key '{self._key}'" if self._key else ""
            _logger.info("Configuration%s has changed, updating", key_str)
            _logger.debug("New configuration%s being applied: %r", key_str, new_config)
            return True
        return False

Copy link
Contributor

@ela-kotulska-frequenz ela-kotulska-frequenz Dec 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The above should also work with other commits. But maybe I am missing something


        if key is not None:
            receiver = receiver.map(lambda config: _get_key(config, key))

        if skip_unchanged:
           # _NotEqualWithLogging will simplify
            receiver = receiver.filter(WithPrevious(_NotEqualWithLogging(key)))

        receiver = receiver.map(
            # load_config_with_logging  will simplify
            lambda config: load_config_with_logging(
                config_class, config, base_schema=base_schema, **marshmallow_load_kwargs
            )
        ).filter(_is_valid_or_none)

        if skip_none:
            receiver = receiver.filter(_is_dataclass)
        return receiver

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In other words you first get the key and then apply all filters.

Yes! I can't believe I missed that before, but in the new version I also reversed the oder of map and filter and it made the code much simpler!

"""A predicate that returns whether the two mappings are not equal.

If the mappings are equal, a logging message will be emitted indicating that the
configuration has not changed for the specified key.
"""

def __init__(self, key: str | None = None) -> None:
"""Initialize this instance.

Args:
key: The key to use in the logging message.
"""
self._key = key

def __call__(
self, old_config: Mapping[str, Any] | None, new_config: Mapping[str, Any] | None
) -> bool:
"""Return whether the two mappings are not equal, logging if they are the same."""
if self._key is None:
has_changed = new_config != old_config
else:
match (new_config, old_config):
case (None, None):
has_changed = False
case (None, Mapping()):
has_changed = old_config.get(self._key) is not None
case (Mapping(), None):
has_changed = new_config.get(self._key) is not None
case (Mapping(), Mapping()):
has_changed = new_config.get(self._key) != old_config.get(self._key)
case unexpected:
# We can't use `assert_never` here because `mypy` is having trouble
# narrowing the types of a tuple. See for example:
# https://github.com/python/mypy/issues/16722
# https://github.com/python/mypy/issues/16650
# https://github.com/python/mypy/issues/14833
# assert_never(unexpected)
assert False, f"Unexpected match: {unexpected}"

if not has_changed:
key_str = f" for key '{self._key}'" if self._key else ""
_logger.info("Configuration%s has not changed, skipping update", key_str)
_logger.debug("Old configuration%s being kept: %r", key_str, old_config)

return has_changed