|
17 | 17 | import grpc.aio |
18 | 18 | from frequenz.channels import Broadcast, Receiver, select, selected_from |
19 | 19 | from frequenz.channels.timer import SkipMissedAndResync, Timer |
| 20 | +from frequenz.client.base.streaming import StreamStartedEvent |
20 | 21 | from frequenz.client.dispatch import DispatchApiClient |
| 22 | +from frequenz.client.dispatch.types import DispatchEvent as ApiDispatchEvent |
21 | 23 | from frequenz.client.dispatch.types import Event |
22 | 24 | from frequenz.sdk.actor import BackgroundService |
23 | 25 |
|
@@ -244,24 +246,40 @@ async def _run(self) -> None: |
244 | 246 | heappop(self._scheduled_events).dispatch |
245 | 247 | ) |
246 | 248 | elif selected_from(selected, stream): |
247 | | - _logger.debug("Received dispatch event: %s", selected.message) |
248 | | - dispatch = Dispatch(selected.message.dispatch) |
249 | | - match selected.message.event: |
250 | | - case Event.CREATED: |
251 | | - self._dispatches[dispatch.id] = dispatch |
252 | | - await self._update_dispatch_schedule_and_notify(dispatch, None) |
253 | | - await self._lifecycle_events_tx.send(Created(dispatch=dispatch)) |
254 | | - case Event.UPDATED: |
255 | | - await self._update_dispatch_schedule_and_notify( |
256 | | - dispatch, self._dispatches[dispatch.id] |
257 | | - ) |
258 | | - self._dispatches[dispatch.id] = dispatch |
259 | | - await self._lifecycle_events_tx.send(Updated(dispatch=dispatch)) |
260 | | - case Event.DELETED: |
261 | | - self._dispatches.pop(dispatch.id) |
262 | | - await self._update_dispatch_schedule_and_notify(None, dispatch) |
263 | | - |
264 | | - await self._lifecycle_events_tx.send(Deleted(dispatch=dispatch)) |
| 249 | + match selected.message: |
| 250 | + case StreamStartedEvent(): |
| 251 | + _logger.info("Dispatch stream restarted, refreshing dispatches") |
| 252 | + await self._fetch() |
| 253 | + |
| 254 | + case ApiDispatchEvent(): |
| 255 | + _logger.debug("Received dispatch event: %s", selected.message) |
| 256 | + dispatch = Dispatch(selected.message.dispatch) |
| 257 | + match selected.message.event: |
| 258 | + case Event.CREATED: |
| 259 | + self._dispatches[dispatch.id] = dispatch |
| 260 | + await self._update_dispatch_schedule_and_notify( |
| 261 | + dispatch, None |
| 262 | + ) |
| 263 | + await self._lifecycle_events_tx.send( |
| 264 | + Created(dispatch=dispatch) |
| 265 | + ) |
| 266 | + case Event.UPDATED: |
| 267 | + await self._update_dispatch_schedule_and_notify( |
| 268 | + dispatch, self._dispatches[dispatch.id] |
| 269 | + ) |
| 270 | + self._dispatches[dispatch.id] = dispatch |
| 271 | + await self._lifecycle_events_tx.send( |
| 272 | + Updated(dispatch=dispatch) |
| 273 | + ) |
| 274 | + case Event.DELETED: |
| 275 | + self._dispatches.pop(dispatch.id) |
| 276 | + await self._update_dispatch_schedule_and_notify( |
| 277 | + None, dispatch |
| 278 | + ) |
| 279 | + |
| 280 | + await self._lifecycle_events_tx.send( |
| 281 | + Deleted(dispatch=dispatch) |
| 282 | + ) |
265 | 283 |
|
266 | 284 | async def _execute_scheduled_event(self, dispatch: Dispatch) -> None: |
267 | 285 | """Execute a scheduled event. |
|
0 commit comments