Skip to content

Commit 46d7006

Browse files
committed
Rewrite merging strategy design
- Implement merge strategy based on TYPE+TARGET Signed-off-by: Mathias L. Baumann <[email protected]>
1 parent e7fd295 commit 46d7006

File tree

5 files changed

+137
-52
lines changed

5 files changed

+137
-52
lines changed

RELEASE_NOTES.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212

1313
## New Features
1414

15-
* A new feature "unify running intervals" has been added to the `Dispatcher.new_running_state_event_receiver` method. Using it, you can automatically merge & unify consecutive and overlapping dispatch start/stop events of the same type. E.g. dispatch `A` starting at 10:10 and ending at 10:30 and dispatch `B` starts at 10:30 until 11:00, with the feature enabled this would in total trigger one start event, one reconfigure event at 10:30 and one stop event at 11:00.
15+
* A new feature "merger strategy" (`MergeByType`, `MergeByTypeTarget`) has been added to the `Dispatcher.new_running_state_event_receiver` method. Using it, you can automatically merge & unify consecutive and overlapping dispatch start/stop events of the same type. E.g. dispatch `A` starting at 10:10 and ending at 10:30 and dispatch `B` starts at 10:30 until 11:00, with the feature enabled this would in total trigger one start event, one reconfigure event at 10:30 and one stop event at 11:00.
1616

1717
## Bug Fixes
1818

src/frequenz/dispatch/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
1616
"""
1717

18+
from ._bg_service import MergeByType, MergeByTypeTarget, _MergeStrategy
1819
from ._dispatch import Dispatch
1920
from ._dispatcher import Dispatcher
2021
from ._event import Created, Deleted, DispatchEvent, Updated
@@ -29,4 +30,7 @@
2930
"Dispatch",
3031
"DispatchManagingActor",
3132
"DispatchUpdate",
33+
"_MergeStrategy", # To allow for user strategies
34+
"MergeByType",
35+
"MergeByTypeTarget",
3236
]

src/frequenz/dispatch/_bg_service.py

Lines changed: 84 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,16 @@
33

44
"""The dispatch background service."""
55

6+
from __future__ import annotations
7+
68
import asyncio
79
import logging
10+
from abc import ABC, abstractmethod
811
from dataclasses import dataclass, field
912
from datetime import datetime, timedelta, timezone
13+
from enum import Enum, auto
1014
from heapq import heappop, heappush
15+
from typing import Callable
1116

1217
import grpc.aio
1318
from frequenz.channels import Broadcast, Receiver, select, selected_from
@@ -23,6 +28,75 @@
2328
"""The logger for this module."""
2429

2530

31+
class _MergeStrategy(ABC):
32+
"""Base class for strategies to merge running intervals."""
33+
34+
@abstractmethod
35+
def _get_filter_function(
36+
self,
37+
scheduler: DispatchScheduler,
38+
) -> Callable[[Dispatch], bool]:
39+
"""Get a filter function for dispatches.
40+
41+
Args:
42+
scheduler: The dispatch scheduler.
43+
44+
Returns:
45+
A filter function.
46+
"""
47+
48+
49+
class MergeByType(_MergeStrategy):
50+
"""Merge running intervals based on the dispatch type."""
51+
52+
def __init__(self) -> None:
53+
"""Initialize the strategy."""
54+
self._scheduler: DispatchScheduler
55+
self._new_dispatch: Dispatch
56+
57+
def _get_filter_function(
58+
self, scheduler: DispatchScheduler
59+
) -> Callable[[Dispatch], bool]:
60+
"""Get a filter function for dispatches."""
61+
self._scheduler = scheduler
62+
return self._filter_func
63+
64+
def _criteria(self, dispatch: Dispatch) -> bool:
65+
"""Define the criteria for checking other running dispatches."""
66+
return dispatch.type == self._new_dispatch.type
67+
68+
def _filter_func(self, new_dispatch: Dispatch) -> bool:
69+
"""Filter dispatches based on the merge strategy.
70+
71+
Keeps start events.
72+
Keeps stop events only if no other dispatches matching the
73+
strategy's criteria are running.
74+
"""
75+
if new_dispatch.started:
76+
return True
77+
78+
self._new_dispatch = new_dispatch
79+
80+
other_dispatches_running = any(
81+
dispatch.started
82+
for dispatch in self._scheduler._dispatches.values()
83+
if self._criteria(dispatch)
84+
)
85+
86+
return not other_dispatches_running
87+
88+
89+
class MergeByTypeTarget(MergeByType):
90+
"""Merge running intervals based on the dispatch type and target."""
91+
92+
def _criteria(self, dispatch: Dispatch) -> bool:
93+
"""Define the criteria for checking other running dispatches."""
94+
return (
95+
dispatch.type == self._new_dispatch.type
96+
and dispatch.target == self._new_dispatch.target
97+
)
98+
99+
26100
# pylint: disable=too-many-instance-attributes
27101
class DispatchScheduler(BackgroundService):
28102
"""Dispatch background service.
@@ -119,54 +193,34 @@ def new_lifecycle_events_receiver(self, type: str) -> Receiver[DispatchEvent]:
119193
)
120194

