33
44"""The dispatch background service."""
55
6+ from __future__ import annotations
7+
68import asyncio
9+ import functools
710import logging
11+ from abc import ABC , abstractmethod
12+ from collections .abc import Mapping
813from dataclasses import dataclass , field
914from datetime import datetime , timedelta , timezone
1015from heapq import heappop , heappush
2328"""The logger for this module."""
2429
2530
31+ class MergeStrategy (ABC ):
32+ """Base class for strategies to merge running intervals."""
33+
34+ @abstractmethod
35+ def filter (self , dispatches : Mapping [int , Dispatch ], dispatch : Dispatch ) -> bool :
36+ """Filter dispatches based on the strategy.
37+
38+ Args:
39+ dispatches: All dispatches, available as context.
40+ dispatch: The dispatch to filter.
41+
42+ Returns:
43+ True if the dispatch should be included, False otherwise.
44+ """
45+
46+
2647# pylint: disable=too-many-instance-attributes
2748class DispatchScheduler (BackgroundService ):
2849 """Dispatch background service.
@@ -119,19 +140,36 @@ def new_lifecycle_events_receiver(self, type: str) -> Receiver[DispatchEvent]:
119140 )
120141
121142 async def new_running_state_event_receiver (
122- self , type : str , * , unify_running_intervals : bool = True
143+ self , type : str , * , merge_strategy : MergeStrategy | None = None
123144 ) -> Receiver [Dispatch ]:
124145 """Create a new receiver for running state events of the specified type.
125146
126- If `unify_running_intervals` is True, running intervals from multiple
127- dispatches of the same type are considered as one continuous running
128- period. In this mode, any stop events are ignored as long as at least
129- one dispatch remains active.
147+ `merge_strategy` is an instance of a class derived from
148+ [`MergeStrategy`][frequenz.dispatch.MergeStrategy]. Available strategies
149+ are:
150+
151+ * [`MergeByType`][frequenz.dispatch.MergeByType] — merges all dispatches
152+ of the same type
153+ * [`MergeByTypeTarget`][frequenz.dispatch.MergeByTypeTarget] — merges all
154+ dispatches of the same type and target
155+ * `None` — no merging, just send all events
156+
157+ You can make your own strategy by subclassing:
158+
159+ * [`MergeByIdentity`][frequenz.dispatch.MergeByIdentity] — Merges
160+ dispatches based on a user defined identity function
161+ * [`MergeStrategy`][frequenz.dispatch.MergeStrategy] — Merges based
162+ on a user defined filter function
163+
164+ Running intervals from multiple dispatches will be merged, according to
165+ the chosen strategy.
166+
167+ While merging, stop events are ignored as long as at least one
168+ merge-criteria-matching dispatch remains active.
130169
131170 Args:
132171 type: The type of events to receive.
133- unify_running_intervals: Whether to unify running intervals.
134-
172+ merge_strategy: The merge strategy to use.
135173 Returns:
136174 A new receiver for running state status.
137175 """
@@ -140,33 +178,21 @@ async def new_running_state_event_receiver(
140178 dispatch for dispatch in self ._dispatches .values () if dispatch .type == type
141179 ]
142180
143- # Create receiver with enough capacity to hold all matching dispatches
181+ # Create a new receiver with at least 30 slots, but more if there are
182+ # more dispatches.
183+ # That way we can send all dispatches initially and don't have to worry
184+ # about the receiver being full.
185+ # If there are no initial dispatches, we still want to have some slots
186+ # available for future dispatches, so we set the limit to 30.
144187 receiver = self ._running_state_status_channel .new_receiver (
145- limit = max (1 , len (dispatches ))
188+ limit = max (30 , len (dispatches ))
146189 ).filter (lambda dispatch : dispatch .type == type )
147190
148- if unify_running_intervals :
149-
150- def _is_type_still_running (new_dispatch : Dispatch ) -> bool :
151- """Merge time windows of running dispatches.
152-
153- Any event that would cause a stop is filtered if at least one
154- dispatch of the same type is running.
155- """
156- if new_dispatch .started :
157- return True
158-
159- other_dispatches_running = any (
160- dispatch .started
161- for dispatch in self ._dispatches .values ()
162- if dispatch .type == type
163- )
164- # If no other dispatches are running, we can allow the stop event
165- return not other_dispatches_running
166-
167- receiver = receiver .filter (_is_type_still_running )
191+ if merge_strategy :
192+ receiver = receiver .filter (
193+ functools .partial (merge_strategy .filter , self ._dispatches )
194+ )
168195
169- # Send all matching dispatches to the receiver
170196 for dispatch in dispatches :
171197 await self ._send_running_state_change (dispatch )
172198
@@ -195,9 +221,6 @@ async def _run(self) -> None:
195221 if selected_from (selected , self ._next_event_timer ):
196222 if not self ._scheduled_events :
197223 continue
198- _logger .debug (
199- "Executing scheduled event: %s" , self ._scheduled_events [0 ].dispatch
200- )
201224 await self ._execute_scheduled_event (
202225 heappop (self ._scheduled_events ).dispatch
203226 )
@@ -227,6 +250,7 @@ async def _execute_scheduled_event(self, dispatch: Dispatch) -> None:
227250 Args:
228251 dispatch: The dispatch to execute.
229252 """
253+ _logger .debug ("Executing scheduled event: %s (%s)" , dispatch , dispatch .started )
230254 await self ._send_running_state_change (dispatch )
231255
232256 # The timer is always a tiny bit delayed, so we need to check if the
@@ -256,7 +280,7 @@ async def _fetch(self) -> None:
256280 for client_dispatch in page :
257281 dispatch = Dispatch (client_dispatch )
258282
259- self ._dispatches [dispatch .id ] = Dispatch ( client_dispatch )
283+ self ._dispatches [dispatch .id ] = dispatch
260284 old_dispatch = old_dispatches .pop (dispatch .id , None )
261285 if not old_dispatch :
262286 _logger .debug ("New dispatch: %s" , dispatch )
@@ -310,7 +334,7 @@ async def _update_dispatch_schedule_and_notify(
310334 self ._remove_scheduled (old_dispatch )
311335
312336 was_running = old_dispatch .started
313- old_dispatch ._set_deleted () # pylint: disable=protected-access)
337+ old_dispatch ._set_deleted () # pylint: disable=protected-access
314338
315339 # If the dispatch was running, we need to notify
316340 if was_running :
0 commit comments