Skip to content

Commit 7324eda

Browse files
committed
with now waits till dispatch is initialized
Signed-off-by: Mathias L. Baumann <[email protected]>
1 parent babed85 commit 7324eda

File tree

4 files changed

+42
-4
lines changed

4 files changed

+42
-4
lines changed

RELEASE_NOTES.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,5 @@ This release introduces a more flexible and powerful mechanism for managing disp
2525
* `Dispatcher.start_dispatching(dispatch_type, actor_factory, merge_strategy)` to manage your actor for the given type and merge strategy. All you need provide is an actor factory.
2626
* `Dispatcher.stop_dispatching(dispatch_type)` to stop dispatching for the given type.
2727
* `Dispatcher.is_managed(dispatch_type)` to check if dispatching is active for the given type.
28+
* A new method `Dispatcher.wait_for_initialization()` has been added to wait for all actors to be initialized.
29+
* When using `async with Dispatcher(..) as dispatcher`, the dispatcher will first wait for the dispatch service to be initialized before entering the block.

src/frequenz/dispatch/_bg_service.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,13 @@ def __init__(
129129
always at index 0.
130130
"""
131131

132+
self._initial_fetch_event = asyncio.Event()
133+
"""The initial fetch event."""
134+
135+
async def wait_for_initialization(self) -> None:
136+
"""Wait for the initial fetch to complete."""
137+
await self._initial_fetch_event.wait()
138+
132139
# pylint: disable=redefined-builtin
133140
def new_lifecycle_events_receiver(self, type: str) -> Receiver[DispatchEvent]:
134141
"""Create a new receiver for lifecycle events.
@@ -144,7 +151,10 @@ def new_lifecycle_events_receiver(self, type: str) -> Receiver[DispatchEvent]:
144151
)
145152

146153
async def new_running_state_event_receiver(
147-
self, type: str, *, merge_strategy: MergeStrategy | None = None
154+
self,
155+
type: str,
156+
*,
157+
merge_strategy: MergeStrategy | None = None,
148158
) -> Receiver[Dispatch]:
149159
"""Create a new receiver for running state events of the specified type.
150160
@@ -171,6 +181,7 @@ async def new_running_state_event_receiver(
171181
Args:
172182
type: The type of events to receive.
173183
merge_strategy: The merge strategy to use.
184+
174185
Returns:
175186
A new receiver for running state status.
176187
"""
@@ -272,6 +283,8 @@ async def _fetch(self) -> None:
272283
This is used for the initial fetch and for re-fetching all dispatches
273284
if the connection was lost.
274285
"""
286+
self._initial_fetch_event.clear()
287+
275288
old_dispatches = self._dispatches
276289
self._dispatches = {}
277290

@@ -311,6 +324,8 @@ async def _fetch(self) -> None:
311324
dispatch._set_deleted() # pylint: disable=protected-access
312325
await self._lifecycle_events_tx.send(Deleted(dispatch=dispatch))
313326

327+
self._initial_fetch_event.set()
328+
314329
async def _update_dispatch_schedule_and_notify(
315330
self, dispatch: Dispatch | None, old_dispatch: Dispatch | None
316331
) -> None:

src/frequenz/dispatch/_dispatcher.py

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import asyncio
99
import logging
1010
from asyncio import Event
11-
from typing import Awaitable, Callable
11+
from typing import Awaitable, Callable, Self
1212

1313
from frequenz.channels import Receiver
1414
from frequenz.client.dispatch import Client
@@ -221,6 +221,10 @@ def cancel(self, msg: str | None = None) -> None:
221221
for instance in self._actor_dispatchers.values():
222222
instance.cancel()
223223

224+
async def wait_for_initialization(self) -> None:
225+
"""Wait until the background service is initialized."""
226+
await self._bg_service.wait_for_initialization()
227+
224228
def is_managed(self, dispatch_type: str) -> bool:
225229
"""Check if the dispatcher is managing actors for a given dispatch type.
226230
@@ -271,7 +275,8 @@ def id_identity(dispatch: Dispatch) -> int:
271275
dispatcher = ActorDispatcher(
272276
actor_factory=actor_factory,
273277
running_status_receiver=await self.new_running_state_event_receiver(
274-
dispatch_type, merge_strategy=merge_strategy
278+
dispatch_type,
279+
merge_strategy=merge_strategy,
275280
),
276281
dispatch_identity=(
277282
id_identity if merge_strategy is None else merge_strategy.identity
@@ -299,6 +304,19 @@ def client(self) -> Client:
299304
"""Return the client."""
300305
return self._client
301306

307+
@override
308+
async def __aenter__(self) -> Self:
309+
"""Enter an async context.
310+
311+
Start this background service.
312+
313+
Returns:
314+
This background service.
315+
"""
316+
self.start()
317+
await self.wait_for_initialization()
318+
return self
319+
302320
def new_lifecycle_events_receiver(
303321
self, dispatch_type: str
304322
) -> Receiver[DispatchEvent]:
@@ -368,5 +386,6 @@ async def new_running_state_event_receiver(
368386
A new receiver for dispatches whose running status changed.
369387
"""
370388
return await self._bg_service.new_running_state_event_receiver(
371-
dispatch_type, merge_strategy=merge_strategy
389+
dispatch_type,
390+
merge_strategy=merge_strategy,
372391
)

tests/test_mananging_actor.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,7 +311,9 @@ async def new_mock_receiver(
311311
# pylint: disable=protected-access
312312
assert "MANAGE_TEST" in dispatcher._actor_dispatchers
313313
actor_manager = dispatcher._actor_dispatchers["MANAGE_TEST"]
314+
# pylint: disable=comparison-with-callable
314315
assert actor_manager._actor_factory == MockActor.create
316+
# pylint: enable=comparison-with-callable
315317

316318
dispatch = Dispatch(
317319
replace(

0 commit comments

Comments
 (0)