|
7 | 7 | import logging |
8 | 8 | from dataclasses import dataclass, field |
9 | 9 | from datetime import datetime, timedelta, timezone |
| 10 | +from enum import Enum, auto |
10 | 11 | from heapq import heappop, heappush |
| 12 | +from typing import Callable |
11 | 13 |
|
12 | 14 | import grpc.aio |
13 | 15 | from frequenz.channels import Broadcast, Receiver, select, selected_from |
|
23 | 25 | """The logger for this module.""" |
24 | 26 |
|
25 | 27 |
|
| 28 | +class UnifyStrategy(Enum): |
| 29 | + """The strategy to unify running intervals.""" |
| 30 | + |
| 31 | + TYPE = auto() |
| 32 | + """Unify running intervals based on the dispatch type.""" |
| 33 | + |
| 34 | + TYPE_TARGET = auto() |
| 35 | + """Unify running intervals based on the dispatch type and target.""" |
| 36 | + |
| 37 | + |
26 | 38 | # pylint: disable=too-many-instance-attributes |
27 | 39 | class DispatchScheduler(BackgroundService): |
28 | 40 | """Dispatch background service. |
@@ -119,54 +131,87 @@ def new_lifecycle_events_receiver(self, type: str) -> Receiver[DispatchEvent]: |
119 | 131 | ) |
120 | 132 |
|
121 | 133 | async def new_running_state_event_receiver( |
122 | | - self, type: str, *, unify_running_intervals: bool = True |
| 134 | + self, type: str, *, unify_running_intervals: UnifyStrategy | None = None |
123 | 135 | ) -> Receiver[Dispatch]: |
124 | 136 | """Create a new receiver for running state events of the specified type. |
125 | 137 |
|
126 | | - If `unify_running_intervals` is True, running intervals from multiple |
127 | | - dispatches of the same type are considered as one continuous running |
128 | | - period. In this mode, any stop events are ignored as long as at least |
129 | | - one dispatch remains active. |
| 138 | + If `unify_running_intervals` is set, running intervals from multiple |
| 139 | + dispatches of the same type/type&target (depending on the chosen |
| 140 | + strategy) are considered as one continuous running |
| 141 | + period. |
| 142 | + In this mode, stop events are ignored as long as at least one (criteria |
| 143 | + matching) dispatch remains active. |
130 | 144 |
|
131 | 145 | Args: |
132 | 146 | type: The type of events to receive. |
133 | | - unify_running_intervals: Whether to unify running intervals. |
134 | | -
|
| 147 | + unify_running_intervals: The strategy to unify running intervals. |
135 | 148 | Returns: |
136 | 149 | A new receiver for running state status. |
137 | 150 | """ |
138 | | - # Find all matching dispatches based on the type and collect them |
139 | 151 | dispatches = [ |
140 | 152 | dispatch for dispatch in self._dispatches.values() if dispatch.type == type |
141 | 153 | ] |
142 | 154 |
|
143 | | - # Create receiver with enough capacity to hold all matching dispatches |
144 | 155 | receiver = self._running_state_status_channel.new_receiver( |
145 | 156 | limit=max(1, len(dispatches)) |
146 | 157 | ).filter(lambda dispatch: dispatch.type == type) |
147 | 158 |
|
148 | 159 | if unify_running_intervals: |
149 | 160 |
|
150 | | - def _is_type_still_running(new_dispatch: Dispatch) -> bool: |
151 | | - """Merge time windows of running dispatches. |
| 161 | + def is_still_running_filter( |
| 162 | + unify_strategy: UnifyStrategy, |
| 163 | + ) -> Callable[[Dispatch], bool]: |
| 164 | + """Create a filter function based on the provided UnifyStrategy. |
| 165 | +
|
| 166 | + Args: |
| 167 | + unify_strategy: The strategy to use for unifying running intervals. |
152 | 168 |
|
153 | | - Any event that would cause a stop is filtered if at least one |
154 | | - dispatch of the same type is running. |
| 169 | + Returns: |
| 170 | + A callable that takes a Dispatch and returns True if it |
| 171 | + should be kept, False otherwise. |
155 | 172 | """ |
156 | | - if new_dispatch.started: |
157 | | - return True |
158 | 173 |
|
159 | | - other_dispatches_running = any( |
160 | | - dispatch.started |
161 | | - for dispatch in self._dispatches.values() |
162 | | - if dispatch.type == type |
163 | | - ) |
164 | | - # If no other dispatches are running, we can allow the stop event |
165 | | - return not other_dispatches_running |
| 174 | + def filter_func(new_dispatch: Dispatch) -> bool: |
| 175 | + """Filter dispatches based on the provided unification strategy. |
| 176 | +
|
| 177 | + Keeps start events. |
| 178 | + Keeps stop events only if no other dispatches matching the |
| 179 | + strategy's criteria are running. |
| 180 | + """ |
| 181 | + if new_dispatch.started: |
| 182 | + return True |
| 183 | + |
| 184 | + # Define the criteria for checking other running dispatches |
| 185 | + # based on strategy |
| 186 | + if unify_strategy == UnifyStrategy.TYPE: |
| 187 | + # criteria = lambda dispatch: dispatch.type == type |
| 188 | + def criteria(dispatch: Dispatch) -> bool: |
| 189 | + return dispatch.type == type |
| 190 | + |
| 191 | + elif unify_strategy == UnifyStrategy.TYPE_TARGET: |
| 192 | + |
| 193 | + def criteria(dispatch: Dispatch) -> bool: |
| 194 | + return ( |
| 195 | + dispatch.type == type |
| 196 | + and dispatch.target == new_dispatch.target |
| 197 | + ) |
| 198 | + |
| 199 | + else: # Default case (should not normally happen) |
| 200 | + assert False, f"Unknown unify strategy: {unify_strategy}" |
| 201 | + |
| 202 | + other_dispatches_running = any( |
| 203 | + dispatch.started |
| 204 | + for dispatch in self._dispatches.values() |
| 205 | + if criteria(dispatch) |
| 206 | + ) |
| 207 | + |
| 208 | + return not other_dispatches_running |
| 209 | + |
| 210 | + return filter_func |
166 | 211 |
|
167 | | - receiver = receiver.filter(_is_type_still_running) |
| 212 | + receiver = receiver.filter(is_still_running_filter(unify_running_intervals)) |
| 213 | + # End of unify_running_intervals block |
168 | 214 |
|
169 | | - # Send all matching dispatches to the receiver |
170 | 215 | for dispatch in dispatches: |
171 | 216 | await self._send_running_state_change(dispatch) |
172 | 217 |
|
|
0 commit comments