Skip to content

Commit c6ddf66

Browse files
committed
Rewrite merging strategy design
Implement merge strategy based on TYPE+TARGET Signed-off-by: Mathias L. Baumann <[email protected]>
1 parent 96c8bc8 commit c6ddf66

File tree

5 files changed

+141
-53
lines changed

5 files changed

+141
-53
lines changed

RELEASE_NOTES.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
## New Features
1515

16-
* A new feature "unify running intervals" has been added to the `Dispatcher.new_running_state_event_receiver` method. Using it, you can automatically merge & unify consecutive and overlapping dispatch start/stop events of the same type. E.g. dispatch `A` starting at 10:10 and ending at 10:30 and dispatch `B` starts at 10:30 until 11:00, with the feature enabled this would in total trigger one start event, one reconfigure event at 10:30 and one stop event at 11:00.
16+
* A new feature "merger strategy" (`MergeByType`, `MergeByTypeTarget`) has been added to the `Dispatcher.new_running_state_event_receiver` method. Using it, you can automatically merge & unify consecutive and overlapping dispatch start/stop events of the same type. E.g. dispatch `A` starting at 10:10 and ending at 10:30 and dispatch `B` starts at 10:30 until 11:00, with the feature enabled this would in total trigger one start event, one reconfigure event at 10:30 and one stop event at 11:00.
1717

1818
* The SDK dependency was widened to allow versions up to (excluding) v1.0.0-rc1600.
1919

src/frequenz/dispatch/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
1616
"""
1717

18+
from ._bg_service import MergeByType, MergeByTypeTarget, MergeStrategy
1819
from ._dispatch import Dispatch
1920
from ._dispatcher import Dispatcher
2021
from ._event import Created, Deleted, DispatchEvent, Updated
@@ -29,4 +30,7 @@
2930
"Dispatch",
3031
"DispatchManagingActor",
3132
"DispatchUpdate",
33+
"MergeStrategy", # To allow for user strategies
34+
"MergeByType",
35+
"MergeByTypeTarget",
3236
]

src/frequenz/dispatch/_bg_service.py

Lines changed: 88 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,24 @@
33

44
"""The dispatch background service."""
55

6+
from __future__ import annotations
7+
68
import asyncio
79
import logging
10+
from abc import ABC, abstractmethod
11+
from collections.abc import ValuesView
812
from dataclasses import dataclass, field
913
from datetime import datetime, timedelta, timezone
1014
from heapq import heappop, heappush
15+
from typing import Callable
1116

1217
import grpc.aio
1318
from frequenz.channels import Broadcast, Receiver, select, selected_from
1419
from frequenz.channels.timer import SkipMissedAndResync, Timer
1520
from frequenz.client.dispatch import Client
1621
from frequenz.client.dispatch.types import Event
1722
from frequenz.sdk.actor import BackgroundService
23+
from typing_extensions import override
1824

1925
from ._dispatch import Dispatch
2026
from ._event import Created, Deleted, DispatchEvent, Updated
@@ -23,6 +29,73 @@
2329
"""The logger for this module."""
2430

2531

32+
class MergeStrategy(ABC):
33+
"""Base class for strategies to merge running intervals."""
34+
35+
def __init__(self, scheduler: DispatchScheduler) -> None:
36+
"""Initialize the strategy.
37+
38+
Args:
39+
scheduler: The dispatch scheduler.
40+
"""
41+
self._scheduler = scheduler
42+
43+
@property
44+
@abstractmethod
45+
def filter(self) -> Callable[[Dispatch], bool]:
46+
"""Get a filter function for dispatches.
47+
48+
Returns:
49+
A filter function.
50+
"""
51+
52+
53+
class MergeByType(MergeStrategy):
54+
"""Merge running intervals based on the dispatch type."""
55+
56+
@property
57+
@override
58+
def filter(self) -> Callable[[Dispatch], bool]:
59+
"""Get a filter function for dispatches."""
60+
return self._filter_func
61+
62+
def criteria(self, previous_dispatch: Dispatch, new_dispatch: Dispatch) -> bool:
63+
"""Define the criteria for checking other running dispatches."""
64+
return previous_dispatch.type == new_dispatch.type
65+
66+
def _filter_func(self, new_dispatch: Dispatch) -> bool:
67+
"""Filter dispatches based on the merge strategy.
68+
69+
Keeps start events.
70+
Keeps stop events only if no other dispatches matching the
71+
strategy's criteria are running.
72+
"""
73+
if new_dispatch.started:
74+
return True
75+
76+
# pylint: disable=protected-access
77+
other_dispatches_running = any(
78+
dispatch.started
79+
for dispatch in self._scheduler.dispatches
80+
if self.criteria(dispatch, new_dispatch)
81+
)
82+
# pylint: enable=protected-access
83+
84+
return not other_dispatches_running
85+
86+
87+
class MergeByTypeTarget(MergeByType):
88+
"""Merge running intervals based on the dispatch type and target."""
89+
90+
@override
91+
def criteria(self, previous_dispatch: Dispatch, new_dispatch: Dispatch) -> bool:
92+
"""Define the criteria for checking other running dispatches."""
93+
return (
94+
previous_dispatch.type == new_dispatch.type
95+
and previous_dispatch.target == new_dispatch.target
96+
)
97+
98+
2699
# pylint: disable=too-many-instance-attributes
27100
class DispatchScheduler(BackgroundService):
28101
"""Dispatch background service.
@@ -104,6 +177,11 @@ def __init__(
104177
always at index 0.
105178
"""
106179

180+
@property
181+
def dispatches(self) -> ValuesView[Dispatch]:
182+
"""All currently cached dispatches."""
183+
return self._dispatches.values()
184+
107185
# pylint: disable=redefined-builtin
108186
def new_lifecycle_events_receiver(self, type: str) -> Receiver[DispatchEvent]:
109187
"""Create a new receiver for lifecycle events.
@@ -119,54 +197,34 @@ def new_lifecycle_events_receiver(self, type: str) -> Receiver[DispatchEvent]:
119197
)
120198

