Skip to content

Commit c7d8a52

Browse files
committed
Add an option to wait for the first configuration
This option is useful for when retrieving the configuration for the first time, as users might want to block the program's initialization until the configuration is read. Signed-off-by: Leandro Lucarella <[email protected]>
1 parent b206ab7 commit c7d8a52

File tree

1 file changed

+50
-2
lines changed

1 file changed

+50
-2
lines changed

src/frequenz/sdk/config/_manager.py

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
"""Management of configuration."""
55

6+
import asyncio
67
import pathlib
78
from collections.abc import Mapping, Sequence
89
from datetime import timedelta
@@ -29,6 +30,7 @@ def __init__( # pylint: disable=too-many-arguments
2930
force_polling: bool = True,
3031
name: str | None = None,
3132
polling_interval: timedelta = timedelta(seconds=5),
33+
wait_for_first_timeout: timedelta = timedelta(seconds=5),
3234
) -> None:
3335
"""Initialize this config manager.
3436
@@ -45,6 +47,10 @@ def __init__( # pylint: disable=too-many-arguments
4547
be used. This is used mostly for debugging purposes.
4648
polling_interval: The interval to poll for changes. Only relevant if
4749
polling is enabled.
50+
wait_for_first_timeout: The timeout to use when waiting for the first
51+
configuration in
52+
[`new_receiver`][frequenz.sdk.config.ConfigManager.new_receiver] if
53+
`wait_for_first` is `True`.
4854
"""
4955
self.name: Final[str] = str(id(self)) if name is None else name
5056
"""The name of this config manager."""
@@ -63,6 +69,14 @@ def __init__( # pylint: disable=too-many-arguments
6369
)
6470
"""The actor that manages the configuration."""
6571

72+
self.wait_for_first_timeout: timedelta = wait_for_first_timeout
73+
"""The timeout to use when waiting for the first configuration.
74+
75+
When passing `wait_for_first` as `True` to
76+
[`new_receiver`][frequenz.sdk.config.ConfigManager.new_receiver], this timeout
77+
will be used to wait for the first configuration to be received.
78+
"""
79+
6680
if auto_start:
6781
self.actor.start()
6882

@@ -71,6 +85,7 @@ def __repr__(self) -> str:
7185
return (
7286
f"<{self.__class__.__name__}: "
7387
f"name={self.name!r}, "
88+
f"wait_for_first_timeout={self.wait_for_first_timeout!r}, "
7489
f"config_channel={self.config_channel!r}, "
7590
f"actor={self.actor!r}>"
7691
)
@@ -80,20 +95,53 @@ def __str__(self) -> str:
8095
return f"{type(self).__name__}[{self.name}]"
8196

8297
# The noqa DOC502 is needed because we raise TimeoutError indirectly.
83-
async def new_receiver(self) -> Receiver[Mapping[str, Any] | None]: # noqa: DOC502
98+
async def new_receiver( # pylint: disable=too-many-arguments # noqa: DOC502
99+
self,
100+
*,
101+
wait_for_first: bool = False,
102+
) -> Receiver[Mapping[str, Any] | None]:
84103
"""Create a new receiver for the configuration.
85104
86105
Note:
87106
If there is a burst of configuration updates, the receiver will only
88107
receive the last configuration, older configurations will be ignored.
89108
109+
### Waiting for the first configuration
110+
111+
If `wait_for_first` is `True`, the receiver will wait for the first
112+
configuration to be received before returning the receiver. If the
113+
configuration can't be received in time, a timeout error will be raised.
114+
115+
If the configuration is received successfully, the first configuration can be
116+
simply retrieved by calling [`consume()`][frequenz.channels.Receiver.consume] on
117+
the receiver without blocking.
118+
90119
Example:
91120
```python
92121
# TODO: Add Example
93122
```
94123
124+
Args:
125+
wait_for_first: Whether to wait for the first configuration to be received
126+
before returning the receiver. If the configuration can't be received
127+
for
128+
[`wait_for_first_timeout`][frequenz.sdk.config.ConfigManager.wait_for_first_timeout]
129+
time, a timeout error will be raised. If receiving was successful, the
130+
first configuration can be simply retrieved by calling
131+
[`consume()`][frequenz.channels.Receiver.consume] on the receiver.
132+
95133
Returns:
96134
The receiver for the configuration.
135+
136+
Raises:
137+
asyncio.TimeoutError: If `wait_for_first` is `True` and the first
138+
configuration can't be received in time.
97139
"""
98140
recv_name = f"{self}_receiver"
99-
return self.config_channel.new_receiver(name=recv_name, limit=1)
141+
receiver = self.config_channel.new_receiver(name=recv_name, limit=1)
142+
143+
if wait_for_first:
144+
async with asyncio.timeout(self.wait_for_first_timeout.total_seconds()):
145+
await receiver.ready()
146+
147+
return receiver

0 commit comments

Comments
 (0)