Skip to content

Commit bf60a02

Browse files
committed
Ensure that start events are executed before stop events
This allows us to filter the `stop` events when we desire to merge consecutive events. Signed-off-by: Mathias L. Baumann <[email protected]>
1 parent 9b1af27 commit bf60a02

File tree

2 files changed

+47
-5
lines changed

2 files changed

+47
-5
lines changed

src/frequenz/dispatch/_bg_service.py

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,24 @@ class QueueItem:
3636
"""A queue item for the scheduled events."""
3737

3838
time: datetime
39+
stop_event: bool
40+
"""Whether this is a stop event.
41+
42+
Exists to make sure that when two events are scheduled at the same time,
43+
the start event is executed first, allowing filters down the data pipeline
44+
to consider the start event when deciding whether to execute the
45+
stop event.
46+
"""
3947
dispatch_id: int
4048
dispatch: Dispatch = field(compare=False)
4149

42-
def __init__(self, time: datetime, dispatch: Dispatch) -> None:
50+
def __init__(
51+
self, time: datetime, dispatch: Dispatch, stop_event: bool
52+
) -> None:
4353
"""Initialize the queue item."""
4454
self.time = time
4555
self.dispatch_id = dispatch.id
56+
self.stop_event = stop_event
4657
self.dispatch = dispatch
4758

4859
# pylint: disable=too-many-arguments
@@ -365,7 +376,10 @@ def _schedule_start(self, dispatch: Dispatch) -> None:
365376
# Schedule the next run
366377
try:
367378
if next_run := dispatch.next_run:
368-
heappush(self._scheduled_events, self.QueueItem(next_run, dispatch))
379+
heappush(
380+
self._scheduled_events,
381+
self.QueueItem(next_run, dispatch, stop_event=False),
382+
)
369383
_logger.debug(
370384
"Scheduled dispatch %s to start at %s", dispatch.id, next_run
371385
)
@@ -384,7 +398,9 @@ def _schedule_stop(self, dispatch: Dispatch) -> None:
384398
if dispatch.duration and dispatch.duration > timedelta(seconds=0):
385399
until = dispatch.until
386400
assert until is not None
387-
heappush(self._scheduled_events, self.QueueItem(until, dispatch))
401+
heappush(
402+
self._scheduled_events, self.QueueItem(until, dispatch, stop_event=True)
403+
)
388404
_logger.debug("Scheduled dispatch %s to stop at %s", dispatch, until)
389405

390406
def _update_changed_running_state(

tests/test_mananging_actor.py

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -147,14 +147,40 @@ def test_heapq_dispatch_compare(test_env: TestEnv) -> None:
147147
# Push two events with the same 'until' time onto the heap
148148
heapq.heappush(
149149
scheduled_events,
150-
DispatchScheduler.QueueItem(until_time, Dispatch(dispatch1)),
150+
DispatchScheduler.QueueItem(until_time, Dispatch(dispatch1), True),
151151
)
152152
heapq.heappush(
153153
scheduled_events,
154-
DispatchScheduler.QueueItem(until_time, Dispatch(dispatch2)),
154+
DispatchScheduler.QueueItem(until_time, Dispatch(dispatch2), True),
155155
)
156156

157157

158+
def test_heapq_dispatch_start_stop_compare(test_env: TestEnv) -> None:
159+
"""Test that the heapq compare function works."""
160+
dispatch1 = test_env.generator.generate_dispatch()
161+
dispatch2 = test_env.generator.generate_dispatch()
162+
163+
# Simulate two dispatches with the same 'until' time
164+
now = datetime.now(timezone.utc)
165+
until_time = now + timedelta(minutes=5)
166+
167+
# Create the heap
168+
scheduled_events: list[DispatchScheduler.QueueItem] = []
169+
170+
# Push two events with the same 'until' time onto the heap
171+
heapq.heappush(
172+
scheduled_events,
173+
DispatchScheduler.QueueItem(until_time, Dispatch(dispatch1), stop_event=False),
174+
)
175+
heapq.heappush(
176+
scheduled_events,
177+
DispatchScheduler.QueueItem(until_time, Dispatch(dispatch2), stop_event=True),
178+
)
179+
180+
assert scheduled_events[0].dispatch_id == dispatch1.id
181+
assert scheduled_events[1].dispatch_id == dispatch2.id
182+
183+
158184
async def test_dry_run(test_env: TestEnv, fake_time: time_machine.Coordinates) -> None:
159185
"""Test the dry run mode."""
160186
dispatch = test_env.generator.generate_dispatch()

0 commit comments

Comments
 (0)