Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ dependencies = [
# Make sure to update the version for cross-referencing also in the
# mkdocs.yml file when changing the version here (look for the config key
# plugins.mkdocstrings.handlers.python.import)
"frequenz-sdk >= 1.0.0-rc2002, < 1.0.0-rc2100",
"frequenz-sdk >= 1.0.0-rc2100, < 1.0.0-rc2200",
"frequenz-channels >= 1.6.1, < 2.0.0",
"frequenz-client-dispatch >= 0.11.0, < 0.12.0",
"frequenz-client-common >= 0.3.2, < 0.4.0",
Expand Down
64 changes: 47 additions & 17 deletions src/frequenz/dispatch/_bg_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ def __init__(

self._client = client
self._dispatches: dict[DispatchId, Dispatch] = {}
self._deleted_dispatches: dict[DispatchId, datetime] = {}
self._microgrid_id = microgrid_id

self._lifecycle_events_channel = Broadcast[DispatchEvent](
Expand Down Expand Up @@ -268,34 +269,57 @@ async def _run(self) -> None:
_logger.debug(
"Received dispatch event: %s", selected.message
)
dispatch = Dispatch(selected.message.dispatch)
new_dispatch = Dispatch(selected.message.dispatch)
_existing_dispatch = self._dispatches.get(new_dispatch.id)
is_new_or_newer = (
_existing_dispatch is None
or new_dispatch.update_time
> _existing_dispatch.update_time
)

match selected.message.event:
case Event.CREATED:
self._dispatches[dispatch.id] = dispatch
await self._update_dispatch_schedule_and_notify(
dispatch, None, next_event_timer
)
# Check if the dispatch already exists and
# was updated. The CREATE event is late in
# this case
if is_new_or_newer:
self._dispatches[new_dispatch.id] = new_dispatch
await self._update_dispatch_schedule_and_notify(
new_dispatch, None, next_event_timer
)
await self._lifecycle_events_tx.send(
Created(dispatch=dispatch)
Created(dispatch=new_dispatch)
)
case Event.UPDATED:
await self._update_dispatch_schedule_and_notify(
dispatch,
self._dispatches[dispatch.id],
next_event_timer,
)
self._dispatches[dispatch.id] = dispatch
# We might receive update before we fetched
# the entry, so don't rely on it existing
if is_new_or_newer:
await self._update_dispatch_schedule_and_notify(
new_dispatch,
self._dispatches.get(new_dispatch.id),
next_event_timer,
)
self._dispatches[new_dispatch.id] = new_dispatch
await self._lifecycle_events_tx.send(
Updated(dispatch=dispatch)
Updated(dispatch=new_dispatch)
)
case Event.DELETED:
self._dispatches.pop(dispatch.id)
# The dispatch might already be deleted,
# depending on the exact timing of fetch()
# so we don't rely on it existing.
if is_new_or_newer:
self._dispatches.pop(new_dispatch.id, None)

self._deleted_dispatches[new_dispatch.id] = (
datetime.now(timezone.utc)
)

await self._update_dispatch_schedule_and_notify(
None, dispatch, next_event_timer
None, new_dispatch, next_event_timer
)

await self._lifecycle_events_tx.send(
Deleted(dispatch=dispatch)
Deleted(dispatch=new_dispatch)
)

case StreamRetrying():
Expand Down Expand Up @@ -349,6 +373,12 @@ async def _fetch(self, timer: Timer) -> None:
_logger.debug("Fetching dispatches for microgrid %s", self._microgrid_id)
async for page in self._client.list(microgrid_id=self._microgrid_id):
for client_dispatch in page:
deleted_timestamp = self._deleted_dispatches.get(client_dispatch.id)
if (
deleted_timestamp
and client_dispatch.update_time < deleted_timestamp
):
continue
dispatch = Dispatch(client_dispatch)

self._dispatches[dispatch.id] = dispatch
Expand All @@ -359,7 +389,7 @@ async def _fetch(self, timer: Timer) -> None:
dispatch, None, timer
)
await self._lifecycle_events_tx.send(Created(dispatch=dispatch))
elif dispatch.update_time != old_dispatch.update_time:
elif dispatch.update_time > old_dispatch.update_time:
Comment on lines -362 to +392
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explictly require newer time when comparing in fetch()

It would be good to specify in the commit message why this change is needed or better. Maybe it is because of the limited diff context, but it is not obvious for me.

_logger.debug("Updated dispatch: %s", dispatch)
await self._update_dispatch_schedule_and_notify(
dispatch, old_dispatch, timer
Expand Down
Loading