Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* Two new parameters were added to the `Dispatcher` constructor:
* `sign_secret`: A secret key used for signing messages.
* `auth_key`: An authentication key for the Dispatch API.
* `Dispatcher` now only fetches ongoing dispatches, excluding completed ones, to optimize performance and relevance.

## Bug Fixes

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ dependencies = [
# plugins.mkdocstrings.handlers.python.import)
"frequenz-sdk >= 1.0.0-rc2100, < 1.0.0-rc2200",
"frequenz-channels >= 1.6.1, < 2.0.0",
"frequenz-client-dispatch >= 0.11.2, < 0.12.0",
"frequenz-client-dispatch >= 0.11.3, < 0.12.0",
"frequenz-client-base >= 0.11.0, < 0.12.0",
]
dynamic = ["version"]
Expand Down
9 changes: 8 additions & 1 deletion src/frequenz/dispatch/_bg_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,19 +365,26 @@ async def _fetch(self, timer: Timer) -> None:
if the connection was lost.
"""
self._initial_fetch_event.clear()
# We fetch dispatches that would have ended 5 seconds ago to
# avoid missing any updates that happened while we were disconnected.
past_time_buffer = timedelta(seconds=5)

new_dispatches = {}

try:
_logger.debug("Fetching dispatches for microgrid %s", self._microgrid_id)
async for page in self._client.list(microgrid_id=self._microgrid_id):
now = datetime.now(timezone.utc)
async for page in self._client.list(
microgrid_id=self._microgrid_id, end_from=now - past_time_buffer
):
for client_dispatch in page:
deleted_timestamp = self._deleted_dispatches.get(client_dispatch.id)
if (
deleted_timestamp
and client_dispatch.update_time < deleted_timestamp
):
continue

dispatch = Dispatch(client_dispatch)

new_dispatches[dispatch.id] = dispatch
Expand Down
54 changes: 47 additions & 7 deletions tests/test_frequenz_dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -476,9 +476,7 @@ async def test_dispatch_new_but_finished(
)
fake_time.shift(timedelta(seconds=1))

await asyncio.sleep(1)
# Process the lifecycle event caused by the old dispatch at startup
await test_env.lifecycle_events.receive()
# Expecting no event for a past and finished dispatch as it should be filtered out

# Create another dispatch the normal way
new_dispatch = generator.generate_dispatch()
Expand Down Expand Up @@ -514,7 +512,7 @@ async def test_notification_on_actor_start(
running_dispatch,
active=True,
duration=timedelta(seconds=10),
start_time=_now() - timedelta(seconds=5),
start_time=_now() + timedelta(seconds=5),
recurrence=RecurrenceRule(),
type="TEST_TYPE",
)
Expand All @@ -524,7 +522,7 @@ async def test_notification_on_actor_start(
stopped_dispatch,
active=False,
duration=timedelta(seconds=5),
start_time=_now() - timedelta(seconds=5),
start_time=_now() + timedelta(seconds=5),
recurrence=RecurrenceRule(),
type="TEST_TYPE",
)
Expand All @@ -536,8 +534,50 @@ async def test_notification_on_actor_start(
)
test_env.service.start()

fake_time.shift(timedelta(seconds=1))
await asyncio.sleep(1)
fake_time.shift(timedelta(seconds=7))
await asyncio.sleep(6)

# Expect notification of the running dispatch being ready to run
ready_dispatch = await test_env.running_state_change.receive()
assert ready_dispatch.started


async def test_notification_on_actor_start_inf_duration(
test_env: _TestEnv,
generator: DispatchGenerator,
fake_time: time_machine.Coordinates,
) -> None:
"""Test that the actor sends notifications for indefinite dispatches on start."""
# Generate a dispatch that is already running
running_dispatch = generator.generate_dispatch()
running_dispatch = replace(
running_dispatch,
active=True,
duration=None,
start_time=_now() + timedelta(seconds=5),
recurrence=RecurrenceRule(),
type="TEST_TYPE",
)
# Generate a dispatch that is not running
stopped_dispatch = generator.generate_dispatch()
stopped_dispatch = replace(
stopped_dispatch,
active=False,
duration=timedelta(seconds=5),
start_time=_now() + timedelta(seconds=5),
recurrence=RecurrenceRule(),
type="TEST_TYPE",
)
await test_env.service.stop()

# Create the dispatches
test_env.client.set_dispatches(
test_env.microgrid_id, [running_dispatch, stopped_dispatch]
)
test_env.service.start()

fake_time.shift(timedelta(seconds=7))
await asyncio.sleep(6)

# Expect notification of the running dispatch being ready to run
ready_dispatch = await test_env.running_state_change.receive()
Expand Down