Skip to content

Commit 3538405

Browse files
committed
Only fetch relevant dispatches, excluding past ones
Signed-off-by: Mathias L. Baumann <[email protected]>
1 parent 519ae09 commit 3538405

File tree

4 files changed

+21
-9
lines changed

4 files changed

+21
-9
lines changed

RELEASE_NOTES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
* Two new parameters were added to the `Dispatcher` constructor:
1616
* `sign_secret`: A secret key used for signing messages.
1717
* `auth_key`: An authentication key for the Dispatch API.
18+
* `Dispatcher` now only fetches ongoing dispatches, excluding completed ones, to optimize performance and relevance.
1819

1920
## Bug Fixes
2021

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ dependencies = [
4040
# plugins.mkdocstrings.handlers.python.import)
4141
"frequenz-sdk >= 1.0.0-rc2100, < 1.0.0-rc2200",
4242
"frequenz-channels >= 1.6.1, < 2.0.0",
43-
"frequenz-client-dispatch >= 0.11.2, < 0.12.0",
43+
"frequenz-client-dispatch >= 0.11.3, < 0.12.0",
4444
"frequenz-client-base >= 0.11.0, < 0.12.0",
4545
]
4646
dynamic = ["version"]

src/frequenz/dispatch/_bg_service.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -370,14 +370,18 @@ async def _fetch(self, timer: Timer) -> None:
370370

371371
try:
372372
_logger.debug("Fetching dispatches for microgrid %s", self._microgrid_id)
373-
async for page in self._client.list(microgrid_id=self._microgrid_id):
373+
now = datetime.now(timezone.utc)
374+
async for page in self._client.list(
375+
microgrid_id=self._microgrid_id, end_from=now - timedelta(seconds=5)
376+
):
374377
for client_dispatch in page:
375378
deleted_timestamp = self._deleted_dispatches.get(client_dispatch.id)
376379
if (
377380
deleted_timestamp
378381
and client_dispatch.update_time < deleted_timestamp
379382
):
380383
continue
384+
381385
dispatch = Dispatch(client_dispatch)
382386

383387
new_dispatches[dispatch.id] = dispatch

tests/test_frequenz_dispatch.py

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
"""Tests for the frequenz.dispatch.actor package."""
55

66
import asyncio
7+
import logging
78
from dataclasses import dataclass, replace
89
from datetime import datetime, timedelta, timezone
910
from random import randint
@@ -476,9 +477,7 @@ async def test_dispatch_new_but_finished(
476477
)
477478
fake_time.shift(timedelta(seconds=1))
478479

479-
await asyncio.sleep(1)
480-
# Process the lifecycle event caused by the old dispatch at startup
481-
await test_env.lifecycle_events.receive()
480+
# Expecting no event for a past and finished dispatch as it should be filtered out
482481

483482
# Create another dispatch the normal way
484483
new_dispatch = generator.generate_dispatch()
@@ -514,7 +513,7 @@ async def test_notification_on_actor_start(
514513
running_dispatch,
515514
active=True,
516515
duration=timedelta(seconds=10),
517-
start_time=_now() - timedelta(seconds=5),
516+
start_time=_now() + timedelta(seconds=5),
518517
recurrence=RecurrenceRule(),
519518
type="TEST_TYPE",
520519
)
@@ -524,23 +523,31 @@ async def test_notification_on_actor_start(
524523
stopped_dispatch,
525524
active=False,
526525
duration=timedelta(seconds=5),
527-
start_time=_now() - timedelta(seconds=5),
526+
start_time=_now() + timedelta(seconds=5),
528527
recurrence=RecurrenceRule(),
529528
type="TEST_TYPE",
530529
)
530+
logging.warning("Running dispatch: %s", running_dispatch)
531+
logging.warning("Stopped dispatch: %s", stopped_dispatch)
532+
531533
await test_env.service.stop()
532534

535+
logging.warning("Restarting service")
533536
# Create the dispatches
534537
test_env.client.set_dispatches(
535538
test_env.microgrid_id, [running_dispatch, stopped_dispatch]
536539
)
540+
logging.warning("Dispatches set in client")
537541
test_env.service.start()
542+
logging.warning("Service started")
538543

539-
fake_time.shift(timedelta(seconds=1))
540-
await asyncio.sleep(1)
544+
fake_time.shift(timedelta(seconds=7))
545+
await asyncio.sleep(6)
546+
logging.warning("Time shifted")
541547

542548
# Expect notification of the running dispatch being ready to run
543549
ready_dispatch = await test_env.running_state_change.receive()
550+
logging.warning("Received running state change: %s", ready_dispatch)
544551
assert ready_dispatch.started
545552

546553

0 commit comments

Comments
 (0)