diff --git a/pyproject.toml b/pyproject.toml index 144591d..3e15c63 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -38,7 +38,7 @@ dependencies = [ # Make sure to update the version for cross-referencing also in the # mkdocs.yml file when changing the version here (look for the config key # plugins.mkdocstrings.handlers.python.import) - "frequenz-sdk >= 1.0.0-rc2002, < 1.0.0-rc2100", + "frequenz-sdk >= 1.0.0-rc2100, < 1.0.0-rc2200", "frequenz-channels >= 1.6.1, < 2.0.0", "frequenz-client-dispatch >= 0.11.0, < 0.12.0", "frequenz-client-common >= 0.3.2, < 0.4.0", diff --git a/src/frequenz/dispatch/_bg_service.py b/src/frequenz/dispatch/_bg_service.py index d1bb4e2..c00ab3f 100644 --- a/src/frequenz/dispatch/_bg_service.py +++ b/src/frequenz/dispatch/_bg_service.py @@ -113,6 +113,7 @@ def __init__( self._client = client self._dispatches: dict[DispatchId, Dispatch] = {} + self._deleted_dispatches: dict[DispatchId, datetime] = {} self._microgrid_id = microgrid_id self._lifecycle_events_channel = Broadcast[DispatchEvent]( @@ -268,34 +269,57 @@ async def _run(self) -> None: _logger.debug( "Received dispatch event: %s", selected.message ) - dispatch = Dispatch(selected.message.dispatch) + new_dispatch = Dispatch(selected.message.dispatch) + _existing_dispatch = self._dispatches.get(new_dispatch.id) + is_new_or_newer = ( + _existing_dispatch is None + or new_dispatch.update_time + > _existing_dispatch.update_time + ) + match selected.message.event: case Event.CREATED: - self._dispatches[dispatch.id] = dispatch - await self._update_dispatch_schedule_and_notify( - dispatch, None, next_event_timer - ) + # Check if the dispatch already exists and + # was updated. The CREATE event is late in + # this case + if is_new_or_newer: + self._dispatches[new_dispatch.id] = new_dispatch + await self._update_dispatch_schedule_and_notify( + new_dispatch, None, next_event_timer + ) await self._lifecycle_events_tx.send( - Created(dispatch=dispatch) + Created(dispatch=new_dispatch) ) case Event.UPDATED: - await self._update_dispatch_schedule_and_notify( - dispatch, - self._dispatches[dispatch.id], - next_event_timer, - ) - self._dispatches[dispatch.id] = dispatch + # We might receive update before we fetched + # the entry, so don't rely on it existing + if is_new_or_newer: + await self._update_dispatch_schedule_and_notify( + new_dispatch, + self._dispatches.get(new_dispatch.id), + next_event_timer, + ) + self._dispatches[new_dispatch.id] = new_dispatch await self._lifecycle_events_tx.send( - Updated(dispatch=dispatch) + Updated(dispatch=new_dispatch) ) case Event.DELETED: - self._dispatches.pop(dispatch.id) + # The dispatch might already be deleted, + # depending on the exact timing of fetch() + # so we don't rely on it existing. + if is_new_or_newer: + self._dispatches.pop(new_dispatch.id, None) + + self._deleted_dispatches[new_dispatch.id] = ( + datetime.now(timezone.utc) + ) + await self._update_dispatch_schedule_and_notify( - None, dispatch, next_event_timer + None, new_dispatch, next_event_timer ) await self._lifecycle_events_tx.send( - Deleted(dispatch=dispatch) + Deleted(dispatch=new_dispatch) ) case StreamRetrying(): @@ -349,6 +373,12 @@ async def _fetch(self, timer: Timer) -> None: _logger.debug("Fetching dispatches for microgrid %s", self._microgrid_id) async for page in self._client.list(microgrid_id=self._microgrid_id): for client_dispatch in page: + deleted_timestamp = self._deleted_dispatches.get(client_dispatch.id) + if ( + deleted_timestamp + and client_dispatch.update_time < deleted_timestamp + ): + continue dispatch = Dispatch(client_dispatch) self._dispatches[dispatch.id] = dispatch @@ -359,7 +389,7 @@ async def _fetch(self, timer: Timer) -> None: dispatch, None, timer ) await self._lifecycle_events_tx.send(Created(dispatch=dispatch)) - elif dispatch.update_time != old_dispatch.update_time: + elif dispatch.update_time > old_dispatch.update_time: _logger.debug("Updated dispatch: %s", dispatch) await self._update_dispatch_schedule_and_notify( dispatch, old_dispatch, timer