Skip to content

Commit 59eb1b3

Browse files
committed
Refresh the dispatches after a stream restart.
To ensure we didn't miss any. Signed-off-by: Mathias L. Baumann <[email protected]>
1 parent c8973fc commit 59eb1b3

File tree

3 files changed

+40
-19
lines changed

3 files changed

+40
-19
lines changed

RELEASE_NOTES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
* This release now supports the sdk up to rc2000.
1414
* Less logs are now on `INFO` level, and more on `DEBUG` level, making the output less verbose.
1515
* Changed the type of `DispatchInfo.components` from `list[int] | list[ComponentCategory]` to `list[ComponentId] | list[ComponentCategory]`, where `ComponentId` is imported from `frequenz.client.microgrid`.
16+
* While the dispatch stream restarts we refresh our dispatch cache as well, to ensure we didn't miss any updates.
1617

1718
## Bug Fixes
1819

pyproject.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,9 @@ dependencies = [
4040
# plugins.mkdocstrings.handlers.python.import)
4141
"frequenz-sdk >= 1.0.0-rc2000, < 1.0.0-rc2100",
4242
"frequenz-channels >= 1.6.1, < 2.0.0",
43-
"frequenz-client-dispatch >= 0.10.1, < 0.11.0",
43+
"frequenz-client-dispatch >= 0.10.2, < 0.11.0",
44+
#"frequenz-client-base >= 0.11.0, < 0.13.0",
45+
"frequenz-client-base @ git+https://github.com/frequenz-floss/frequenz-client-base-python@refs/pull/146/head",
4446
]
4547
dynamic = ["version"]
4648

src/frequenz/dispatch/_bg_service.py

Lines changed: 36 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717
import grpc.aio
1818
from frequenz.channels import Broadcast, Receiver, select, selected_from
1919
from frequenz.channels.timer import SkipMissedAndResync, Timer
20+
from frequenz.client.base.streaming import StreamStartedEvent
2021
from frequenz.client.dispatch import DispatchApiClient
22+
from frequenz.client.dispatch.types import DispatchEvent as ApiDispatchEvent
2123
from frequenz.client.dispatch.types import Event
2224
from frequenz.sdk.actor import BackgroundService
2325

@@ -244,24 +246,40 @@ async def _run(self) -> None:
244246
heappop(self._scheduled_events).dispatch
245247
)
246248
elif selected_from(selected, stream):
247-
_logger.debug("Received dispatch event: %s", selected.message)
248-
dispatch = Dispatch(selected.message.dispatch)
249-
match selected.message.event:
250-
case Event.CREATED:
251-
self._dispatches[dispatch.id] = dispatch
252-
await self._update_dispatch_schedule_and_notify(dispatch, None)
253-
await self._lifecycle_events_tx.send(Created(dispatch=dispatch))
254-
case Event.UPDATED:
255-
await self._update_dispatch_schedule_and_notify(
256-
dispatch, self._dispatches[dispatch.id]
257-
)
258-
self._dispatches[dispatch.id] = dispatch
259-
await self._lifecycle_events_tx.send(Updated(dispatch=dispatch))
260-
case Event.DELETED:
261-
self._dispatches.pop(dispatch.id)
262-
await self._update_dispatch_schedule_and_notify(None, dispatch)
263-
264-
await self._lifecycle_events_tx.send(Deleted(dispatch=dispatch))
249+
match selected.message:
250+
case StreamStartedEvent():
251+
_logger.info("Dispatch stream restarted, refreshing dispatches")
252+
await self._fetch()
253+
254+
case ApiDispatchEvent():
255+
_logger.debug("Received dispatch event: %s", selected.message)
256+
dispatch = Dispatch(selected.message.dispatch)
257+
match selected.message.event:
258+
case Event.CREATED:
259+
self._dispatches[dispatch.id] = dispatch
260+
await self._update_dispatch_schedule_and_notify(
261+
dispatch, None
262+
)
263+
await self._lifecycle_events_tx.send(
264+
Created(dispatch=dispatch)
265+
)
266+
case Event.UPDATED:
267+
await self._update_dispatch_schedule_and_notify(
268+
dispatch, self._dispatches[dispatch.id]
269+
)
270+
self._dispatches[dispatch.id] = dispatch
271+
await self._lifecycle_events_tx.send(
272+
Updated(dispatch=dispatch)
273+
)
274+
case Event.DELETED:
275+
self._dispatches.pop(dispatch.id)
276+
await self._update_dispatch_schedule_and_notify(
277+
None, dispatch
278+
)
279+
280+
await self._lifecycle_events_tx.send(
281+
Deleted(dispatch=dispatch)
282+
)
265283

266284
async def _execute_scheduled_event(self, dispatch: Dispatch) -> None:
267285
"""Execute a scheduled event.

0 commit comments

Comments
 (0)