Skip to content

Commit f774825

Browse files
committed
Rewrite merging strategy design
Implement merge strategy based on TYPE+TARGET Signed-off-by: Mathias L. Baumann <[email protected]>
1 parent 96c8bc8 commit f774825

File tree

5 files changed

+144
-53
lines changed

5 files changed

+144
-53
lines changed

RELEASE_NOTES.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
## New Features
1515

16-
* A new feature "unify running intervals" has been added to the `Dispatcher.new_running_state_event_receiver` method. Using it, you can automatically merge & unify consecutive and overlapping dispatch start/stop events of the same type. E.g. dispatch `A` starting at 10:10 and ending at 10:30 and dispatch `B` starts at 10:30 until 11:00, with the feature enabled this would in total trigger one start event, one reconfigure event at 10:30 and one stop event at 11:00.
16+
* A new feature "merger strategy" (`MergeByType`, `MergeByTypeTarget`) has been added to the `Dispatcher.new_running_state_event_receiver` method. Using it, you can automatically merge & unify consecutive and overlapping dispatch start/stop events of the same type. E.g. dispatch `A` starting at 10:10 and ending at 10:30 and dispatch `B` starts at 10:30 until 11:00, with the feature enabled this would in total trigger one start event, one reconfigure event at 10:30 and one stop event at 11:00.
1717

1818
* The SDK dependency was widened to allow versions up to (excluding) v1.0.0-rc1600.
1919

src/frequenz/dispatch/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
1616
"""
1717

18+
from ._bg_service import MergeByType, MergeByTypeTarget, MergeStrategy
1819
from ._dispatch import Dispatch
1920
from ._dispatcher import Dispatcher
2021
from ._event import Created, Deleted, DispatchEvent, Updated
@@ -29,4 +30,7 @@
2930
"Dispatch",
3031
"DispatchManagingActor",
3132
"DispatchUpdate",
33+
"MergeStrategy", # To allow for user strategies
34+
"MergeByType",
35+
"MergeByTypeTarget",
3236
]

src/frequenz/dispatch/_bg_service.py

Lines changed: 91 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,24 @@
33

44
"""The dispatch background service."""
55

6+
from __future__ import annotations
7+
68
import asyncio
79
import logging
10+
from abc import ABC, abstractmethod
11+
from collections.abc import ValuesView
812
from dataclasses import dataclass, field
913
from datetime import datetime, timedelta, timezone
1014
from heapq import heappop, heappush
15+
from typing import Callable
1116

1217
import grpc.aio
1318
from frequenz.channels import Broadcast, Receiver, select, selected_from
1419
from frequenz.channels.timer import SkipMissedAndResync, Timer
1520
from frequenz.client.dispatch import Client
1621
from frequenz.client.dispatch.types import Event
1722
from frequenz.sdk.actor import BackgroundService
23+
from typing_extensions import override
1824

1925
from ._dispatch import Dispatch
2026
from ._event import Created, Deleted, DispatchEvent, Updated
@@ -23,6 +29,76 @@
2329
"""The logger for this module."""
2430

2531

32+
class MergeStrategy(ABC):
33+
"""Base class for strategies to merge running intervals.
34+
35+
All strategies will be initialized with the dispatch scheduler.
36+
"""
37+
38+
def __init__(self, scheduler: DispatchScheduler) -> None:
39+
"""Initialize the strategy.
40+
41+
Args:
42+
scheduler: The dispatch scheduler.
43+
"""
44+
self._scheduler = scheduler
45+
46+
@property
47+
@abstractmethod
48+
def filter(self) -> Callable[[Dispatch], bool]:
49+
"""Get a filter function for dispatches.
50+
51+
Returns:
52+
A filter function.
53+
"""
54+
55+
56+
class MergeByType(MergeStrategy):
57+
"""Merge running intervals based on the dispatch type."""
58+
59+
@property
60+
@override
61+
def filter(self) -> Callable[[Dispatch], bool]:
62+
"""Get a filter function for dispatches."""
63+
return self._filter_func
64+
65+
def criteria(self, previous_dispatch: Dispatch, new_dispatch: Dispatch) -> bool:
66+
"""Define the criteria for checking other running dispatches."""
67+
return previous_dispatch.type == new_dispatch.type
68+
69+
def _filter_func(self, new_dispatch: Dispatch) -> bool:
70+
"""Filter dispatches based on the merge strategy.
71+
72+
Keeps start events.
73+
Keeps stop events only if no other dispatches matching the
74+
strategy's criteria are running.
75+
"""
76+
if new_dispatch.started:
77+
return True
78+
79+
# pylint: disable=protected-access
80+
other_dispatches_running = any(
81+
dispatch.started
82+
for dispatch in self._scheduler.dispatches
83+
if self.criteria(dispatch, new_dispatch)
84+
)
85+
# pylint: enable=protected-access
86+
87+
return not other_dispatches_running
88+
89+
90+
class MergeByTypeTarget(MergeByType):
91+
"""Merge running intervals based on the dispatch type and target."""
92+
93+
@override
94+
def criteria(self, previous_dispatch: Dispatch, new_dispatch: Dispatch) -> bool:
95+
"""Define the criteria for checking other running dispatches."""
96+
return (
97+
previous_dispatch.type == new_dispatch.type
98+
and previous_dispatch.target == new_dispatch.target
99+
)
100+
101+
26102
# pylint: disable=too-many-instance-attributes
27103
class DispatchScheduler(BackgroundService):
28104
"""Dispatch background service.
@@ -104,6 +180,11 @@ def __init__(
104180
always at index 0.
105181
"""
106182

183+
@property
184+
def dispatches(self) -> ValuesView[Dispatch]:
185+
"""All currently cached dispatches."""
186+
return self._dispatches.values()
187+
107188
# pylint: disable=redefined-builtin
108189
def new_lifecycle_events_receiver(self, type: str) -> Receiver[DispatchEvent]:
109190
"""Create a new receiver for lifecycle events.
@@ -119,54 +200,34 @@ def new_lifecycle_events_receiver(self, type: str) -> Receiver[DispatchEvent]:
119200
)
120201

