Skip to content

Commit 2cc7583

Browse files
committed
Ensure that start events are executed before stop events
This allows us to filter the `stop` events when we desire to merge consecutive events. Signed-off-by: Mathias L. Baumann <[email protected]>
1 parent 9b1af27 commit 2cc7583

File tree

2 files changed

+53
-6
lines changed

2 files changed

+53
-6
lines changed

src/frequenz/dispatch/_bg_service.py

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,16 +33,32 @@ class DispatchScheduler(BackgroundService):
3333

3434
@dataclass(order=True)
3535
class QueueItem:
36-
"""A queue item for the scheduled events."""
36+
"""A queue item for the scheduled events.
37+
38+
This class is used to define the order of the queue items based on the
39+
scheduled time, whether it is a stop event and finally the dispatch ID
40+
(for uniqueness).
41+
"""
3742

3843
time: datetime
44+
is_stop_event: bool
45+
"""Whether this is a stop event.
46+
47+
Exists to make sure that when two events are scheduled at the same time,
48+
the start event is executed first, allowing filters down the data pipeline
49+
to consider the start event when deciding whether to execute the
50+
stop event.
51+
"""
3952
dispatch_id: int
4053
dispatch: Dispatch = field(compare=False)
4154

42-
def __init__(self, time: datetime, dispatch: Dispatch) -> None:
55+
def __init__(
56+
self, time: datetime, dispatch: Dispatch, stop_event: bool
57+
) -> None:
4358
"""Initialize the queue item."""
4459
self.time = time
4560
self.dispatch_id = dispatch.id
61+
self.is_stop_event = stop_event
4662
self.dispatch = dispatch
4763

4864
# pylint: disable=too-many-arguments
@@ -365,7 +381,10 @@ def _schedule_start(self, dispatch: Dispatch) -> None:
365381
# Schedule the next run
366382
try:
367383
if next_run := dispatch.next_run:
368-
heappush(self._scheduled_events, self.QueueItem(next_run, dispatch))
384+
heappush(
385+
self._scheduled_events,
386+
self.QueueItem(next_run, dispatch, stop_event=False),
387+
)
369388
_logger.debug(
370389
"Scheduled dispatch %s to start at %s", dispatch.id, next_run
371390
)
@@ -384,7 +403,9 @@ def _schedule_stop(self, dispatch: Dispatch) -> None:
384403
if dispatch.duration and dispatch.duration > timedelta(seconds=0):
385404
until = dispatch.until
386405
assert until is not None
387-
heappush(self._scheduled_events, self.QueueItem(until, dispatch))
406+
heappush(
407+
self._scheduled_events, self.QueueItem(until, dispatch, stop_event=True)
408+
)
388409
_logger.debug("Scheduled dispatch %s to stop at %s", dispatch, until)
389410

390411
def _update_changed_running_state(

tests/test_mananging_actor.py

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -147,14 +147,40 @@ def test_heapq_dispatch_compare(test_env: TestEnv) -> None:
147147
# Push two events with the same 'until' time onto the heap
148148
heapq.heappush(
149149
scheduled_events,
150-
DispatchScheduler.QueueItem(until_time, Dispatch(dispatch1)),
150+
DispatchScheduler.QueueItem(until_time, Dispatch(dispatch1), True),
151151
)
152152
heapq.heappush(
153153
scheduled_events,
154-
DispatchScheduler.QueueItem(until_time, Dispatch(dispatch2)),
154+
DispatchScheduler.QueueItem(until_time, Dispatch(dispatch2), True),
155155
)
156156

157157

158+
def test_heapq_dispatch_start_stop_compare(test_env: TestEnv) -> None:
159+
"""Test that the heapq compare function works."""
160+
dispatch1 = test_env.generator.generate_dispatch()
161+
dispatch2 = test_env.generator.generate_dispatch()
162+
163+
# Simulate two dispatches with the same 'until' time
164+
now = datetime.now(timezone.utc)
165+
until_time = now + timedelta(minutes=5)
166+
167+
# Create the heap
168+
scheduled_events: list[DispatchScheduler.QueueItem] = []
169+
170+
# Push two events with the same 'until' time onto the heap
171+
heapq.heappush(
172+
scheduled_events,
173+
DispatchScheduler.QueueItem(until_time, Dispatch(dispatch1), stop_event=False),
174+
)
175+
heapq.heappush(
176+
scheduled_events,
177+
DispatchScheduler.QueueItem(until_time, Dispatch(dispatch2), stop_event=True),
178+
)
179+
180+
assert scheduled_events[0].dispatch_id == dispatch1.id
181+
assert scheduled_events[1].dispatch_id == dispatch2.id
182+
183+
158184
async def test_dry_run(test_env: TestEnv, fake_time: time_machine.Coordinates) -> None:
159185
"""Test the dry run mode."""
160186
dispatch = test_env.generator.generate_dispatch()

0 commit comments

Comments
 (0)