Skip to content

Commit cb74e80

Browse files
authored
Fetch only ongoing dispatches (#195)
Based on frequenz-floss/frequenz-client-dispatch-python#212
2 parents 519ae09 + c36017b commit cb74e80

File tree

4 files changed

+57
-9
lines changed

4 files changed

+57
-9
lines changed

RELEASE_NOTES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
* Two new parameters were added to the `Dispatcher` constructor:
1616
* `sign_secret`: A secret key used for signing messages.
1717
* `auth_key`: An authentication key for the Dispatch API.
18+
* `Dispatcher` now only fetches ongoing dispatches, excluding completed ones, to optimize performance and relevance.
1819

1920
## Bug Fixes
2021

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ dependencies = [
4040
# plugins.mkdocstrings.handlers.python.import)
4141
"frequenz-sdk >= 1.0.0-rc2100, < 1.0.0-rc2200",
4242
"frequenz-channels >= 1.6.1, < 2.0.0",
43-
"frequenz-client-dispatch >= 0.11.2, < 0.12.0",
43+
"frequenz-client-dispatch >= 0.11.3, < 0.12.0",
4444
"frequenz-client-base >= 0.11.0, < 0.12.0",
4545
]
4646
dynamic = ["version"]

src/frequenz/dispatch/_bg_service.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -365,19 +365,26 @@ async def _fetch(self, timer: Timer) -> None:
365365
if the connection was lost.
366366
"""
367367
self._initial_fetch_event.clear()
368+
# We fetch dispatches that would have ended 5 seconds ago to
369+
# avoid missing any updates that happened while we were disconnected.
370+
past_time_buffer = timedelta(seconds=5)
368371

369372
new_dispatches = {}
370373

371374
try:
372375
_logger.debug("Fetching dispatches for microgrid %s", self._microgrid_id)
373-
async for page in self._client.list(microgrid_id=self._microgrid_id):
376+
now = datetime.now(timezone.utc)
377+
async for page in self._client.list(
378+
microgrid_id=self._microgrid_id, end_from=now - past_time_buffer
379+
):
374380
for client_dispatch in page:
375381
deleted_timestamp = self._deleted_dispatches.get(client_dispatch.id)
376382
if (
377383
deleted_timestamp
378384
and client_dispatch.update_time < deleted_timestamp
379385
):
380386
continue
387+
381388
dispatch = Dispatch(client_dispatch)
382389

383390
new_dispatches[dispatch.id] = dispatch

tests/test_frequenz_dispatch.py

Lines changed: 47 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -476,9 +476,7 @@ async def test_dispatch_new_but_finished(
476476
)
477477
fake_time.shift(timedelta(seconds=1))
478478

479-
await asyncio.sleep(1)
480-
# Process the lifecycle event caused by the old dispatch at startup
481-
await test_env.lifecycle_events.receive()
479+
# Expecting no event for a past and finished dispatch as it should be filtered out
482480

483481
# Create another dispatch the normal way
484482
new_dispatch = generator.generate_dispatch()
@@ -514,7 +512,7 @@ async def test_notification_on_actor_start(
514512
running_dispatch,
515513
active=True,
516514
duration=timedelta(seconds=10),
517-
start_time=_now() - timedelta(seconds=5),
515+
start_time=_now() + timedelta(seconds=5),
518516
recurrence=RecurrenceRule(),
519517
type="TEST_TYPE",
520518
)
@@ -524,7 +522,7 @@ async def test_notification_on_actor_start(
524522
stopped_dispatch,
525523
active=False,
526524
duration=timedelta(seconds=5),
527-
start_time=_now() - timedelta(seconds=5),
525+
start_time=_now() + timedelta(seconds=5),
528526
recurrence=RecurrenceRule(),
529527
type="TEST_TYPE",
530528
)
@@ -536,8 +534,50 @@ async def test_notification_on_actor_start(
536534
)
537535
test_env.service.start()
538536

539-
fake_time.shift(timedelta(seconds=1))
540-
await asyncio.sleep(1)
537+
fake_time.shift(timedelta(seconds=7))
538+
await asyncio.sleep(6)
539+
540+
# Expect notification of the running dispatch being ready to run
541+
ready_dispatch = await test_env.running_state_change.receive()
542+
assert ready_dispatch.started
543+
544+
545+
async def test_notification_on_actor_start_inf_duration(
546+
test_env: _TestEnv,
547+
generator: DispatchGenerator,
548+
fake_time: time_machine.Coordinates,
549+
) -> None:
550+
"""Test that the actor sends notifications for indefinite dispatches on start."""
551+
# Generate a dispatch that is already running
552+
running_dispatch = generator.generate_dispatch()
553+
running_dispatch = replace(
554+
running_dispatch,
555+
active=True,
556+
duration=None,
557+
start_time=_now() + timedelta(seconds=5),
558+
recurrence=RecurrenceRule(),
559+
type="TEST_TYPE",
560+
)
561+
# Generate a dispatch that is not running
562+
stopped_dispatch = generator.generate_dispatch()
563+
stopped_dispatch = replace(
564+
stopped_dispatch,
565+
active=False,
566+
duration=timedelta(seconds=5),
567+
start_time=_now() + timedelta(seconds=5),
568+
recurrence=RecurrenceRule(),
569+
type="TEST_TYPE",
570+
)
571+
await test_env.service.stop()
572+
573+
# Create the dispatches
574+
test_env.client.set_dispatches(
575+
test_env.microgrid_id, [running_dispatch, stopped_dispatch]
576+
)
577+
test_env.service.start()
578+
579+
fake_time.shift(timedelta(seconds=7))
580+
await asyncio.sleep(6)
541581

542582
# Expect notification of the running dispatch being ready to run
543583
ready_dispatch = await test_env.running_state_change.receive()

0 commit comments

Comments
 (0)