121202
async def new_running_state_event_receiver(
122-
self, type: str, *, unify_running_intervals: bool = True
203+
self, type: str, *, merge_strategy: type[MergeStrategy] | None = None
123204
) -> Receiver[Dispatch]:
124205
"""Create a new receiver for running state events of the specified type.
125206
126-
If `unify_running_intervals` is True, running intervals from multiple
127-
dispatches of the same type are considered as one continuous running
128-
period. In this mode, any stop events are ignored as long as at least
129-
one dispatch remains active.
207+
`merge_strategy` can be one of `MergeByType` or `MergeByTypeTarget`.
208+
If set, running intervals from multiple dispatches will be merged,
209+
depending on the chosen strategy.
210+
When merging, stop events are ignored as long as at least one
211+
merge-criteria-matching dispatch remains active.
130212
131213
Args:
132214
type: The type of events to receive.
133-
unify_running_intervals: Whether to unify running intervals.
134-
215+
merge_strategy: The merge strategy to use.
135216
Returns:
136217
A new receiver for running state status.
137218
"""
138-
# Find all matching dispatches based on the type and collect them
139219
dispatches = [
140220
dispatch for dispatch in self._dispatches.values() if dispatch.type == type
141221
]
142222

143-
# Create receiver with enough capacity to hold all matching dispatches
144223
receiver = self._running_state_status_channel.new_receiver(
145224
limit=max(1, len(dispatches))
146225
).filter(lambda dispatch: dispatch.type == type)
147226

148-
if unify_running_intervals:
149-
150-
def _is_type_still_running(new_dispatch: Dispatch) -> bool:
151-
"""Merge time windows of running dispatches.
152-
153-
Any event that would cause a stop is filtered if at least one
154-
dispatch of the same type is running.
155-
"""
156-
if new_dispatch.started:
157-
return True
158-
159-
other_dispatches_running = any(
160-
dispatch.started
161-
for dispatch in self._dispatches.values()
162-
if dispatch.type == type
163-
)
164-
# If no other dispatches are running, we can allow the stop event
165-
return not other_dispatches_running
166-
167-
receiver = receiver.filter(_is_type_still_running)
227+
if merge_strategy:
228+
instance = merge_strategy(self)
229+
receiver = receiver.filter(instance.filter)
168230

169-
# Send all matching dispatches to the receiver
170231
for dispatch in dispatches:
171232
await self._send_running_state_change(dispatch)
172233

src/frequenz/dispatch/_dispatcher.py

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from frequenz.channels import Receiver
88
from frequenz.client.dispatch import Client
99

10-
from ._bg_service import DispatchScheduler
10+
from ._bg_service import DispatchScheduler, MergeStrategy
1111
from ._dispatch import Dispatch
1212
from ._event import DispatchEvent
1313

@@ -200,7 +200,10 @@ def new_lifecycle_events_receiver(
200200
return self._bg_service.new_lifecycle_events_receiver(dispatch_type)
201201

202202
async def new_running_state_event_receiver(
203-
self, dispatch_type: str, *, unify_running_intervals: bool = True
203+
self,
204+
dispatch_type: str,
205+
*,
206+
merge_strategy: type[MergeStrategy] | None = None,
204207
) -> Receiver[Dispatch]:
205208
"""Return running state event receiver.
206209
@@ -228,18 +231,21 @@ async def new_running_state_event_receiver(
228231
- The payload changed
229232
- The dispatch was deleted
230233
231-
If `unify_running_intervals` is True, running intervals from multiple
232-
dispatches of the same type are considered as one continuous running
233-
period. In this mode, any stop events are ignored as long as at least
234-
one dispatch remains active.
234+
If `unify_running_intervals` is set, running intervals from multiple
235+
dispatches of the same type/type&target (depending on the chosen
236+
strategy) are considered as one continuous running
237+
period.
238+
In this mode, stop events are ignored as long as at least one (criteria
239+
matching) dispatch remains active.
235240
236241
Args:
237242
dispatch_type: The type of the dispatch to listen for.
238-
unify_running_intervals: Whether to unify running intervals.
243+
merge_strategy: The type of the strategy to merge running intervals.
244+
One of `MergeByType` or `MergeByTypeTarget`.
239245
240246
Returns:
241247
A new receiver for dispatches whose running status changed.
242248
"""
243249
return await self._bg_service.new_running_state_event_receiver(
244-
dispatch_type, unify_running_intervals=unify_running_intervals
250+
dispatch_type, merge_strategy=merge_strategy
245251
)

0 commit comments

Comments
 (0)