121199
async def new_running_state_event_receiver(
122-
self, type: str, *, unify_running_intervals: bool = True
200+
self, type: str, *, merge_strategy: type[MergeStrategy] | None = None
123201
) -> Receiver[Dispatch]:
124202
"""Create a new receiver for running state events of the specified type.
125203
126-
If `unify_running_intervals` is True, running intervals from multiple
127-
dispatches of the same type are considered as one continuous running
128-
period. In this mode, any stop events are ignored as long as at least
129-
one dispatch remains active.
204+
`merge_strategy` can be one of `MergeByType` or `MergeByTypeTarget`.
205+
If set, running intervals from multiple dispatches will be merged,
206+
depending on the chosen strategy.
207+
When merging, stop events are ignored as long as at least one
208+
merge-criteria-matching dispatch remains active.
130209
131210
Args:
132211
type: The type of events to receive.
133-
unify_running_intervals: Whether to unify running intervals.
134-
212+
merge_strategy: The merge strategy to use.
135213
Returns:
136214
A new receiver for running state status.
137215
"""
138-
# Find all matching dispatches based on the type and collect them
139216
dispatches = [
140217
dispatch for dispatch in self._dispatches.values() if dispatch.type == type
141218
]
142219

143-
# Create receiver with enough capacity to hold all matching dispatches
144220
receiver = self._running_state_status_channel.new_receiver(
145221
limit=max(1, len(dispatches))
146222
).filter(lambda dispatch: dispatch.type == type)
147223

148-
if unify_running_intervals:
149-
150-
def _is_type_still_running(new_dispatch: Dispatch) -> bool:
151-
"""Merge time windows of running dispatches.
152-
153-
Any event that would cause a stop is filtered if at least one
154-
dispatch of the same type is running.
155-
"""
156-
if new_dispatch.started:
157-
return True
158-
159-
other_dispatches_running = any(
160-
dispatch.started
161-
for dispatch in self._dispatches.values()
162-
if dispatch.type == type
163-
)
164-
# If no other dispatches are running, we can allow the stop event
165-
return not other_dispatches_running
166-
167-
receiver = receiver.filter(_is_type_still_running)
224+
if merge_strategy:
225+
instance = merge_strategy(self)
226+
receiver = receiver.filter(instance.filter)
168227

169-
# Send all matching dispatches to the receiver
170228
for dispatch in dispatches:
171229
await self._send_running_state_change(dispatch)
172230

src/frequenz/dispatch/_dispatcher.py

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from frequenz.channels import Receiver
88
from frequenz.client.dispatch import Client
99

10-
from ._bg_service import DispatchScheduler
10+
from ._bg_service import DispatchScheduler, MergeStrategy
1111
from ._dispatch import Dispatch
1212
from ._event import DispatchEvent
1313

@@ -200,7 +200,10 @@ def new_lifecycle_events_receiver(
200200
return self._bg_service.new_lifecycle_events_receiver(dispatch_type)
201201

202202
async def new_running_state_event_receiver(
203-
self, dispatch_type: str, *, unify_running_intervals: bool = True
203+
self,
204+
dispatch_type: str,
205+
*,
206+
merge_strategy: type[MergeStrategy] | None = None,
204207
) -> Receiver[Dispatch]:
205208
"""Return running state event receiver.
206209
@@ -228,18 +231,21 @@ async def new_running_state_event_receiver(
228231
- The payload changed
229232
- The dispatch was deleted
230233
231-
If `unify_running_intervals` is True, running intervals from multiple
232-
dispatches of the same type are considered as one continuous running
233-
period. In this mode, any stop events are ignored as long as at least
234-
one dispatch remains active.
234+
If `unify_running_intervals` is set, running intervals from multiple
235+
dispatches of the same type/type&target (depending on the chosen
236+
strategy) are considered as one continuous running
237+
period.
238+
In this mode, stop events are ignored as long as at least one (criteria
239+
matching) dispatch remains active.
235240
236241
Args:
237242
dispatch_type: The type of the dispatch to listen for.
238-
unify_running_intervals: Whether to unify running intervals.
243+
merge_strategy: The strategy to merge running intervals. One of
244+
`MergeByType` or `MergeByTypeTarget`.
239245
240246
Returns:
241247
A new receiver for dispatches whose running status changed.
242248
"""
243249
return await self._bg_service.new_running_state_event_receiver(
244-
dispatch_type, unify_running_intervals=unify_running_intervals
250+
dispatch_type, merge_strategy=merge_strategy
245251
)

0 commit comments

Comments
 (0)