Skip to content

Commit 83c9fdb

Browse files
committed
Fix filter not seeing the latest dispatches after periodic fetch()'s
By using a workflow that avoids reassigning the dispatches dict. When `fetch()`, which runs periodically, would assign a fresh map to `self._dispatches` the filter would not see the latest dispatches, as it would still reference the old map. Signed-off-by: Mathias L. Baumann <[email protected]>
1 parent ea9961a commit 83c9fdb

File tree

2 files changed

+18
-10
lines changed

2 files changed

+18
-10
lines changed

RELEASE_NOTES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,4 @@
1616

1717
* The merge by type class now uses the correct logger path.
1818
* The merge by type was made more robust under heavy load, making sure to use the same `now` for all dispatches that are checked.
19+
* Fix that the merge filter was using an outdated dispatches dict once fetch() ran.

src/frequenz/dispatch/_bg_service.py

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -366,8 +366,8 @@ async def _fetch(self, timer: Timer) -> None:
366366
"""
367367
self._initial_fetch_event.clear()
368368

369-
old_dispatches = self._dispatches
370-
self._dispatches = {}
369+
old_dispatches = set(self._dispatches.keys())
370+
new_dispatches = {}
371371

372372
try:
373373
_logger.debug("Fetching dispatches for microgrid %s", self._microgrid_id)
@@ -381,9 +381,9 @@ async def _fetch(self, timer: Timer) -> None:
381381
continue
382382
dispatch = Dispatch(client_dispatch)
383383

384-
self._dispatches[dispatch.id] = dispatch
385-
old_dispatch = old_dispatches.pop(dispatch.id, None)
386-
if not old_dispatch:
384+
new_dispatches[dispatch.id] = dispatch
385+
old_dispatch = self._dispatches.get(dispatch.id, None)
386+
if old_dispatch is None:
387387
_logger.debug("New dispatch: %s", dispatch)
388388
await self._update_dispatch_schedule_and_notify(
389389
dispatch, None, timer
@@ -396,23 +396,30 @@ async def _fetch(self, timer: Timer) -> None:
396396
)
397397
await self._lifecycle_events_tx.send(Updated(dispatch=dispatch))
398398

399-
_logger.debug("Received %s dispatches", len(self._dispatches))
399+
_logger.debug("Received %s dispatches", len(new_dispatches))
400400

401401
except grpc.aio.AioRpcError as error:
402402
_logger.error("Error fetching dispatches: %s", error)
403-
self._dispatches = old_dispatches
404403
return
405404

406-
for dispatch in old_dispatches.values():
405+
# Delete old dispatches
406+
for dispatch_id in old_dispatches:
407+
if dispatch_id in new_dispatches:
408+
continue
409+
410+
dispatch = self._dispatches.pop(dispatch_id)
407411
_logger.debug("Deleted dispatch: %s", dispatch)
408-
await self._lifecycle_events_tx.send(Deleted(dispatch=dispatch))
409412
await self._update_dispatch_schedule_and_notify(None, dispatch, timer)
410413

411414
# Set deleted only here as it influences the result of dispatch.started
412-
# which is used in above in _running_state_change
415+
# which is used in the func call above ^
413416
dispatch._set_deleted() # pylint: disable=protected-access
414417
await self._lifecycle_events_tx.send(Deleted(dispatch=dispatch))
415418

419+
# Update the dispatch list with the dispatches
420+
self._dispatches.update(new_dispatches)
421+
422+
# Set event to indicate fetch ran at least once
416423
self._initial_fetch_event.set()
417424

418425
async def _update_dispatch_schedule_and_notify(

0 commit comments

Comments
 (0)