Skip to content

Commit aef44b0

Browse files
committed
Ensure that start events are executed before stop events
This allows us to filter the `stop` events when we desire to merge consecutive events. Signed-off-by: Mathias L. Baumann <[email protected]>
1 parent 9b1af27 commit aef44b0

File tree

3 files changed

+56
-9
lines changed

3 files changed

+56
-9
lines changed

src/frequenz/dispatch/_bg_service.py

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,16 +33,32 @@ class DispatchScheduler(BackgroundService):
3333

3434
@dataclass(order=True)
3535
class QueueItem:
36-
"""A queue item for the scheduled events."""
36+
"""A queue item for the scheduled events.
37+
38+
This class is used to define the order of the queue items based on the
39+
scheduled time, whether it is a stop event and finally the dispatch ID
40+
(for uniqueness).
41+
"""
3742

3843
time: datetime
44+
priority: int
45+
"""Sort priority when the time is the same.
46+
47+
Exists to make sure that when two events are scheduled at the same time,
48+
the start event is executed first, allowing filters down the data pipeline
49+
to consider the start event when deciding whether to execute the
50+
stop event.
51+
"""
3952
dispatch_id: int
4053
dispatch: Dispatch = field(compare=False)
4154

42-
def __init__(self, time: datetime, dispatch: Dispatch) -> None:
55+
def __init__(
56+
self, time: datetime, dispatch: Dispatch, stop_event: bool
57+
) -> None:
4358
"""Initialize the queue item."""
4459
self.time = time
4560
self.dispatch_id = dispatch.id
61+
self.priority = int(stop_event)
4662
self.dispatch = dispatch
4763

4864
# pylint: disable=too-many-arguments
@@ -103,7 +119,7 @@ def new_lifecycle_events_receiver(self, type: str) -> Receiver[DispatchEvent]:
103119
)
104120

105121
async def new_running_state_event_receiver(
106-
self, type: str, unify_running_intervals: bool = True
122+
self, type: str, *, unify_running_intervals: bool = True
107123
) -> Receiver[Dispatch]:
108124
"""Create a new receiver for running state events of the specified type.
109125
@@ -365,7 +381,10 @@ def _schedule_start(self, dispatch: Dispatch) -> None:
365381
# Schedule the next run
366382
try:
367383
if next_run := dispatch.next_run:
368-
heappush(self._scheduled_events, self.QueueItem(next_run, dispatch))
384+
heappush(
385+
self._scheduled_events,
386+
self.QueueItem(next_run, dispatch, stop_event=False),
387+
)
369388
_logger.debug(
370389
"Scheduled dispatch %s to start at %s", dispatch.id, next_run
371390
)
@@ -384,7 +403,9 @@ def _schedule_stop(self, dispatch: Dispatch) -> None:
384403
if dispatch.duration and dispatch.duration > timedelta(seconds=0):
385404
until = dispatch.until
386405
assert until is not None
387-
heappush(self._scheduled_events, self.QueueItem(until, dispatch))
406+
heappush(
407+
self._scheduled_events, self.QueueItem(until, dispatch, stop_event=True)
408+
)
388409
_logger.debug("Scheduled dispatch %s to stop at %s", dispatch, until)
389410

390411
def _update_changed_running_state(

src/frequenz/dispatch/_dispatcher.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ def new_lifecycle_events_receiver(
200200
return self._bg_service.new_lifecycle_events_receiver(dispatch_type)
201201

202202
async def new_running_state_event_receiver(
203-
self, dispatch_type: str, unify_running_intervals: bool = True
203+
self, dispatch_type: str, *, unify_running_intervals: bool = True
204204
) -> Receiver[Dispatch]:
205205
"""Return running state event receiver.
206206
@@ -241,5 +241,5 @@ async def new_running_state_event_receiver(
241241
A new receiver for dispatches whose running status changed.
242242
"""
243243
return await self._bg_service.new_running_state_event_receiver(
244-
dispatch_type, unify_running_intervals
244+
dispatch_type, unify_running_intervals=unify_running_intervals
245245
)

tests/test_mananging_actor.py

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -147,14 +147,40 @@ def test_heapq_dispatch_compare(test_env: TestEnv) -> None:
147147
# Push two events with the same 'until' time onto the heap
148148
heapq.heappush(
149149
scheduled_events,
150-
DispatchScheduler.QueueItem(until_time, Dispatch(dispatch1)),
150+
DispatchScheduler.QueueItem(until_time, Dispatch(dispatch1), True),
151151
)
152152
heapq.heappush(
153153
scheduled_events,
154-
DispatchScheduler.QueueItem(until_time, Dispatch(dispatch2)),
154+
DispatchScheduler.QueueItem(until_time, Dispatch(dispatch2), True),
155155
)
156156

157157

158+
def test_heapq_dispatch_start_stop_compare(test_env: TestEnv) -> None:
159+
"""Test that the heapq compare function works."""
160+
dispatch1 = test_env.generator.generate_dispatch()
161+
dispatch2 = test_env.generator.generate_dispatch()
162+
163+
# Simulate two dispatches with the same 'until' time
164+
now = datetime.now(timezone.utc)
165+
until_time = now + timedelta(minutes=5)
166+
167+
# Create the heap
168+
scheduled_events: list[DispatchScheduler.QueueItem] = []
169+
170+
# Push two events with the same 'until' time onto the heap
171+
heapq.heappush(
172+
scheduled_events,
173+
DispatchScheduler.QueueItem(until_time, Dispatch(dispatch1), stop_event=False),
174+
)
175+
heapq.heappush(
176+
scheduled_events,
177+
DispatchScheduler.QueueItem(until_time, Dispatch(dispatch2), stop_event=True),
178+
)
179+
180+
assert scheduled_events[0].dispatch_id == dispatch1.id
181+
assert scheduled_events[1].dispatch_id == dispatch2.id
182+
183+
158184
async def test_dry_run(test_env: TestEnv, fake_time: time_machine.Coordinates) -> None:
159185
"""Test the dry run mode."""
160186
dispatch = test_env.generator.generate_dispatch()

0 commit comments

Comments
 (0)