Skip to content

Commit 89f9260

Browse files
committed
Implement merge strategy based on TYPE+TARGET
Signed-off-by: Mathias L. Baumann <[email protected]>
1 parent e7fd295 commit 89f9260

File tree

4 files changed

+98
-39
lines changed

4 files changed

+98
-39
lines changed

src/frequenz/dispatch/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
1616
"""
1717

18+
from ._bg_service import UnifyStrategy
1819
from ._dispatch import Dispatch
1920
from ._dispatcher import Dispatcher
2021
from ._event import Created, Deleted, DispatchEvent, Updated
@@ -29,4 +30,5 @@
2930
"Dispatch",
3031
"DispatchManagingActor",
3132
"DispatchUpdate",
33+
"UnifyStrategy",
3234
]

src/frequenz/dispatch/_bg_service.py

Lines changed: 69 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@
77
import logging
88
from dataclasses import dataclass, field
99
from datetime import datetime, timedelta, timezone
10+
from enum import Enum, auto
1011
from heapq import heappop, heappush
12+
from typing import Callable
1113

1214
import grpc.aio
1315
from frequenz.channels import Broadcast, Receiver, select, selected_from
@@ -23,6 +25,16 @@
2325
"""The logger for this module."""
2426

2527

28+
class UnifyStrategy(Enum):
29+
"""The strategy to unify running intervals."""
30+
31+
TYPE = auto()
32+
"""Unify running intervals based on the dispatch type."""
33+
34+
TYPE_TARGET = auto()
35+
"""Unify running intervals based on the dispatch type and target."""
36+
37+
2638
# pylint: disable=too-many-instance-attributes
2739
class DispatchScheduler(BackgroundService):
2840
"""Dispatch background service.
@@ -119,54 +131,87 @@ def new_lifecycle_events_receiver(self, type: str) -> Receiver[DispatchEvent]:
119131
)
120132

121133
async def new_running_state_event_receiver(
122-
self, type: str, *, unify_running_intervals: bool = True
134+
self, type: str, *, unify_running_intervals: UnifyStrategy | None = None
123135
) -> Receiver[Dispatch]:
124136
"""Create a new receiver for running state events of the specified type.
125137
126-
If `unify_running_intervals` is True, running intervals from multiple
127-
dispatches of the same type are considered as one continuous running
128-
period. In this mode, any stop events are ignored as long as at least
129-
one dispatch remains active.
138+
If `unify_running_intervals` is set, running intervals from multiple
139+
dispatches of the same type/type&target (depending on the chosen
140+
strategy) are considered as one continuous running
141+
period.
142+
In this mode, stop events are ignored as long as at least one (criteria
143+
matching) dispatch remains active.
130144
131145
Args:
132146
type: The type of events to receive.
133-
unify_running_intervals: Whether to unify running intervals.
134-
147+
unify_running_intervals: The strategy to unify running intervals.
135148
Returns:
136149
A new receiver for running state status.
137150
"""
138-
# Find all matching dispatches based on the type and collect them
139151
dispatches = [
140152
dispatch for dispatch in self._dispatches.values() if dispatch.type == type
141153
]
142154

143-
# Create receiver with enough capacity to hold all matching dispatches
144155
receiver = self._running_state_status_channel.new_receiver(
145156
limit=max(1, len(dispatches))
146157
).filter(lambda dispatch: dispatch.type == type)
147158

148159
if unify_running_intervals:
149160

