Skip to content

Commit f762b5a

Browse files
committed
Add support for subscribing to specific config keys
This is useful for actors to be able to subscribe to a particular key with the actor configuration, avoiding spurious updates and noise in the received configuration. Signed-off-by: Leandro Lucarella <[email protected]>
1 parent 5195d0b commit f762b5a

File tree

1 file changed

+82
-14
lines changed

1 file changed

+82
-14
lines changed

src/frequenz/sdk/config/_manager.py

Lines changed: 82 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import pathlib
99
from collections.abc import Mapping, Sequence
1010
from datetime import timedelta
11-
from typing import Any, Final
11+
from typing import Any, Final, overload
1212

1313
from frequenz.channels import Broadcast, Receiver
1414
from frequenz.channels.experimental import WithPrevious
@@ -98,13 +98,31 @@ def __str__(self) -> str:
9898
"""Return a string representation of this config manager."""
9999
return f"{type(self).__name__}[{self.name}]"
100100

101+
@overload
102+
async def new_receiver(
103+
self,
104+
*,
105+
wait_for_first: bool = True,
106+
skip_unchanged: bool = True,
107+
) -> Receiver[Mapping[str, Any]]: ...
108+
109+
@overload
110+
async def new_receiver(
111+
self,
112+
*,
113+
wait_for_first: bool = True,
114+
skip_unchanged: bool = True,
115+
key: str,
116+
) -> Receiver[Mapping[str, Any] | None]: ...
117+
101118
# The noqa DOC502 is needed because we raise TimeoutError indirectly.
102119
async def new_receiver( # pylint: disable=too-many-arguments # noqa: DOC502
103120
self,
104121
*,
105122
wait_for_first: bool = False,
106123
skip_unchanged: bool = True,
107-
) -> Receiver[Mapping[str, Any] | None]:
124+
key: str | None = None,
125+
) -> Receiver[Mapping[str, Any]] | Receiver[Mapping[str, Any] | None]:
108126
"""Create a new receiver for the configuration.
109127
110128
This method has a lot of features and functionalities to make it easier to
@@ -121,6 +139,14 @@ async def new_receiver( # pylint: disable=too-many-arguments # noqa: DOC502
121139
The comparison is done using the *raw* `dict` to determine if the configuration
122140
has changed.
123141
142+
### Filtering
143+
144+
The configuration can be filtered by a `key`.
145+
146+
If the `key` is `None`, the receiver will receive the full configuration,
147+
otherwise only the part of the configuration under the specified key is
148+
received, or `None` if the key is not found.
149+
124150
### Waiting for the first configuration
125151
126152
If `wait_for_first` is `True`, the receiver will wait for the first
@@ -146,6 +172,8 @@ async def new_receiver( # pylint: disable=too-many-arguments # noqa: DOC502
146172
[`consume()`][frequenz.channels.Receiver.consume] on the receiver.
147173
skip_unchanged: Whether to skip sending the configuration if it hasn't
148174
changed compared to the last one received.
175+
key: The key to filter the configuration. If `None`, the full configuration
176+
will be received.
149177
150178
Returns:
151179
The receiver for the configuration.
@@ -154,25 +182,65 @@ async def new_receiver( # pylint: disable=too-many-arguments # noqa: DOC502
154182
asyncio.TimeoutError: If `wait_for_first` is `True` and the first
155183
configuration can't be received in time.
156184
"""
157-
recv_name = f"{self}_receiver"
185+
recv_name = f"{self}_receiver" if key is None else f"{self}_receiver_{key}"
158186
receiver = self.config_channel.new_receiver(name=recv_name, limit=1)
159187

160188
if skip_unchanged:
161-
receiver = receiver.filter(WithPrevious(not_equal_with_logging))
189+
receiver = receiver.filter(WithPrevious(_NotEqualWithLogging(key)))
162190

163191
if wait_for_first:
164192
async with asyncio.timeout(self.wait_for_first_timeout.total_seconds()):
165193
await receiver.ready()
166194

167-
return receiver
195+
if key is None:
196+
return receiver
197+
198+
return receiver.map(lambda config: config.get(key))
168199

169200

170-
def not_equal_with_logging(
171-
old_config: Mapping[str, Any], new_config: Mapping[str, Any]
172-
) -> bool:
173-
"""Return whether the two mappings are not equal, logging if they are the same."""
174-
if old_config == new_config:
175-
_logger.info("Configuration has not changed, skipping update")
176-
_logger.debug("Old configuration being kept: %r", old_config)
177-
return False
178-
return True
201+
class _NotEqualWithLogging:
202+
"""A predicate that returns whether the two mappings are not equal.
203+
204+
If the mappings are equal, a logging message will be emitted indicating that the
205+
configuration has not changed for the specified key.
206+
"""
207+
208+
def __init__(self, key: str | None = None) -> None:
209+
"""Initialize this instance.
210+
211+
Args:
212+
key: The key to use in the logging message.
213+
"""
214+
self._key = key
215+
216+
def __call__(
217+
self, old_config: Mapping[str, Any] | None, new_config: Mapping[str, Any] | None
218+
) -> bool:
219+
"""Return whether the two mappings are not equal, logging if they are the same."""
220+
if self._key is None:
221+
has_changed = new_config != old_config
222+
else:
223+
match (new_config, old_config):
224+
case (None, None):
225+
has_changed = False
226+
case (None, Mapping()):
227+
has_changed = old_config.get(self._key) is not None
228+
case (Mapping(), None):
229+
has_changed = new_config.get(self._key) is not None
230+
case (Mapping(), Mapping()):
231+
has_changed = new_config.get(self._key) != old_config.get(self._key)
232+
case unexpected:
233+
# We can't use `assert_never` here because `mypy` is having trouble
234+
# narrowing the types of a tuple. See for example:
235+
# https://github.com/python/mypy/issues/16722
236+
# https://github.com/python/mypy/issues/16650
237+
# https://github.com/python/mypy/issues/14833
238+
# assert_never(unexpected)
239+
assert False, f"Unexpected match: {unexpected}"
240+
241+
if not has_changed:
242+
key_str = f" for key '{self._key}'" if self._key else ""
243+
_logger.info("Configuration%s has not changed, skipping update", key_str)
244+
_logger.debug("Old configuration%s being kept: %r", key_str, old_config)
245+
246+
return has_changed

0 commit comments

Comments
 (0)