Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@
## Upgrading

* Two properties have been replaced by methods that require a type as parameter.
* `Dispatcher.lifecycle_events` has been replaced by the method `Dispatcher.new_lifecycle_events_receiver(self, type: str)`.
* `Dispatcher.running_status_change` has been replaced by the method `Dispatcher.new_running_state_event_receiver(self, type: str)`.
* `Dispatcher.lifecycle_events` has been replaced by the method `Dispatcher.new_lifecycle_events_receiver(self, dispatch_type: str)`.
* `Dispatcher.running_status_change` has been replaced by the method `Dispatcher.new_running_state_event_receiver(self, dispatch_type: str, unify_running_intervals: bool)`.

## New Features

<!-- Here goes the main new features and examples or instructions on how to use them -->
* A new feature "unify running intervals" has been added to the `Dispatcher.new_running_state_event_receiver` method. Using it, you can automatically merge & unify consecutive and overlapping dispatch start/stop events of the same type. E.g. dispatch `A` starting at 10:10 and ending at 10:30 and dispatch `B` starts at 10:30 until 11:00, with the feature enabled this would in total trigger one start event, one reconfigure event at 10:30 and one stop event at 11:00.

## Bug Fixes

Expand Down
62 changes: 56 additions & 6 deletions src/frequenz/dispatch/_bg_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,32 @@ class DispatchScheduler(BackgroundService):

@dataclass(order=True)
class QueueItem:
"""A queue item for the scheduled events."""
"""A queue item for the scheduled events.

This class is used to define the order of the queue items based on the
scheduled time, whether it is a stop event and finally the dispatch ID
(for uniqueness).
"""

time: datetime
priority: int
"""Sort priority when the time is the same.

Exists to make sure that when two events are scheduled at the same time,
the start event is executed first, allowing filters down the data pipeline
to consider the start event when deciding whether to execute the
stop event.
"""
dispatch_id: int
dispatch: Dispatch = field(compare=False)

def __init__(self, time: datetime, dispatch: Dispatch) -> None:
def __init__(
self, time: datetime, dispatch: Dispatch, stop_event: bool
) -> None:
"""Initialize the queue item."""
self.time = time
self.dispatch_id = dispatch.id
self.priority = int(stop_event)
self.dispatch = dispatch

# pylint: disable=too-many-arguments
Expand Down Expand Up @@ -102,11 +118,19 @@ def new_lifecycle_events_receiver(self, type: str) -> Receiver[DispatchEvent]:
lambda event: event.dispatch.type == type
)

async def new_running_state_event_receiver(self, type: str) -> Receiver[Dispatch]:
"""Create a new receiver for running state events.
async def new_running_state_event_receiver(
self, type: str, *, unify_running_intervals: bool = True
) -> Receiver[Dispatch]:
"""Create a new receiver for running state events of the specified type.

If `unify_running_intervals` is True, running intervals from multiple
dispatches of the same type are considered as one continuous running
period. In this mode, any stop events are ignored as long as at least
one dispatch remains active.

Args:
type: The type of events to receive.
unify_running_intervals: Whether to unify running intervals.

Returns:
A new receiver for running state status.
Expand All @@ -121,6 +145,27 @@ async def new_running_state_event_receiver(self, type: str) -> Receiver[Dispatch
limit=max(1, len(dispatches))
).filter(lambda dispatch: dispatch.type == type)

if unify_running_intervals:

def _is_type_still_running(new_dispatch: Dispatch) -> bool:
"""Merge time windows of running dispatches.

Any event that would cause a stop is filtered if at least one
dispatch of the same type is running.
"""
if new_dispatch.started:
return True

other_dispatches_running = any(
dispatch.started
for dispatch in self._dispatches.values()
if dispatch.type == type
)
# If no other dispatches are running, we can allow the stop event
return not other_dispatches_running

receiver = receiver.filter(_is_type_still_running)

# Send all matching dispatches to the receiver
for dispatch in dispatches:
await self._send_running_state_change(dispatch)
Expand Down Expand Up @@ -336,7 +381,10 @@ def _schedule_start(self, dispatch: Dispatch) -> None:
# Schedule the next run
try:
if next_run := dispatch.next_run:
heappush(self._scheduled_events, self.QueueItem(next_run, dispatch))
heappush(
self._scheduled_events,
self.QueueItem(next_run, dispatch, stop_event=False),
)
_logger.debug(
"Scheduled dispatch %s to start at %s", dispatch.id, next_run
)
Expand All @@ -355,7 +403,9 @@ def _schedule_stop(self, dispatch: Dispatch) -> None:
if dispatch.duration and dispatch.duration > timedelta(seconds=0):
until = dispatch.until
assert until is not None
heappush(self._scheduled_events, self.QueueItem(until, dispatch))
heappush(
self._scheduled_events, self.QueueItem(until, dispatch, stop_event=True)
)
_logger.debug("Scheduled dispatch %s to stop at %s", dispatch, until)

def _update_changed_running_state(
Expand Down
12 changes: 10 additions & 2 deletions src/frequenz/dispatch/_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ def new_lifecycle_events_receiver(
return self._bg_service.new_lifecycle_events_receiver(dispatch_type)

async def new_running_state_event_receiver(
self, dispatch_type: str
self, dispatch_type: str, *, unify_running_intervals: bool = True
) -> Receiver[Dispatch]:
"""Return running state event receiver.

Expand Down Expand Up @@ -228,10 +228,18 @@ async def new_running_state_event_receiver(
- The payload changed
- The dispatch was deleted

If `unify_running_intervals` is True, running intervals from multiple
dispatches of the same type are considered as one continuous running
period. In this mode, any stop events are ignored as long as at least
one dispatch remains active.

Args:
dispatch_type: The type of the dispatch to listen for.
unify_running_intervals: Whether to unify running intervals.

Returns:
A new receiver for dispatches whose running status changed.
"""
return await self._bg_service.new_running_state_event_receiver(dispatch_type)
return await self._bg_service.new_running_state_event_receiver(
dispatch_type, unify_running_intervals=unify_running_intervals
)
Loading
Loading