diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 090a007..0fc1f87 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -15,6 +15,7 @@ * Two new parameters were added to the `Dispatcher` constructor: * `sign_secret`: A secret key used for signing messages. * `auth_key`: An authentication key for the Dispatch API. +* `Dispatcher` now only fetches ongoing dispatches, excluding completed ones, to optimize performance and relevance. ## Bug Fixes diff --git a/pyproject.toml b/pyproject.toml index 1039528..aa2d742 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -40,7 +40,7 @@ dependencies = [ # plugins.mkdocstrings.handlers.python.import) "frequenz-sdk >= 1.0.0-rc2100, < 1.0.0-rc2200", "frequenz-channels >= 1.6.1, < 2.0.0", - "frequenz-client-dispatch >= 0.11.2, < 0.12.0", + "frequenz-client-dispatch >= 0.11.3, < 0.12.0", "frequenz-client-base >= 0.11.0, < 0.12.0", ] dynamic = ["version"] diff --git a/src/frequenz/dispatch/_bg_service.py b/src/frequenz/dispatch/_bg_service.py index 9556c6c..f5697ea 100644 --- a/src/frequenz/dispatch/_bg_service.py +++ b/src/frequenz/dispatch/_bg_service.py @@ -365,12 +365,18 @@ async def _fetch(self, timer: Timer) -> None: if the connection was lost. """ self._initial_fetch_event.clear() + # We fetch dispatches that would have ended 5 seconds ago to + # avoid missing any updates that happened while we were disconnected. + past_time_buffer = timedelta(seconds=5) new_dispatches = {} try: _logger.debug("Fetching dispatches for microgrid %s", self._microgrid_id) - async for page in self._client.list(microgrid_id=self._microgrid_id): + now = datetime.now(timezone.utc) + async for page in self._client.list( + microgrid_id=self._microgrid_id, end_from=now - past_time_buffer + ): for client_dispatch in page: deleted_timestamp = self._deleted_dispatches.get(client_dispatch.id) if ( @@ -378,6 +384,7 @@ async def _fetch(self, timer: Timer) -> None: and client_dispatch.update_time < deleted_timestamp ): continue + dispatch = Dispatch(client_dispatch) new_dispatches[dispatch.id] = dispatch diff --git a/tests/test_frequenz_dispatch.py b/tests/test_frequenz_dispatch.py index d17239f..9dcc299 100644 --- a/tests/test_frequenz_dispatch.py +++ b/tests/test_frequenz_dispatch.py @@ -476,9 +476,7 @@ async def test_dispatch_new_but_finished( ) fake_time.shift(timedelta(seconds=1)) - await asyncio.sleep(1) - # Process the lifecycle event caused by the old dispatch at startup - await test_env.lifecycle_events.receive() + # Expecting no event for a past and finished dispatch as it should be filtered out # Create another dispatch the normal way new_dispatch = generator.generate_dispatch() @@ -514,7 +512,7 @@ async def test_notification_on_actor_start( running_dispatch, active=True, duration=timedelta(seconds=10), - start_time=_now() - timedelta(seconds=5), + start_time=_now() + timedelta(seconds=5), recurrence=RecurrenceRule(), type="TEST_TYPE", ) @@ -524,7 +522,7 @@ async def test_notification_on_actor_start( stopped_dispatch, active=False, duration=timedelta(seconds=5), - start_time=_now() - timedelta(seconds=5), + start_time=_now() + timedelta(seconds=5), recurrence=RecurrenceRule(), type="TEST_TYPE", ) @@ -536,8 +534,50 @@ async def test_notification_on_actor_start( ) test_env.service.start() - fake_time.shift(timedelta(seconds=1)) - await asyncio.sleep(1) + fake_time.shift(timedelta(seconds=7)) + await asyncio.sleep(6) + + # Expect notification of the running dispatch being ready to run + ready_dispatch = await test_env.running_state_change.receive() + assert ready_dispatch.started + + +async def test_notification_on_actor_start_inf_duration( + test_env: _TestEnv, + generator: DispatchGenerator, + fake_time: time_machine.Coordinates, +) -> None: + """Test that the actor sends notifications for indefinite dispatches on start.""" + # Generate a dispatch that is already running + running_dispatch = generator.generate_dispatch() + running_dispatch = replace( + running_dispatch, + active=True, + duration=None, + start_time=_now() + timedelta(seconds=5), + recurrence=RecurrenceRule(), + type="TEST_TYPE", + ) + # Generate a dispatch that is not running + stopped_dispatch = generator.generate_dispatch() + stopped_dispatch = replace( + stopped_dispatch, + active=False, + duration=timedelta(seconds=5), + start_time=_now() + timedelta(seconds=5), + recurrence=RecurrenceRule(), + type="TEST_TYPE", + ) + await test_env.service.stop() + + # Create the dispatches + test_env.client.set_dispatches( + test_env.microgrid_id, [running_dispatch, stopped_dispatch] + ) + test_env.service.start() + + fake_time.shift(timedelta(seconds=7)) + await asyncio.sleep(6) # Expect notification of the running dispatch being ready to run ready_dispatch = await test_env.running_state_change.receive()