Skip to content

Commit 135ec57

Browse files
authored
Bug fixes after testing with edge app (#169)
- **Fix sdk depency after yanked release** - **Explictly require newer time when comparing in fetch()** - **Harden sync for delete events between stream/fetch** - **Harden stream/fetch sync for CREATED and UPDATED**
2 parents a7e380b + 9ff5eac commit 135ec57

File tree

2 files changed

+48
-18
lines changed

2 files changed

+48
-18
lines changed

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ dependencies = [
3838
# Make sure to update the version for cross-referencing also in the
3939
# mkdocs.yml file when changing the version here (look for the config key
4040
# plugins.mkdocstrings.handlers.python.import)
41-
"frequenz-sdk >= 1.0.0-rc2002, < 1.0.0-rc2100",
41+
"frequenz-sdk >= 1.0.0-rc2100, < 1.0.0-rc2200",
4242
"frequenz-channels >= 1.6.1, < 2.0.0",
4343
"frequenz-client-dispatch >= 0.11.0, < 0.12.0",
4444
"frequenz-client-common >= 0.3.2, < 0.4.0",

src/frequenz/dispatch/_bg_service.py

Lines changed: 47 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ def __init__(
113113

114114
self._client = client
115115
self._dispatches: dict[DispatchId, Dispatch] = {}
116+
self._deleted_dispatches: dict[DispatchId, datetime] = {}
116117
self._microgrid_id = microgrid_id
117118

118119
self._lifecycle_events_channel = Broadcast[DispatchEvent](
@@ -268,34 +269,57 @@ async def _run(self) -> None:
268269
_logger.debug(
269270
"Received dispatch event: %s", selected.message
270271
)
271-
dispatch = Dispatch(selected.message.dispatch)
272+
new_dispatch = Dispatch(selected.message.dispatch)
273+
_existing_dispatch = self._dispatches.get(new_dispatch.id)
274+
is_new_or_newer = (
275+
_existing_dispatch is None
276+
or new_dispatch.update_time
277+
> _existing_dispatch.update_time
278+
)
279+
272280
match selected.message.event:
273281
case Event.CREATED:
274-
self._dispatches[dispatch.id] = dispatch
275-
await self._update_dispatch_schedule_and_notify(
276-
dispatch, None, next_event_timer
277-
)
282+
# Check if the dispatch already exists and
283+
# was updated. The CREATE event is late in
284+
# this case
285+
if is_new_or_newer:
286+
self._dispatches[new_dispatch.id] = new_dispatch
287+
await self._update_dispatch_schedule_and_notify(
288+
new_dispatch, None, next_event_timer
289+
)
278290
await self._lifecycle_events_tx.send(
279-
Created(dispatch=dispatch)
291+
Created(dispatch=new_dispatch)
280292
)
281293
case Event.UPDATED:
282-
await self._update_dispatch_schedule_and_notify(
283-
dispatch,
284-
self._dispatches[dispatch.id],
285-
next_event_timer,
286-
)
287-
self._dispatches[dispatch.id] = dispatch
294+
# We might receive update before we fetched
295+
# the entry, so don't rely on it existing
296+
if is_new_or_newer:
297+
await self._update_dispatch_schedule_and_notify(
298+
new_dispatch,
299+
self._dispatches.get(new_dispatch.id),
300+
next_event_timer,
301+
)
302+
self._dispatches[new_dispatch.id] = new_dispatch
288303
await self._lifecycle_events_tx.send(
289-
Updated(dispatch=dispatch)
304+
Updated(dispatch=new_dispatch)
290305
)
291306
case Event.DELETED:
292-
self._dispatches.pop(dispatch.id)
307+
# The dispatch might already be deleted,
308+
# depending on the exact timing of fetch()
309+
# so we don't rely on it existing.
310+
if is_new_or_newer:
311+
self._dispatches.pop(new_dispatch.id, None)
312+
313+
self._deleted_dispatches[new_dispatch.id] = (
314+
datetime.now(timezone.utc)
315+
)
316+
293317
await self._update_dispatch_schedule_and_notify(
294-
None, dispatch, next_event_timer
318+
None, new_dispatch, next_event_timer
295319
)
296320

297321
await self._lifecycle_events_tx.send(
298-
Deleted(dispatch=dispatch)
322+
Deleted(dispatch=new_dispatch)
299323
)
300324

301325
case StreamRetrying():
@@ -349,6 +373,12 @@ async def _fetch(self, timer: Timer) -> None:
349373
_logger.debug("Fetching dispatches for microgrid %s", self._microgrid_id)
350374
async for page in self._client.list(microgrid_id=self._microgrid_id):
351375
for client_dispatch in page:
376+
deleted_timestamp = self._deleted_dispatches.get(client_dispatch.id)
377+
if (
378+
deleted_timestamp
379+
and client_dispatch.update_time < deleted_timestamp
380+
):
381+
continue
352382
dispatch = Dispatch(client_dispatch)
353383

354384
self._dispatches[dispatch.id] = dispatch
@@ -359,7 +389,7 @@ async def _fetch(self, timer: Timer) -> None:
359389
dispatch, None, timer
360390
)
361391
await self._lifecycle_events_tx.send(Created(dispatch=dispatch))
362-
elif dispatch.update_time != old_dispatch.update_time:
392+
elif dispatch.update_time > old_dispatch.update_time:
363393
_logger.debug("Updated dispatch: %s", dispatch)
364394
await self._update_dispatch_schedule_and_notify(
365395
dispatch, old_dispatch, timer

0 commit comments

Comments
 (0)