Skip to content

Commit 023e50c

Browse files
authored
Fix crash when dispatches share equal start time (frequenz-floss#75)
- **Fix exception when two dispatches have equal start time** - **Ensure we don't accidentally work on invalid rrules** fixes frequenz-floss#74
2 parents fb64157 + 86b28ce commit 023e50c

File tree

4 files changed

+68
-10
lines changed

4 files changed

+68
-10
lines changed

RELEASE_NOTES.md

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,16 @@
22

33
## Summary
44

5-
This is a hot fix for recurrence not working
5+
<!-- Here goes a general summary of what this release is about -->
6+
7+
## Upgrading
8+
9+
<!-- Here goes notes on how to upgrade from previous versions, including deprecations and what they should be replaced with -->
10+
11+
## New Features
12+
13+
<!-- Here goes the main new features and examples or instructions on how to use them -->
14+
15+
## Bug Fixes
16+
17+
* Fixed a crash in the `DispatchManagingActor` when dispatches shared an equal start time.

src/frequenz/dispatch/_dispatch.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,12 +211,18 @@ def _prepare_rrule(self) -> rrule.rrule:
211211
212212
Returns:
213213
The rrule object.
214+
215+
Raises:
216+
ValueError: If the interval is invalid.
214217
"""
215218
count, until = (None, None)
216219
if end := self.recurrence.end_criteria:
217220
count = end.count
218221
until = end.until
219222

223+
if self.recurrence.interval is None or self.recurrence.interval < 1:
224+
raise ValueError("Interval must be at least 1")
225+
220226
rrule_obj = rrule.rrule(
221227
freq=_RRULE_FREQ_MAP[self.recurrence.frequency],
222228
dtstart=self.start_time,

src/frequenz/dispatch/actor.py

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
"""The dispatch actor."""
55

66
import logging
7+
from dataclasses import dataclass, field
78
from datetime import datetime, timedelta, timezone
89
from heapq import heappop, heappush
910

@@ -30,6 +31,20 @@ class DispatchingActor(Actor):
3031
dispatches as necessary.
3132
"""
3233

34+
@dataclass(order=True)
35+
class QueueItem:
36+
"""A queue item for the scheduled events."""
37+
38+
time: datetime
39+
dispatch_id: int
40+
dispatch: Dispatch = field(compare=False)
41+
42+
def __init__(self, time: datetime, dispatch: Dispatch) -> None:
43+
"""Initialize the queue item."""
44+
self.time = time
45+
self.dispatch_id = dispatch.id
46+
self.dispatch = dispatch
47+
3348
# pylint: disable=too-many-arguments
3449
def __init__(
3550
self,
@@ -61,7 +76,7 @@ def __init__(
6176
Interval is chosen arbitrarily, as it will be reset on the first event.
6277
"""
6378

64-
self._scheduled_events: list[tuple[datetime, Dispatch]] = []
79+
self._scheduled_events: list["DispatchingActor.QueueItem"] = []
6580
"""The scheduled events, sorted by time.
6681
6782
Each event is a tuple of the scheduled time and the dispatch.
@@ -84,9 +99,11 @@ async def _run(self) -> None:
8499
if not self._scheduled_events:
85100
continue
86101
_logger.debug(
87-
"Executing scheduled event: %s", self._scheduled_events[0][1]
102+
"Executing scheduled event: %s", self._scheduled_events[0].dispatch
103+
)
104+
await self._execute_scheduled_event(
105+
heappop(self._scheduled_events).dispatch
88106
)
89-
await self._execute_scheduled_event(heappop(self._scheduled_events)[1])
90107
elif selected_from(selected, stream):
91108
_logger.debug("Received dispatch event: %s", selected.message)
92109
dispatch = Dispatch(selected.message.dispatch)
@@ -243,9 +260,9 @@ async def _update_dispatch_schedule_and_notify(
243260
def _update_timer(self) -> None:
244261
"""Update the timer to the next event."""
245262
if self._scheduled_events:
246-
due_at: datetime = self._scheduled_events[0][0]
263+
due_at: datetime = self._scheduled_events[0].time
247264
self._next_event_timer.reset(interval=due_at - datetime.now(timezone.utc))
248-
_logger.debug("Next event scheduled at %s", self._scheduled_events[0][0])
265+
_logger.debug("Next event scheduled at %s", self._scheduled_events[0].time)
249266

250267
def _remove_scheduled(self, dispatch: Dispatch) -> bool:
251268
"""Remove a dispatch from the scheduled events.
@@ -256,8 +273,8 @@ def _remove_scheduled(self, dispatch: Dispatch) -> bool:
256273
Returns:
257274
True if the dispatch was found and removed, False otherwise.
258275
"""
259-
for idx, (_, sched_dispatch) in enumerate(self._scheduled_events):
260-
if dispatch.id == sched_dispatch.id:
276+
for idx, item in enumerate(self._scheduled_events):
277+
if dispatch.id == item.dispatch.id:
261278
self._scheduled_events.pop(idx)
262279
return True
263280

@@ -276,7 +293,7 @@ def _schedule_start(self, dispatch: Dispatch) -> None:
276293
# Schedule the next run
277294
try:
278295
if next_run := dispatch.next_run:
279-
heappush(self._scheduled_events, (next_run, dispatch))
296+
heappush(self._scheduled_events, self.QueueItem(next_run, dispatch))
280297
_logger.debug(
281298
"Scheduled dispatch %s to start at %s", dispatch.id, next_run
282299
)
@@ -295,7 +312,7 @@ def _schedule_stop(self, dispatch: Dispatch) -> None:
295312
if dispatch.duration and dispatch.duration > timedelta(seconds=0):
296313
until = dispatch.until
297314
assert until is not None
298-
heappush(self._scheduled_events, (until, dispatch))
315+
heappush(self._scheduled_events, self.QueueItem(until, dispatch))
299316
_logger.debug("Scheduled dispatch %s to stop at %s", dispatch, until)
300317

301318
def _update_changed_running_state(

tests/test_mananging_actor.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
"""Test the dispatch runner."""
55

66
import asyncio
7+
import heapq
78
from dataclasses import dataclass, replace
89
from datetime import datetime, timedelta, timezone
910
from typing import AsyncIterator, Iterator
@@ -17,6 +18,7 @@
1718
from pytest import fixture
1819

1920
from frequenz.dispatch import Dispatch, DispatchManagingActor, DispatchUpdate
21+
from frequenz.dispatch.actor import DispatchingActor
2022

2123

2224
@fixture
@@ -128,6 +130,27 @@ async def test_simple_start_stop(
128130
assert test_env.actor.is_running is False
129131

130132

133+
def test_heapq_dispatch_compare(test_env: TestEnv) -> None:
134+
"""Test that the heapq compare function works."""
135+
dispatch1 = test_env.generator.generate_dispatch()
136+
dispatch2 = test_env.generator.generate_dispatch()
137+
138+
# Simulate two dispatches with the same 'until' time
139+
now = datetime.now(timezone.utc)
140+
until_time = now + timedelta(minutes=5)
141+
142+
# Create the heap
143+
scheduled_events: list[DispatchingActor.QueueItem] = []
144+
145+
# Push two events with the same 'until' time onto the heap
146+
heapq.heappush(
147+
scheduled_events, DispatchingActor.QueueItem(until_time, Dispatch(dispatch1))
148+
)
149+
heapq.heappush(
150+
scheduled_events, DispatchingActor.QueueItem(until_time, Dispatch(dispatch2))
151+
)
152+
153+
131154
async def test_dry_run(test_env: TestEnv, fake_time: time_machine.Coordinates) -> None:
132155
"""Test the dry run mode."""
133156
dispatch = test_env.generator.generate_dispatch()

0 commit comments

Comments
 (0)