121195
async def new_running_state_event_receiver(
122-
self, type: str, *, unify_running_intervals: bool = True
196+
self, type: str, *, merge_strategy: _MergeStrategy | None = None
123197
) -> Receiver[Dispatch]:
124198
"""Create a new receiver for running state events of the specified type.
125199
126-
If `unify_running_intervals` is True, running intervals from multiple
127-
dispatches of the same type are considered as one continuous running
128-
period. In this mode, any stop events are ignored as long as at least
129-
one dispatch remains active.
200+
If `unify_running_intervals` is set, running intervals from multiple
201+
dispatches of the same type/type&target (depending on the chosen
202+
strategy) are considered as one continuous running
203+
period.
204+
In this mode, stop events are ignored as long as at least one (criteria
205+
matching) dispatch remains active.
130206
131207
Args:
132208
type: The type of events to receive.
133-
unify_running_intervals: Whether to unify running intervals.
134-
209+
unify_running_intervals: The strategy to unify running intervals.
135210
Returns:
136211
A new receiver for running state status.
137212
"""
138-
# Find all matching dispatches based on the type and collect them
139213
dispatches = [
140214
dispatch for dispatch in self._dispatches.values() if dispatch.type == type
141215
]
142216

143-
# Create receiver with enough capacity to hold all matching dispatches
144217
receiver = self._running_state_status_channel.new_receiver(
145218
limit=max(1, len(dispatches))
146219
).filter(lambda dispatch: dispatch.type == type)
147220

148-
if unify_running_intervals:
149-
150-
def _is_type_still_running(new_dispatch: Dispatch) -> bool:
151-
"""Merge time windows of running dispatches.
152-
153-
Any event that would cause a stop is filtered if at least one
154-
dispatch of the same type is running.
155-
"""
156-
if new_dispatch.started:
157-
return True
158-
159-
other_dispatches_running = any(
160-
dispatch.started
161-
for dispatch in self._dispatches.values()
162-
if dispatch.type == type
163-
)
164-
# If no other dispatches are running, we can allow the stop event
165-
return not other_dispatches_running
166-
167-
receiver = receiver.filter(_is_type_still_running)
221+
if merge_strategy:
222+
receiver = receiver.filter(merge_strategy._get_filter_function(self))
168223

169-
# Send all matching dispatches to the receiver
170224
for dispatch in dispatches:
171225
await self._send_running_state_change(dispatch)
172226

src/frequenz/dispatch/_dispatcher.py

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from frequenz.channels import Receiver
88
from frequenz.client.dispatch import Client
99

10-
from ._bg_service import DispatchScheduler
10+
from ._bg_service import DispatchScheduler, _MergeStrategy
1111
from ._dispatch import Dispatch
1212
from ._event import DispatchEvent
1313

@@ -200,7 +200,10 @@ def new_lifecycle_events_receiver(
200200
return self._bg_service.new_lifecycle_events_receiver(dispatch_type)
201201

202202
async def new_running_state_event_receiver(
203-
self, dispatch_type: str, *, unify_running_intervals: bool = True
203+
self,
204+
dispatch_type: str,
205+
*,
206+
merge_strategy: _MergeStrategy | None = None,
204207
) -> Receiver[Dispatch]:
205208
"""Return running state event receiver.
206209
@@ -228,18 +231,21 @@ async def new_running_state_event_receiver(
228231
- The payload changed
229232
- The dispatch was deleted
230233
231-
If `unify_running_intervals` is True, running intervals from multiple
232-
dispatches of the same type are considered as one continuous running
233-
period. In this mode, any stop events are ignored as long as at least
234-
one dispatch remains active.
234+
If `unify_running_intervals` is set, running intervals from multiple
235+
dispatches of the same type/type&target (depending on the chosen
236+
strategy) are considered as one continuous running
237+
period.
238+
In this mode, stop events are ignored as long as at least one (criteria
239+
matching) dispatch remains active.
235240
236241
Args:
237242
dispatch_type: The type of the dispatch to listen for.
238-
unify_running_intervals: Whether to unify running intervals.
243+
merge_strategy: The strategy to merge running intervals. One of
244+
`MergeByType` or `MergeByTypeTarget`.
239245
240246
Returns:
241247
A new receiver for dispatches whose running status changed.
242248
"""
243249
return await self._bg_service.new_running_state_event_receiver(
244-
dispatch_type, unify_running_intervals=unify_running_intervals
250+
dispatch_type, merge_strategy=merge_strategy
245251
)

0 commit comments

Comments
 (0)