Skip to content

Commit 412313a

Browse files
committed
Fix filter not seeing the latest dispatches after fetch()
By using non-dict-assigning dispatch update flow. When fetch() which runs periodically would assign a fresh map to `self._dispatches` the filter would from the non operate on an old non-updated map. Signed-off-by: Mathias L. Baumann <[email protected]>
1 parent 436308e commit 412313a

File tree

2 files changed

+14
-12
lines changed

2 files changed

+14
-12
lines changed

RELEASE_NOTES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,4 @@
1616

1717
* The merge by type class now uses the correct logger path.
1818
* The merge by type was made more robust under heavy load, making sure to use the same `now` for all dispatches that are checked.
19+
* Fix that the merge filter was using an outdated dispatches dict once fetch() ran.

src/frequenz/dispatch/_bg_service.py

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -366,8 +366,8 @@ async def _fetch(self, timer: Timer) -> None:
366366
"""
367367
self._initial_fetch_event.clear()
368368

369-
old_dispatches = self._dispatches
370-
self._dispatches = {}
369+
old_dispatches = set(self._dispatches.keys())
370+
new_dispatches = {}
371371

372372
try:
373373
_logger.debug("Fetching dispatches for microgrid %s", self._microgrid_id)
@@ -381,9 +381,9 @@ async def _fetch(self, timer: Timer) -> None:
381381
continue
382382
dispatch = Dispatch(client_dispatch)
383383

384-
self._dispatches[dispatch.id] = dispatch
385-
old_dispatch = old_dispatches.pop(dispatch.id, None)
386-
if not old_dispatch:
384+
new_dispatches[dispatch.id] = dispatch
385+
old_dispatch = self._dispatches.get(dispatch.id, None)
386+
if old_dispatch is None:
387387
_logger.debug("New dispatch: %s", dispatch)
388388
await self._update_dispatch_schedule_and_notify(
389389
dispatch, None, timer
@@ -396,22 +396,23 @@ async def _fetch(self, timer: Timer) -> None:
396396
)
397397
await self._lifecycle_events_tx.send(Updated(dispatch=dispatch))
398398

399-
_logger.debug("Received %s dispatches", len(self._dispatches))
399+
_logger.debug("Received %s dispatches", len(new_dispatches))
400400

401401
except grpc.aio.AioRpcError as error:
402402
_logger.error("Error fetching dispatches: %s", error)
403-
self._dispatches = old_dispatches
404403
return
405404

406-
for dispatch in old_dispatches.values():
405+
# Delete old dispatches
406+
for dispatch_id in old_dispatches:
407+
if dispatch_id in new_dispatches:
408+
continue
409+
410+
dispatch = self._dispatches.pop(dispatch_id)
407411
_logger.debug("Deleted dispatch: %s", dispatch)
408412
await self._lifecycle_events_tx.send(Deleted(dispatch=dispatch))
409413
await self._update_dispatch_schedule_and_notify(None, dispatch, timer)
410414

411-
# Set deleted only here as it influences the result of dispatch.started
412-
# which is used in above in _running_state_change
413-
dispatch._set_deleted() # pylint: disable=protected-access
414-
await self._lifecycle_events_tx.send(Deleted(dispatch=dispatch))
415+
self._dispatches.update(new_dispatches)
415416

416417
self._initial_fetch_event.set()
417418

0 commit comments

Comments
 (0)