Skip to content

Commit 1657e3a

Browse files
committed
Harden sync for delete events between stream/fetch
We remember delete events with their timestamp, so when can check in fetch() if the dispatch we just fetched is actually already deleted before we received it. Signed-off-by: Mathias L. Baumann <[email protected]>
1 parent b0990e5 commit 1657e3a

File tree

1 file changed

+28
-1
lines changed

1 file changed

+28
-1
lines changed

src/frequenz/dispatch/_bg_service.py

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ def __init__(
113113

114114
self._client = client
115115
self._dispatches: dict[DispatchId, Dispatch] = {}
116+
self._deleted_dispatches: dict[DispatchId, datetime] = {}
116117
self._microgrid_id = microgrid_id
117118

118119
self._lifecycle_events_channel = Broadcast[DispatchEvent](
@@ -263,6 +264,17 @@ async def _run(self) -> None:
263264
heappop(self._scheduled_events).dispatch, next_event_timer
264265
)
265266
elif selected_from(selected, stream):
267+
268+
def is_more_relevant(
269+
dispatch: Dispatch,
270+
) -> bool:
271+
existing_dispatch = self._dispatches.get(dispatch.id)
272+
273+
return (
274+
not existing_dispatch
275+
or dispatch.update_time > existing_dispatch.update_time
276+
)
277+
266278
match selected.message:
267279
case ApiDispatchEvent():
268280
_logger.debug(
@@ -289,7 +301,16 @@ async def _run(self) -> None:
289301
Updated(dispatch=dispatch)
290302
)
291303
case Event.DELETED:
292-
self._dispatches.pop(dispatch.id)
304+
# The dispatch might already be deleted,
305+
# depending on the exact timing of fetch()
306+
# so we don't rely on it existing.
307+
if is_more_relevant(dispatch):
308+
self._dispatches.pop(dispatch.id, None)
309+
310+
self._deleted_dispatches[dispatch.id] = (
311+
datetime.now(timezone.utc)
312+
)
313+
293314
await self._update_dispatch_schedule_and_notify(
294315
None, dispatch, next_event_timer
295316
)
@@ -349,6 +370,12 @@ async def _fetch(self, timer: Timer) -> None:
349370
_logger.debug("Fetching dispatches for microgrid %s", self._microgrid_id)
350371
async for page in self._client.list(microgrid_id=self._microgrid_id):
351372
for client_dispatch in page:
373+
deleted_timestamp = self._deleted_dispatches.get(client_dispatch.id)
374+
if (
375+
deleted_timestamp
376+
and client_dispatch.update_time < deleted_timestamp
377+
):
378+
continue
352379
dispatch = Dispatch(client_dispatch)
353380

354381
self._dispatches[dispatch.id] = dispatch

0 commit comments

Comments
 (0)