Skip to content

Commit 2ab7437

Browse files
committed
Fix exception when two dispatches have equal start time
The heapq will first look at the given time to compare, but when it is identical it will look at the attached dispatch objects for comparison. But those just aren't ready for so much responsibility. UNTIL NOW! Cudos to Merlin for finding this bug! Signed-off-by: Mathias L. Baumann <[email protected]>
1 parent 633cc73 commit 2ab7437

File tree

4 files changed

+63
-11
lines changed

4 files changed

+63
-11
lines changed

RELEASE_NOTES.md

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,16 @@
22

33
## Summary
44

5-
This is a hot fix for recurrence not working
5+
<!-- Here goes a general summary of what this release is about -->
6+
7+
## Upgrading
8+
9+
<!-- Here goes notes on how to upgrade from previous versions, including deprecations and what they should be replaced with -->
10+
11+
## New Features
12+
13+
<!-- Here goes the main new features and examples or instructions on how to use them -->
14+
15+
## Bug Fixes
16+
17+
* Fixed a crash in the `DispatchManagingActor` when dispatches shared an equal start time.

src/frequenz/dispatch/_dispatch.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from dataclasses import dataclass
99
from datetime import datetime, timezone
1010
from enum import Enum
11-
from typing import Iterator, cast
11+
from typing import Any, Iterator, Self, cast
1212

1313
from dateutil import rrule
1414
from frequenz.client.dispatch.types import Dispatch as BaseDispatch

src/frequenz/dispatch/actor.py

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
"""The dispatch actor."""
55

66
import logging
7+
from dataclasses import dataclass, field
78
from datetime import datetime, timedelta, timezone
89
from heapq import heappop, heappush
910

@@ -30,6 +31,18 @@ class DispatchingActor(Actor):
3031
dispatches as necessary.
3132
"""
3233

34+
@dataclass(order=True)
35+
class QueueItem:
36+
"""A queue item for the scheduled events."""
37+
38+
priority: tuple[datetime, int]
39+
dispatch: Dispatch = field(compare=False)
40+
41+
def __init__(self, time: datetime, dispatch: Dispatch) -> None:
42+
"""Initialize the queue item."""
43+
self.priority = (time, dispatch.id)
44+
self.dispatch = dispatch
45+
3346
# pylint: disable=too-many-arguments
3447
def __init__(
3548
self,
@@ -61,7 +74,7 @@ def __init__(
6174
Interval is chosen arbitrarily, as it will be reset on the first event.
6275
"""
6376

64-
self._scheduled_events: list[tuple[datetime, Dispatch]] = []
77+
self._scheduled_events: list["DispatchingActor.QueueItem"] = []
6578
"""The scheduled events, sorted by time.
6679
6780
Each event is a tuple of the scheduled time and the dispatch.
@@ -84,9 +97,11 @@ async def _run(self) -> None:
8497
if not self._scheduled_events:
8598
continue
8699
_logger.debug(
87-
"Executing scheduled event: %s", self._scheduled_events[0][1]
100+
"Executing scheduled event: %s", self._scheduled_events[0].dispatch
101+
)
102+
await self._execute_scheduled_event(
103+
heappop(self._scheduled_events).dispatch
88104
)
89-
await self._execute_scheduled_event(heappop(self._scheduled_events)[1])
90105
elif selected_from(selected, stream):
91106
_logger.debug("Received dispatch event: %s", selected.message)
92107
dispatch = Dispatch(selected.message.dispatch)
@@ -243,9 +258,11 @@ async def _update_dispatch_schedule_and_notify(
243258
def _update_timer(self) -> None:
244259
"""Update the timer to the next event."""
245260
if self._scheduled_events:
246-
due_at: datetime = self._scheduled_events[0][0]
261+
due_at: datetime = self._scheduled_events[0].priority[0]
247262
self._next_event_timer.reset(interval=due_at - datetime.now(timezone.utc))
248-
_logger.debug("Next event scheduled at %s", self._scheduled_events[0][0])
263+
_logger.debug(
264+
"Next event scheduled at %s", self._scheduled_events[0].priority[0]
265+
)
249266

250267
def _remove_scheduled(self, dispatch: Dispatch) -> bool:
251268
"""Remove a dispatch from the scheduled events.
@@ -256,8 +273,8 @@ def _remove_scheduled(self, dispatch: Dispatch) -> bool:
256273
Returns:
257274
True if the dispatch was found and removed, False otherwise.
258275
"""
259-
for idx, (_, sched_dispatch) in enumerate(self._scheduled_events):
260-
if dispatch.id == sched_dispatch.id:
276+
for idx, item in enumerate(self._scheduled_events):
277+
if dispatch.id == item.dispatch.id:
261278
self._scheduled_events.pop(idx)
262279
return True
263280

@@ -276,7 +293,7 @@ def _schedule_start(self, dispatch: Dispatch) -> None:
276293
# Schedule the next run
277294
try:
278295
if next_run := dispatch.next_run:
279-
heappush(self._scheduled_events, (next_run, dispatch))
296+
heappush(self._scheduled_events, self.QueueItem(next_run, dispatch))
280297
_logger.debug(
281298
"Scheduled dispatch %s to start at %s", dispatch.id, next_run
282299
)
@@ -295,7 +312,7 @@ def _schedule_stop(self, dispatch: Dispatch) -> None:
295312
if dispatch.duration and dispatch.duration > timedelta(seconds=0):
296313
until = dispatch.until
297314
assert until is not None
298-
heappush(self._scheduled_events, (until, dispatch))
315+
heappush(self._scheduled_events, self.QueueItem(until, dispatch))
299316
_logger.debug("Scheduled dispatch %s to stop at %s", dispatch, until)
300317

301318
def _update_changed_running_state(

tests/test_mananging_actor.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
"""Test the dispatch runner."""
55

66
import asyncio
7+
import heapq
78
from dataclasses import dataclass, replace
89
from datetime import datetime, timedelta, timezone
910
from typing import AsyncIterator, Iterator
@@ -17,6 +18,7 @@
1718
from pytest import fixture
1819

1920
from frequenz.dispatch import Dispatch, DispatchManagingActor, DispatchUpdate
21+
from frequenz.dispatch.actor import DispatchingActor
2022

2123

2224
@fixture
@@ -128,6 +130,27 @@ async def test_simple_start_stop(
128130
assert test_env.actor.is_running is False
129131

130132

133+
def test_heapq_dispatch_compare(test_env: TestEnv) -> None:
134+
"""Test that the heapq compare function works."""
135+
dispatch1 = test_env.generator.generate_dispatch()
136+
dispatch2 = test_env.generator.generate_dispatch()
137+
138+
# Simulate two dispatches with the same 'until' time
139+
now = datetime.now(timezone.utc)
140+
until_time = now + timedelta(minutes=5)
141+
142+
# Create the heap
143+
scheduled_events: list[DispatchingActor.QueueItem] = []
144+
145+
# Push two events with the same 'until' time onto the heap
146+
heapq.heappush(
147+
scheduled_events, DispatchingActor.QueueItem(until_time, Dispatch(dispatch1))
148+
)
149+
heapq.heappush(
150+
scheduled_events, DispatchingActor.QueueItem(until_time, Dispatch(dispatch2))
151+
)
152+
153+
131154
async def test_dry_run(test_env: TestEnv, fake_time: time_machine.Coordinates) -> None:
132155
"""Test the dry run mode."""
133156
dispatch = test_env.generator.generate_dispatch()

0 commit comments

Comments
 (0)