Skip to content

Commit 85fb413

Browse files
committed
Add support for subscribing to specific config keys
This is useful for actors to be able to subscribe to a particular key with the actor configuration, avoiding spurious updates and noise in the received configuration. Signed-off-by: Leandro Lucarella <[email protected]>
1 parent 3015347 commit 85fb413

File tree

1 file changed

+63
-8
lines changed

1 file changed

+63
-8
lines changed

src/frequenz/sdk/config/_manager.py

Lines changed: 63 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import pathlib
99
from collections.abc import Mapping, MutableMapping, Sequence, Set
1010
from datetime import timedelta
11-
from typing import Any
11+
from typing import Any, assert_never, overload
1212

1313
from frequenz.channels import Broadcast, Receiver
1414
from frequenz.channels.file_watcher import EventType
@@ -120,12 +120,30 @@ def __str__(self) -> str:
120120
"""Return a string representation of this config manager."""
121121
return f"{type(self).__name__}[{self._name}]"
122122

123+
@overload
124+
async def new_receiver(
125+
self,
126+
*,
127+
wait_for_first: bool = True,
128+
skip_unchanged: bool = True,
129+
) -> Receiver[Mapping[str, Any]]: ...
130+
131+
@overload
132+
async def new_receiver(
133+
self,
134+
*,
135+
wait_for_first: bool = True,
136+
skip_unchanged: bool = True,
137+
key: str,
138+
) -> Receiver[Mapping[str, Any] | None]: ...
139+
123140
# The noqa DOC502 is needed because we raise TimeoutError indirectly.
124141
async def new_receiver( # pylint: disable=too-many-arguments # noqa: DOC502
125142
self,
126143
*,
127144
wait_for_first: bool = False,
128145
skip_unchanged: bool = True,
146+
key: str | None = None,
129147
) -> Receiver[Mapping[str, Any] | None]:
130148
"""Create a new receiver for the configuration.
131149
@@ -143,6 +161,13 @@ async def new_receiver( # pylint: disable=too-many-arguments # noqa: DOC502
143161
The comparison is done using the *raw* `dict` to determine if the configuration
144162
has changed.
145163
164+
### Filtering
165+
166+
The configuration can be filtered by a `key`. If the `key` is `None`, the
167+
receiver will receive the full configuration, otherwise only the part of the
168+
configuration under the specified key is received, or `None` if the key is not
169+
found.
170+
146171
### Waiting for the first configuration
147172
148173
If `wait_for_first` is `True`, the receiver will wait for the first
@@ -168,6 +193,9 @@ async def new_receiver( # pylint: disable=too-many-arguments # noqa: DOC502
168193
[`consume()`][frequenz.channels.Receiver.consume] on the receiver.
169194
skip_unchanged: Whether to skip sending the configuration if it hasn't
170195
changed compared to the last one received.
196+
key: The key to filter the configuration. A nested key can be specified by
197+
using a dot (`.`) as separator. For example "key.subkey" will get only
198+
`config[key][subkey]` If `None`, all configurations will be received.
171199
172200
Returns:
173201
The receiver for the configuration.
@@ -176,11 +204,18 @@ async def new_receiver( # pylint: disable=too-many-arguments # noqa: DOC502
176204
asyncio.TimeoutError: If `wait_for_first` is `True` and the first
177205
configuration can't be received in time.
178206
"""
179-
recv_name = f"{self._name}_receiver"
207+
recv_name = (
208+
f"{self._name}_receiver" if key is None else f"{self._name}_receiver_{key}"
209+
)
180210
receiver = self._config_channel.new_receiver(name=recv_name, limit=1)
181211

182212
if skip_unchanged:
183-
receiver = receiver.filter(self._config_has_changed)
213+
receiver = receiver.filter(
214+
lambda config: self._config_has_changed(config, key)
215+
)
216+
217+
if key is not None:
218+
receiver = receiver.map(lambda config: config.get(key))
184219

185220
if wait_for_first:
186221
async with asyncio.timeout(self.wait_for_first_timeout.total_seconds()):
@@ -191,11 +226,31 @@ async def new_receiver( # pylint: disable=too-many-arguments # noqa: DOC502
191226
def _config_has_changed(
192227
self,
193228
new_config: Mapping[str, Any] | None,
229+
key: str | None,
194230
) -> bool:
195231
"""Return whether the configuration has changed."""
196232
old_config = self._last_raw_config
197-
if new_config != old_config:
198-
return True
199-
_logger.info("Configuration has not changed, skipping update")
200-
_logger.debug("Old configuration being kept: %r", old_config)
201-
return False
233+
if key is None:
234+
has_changed = new_config != old_config
235+
else:
236+
match (new_config, old_config):
237+
case (None, None):
238+
has_changed = False
239+
case (None, old):
240+
has_changed = key in old
241+
case (new, None):
242+
has_changed = key in new
243+
case (new, old):
244+
has_changed = new.get(key) != old.get(key)
245+
case unexpected:
246+
assert_never(unexpected)
247+
248+
if not has_changed:
249+
key_str = f" for key '{key}'" if key else ""
250+
_logger.info(
251+
"Configuration%s has not changed, skipping update",
252+
key_str,
253+
)
254+
_logger.debug("Old configuration%s being kept: %r", key_str, old_config)
255+
256+
return has_changed

0 commit comments

Comments
 (0)