Skip to content

Commit 9b1af27

Browse files
committed
Add unify merge filter for dispatches
Signed-off-by: Mathias L. Baumann <[email protected]>
1 parent 1c88c58 commit 9b1af27

File tree

3 files changed

+269
-5
lines changed

3 files changed

+269
-5
lines changed

src/frequenz/dispatch/_bg_service.py

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,11 +102,19 @@ def new_lifecycle_events_receiver(self, type: str) -> Receiver[DispatchEvent]:
102102
lambda event: event.dispatch.type == type
103103
)
104104

105-
async def new_running_state_event_receiver(self, type: str) -> Receiver[Dispatch]:
106-
"""Create a new receiver for running state events.
105+
async def new_running_state_event_receiver(
106+
self, type: str, unify_running_intervals: bool = True
107+
) -> Receiver[Dispatch]:
108+
"""Create a new receiver for running state events of the specified type.
109+
110+
If `unify_running_intervals` is True, running intervals from multiple
111+
dispatches of the same type are considered as one continuous running
112+
period. In this mode, any stop events are ignored as long as at least
113+
one dispatch remains active.
107114
108115
Args:
109116
type: The type of events to receive.
117+
unify_running_intervals: Whether to unify running intervals.
110118
111119
Returns:
112120
A new receiver for running state status.
@@ -121,6 +129,27 @@ async def new_running_state_event_receiver(self, type: str) -> Receiver[Dispatch
121129
limit=max(1, len(dispatches))
122130
).filter(lambda dispatch: dispatch.type == type)
123131

132+
if unify_running_intervals:
133+
134+
def _is_type_still_running(new_dispatch: Dispatch) -> bool:
135+
"""Merge time windows of running dispatches.
136+
137+
Any event that would cause a stop is filtered if at least one
138+
dispatch of the same type is running.
139+
"""
140+
if new_dispatch.started:
141+
return True
142+
143+
other_dispatches_running = any(
144+
dispatch.started
145+
for dispatch in self._dispatches.values()
146+
if dispatch.type == type
147+
)
148+
# If no other dispatches are running, we can allow the stop event
149+
return not other_dispatches_running
150+
151+
receiver = receiver.filter(_is_type_still_running)
152+
124153
# Send all matching dispatches to the receiver
125154
for dispatch in dispatches:
126155
await self._send_running_state_change(dispatch)

