|
3 | 3 |
|
4 | 4 | """A highlevel interface for the dispatch API.""" |
5 | 5 |
|
| 6 | +from __future__ import annotations |
| 7 | + |
| 8 | +import asyncio |
| 9 | +import logging |
| 10 | +from asyncio import Event |
| 11 | +from typing import Callable |
6 | 12 |
|
7 | 13 | from frequenz.channels import Receiver |
8 | 14 | from frequenz.client.dispatch import Client |
| 15 | +from frequenz.sdk.actor import Actor, BackgroundService |
| 16 | +from typing_extensions import override |
9 | 17 |
|
| 18 | +from ._actor_dispatcher import ActorDispatcher, DispatchInfo |
10 | 19 | from ._bg_service import DispatchScheduler, MergeStrategy |
11 | 20 | from ._dispatch import Dispatch |
12 | 21 | from ._event import DispatchEvent |
13 | 22 |
|
| 23 | +_logger = logging.getLogger(__name__) |
| 24 | + |
14 | 25 |
|
15 | | -class Dispatcher: |
| 26 | +class Dispatcher(BackgroundService): |
16 | 27 | """A highlevel interface for the dispatch API. |
17 | 28 |
|
18 | 29 | This class provides a highlevel interface to the dispatch API. |
@@ -173,16 +184,103 @@ def __init__( |
173 | 184 | server_url: The URL of the dispatch service. |
174 | 185 | key: The key to access the service. |
175 | 186 | """ |
| 187 | + super().__init__() |
| 188 | + |
176 | 189 | self._client = Client(server_url=server_url, key=key) |
177 | 190 | self._bg_service = DispatchScheduler( |
178 | 191 | microgrid_id, |
179 | 192 | self._client, |
180 | 193 | ) |
| 194 | + self._actor_dispatchers: dict[str, ActorDispatcher] = {} |
| 195 | + self._empty_event = Event() |
| 196 | + self._empty_event.set() |
181 | 197 |
|
182 | | - async def start(self) -> None: |
| 198 | + @override |
| 199 | + def start(self) -> None: |
183 | 200 | """Start the local dispatch service.""" |
184 | 201 | self._bg_service.start() |
185 | 202 |
|
| 203 | + @property |
| 204 | + @override |
| 205 | + def is_running(self) -> bool: |
| 206 | + """Whether the local dispatch service is running.""" |
| 207 | + return self._bg_service.is_running |
| 208 | + |
| 209 | + @override |
| 210 | + async def wait(self) -> None: |
| 211 | + """Wait until all actor dispatches are stopped.""" |
| 212 | + await asyncio.gather(self._bg_service.wait(), self._empty_event.wait()) |
| 213 | + |
| 214 | + self._actor_dispatchers.clear() |
| 215 | + |
| 216 | + @override |
| 217 | + def cancel(self, msg: str | None = None) -> None: |
| 218 | + """Stop the local dispatch service.""" |
| 219 | + self._bg_service.cancel(msg) |
| 220 | + |
| 221 | + for instance in self._actor_dispatchers.values(): |
| 222 | + instance.cancel() |
| 223 | + |
| 224 | + async def start_dispatching( |
| 225 | + self, |
| 226 | + dispatch_type: str, |
| 227 | + *, |
| 228 | + actor_factory: Callable[[DispatchInfo, Receiver[DispatchInfo]], Actor], |
| 229 | + merge_strategy: MergeStrategy | None = None, |
| 230 | + ) -> None: |
| 231 | + """Manage actors for a given dispatch type. |
| 232 | +
|
| 233 | + Creates and manages an ActorDispatcher for the given type that will |
| 234 | + start, stop and reconfigure actors based on received dispatches. |
| 235 | +
|
| 236 | + You can await the `Dispatcher` instance to block until all types |
| 237 | + registered with `start_dispatching()` are stopped using |
| 238 | + `stop_dispatching()` |
| 239 | +
|
| 240 | + Args: |
| 241 | + dispatch_type: The type of the dispatch to manage. |
| 242 | + actor_factory: The factory to create actors. |
| 243 | + merge_strategy: The strategy to merge running intervals. |
| 244 | + """ |
| 245 | + dispatcher = self._actor_dispatchers.get(dispatch_type) |
| 246 | + |
| 247 | + if dispatcher is not None: |
| 248 | + _logger.debug( |
| 249 | + "Ignoring duplicate actor dispatcher request for %r", dispatch_type |
| 250 | + ) |
| 251 | + return |
| 252 | + |
| 253 | + self._empty_event.clear() |
| 254 | + |
| 255 | + def id_identity(dispatch: Dispatch) -> int: |
| 256 | + return dispatch.id |
| 257 | + |
| 258 | + dispatcher = ActorDispatcher( |
| 259 | + actor_factory=actor_factory, |
| 260 | + running_status_receiver=await self.new_running_state_event_receiver( |
| 261 | + dispatch_type, merge_strategy=merge_strategy |
| 262 | + ), |
| 263 | + dispatch_identity=( |
| 264 | + id_identity if merge_strategy is None else merge_strategy.identity |
| 265 | + ), |
| 266 | + ) |
| 267 | + |
| 268 | + self._actor_dispatchers[dispatch_type] = dispatcher |
| 269 | + dispatcher.start() |
| 270 | + |
| 271 | + async def stop_dispatching(self, dispatch_type: str) -> None: |
| 272 | + """Stop managing actors for a given dispatch type. |
| 273 | +
|
| 274 | + Args: |
| 275 | + dispatch_type: The type of the dispatch to stop managing. |
| 276 | + """ |
| 277 | + dispatcher = self._actor_dispatchers.pop(dispatch_type, None) |
| 278 | + if dispatcher is not None: |
| 279 | + await dispatcher.stop() |
| 280 | + |
| 281 | + if not self._actor_dispatchers: |
| 282 | + self._empty_event.set() |
| 283 | + |
186 | 284 | @property |
187 | 285 | def client(self) -> Client: |
188 | 286 | """Return the client.""" |
|
0 commit comments