150-
def _is_type_still_running(new_dispatch: Dispatch) -> bool:
151-
"""Merge time windows of running dispatches.
161+
def is_still_running_filter(
162+
unify_strategy: UnifyStrategy,
163+
) -> Callable[[Dispatch], bool]:
164+
"""Create a filter function based on the provided UnifyStrategy.
165+
166+
Args:
167+
unify_strategy: The strategy to use for unifying running intervals.
152168
153-
Any event that would cause a stop is filtered if at least one
154-
dispatch of the same type is running.
169+
Returns:
170+
A callable that takes a Dispatch and returns True if it
171+
should be kept, False otherwise.
155172
"""
156-
if new_dispatch.started:
157-
return True
158173

159-
other_dispatches_running = any(
160-
dispatch.started
161-
for dispatch in self._dispatches.values()
162-
if dispatch.type == type
163-
)
164-
# If no other dispatches are running, we can allow the stop event
165-
return not other_dispatches_running
174+
def filter_func(new_dispatch: Dispatch) -> bool:
175+
"""Filter dispatches based on the provided unification strategy.
176+
177+
Keeps start events.
178+
Keeps stop events only if no other dispatches matching the
179+
strategy's criteria are running.
180+
"""
181+
if new_dispatch.started:
182+
return True
183+
184+
# Define the criteria for checking other running dispatches
185+
# based on strategy
186+
if unify_strategy == UnifyStrategy.TYPE:
187+
# criteria = lambda dispatch: dispatch.type == type
188+
def criteria(dispatch: Dispatch) -> bool:
189+
return dispatch.type == type
190+
191+
elif unify_strategy == UnifyStrategy.TYPE_TARGET:
192+
193+
def criteria(dispatch: Dispatch) -> bool:
194+
return (
195+
dispatch.type == type
196+
and dispatch.target == new_dispatch.target
197+
)
198+
199+
else: # Default case (should not normally happen)
200+
assert False, f"Unknown unify strategy: {unify_strategy}"
201+
202+
other_dispatches_running = any(
203+
dispatch.started
204+
for dispatch in self._dispatches.values()
205+
if criteria(dispatch)
206+
)
207+
208+
return not other_dispatches_running
209+
210+
return filter_func
166211

167-
receiver = receiver.filter(_is_type_still_running)
212+
receiver = receiver.filter(is_still_running_filter(unify_running_intervals))
213+
# End of unify_running_intervals block
168214

169-
# Send all matching dispatches to the receiver
170215
for dispatch in dispatches:
171216
await self._send_running_state_change(dispatch)
172217

src/frequenz/dispatch/_dispatcher.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from frequenz.channels import Receiver
88
from frequenz.client.dispatch import Client
99

10-
from ._bg_service import DispatchScheduler
10+
from ._bg_service import DispatchScheduler, UnifyStrategy
1111
from ._dispatch import Dispatch
1212
from ._event import DispatchEvent
1313

@@ -200,7 +200,10 @@ def new_lifecycle_events_receiver(
200200
return self._bg_service.new_lifecycle_events_receiver(dispatch_type)
201201

202202
async def new_running_state_event_receiver(
203-
self, dispatch_type: str, *, unify_running_intervals: bool = True
203+
self,
204+
dispatch_type: str,
205+
*,
206+
unify_running_intervals: UnifyStrategy | None = None,
204207
) -> Receiver[Dispatch]:
205208
"""Return running state event receiver.
206209
@@ -228,14 +231,16 @@ async def new_running_state_event_receiver(
228231
- The payload changed
229232
- The dispatch was deleted
230233
231-
If `unify_running_intervals` is True, running intervals from multiple
232-
dispatches of the same type are considered as one continuous running
233-
period. In this mode, any stop events are ignored as long as at least
234-
one dispatch remains active.
234+
If `unify_running_intervals` is set, running intervals from multiple
235+
dispatches of the same type/type&target (depending on the chosen
236+
strategy) are considered as one continuous running
237+
period.
238+
In this mode, stop events are ignored as long as at least one (criteria
239+
matching) dispatch remains active.
235240
236241
Args:
237242
dispatch_type: The type of the dispatch to listen for.
238-
unify_running_intervals: Whether to unify running intervals.
243+
unify_running_intervals: The strategy to unify running intervals.
239244
240245
Returns:
241246
A new receiver for dispatches whose running status changed.

tests/test_frequenz_dispatch.py

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,14 @@
1818
from frequenz.client.dispatch.types import Dispatch as BaseDispatch
1919
from pytest import fixture
2020

21-
from frequenz.dispatch import Created, Deleted, Dispatch, DispatchEvent, Updated
21+
from frequenz.dispatch import (
22+
Created,
23+
Deleted,
24+
Dispatch,
25+
DispatchEvent,
26+
UnifyStrategy,
27+
Updated,
28+
)
2229
from frequenz.dispatch._bg_service import DispatchScheduler
2330

2431

@@ -77,7 +84,7 @@ async def test_env() -> AsyncIterator[TestEnv]:
7784
service=service,
7885
lifecycle_events=service.new_lifecycle_events_receiver("TEST_TYPE"),
7986
running_state_change=await service.new_running_state_event_receiver(
80-
"TEST_TYPE", unify_running_intervals=False
87+
"TEST_TYPE", unify_running_intervals=None
8188
),
8289
client=client,
8390
microgrid_id=microgrid_id,
@@ -520,7 +527,7 @@ async def test_multiple_dispatches_unify_running_intervals(
520527
service.start()
521528

522529
receiver = await service.new_running_state_event_receiver(
523-
"TEST_TYPE", unify_running_intervals=True
530+
"TEST_TYPE", unify_running_intervals=UnifyStrategy.TYPE
524531
)
525532

526533
# Create two overlapping dispatches
@@ -559,7 +566,7 @@ async def test_multiple_dispatches_unify_running_intervals(
559566
assert started1.started
560567
assert started2.started
561568

562-
# Stop dispatch2 first, but unify_running_intervals=True means as long as dispatch1 runs,
569+
# Stop dispatch2 first, but unify_running_intervals=TYPE means as long as dispatch1 runs,
563570
# we do not send a stop event
564571
await client.update(
565572
microgrid_id=microgrid_id, dispatch_id=started2.id, new_fields={"active": False}
@@ -585,15 +592,15 @@ async def test_multiple_dispatches_sequential_intervals_unify(
585592
"""Test that multiple dispatches are merged into a single running interval.
586593
587594
Even if dispatches don't overlap but are consecutive,
588-
unify_running_intervals=True should treat them as continuous if any event tries to stop.
595+
unify_running_intervals=TPYE should treat them as continuous if any event tries to stop.
589596
"""
590597
microgrid_id = randint(1, 100)
591598
client = FakeClient()
592599
service = DispatchScheduler(microgrid_id=microgrid_id, client=client)
593600
service.start()
594601

595602
receiver = await service.new_running_state_event_receiver(
596-
"TEST_TYPE", unify_running_intervals=True
603+
"TEST_TYPE", unify_running_intervals=UnifyStrategy.TYPE
597604
)
598605

599606
dispatch1 = replace(
@@ -653,9 +660,9 @@ async def test_at_least_one_running_filter(
653660
service = DispatchScheduler(microgrid_id=microgrid_id, client=client)
654661
service.start()
655662

656-
# unify_running_intervals is True, so we use merged intervals
663+
# unify_running_intervals is TYPE, so we use merged intervals
657664
receiver = await service.new_running_state_event_receiver(
658-
"TEST_TYPE", unify_running_intervals=True
665+
"TEST_TYPE", unify_running_intervals=UnifyStrategy.TYPE
659666
)
660667

661668
# Single dispatch that starts and stops normally

0 commit comments

Comments
 (0)