src/frequenz/dispatch/_dispatcher.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ def new_lifecycle_events_receiver(
200200
return self._bg_service.new_lifecycle_events_receiver(dispatch_type)
201201

202202
async def new_running_state_event_receiver(
203-
self, dispatch_type: str
203+
self, dispatch_type: str, unify_running_intervals: bool = True
204204
) -> Receiver[Dispatch]:
205205
"""Return running state event receiver.
206206
@@ -228,10 +228,18 @@ async def new_running_state_event_receiver(
228228
- The payload changed
229229
- The dispatch was deleted
230230
231+
If `unify_running_intervals` is True, running intervals from multiple
232+
dispatches of the same type are considered as one continuous running
233+
period. In this mode, any stop events are ignored as long as at least
234+
one dispatch remains active.
235+
231236
Args:
232237
dispatch_type: The type of the dispatch to listen for.
238+
unify_running_intervals: Whether to unify running intervals.
233239
234240
Returns:
235241
A new receiver for dispatches whose running status changed.
236242
"""
237-
return await self._bg_service.new_running_state_event_receiver(dispatch_type)
243+
return await self._bg_service.new_running_state_event_receiver(
244+
dispatch_type, unify_running_intervals
245+
)

tests/test_frequenz_dispatch.py

Lines changed: 228 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ async def test_env() -> AsyncIterator[TestEnv]:
7777
service=service,
7878
lifecycle_events=service.new_lifecycle_events_receiver("TEST_TYPE"),
7979
running_state_change=await service.new_running_state_event_receiver(
80-
"TEST_TYPE"
80+
"TEST_TYPE", unify_running_intervals=False
8181
),
8282
client=client,
8383
microgrid_id=microgrid_id,
@@ -371,6 +371,8 @@ async def test_dispatch_schedule(
371371
done_dispatch = await test_env.running_state_change.receive()
372372
assert done_dispatch == dispatch
373373

374+
await asyncio.sleep(1)
375+
374376

375377
async def test_dispatch_inf_duration_updated_to_finite_and_continues(
376378
test_env: TestEnv,
@@ -459,6 +461,8 @@ async def test_dispatch_new_but_finished(
459461

460462
assert await test_env.running_state_change.receive() == new_dispatch
461463

464+
await asyncio.sleep(1)
465+
462466

463467
async def test_notification_on_actor_start(
464468
test_env: TestEnv,
@@ -500,3 +504,226 @@ async def test_notification_on_actor_start(
500504
# Expect notification of the running dispatch being ready to run
501505
ready_dispatch = await test_env.running_state_change.receive()
502506
assert ready_dispatch.started
507+
508+
509+
async def test_multiple_dispatches_unify_running_intervals(
510+
fake_time: time_machine.Coordinates,
511+
generator: DispatchGenerator,
512+
) -> None:
513+
"""Test that multiple dispatches are merged into a single running interval."""
514+
microgrid_id = randint(1, 100)
515+
client = FakeClient()
516+
service = DispatchScheduler(
517+
microgrid_id=microgrid_id,
518+
client=client,
519+
)
520+
service.start()
521+
522+
receiver = await service.new_running_state_event_receiver(
523+
"TEST_TYPE", unify_running_intervals=True
524+
)
525+
526+
# Create two overlapping dispatches
527+
dispatch1 = replace(
528+
generator.generate_dispatch(),
529+
active=True,
530+
duration=timedelta(seconds=30),
531+
start_time=_now() + timedelta(seconds=5),
532+
recurrence=RecurrenceRule(),
533+
type="TEST_TYPE",
534+
)
535+
dispatch2 = replace(
536+
generator.generate_dispatch(),
537+
active=True,
538+
duration=timedelta(seconds=10),
539+
start_time=_now() + timedelta(seconds=10), # starts after dispatch1
540+
recurrence=RecurrenceRule(),
541+
type="TEST_TYPE",
542+
)
543+
lifecycle_events = service.new_lifecycle_events_receiver("TEST_TYPE")
544+
545+
await client.create(**to_create_params(microgrid_id, dispatch1))
546+
await client.create(**to_create_params(microgrid_id, dispatch2))
547+
548+
# Wait for both to be registered
549+
await lifecycle_events.receive()
550+
await lifecycle_events.receive()
551+
552+
# Move time forward to start both dispatches
553+
fake_time.shift(timedelta(seconds=15))
554+
await asyncio.sleep(1)
555+
556+
started1 = await receiver.receive()
557+
started2 = await receiver.receive()
558+
559+
assert started1.started
560+
assert started2.started
561+
562+
# Stop dispatch2 first, but unify_running_intervals=True means as long as dispatch1 runs,
563+
# we do not send a stop event
564+
await client.update(
565+
microgrid_id=microgrid_id, dispatch_id=started2.id, new_fields={"active": False}
566+
)
567+
fake_time.shift(timedelta(seconds=5))
568+
await asyncio.sleep(1)
569+
570+
# Now stop dispatch1 as well
571+
fake_time.shift(timedelta(seconds=15))
572+
await asyncio.sleep(1)
573+
574+
# Now we expect a single stop event for the merged window
575+
stopped = await receiver.receive()
576+
assert not stopped.started
577+
578+
await service.stop()
579+
580+
581+
async def test_multiple_dispatches_sequential_intervals_unify(
582+
fake_time: time_machine.Coordinates,
583+
generator: DispatchGenerator,
584+
) -> None:
585+
"""Test that multiple dispatches are merged into a single running interval.
586+
587+
Even if dispatches don't overlap but are consecutive,
588+
unify_running_intervals=True should treat them as continuous if any event tries to stop.
589+
"""
590+
microgrid_id = randint(1, 100)
591+
client = FakeClient()
592+
service = DispatchScheduler(microgrid_id=microgrid_id, client=client)
593+
service.start()
594+
595+
receiver = await service.new_running_state_event_receiver(
596+
"TEST_TYPE", unify_running_intervals=True
597+
)
598+
599+
dispatch1 = replace(
600+
generator.generate_dispatch(),
601+
active=True,
602+
duration=timedelta(seconds=5),
603+
start_time=_now() + timedelta(seconds=5),
604+
recurrence=RecurrenceRule(),
605+
type="TEST_TYPE",
606+
)
607+
assert dispatch1.duration is not None
608+
dispatch2 = replace(
609+
generator.generate_dispatch(),
610+
active=True,
611+
duration=timedelta(seconds=5),
612+
start_time=dispatch1.start_time + dispatch1.duration,
613+
recurrence=RecurrenceRule(),
614+
type="TEST_TYPE",
615+
)
616+
lifecycle = service.new_lifecycle_events_receiver("TEST_TYPE")
617+
618+
await client.create(**to_create_params(microgrid_id, dispatch1))
619+
await client.create(**to_create_params(microgrid_id, dispatch2))
620+
621+
# Consume lifecycle events
622+
await lifecycle.receive()
623+
await lifecycle.receive()
624+
625+
fake_time.shift(timedelta(seconds=11))
626+
await asyncio.sleep(1)
627+
started1 = await receiver.receive()
628+
assert started1.started
629+
630+
# Wait for the second dispatch to start
631+
fake_time.shift(timedelta(seconds=3))
632+
await asyncio.sleep(1)
633+
started2 = await receiver.receive()
634+
assert started2.started
635+
636+
# Now stop the second dispatch
637+
fake_time.shift(timedelta(seconds=5))
638+
await asyncio.sleep(1)
639+
stopped = await receiver.receive()
640+
assert not stopped.started
641+
642+
await service.stop()
643+
await asyncio.sleep(1)
644+
645+
646+
async def test_at_least_one_running_filter(
647+
fake_time: time_machine.Coordinates,
648+
generator: DispatchGenerator,
649+
) -> None:
650+
"""Test scenarios directly tied to the _at_least_one_running logic."""
651+
microgrid_id = randint(1, 100)
652+
client = FakeClient()
653+
service = DispatchScheduler(microgrid_id=microgrid_id, client=client)
654+
service.start()
655+
656+
# unify_running_intervals is True, so we use merged intervals
657+
receiver = await service.new_running_state_event_receiver(
658+
"TEST_TYPE", unify_running_intervals=True
659+
)
660+
661+
# Single dispatch that starts and stops normally
662+
dispatch = replace(
663+
generator.generate_dispatch(),
664+
active=True,
665+
duration=timedelta(seconds=10),
666+
start_time=_now() + timedelta(seconds=5),
667+
recurrence=RecurrenceRule(),
668+
type="TEST_TYPE",
669+
)
670+
lifecycle = service.new_lifecycle_events_receiver("TEST_TYPE")
671+
await client.create(**to_create_params(microgrid_id, dispatch))
672+
await lifecycle.receive()
673+
674+
# Move time so it starts
675+
fake_time.shift(timedelta(seconds=6))
676+
await asyncio.sleep(1)
677+
started = await receiver.receive()
678+
assert started.started
679+
680+
# Now stop it
681+
await client.update(
682+
microgrid_id=microgrid_id, dispatch_id=started.id, new_fields={"active": False}
683+
)
684+
fake_time.shift(timedelta(seconds=2))
685+
await asyncio.sleep(1)
686+
stopped = await receiver.receive()
687+
assert not stopped.started
688+
689+
# Now test scenario with multiple dispatches: one never starts, one starts and stops
690+
dispatch_a = replace(
691+
generator.generate_dispatch(),
692+
active=False,
693+
duration=timedelta(seconds=10),
694+
start_time=_now() + timedelta(seconds=50),
695+
recurrence=RecurrenceRule(),
696+
type="TEST_TYPE",
697+
)
698+
dispatch_b = replace(
699+
generator.generate_dispatch(),
700+
active=True,
701+
duration=timedelta(seconds=5),
702+
start_time=_now() + timedelta(seconds=5),
703+
recurrence=RecurrenceRule(),
704+
type="TEST_TYPE",
705+
)
706+
await client.create(**to_create_params(microgrid_id, dispatch_a))
707+
await client.create(**to_create_params(microgrid_id, dispatch_b))
708+
lifecycle = service.new_lifecycle_events_receiver("TEST_TYPE")
709+
await lifecycle.receive()
710+
await lifecycle.receive()
711+
712+
fake_time.shift(timedelta(seconds=6))
713+
await asyncio.sleep(1)
714+
started_b = await receiver.receive()
715+
assert started_b.started
716+
717+
# Stop dispatch_b before dispatch_a ever becomes active
718+
await client.update(
719+
microgrid_id=microgrid_id,
720+
dispatch_id=started_b.id,
721+
new_fields={"active": False},
722+
)
723+
fake_time.shift(timedelta(seconds=2))
724+
await asyncio.sleep(1)
725+
stopped_b = await receiver.receive()
726+
assert not stopped_b.started
727+
728+
# Since dispatch_a never started, no merging logic needed here.
729+
await service.stop()

0 commit comments

Comments
 (0)