33
44"""A highlevel interface for the dispatch API."""
55
6+ from __future__ import annotations
67
78import logging
8- from typing import Callable
9+ from asyncio import Event
10+ from typing import Any , Callable , Generator
911
1012from frequenz .channels import Receiver
1113from frequenz .client .dispatch import Client
@@ -187,6 +189,8 @@ def __init__(
187189 self ._client ,
188190 )
189191 self ._actor_dispatchers : dict [str , ActorDispatcher ] = {}
192+ self ._empty_event = Event ()
193+ self ._empty_event .set ()
190194
191195 def start (self ) -> None :
192196 """Start the local dispatch service."""
@@ -204,6 +208,10 @@ async def start_dispatching(
204208 Creates and manages an ActorDispatcher for the given type that will
205209 start, stop and reconfigure actors based on received dispatches.
206210
211+ You can await the `Dispatcher` instance to block until all types
212+ registered with `start_dispatching()` are stopped using
213+ `stop_dispatching()`
214+
207215 Args:
208216 dispatch_type: The type of the dispatch to manage.
209217 actor_factory: The factory to create actors.
@@ -217,6 +225,8 @@ async def start_dispatching(
217225 )
218226 return
219227
228+ self ._empty_event .clear ()
229+
220230 def id_identity (dispatch : Dispatch ) -> int :
221231 return dispatch .id
222232
@@ -243,6 +253,13 @@ async def stop_dispatching(self, dispatch_type: str) -> None:
243253 if dispatcher is not None :
244254 await dispatcher .stop ()
245255
256+ if not self ._actor_dispatchers :
257+ self ._empty_event .set ()
258+
259+ def __await__ (self ) -> Generator [Any , None , bool ]:
260+ """Wait until all actor dispatches are stopped."""
261+ return self ._empty_event .wait ().__await__ ()
262+
246263 @property
247264 def client (self ) -> Client :
248265 """Return the client."""
0 commit comments