Skip to content

Commit e7fd295

Browse files
authored
Merge & unify running state event intervals (#94)
- **Add unify merge filter for dispatches** - **Ensure that `start` events are executed before `stop` events** - **Update release notes**
2 parents 1c88c58 + 3a627f6 commit e7fd295

File tree

5 files changed

+325
-14
lines changed

5 files changed

+325
-14
lines changed

RELEASE_NOTES.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,12 @@
77
## Upgrading
88

99
* Two properties have been replaced by methods that require a type as parameter.
10-
* `Dispatcher.lifecycle_events` has been replaced by the method `Dispatcher.new_lifecycle_events_receiver(self, type: str)`.
11-
* `Dispatcher.running_status_change` has been replaced by the method `Dispatcher.new_running_state_event_receiver(self, type: str)`.
10+
* `Dispatcher.lifecycle_events` has been replaced by the method `Dispatcher.new_lifecycle_events_receiver(self, dispatch_type: str)`.
11+
* `Dispatcher.running_status_change` has been replaced by the method `Dispatcher.new_running_state_event_receiver(self, dispatch_type: str, unify_running_intervals: bool)`.
1212

1313
## New Features
1414

15-
<!-- Here goes the main new features and examples or instructions on how to use them -->
15+
* A new feature "unify running intervals" has been added to the `Dispatcher.new_running_state_event_receiver` method. Using it, you can automatically merge & unify consecutive and overlapping dispatch start/stop events of the same type. E.g. dispatch `A` starting at 10:10 and ending at 10:30 and dispatch `B` starts at 10:30 until 11:00, with the feature enabled this would in total trigger one start event, one reconfigure event at 10:30 and one stop event at 11:00.
1616

1717
## Bug Fixes
1818

src/frequenz/dispatch/_bg_service.py

Lines changed: 56 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,16 +33,32 @@ class DispatchScheduler(BackgroundService):
3333

3434
@dataclass(order=True)
3535
class QueueItem:
36-
"""A queue item for the scheduled events."""
36+
"""A queue item for the scheduled events.
37+
38+
This class is used to define the order of the queue items based on the
39+
scheduled time, whether it is a stop event and finally the dispatch ID
40+
(for uniqueness).
41+
"""
3742

3843
time: datetime
44+
priority: int
45+
"""Sort priority when the time is the same.
46+
47+
Exists to make sure that when two events are scheduled at the same time,
48+
the start event is executed first, allowing filters down the data pipeline
49+
to consider the start event when deciding whether to execute the
50+
stop event.
51+
"""
3952
dispatch_id: int
4053
dispatch: Dispatch = field(compare=False)
4154

42-
def __init__(self, time: datetime, dispatch: Dispatch) -> None:
55+
def __init__(
56+
self, time: datetime, dispatch: Dispatch, stop_event: bool
57+
) -> None:
4358
"""Initialize the queue item."""
4459
self.time = time
4560
self.dispatch_id = dispatch.id
61+
self.priority = int(stop_event)
4662
self.dispatch = dispatch
4763

4864
# pylint: disable=too-many-arguments
@@ -102,11 +118,19 @@ def new_lifecycle_events_receiver(self, type: str) -> Receiver[DispatchEvent]:
102118
lambda event: event.dispatch.type == type
103119
)
104120

105-
async def new_running_state_event_receiver(self, type: str) -> Receiver[Dispatch]:
106-
"""Create a new receiver for running state events.
121+
async def new_running_state_event_receiver(
122+
self, type: str, *, unify_running_intervals: bool = True
123+
) -> Receiver[Dispatch]:
124+
"""Create a new receiver for running state events of the specified type.
125+
126+
If `unify_running_intervals` is True, running intervals from multiple
127+
dispatches of the same type are considered as one continuous running
128+
period. In this mode, any stop events are ignored as long as at least
129+
one dispatch remains active.
107130
108131
Args:
109132
type: The type of events to receive.
133+
unify_running_intervals: Whether to unify running intervals.
110134
111135
Returns:
112136
A new receiver for running state status.
@@ -121,6 +145,27 @@ async def new_running_state_event_receiver(self, type: str) -> Receiver[Dispatch
121145
limit=max(1, len(dispatches))
122146
).filter(lambda dispatch: dispatch.type == type)
123147

148+
if unify_running_intervals:
149+
150+
def _is_type_still_running(new_dispatch: Dispatch) -> bool:
151+
"""Merge time windows of running dispatches.
152+
153+
Any event that would cause a stop is filtered if at least one
154+
dispatch of the same type is running.
155+
"""
156+
if new_dispatch.started:
157+
return True
158+
159+
other_dispatches_running = any(
160+
dispatch.started
161+
for dispatch in self._dispatches.values()
162+
if dispatch.type == type
163+
)
164+
# If no other dispatches are running, we can allow the stop event
165+
return not other_dispatches_running
166+
167+
receiver = receiver.filter(_is_type_still_running)
168+
124169
# Send all matching dispatches to the receiver
125170
for dispatch in dispatches:
126171
await self._send_running_state_change(dispatch)
@@ -336,7 +381,10 @@ def _schedule_start(self, dispatch: Dispatch) -> None:
336381
# Schedule the next run
337382
try:
338383
if next_run := dispatch.next_run:
339-
heappush(self._scheduled_events, self.QueueItem(next_run, dispatch))
384+
heappush(
385+
self._scheduled_events,
386+
self.QueueItem(next_run, dispatch, stop_event=False),
387+
)
340388
_logger.debug(
341389
"Scheduled dispatch %s to start at %s", dispatch.id, next_run
342390
)
@@ -355,7 +403,9 @@ def _schedule_stop(self, dispatch: Dispatch) -> None:
355403
if dispatch.duration and dispatch.duration > timedelta(seconds=0):
356404
until = dispatch.until
357405
assert until is not None
358-
heappush(self._scheduled_events, self.QueueItem(until, dispatch))
406+
heappush(
407+
self._scheduled_events, self.QueueItem(until, dispatch, stop_event=True)
408+
)
359409
_logger.debug("Scheduled dispatch %s to stop at %s", dispatch, until)
360410

361411
def _update_changed_running_state(

src/frequenz/dispatch/_dispatcher.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ def new_lifecycle_events_receiver(
200200
return self._bg_service.new_lifecycle_events_receiver(dispatch_type)
201201

202202
async def new_running_state_event_receiver(
203-
self, dispatch_type: str
203+
self, dispatch_type: str, *, unify_running_intervals: bool = True
204204
) -> Receiver[Dispatch]:
205205
"""Return running state event receiver.
206206
@@ -228,10 +228,18 @@ async def new_running_state_event_receiver(
228228
- The payload changed
229229
- The dispatch was deleted
230230
231+
If `unify_running_intervals` is True, running intervals from multiple
232+
dispatches of the same type are considered as one continuous running
233+
period. In this mode, any stop events are ignored as long as at least
234+
one dispatch remains active.
235+
231236
Args:
232237
dispatch_type: The type of the dispatch to listen for.
238+
unify_running_intervals: Whether to unify running intervals.
233239
234240
Returns:
235241
A new receiver for dispatches whose running status changed.
236242
"""
237-
return await self._bg_service.new_running_state_event_receiver(dispatch_type)
243+
return await self._bg_service.new_running_state_event_receiver(
244+
dispatch_type, unify_running_intervals=unify_running_intervals
245+
)

0 commit comments

Comments
 (0)