diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 86ebc24..75e865b 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -7,12 +7,12 @@ ## Upgrading * Two properties have been replaced by methods that require a type as parameter. - * `Dispatcher.lifecycle_events` has been replaced by the method `Dispatcher.new_lifecycle_events_receiver(self, type: str)`. - * `Dispatcher.running_status_change` has been replaced by the method `Dispatcher.new_running_state_event_receiver(self, type: str)`. + * `Dispatcher.lifecycle_events` has been replaced by the method `Dispatcher.new_lifecycle_events_receiver(self, dispatch_type: str)`. + * `Dispatcher.running_status_change` has been replaced by the method `Dispatcher.new_running_state_event_receiver(self, dispatch_type: str, unify_running_intervals: bool)`. ## New Features - +* A new feature "unify running intervals" has been added to the `Dispatcher.new_running_state_event_receiver` method. Using it, you can automatically merge & unify consecutive and overlapping dispatch start/stop events of the same type. E.g. dispatch `A` starting at 10:10 and ending at 10:30 and dispatch `B` starts at 10:30 until 11:00, with the feature enabled this would in total trigger one start event, one reconfigure event at 10:30 and one stop event at 11:00. ## Bug Fixes diff --git a/src/frequenz/dispatch/_bg_service.py b/src/frequenz/dispatch/_bg_service.py index 5ee058d..a546850 100644 --- a/src/frequenz/dispatch/_bg_service.py +++ b/src/frequenz/dispatch/_bg_service.py @@ -33,16 +33,32 @@ class DispatchScheduler(BackgroundService): @dataclass(order=True) class QueueItem: - """A queue item for the scheduled events.""" + """A queue item for the scheduled events. + + This class is used to define the order of the queue items based on the + scheduled time, whether it is a stop event and finally the dispatch ID + (for uniqueness). + """ time: datetime + priority: int + """Sort priority when the time is the same. + + Exists to make sure that when two events are scheduled at the same time, + the start event is executed first, allowing filters down the data pipeline + to consider the start event when deciding whether to execute the + stop event. + """ dispatch_id: int dispatch: Dispatch = field(compare=False) - def __init__(self, time: datetime, dispatch: Dispatch) -> None: + def __init__( + self, time: datetime, dispatch: Dispatch, stop_event: bool + ) -> None: """Initialize the queue item.""" self.time = time self.dispatch_id = dispatch.id + self.priority = int(stop_event) self.dispatch = dispatch # pylint: disable=too-many-arguments @@ -102,11 +118,19 @@ def new_lifecycle_events_receiver(self, type: str) -> Receiver[DispatchEvent]: lambda event: event.dispatch.type == type ) - async def new_running_state_event_receiver(self, type: str) -> Receiver[Dispatch]: - """Create a new receiver for running state events. + async def new_running_state_event_receiver( + self, type: str, *, unify_running_intervals: bool = True + ) -> Receiver[Dispatch]: + """Create a new receiver for running state events of the specified type. + + If `unify_running_intervals` is True, running intervals from multiple + dispatches of the same type are considered as one continuous running + period. In this mode, any stop events are ignored as long as at least + one dispatch remains active. Args: type: The type of events to receive. + unify_running_intervals: Whether to unify running intervals. Returns: A new receiver for running state status. @@ -121,6 +145,27 @@ async def new_running_state_event_receiver(self, type: str) -> Receiver[Dispatch limit=max(1, len(dispatches)) ).filter(lambda dispatch: dispatch.type == type) + if unify_running_intervals: + + def _is_type_still_running(new_dispatch: Dispatch) -> bool: + """Merge time windows of running dispatches. + + Any event that would cause a stop is filtered if at least one + dispatch of the same type is running. + """ + if new_dispatch.started: + return True + + other_dispatches_running = any( + dispatch.started + for dispatch in self._dispatches.values() + if dispatch.type == type + ) + # If no other dispatches are running, we can allow the stop event + return not other_dispatches_running + + receiver = receiver.filter(_is_type_still_running) + # Send all matching dispatches to the receiver for dispatch in dispatches: await self._send_running_state_change(dispatch) @@ -336,7 +381,10 @@ def _schedule_start(self, dispatch: Dispatch) -> None: # Schedule the next run try: if next_run := dispatch.next_run: - heappush(self._scheduled_events, self.QueueItem(next_run, dispatch)) + heappush( + self._scheduled_events, + self.QueueItem(next_run, dispatch, stop_event=False), + ) _logger.debug( "Scheduled dispatch %s to start at %s", dispatch.id, next_run ) @@ -355,7 +403,9 @@ def _schedule_stop(self, dispatch: Dispatch) -> None: if dispatch.duration and dispatch.duration > timedelta(seconds=0): until = dispatch.until assert until is not None - heappush(self._scheduled_events, self.QueueItem(until, dispatch)) + heappush( + self._scheduled_events, self.QueueItem(until, dispatch, stop_event=True) + ) _logger.debug("Scheduled dispatch %s to stop at %s", dispatch, until) def _update_changed_running_state( diff --git a/src/frequenz/dispatch/_dispatcher.py b/src/frequenz/dispatch/_dispatcher.py index 1c7baa1..512318d 100644 --- a/src/frequenz/dispatch/_dispatcher.py +++ b/src/frequenz/dispatch/_dispatcher.py @@ -200,7 +200,7 @@ def new_lifecycle_events_receiver( return self._bg_service.new_lifecycle_events_receiver(dispatch_type) async def new_running_state_event_receiver( - self, dispatch_type: str + self, dispatch_type: str, *, unify_running_intervals: bool = True ) -> Receiver[Dispatch]: """Return running state event receiver. @@ -228,10 +228,18 @@ async def new_running_state_event_receiver( - The payload changed - The dispatch was deleted + If `unify_running_intervals` is True, running intervals from multiple + dispatches of the same type are considered as one continuous running + period. In this mode, any stop events are ignored as long as at least + one dispatch remains active. + Args: dispatch_type: The type of the dispatch to listen for. + unify_running_intervals: Whether to unify running intervals. Returns: A new receiver for dispatches whose running status changed. """ - return await self._bg_service.new_running_state_event_receiver(dispatch_type) + return await self._bg_service.new_running_state_event_receiver( + dispatch_type, unify_running_intervals=unify_running_intervals + ) diff --git a/tests/test_frequenz_dispatch.py b/tests/test_frequenz_dispatch.py index dc87b64..41f4c9b 100644 --- a/tests/test_frequenz_dispatch.py +++ b/tests/test_frequenz_dispatch.py @@ -77,7 +77,7 @@ async def test_env() -> AsyncIterator[TestEnv]: service=service, lifecycle_events=service.new_lifecycle_events_receiver("TEST_TYPE"), running_state_change=await service.new_running_state_event_receiver( - "TEST_TYPE" + "TEST_TYPE", unify_running_intervals=False ), client=client, microgrid_id=microgrid_id, @@ -371,6 +371,8 @@ async def test_dispatch_schedule( done_dispatch = await test_env.running_state_change.receive() assert done_dispatch == dispatch + await asyncio.sleep(1) + async def test_dispatch_inf_duration_updated_to_finite_and_continues( test_env: TestEnv, @@ -459,6 +461,8 @@ async def test_dispatch_new_but_finished( assert await test_env.running_state_change.receive() == new_dispatch + await asyncio.sleep(1) + async def test_notification_on_actor_start( test_env: TestEnv, @@ -500,3 +504,226 @@ async def test_notification_on_actor_start( # Expect notification of the running dispatch being ready to run ready_dispatch = await test_env.running_state_change.receive() assert ready_dispatch.started + + +async def test_multiple_dispatches_unify_running_intervals( + fake_time: time_machine.Coordinates, + generator: DispatchGenerator, +) -> None: + """Test that multiple dispatches are merged into a single running interval.""" + microgrid_id = randint(1, 100) + client = FakeClient() + service = DispatchScheduler( + microgrid_id=microgrid_id, + client=client, + ) + service.start() + + receiver = await service.new_running_state_event_receiver( + "TEST_TYPE", unify_running_intervals=True + ) + + # Create two overlapping dispatches + dispatch1 = replace( + generator.generate_dispatch(), + active=True, + duration=timedelta(seconds=30), + start_time=_now() + timedelta(seconds=5), + recurrence=RecurrenceRule(), + type="TEST_TYPE", + ) + dispatch2 = replace( + generator.generate_dispatch(), + active=True, + duration=timedelta(seconds=10), + start_time=_now() + timedelta(seconds=10), # starts after dispatch1 + recurrence=RecurrenceRule(), + type="TEST_TYPE", + ) + lifecycle_events = service.new_lifecycle_events_receiver("TEST_TYPE") + + await client.create(**to_create_params(microgrid_id, dispatch1)) + await client.create(**to_create_params(microgrid_id, dispatch2)) + + # Wait for both to be registered + await lifecycle_events.receive() + await lifecycle_events.receive() + + # Move time forward to start both dispatches + fake_time.shift(timedelta(seconds=15)) + await asyncio.sleep(1) + + started1 = await receiver.receive() + started2 = await receiver.receive() + + assert started1.started + assert started2.started + + # Stop dispatch2 first, but unify_running_intervals=True means as long as dispatch1 runs, + # we do not send a stop event + await client.update( + microgrid_id=microgrid_id, dispatch_id=started2.id, new_fields={"active": False} + ) + fake_time.shift(timedelta(seconds=5)) + await asyncio.sleep(1) + + # Now stop dispatch1 as well + fake_time.shift(timedelta(seconds=15)) + await asyncio.sleep(1) + + # Now we expect a single stop event for the merged window + stopped = await receiver.receive() + assert not stopped.started + + await service.stop() + + +async def test_multiple_dispatches_sequential_intervals_unify( + fake_time: time_machine.Coordinates, + generator: DispatchGenerator, +) -> None: + """Test that multiple dispatches are merged into a single running interval. + + Even if dispatches don't overlap but are consecutive, + unify_running_intervals=True should treat them as continuous if any event tries to stop. + """ + microgrid_id = randint(1, 100) + client = FakeClient() + service = DispatchScheduler(microgrid_id=microgrid_id, client=client) + service.start() + + receiver = await service.new_running_state_event_receiver( + "TEST_TYPE", unify_running_intervals=True + ) + + dispatch1 = replace( + generator.generate_dispatch(), + active=True, + duration=timedelta(seconds=5), + start_time=_now() + timedelta(seconds=5), + recurrence=RecurrenceRule(), + type="TEST_TYPE", + ) + assert dispatch1.duration is not None + dispatch2 = replace( + generator.generate_dispatch(), + active=True, + duration=timedelta(seconds=5), + start_time=dispatch1.start_time + dispatch1.duration, + recurrence=RecurrenceRule(), + type="TEST_TYPE", + ) + lifecycle = service.new_lifecycle_events_receiver("TEST_TYPE") + + await client.create(**to_create_params(microgrid_id, dispatch1)) + await client.create(**to_create_params(microgrid_id, dispatch2)) + + # Consume lifecycle events + await lifecycle.receive() + await lifecycle.receive() + + fake_time.shift(timedelta(seconds=11)) + await asyncio.sleep(1) + started1 = await receiver.receive() + assert started1.started + + # Wait for the second dispatch to start + fake_time.shift(timedelta(seconds=3)) + await asyncio.sleep(1) + started2 = await receiver.receive() + assert started2.started + + # Now stop the second dispatch + fake_time.shift(timedelta(seconds=5)) + await asyncio.sleep(1) + stopped = await receiver.receive() + assert not stopped.started + + await service.stop() + await asyncio.sleep(1) + + +async def test_at_least_one_running_filter( + fake_time: time_machine.Coordinates, + generator: DispatchGenerator, +) -> None: + """Test scenarios directly tied to the _at_least_one_running logic.""" + microgrid_id = randint(1, 100) + client = FakeClient() + service = DispatchScheduler(microgrid_id=microgrid_id, client=client) + service.start() + + # unify_running_intervals is True, so we use merged intervals + receiver = await service.new_running_state_event_receiver( + "TEST_TYPE", unify_running_intervals=True + ) + + # Single dispatch that starts and stops normally + dispatch = replace( + generator.generate_dispatch(), + active=True, + duration=timedelta(seconds=10), + start_time=_now() + timedelta(seconds=5), + recurrence=RecurrenceRule(), + type="TEST_TYPE", + ) + lifecycle = service.new_lifecycle_events_receiver("TEST_TYPE") + await client.create(**to_create_params(microgrid_id, dispatch)) + await lifecycle.receive() + + # Move time so it starts + fake_time.shift(timedelta(seconds=6)) + await asyncio.sleep(1) + started = await receiver.receive() + assert started.started + + # Now stop it + await client.update( + microgrid_id=microgrid_id, dispatch_id=started.id, new_fields={"active": False} + ) + fake_time.shift(timedelta(seconds=2)) + await asyncio.sleep(1) + stopped = await receiver.receive() + assert not stopped.started + + # Now test scenario with multiple dispatches: one never starts, one starts and stops + dispatch_a = replace( + generator.generate_dispatch(), + active=False, + duration=timedelta(seconds=10), + start_time=_now() + timedelta(seconds=50), + recurrence=RecurrenceRule(), + type="TEST_TYPE", + ) + dispatch_b = replace( + generator.generate_dispatch(), + active=True, + duration=timedelta(seconds=5), + start_time=_now() + timedelta(seconds=5), + recurrence=RecurrenceRule(), + type="TEST_TYPE", + ) + await client.create(**to_create_params(microgrid_id, dispatch_a)) + await client.create(**to_create_params(microgrid_id, dispatch_b)) + lifecycle = service.new_lifecycle_events_receiver("TEST_TYPE") + await lifecycle.receive() + await lifecycle.receive() + + fake_time.shift(timedelta(seconds=6)) + await asyncio.sleep(1) + started_b = await receiver.receive() + assert started_b.started + + # Stop dispatch_b before dispatch_a ever becomes active + await client.update( + microgrid_id=microgrid_id, + dispatch_id=started_b.id, + new_fields={"active": False}, + ) + fake_time.shift(timedelta(seconds=2)) + await asyncio.sleep(1) + stopped_b = await receiver.receive() + assert not stopped_b.started + + # Since dispatch_a never started, no merging logic needed here. + await service.stop() diff --git a/tests/test_mananging_actor.py b/tests/test_mananging_actor.py index 7722b63..07fda2f 100644 --- a/tests/test_mananging_actor.py +++ b/tests/test_mananging_actor.py @@ -147,14 +147,40 @@ def test_heapq_dispatch_compare(test_env: TestEnv) -> None: # Push two events with the same 'until' time onto the heap heapq.heappush( scheduled_events, - DispatchScheduler.QueueItem(until_time, Dispatch(dispatch1)), + DispatchScheduler.QueueItem(until_time, Dispatch(dispatch1), True), ) heapq.heappush( scheduled_events, - DispatchScheduler.QueueItem(until_time, Dispatch(dispatch2)), + DispatchScheduler.QueueItem(until_time, Dispatch(dispatch2), True), ) +def test_heapq_dispatch_start_stop_compare(test_env: TestEnv) -> None: + """Test that the heapq compare function works.""" + dispatch1 = test_env.generator.generate_dispatch() + dispatch2 = test_env.generator.generate_dispatch() + + # Simulate two dispatches with the same 'until' time + now = datetime.now(timezone.utc) + until_time = now + timedelta(minutes=5) + + # Create the heap + scheduled_events: list[DispatchScheduler.QueueItem] = [] + + # Push two events with the same 'until' time onto the heap + heapq.heappush( + scheduled_events, + DispatchScheduler.QueueItem(until_time, Dispatch(dispatch1), stop_event=False), + ) + heapq.heappush( + scheduled_events, + DispatchScheduler.QueueItem(until_time, Dispatch(dispatch2), stop_event=True), + ) + + assert scheduled_events[0].dispatch_id == dispatch1.id + assert scheduled_events[1].dispatch_id == dispatch2.id + + async def test_dry_run(test_env: TestEnv, fake_time: time_machine.Coordinates) -> None: """Test the dry run mode.""" dispatch = test_env.generator.generate_dispatch()