77
88import logging
99from asyncio import Event
10- from typing import Any , Callable , Generator
10+ from typing import Callable
1111
1212from frequenz .channels import Receiver
1313from frequenz .client .dispatch import Client
14- from frequenz .sdk .actor import Actor
14+ from frequenz .sdk .actor import Actor , BackgroundService
15+ from typing_extensions import override
1516
1617from ._actor_dispatcher import ActorDispatcher , DispatchInfo
1718from ._bg_service import DispatchScheduler , MergeStrategy
2223_logger = logging .getLogger (__name__ )
2324
2425
25- class Dispatcher :
26+ class Dispatcher ( BackgroundService ) :
2627 """A highlevel interface for the dispatch API.
2728
2829 This class provides a highlevel interface to the dispatch API.
@@ -183,6 +184,8 @@ def __init__(
183184 server_url: The URL of the dispatch service.
184185 key: The key to access the service.
185186 """
187+ super ().__init__ (name = "Dispatcher" )
188+
186189 self ._client = Client (server_url = server_url , key = key )
187190 self ._bg_service = DispatchScheduler (
188191 microgrid_id ,
@@ -192,10 +195,32 @@ def __init__(
192195 self ._empty_event = Event ()
193196 self ._empty_event .set ()
194197
198+ @override
195199 def start (self ) -> None :
196200 """Start the local dispatch service."""
197201 self ._bg_service .start ()
198202
203+ @property
204+ @override
205+ def is_running (self ) -> bool :
206+ """Whether the local dispatch service is running."""
207+ return self ._bg_service .is_running
208+
209+ @override
210+ async def wait (self ) -> None :
211+ """Wait until all actor dispatches are stopped."""
212+ await self ._empty_event .wait ()
213+
214+ @override
215+ def cancel (self , msg : str | None = None ) -> None :
216+ """Stop the local dispatch service."""
217+ self ._bg_service .cancel (msg )
218+
219+ for instance in self ._actor_dispatchers .values ():
220+ instance .cancel ()
221+
222+ self ._actor_dispatchers .clear ()
223+
199224 async def start_dispatching (
200225 self ,
201226 dispatch_type : str ,
@@ -256,10 +281,6 @@ async def stop_dispatching(self, dispatch_type: str) -> None:
256281 if not self ._actor_dispatchers :
257282 self ._empty_event .set ()
258283
259- def __await__ (self ) -> Generator [Any , None , bool ]:
260- """Wait until all actor dispatches are stopped."""
261- return self ._empty_event .wait ().__await__ ()
262-
263284 @property
264285 def client (self ) -> Client :
265286 """Return the client."""
0 commit comments