Skip to content

Commit 5edd490

Browse files
committed
Add method to re-send dispatch states after startup
Signed-off-by: Mathias L. Baumann <[email protected]>
1 parent 21e7591 commit 5edd490

File tree

4 files changed

+74
-1
lines changed

4 files changed

+74
-1
lines changed

RELEASE_NOTES.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
## New Features
1212

13-
<!-- Here goes the main new features and examples or instructions on how to use them -->
13+
* Added method `Dispatcher.resend_current_running_states()` to resend the current dispatch running status.
1414

1515
## Bug Fixes
1616

src/frequenz/dispatch/_dispatcher.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,17 @@ async def start(self) -> None:
213213
"""Start the actor."""
214214
self._actor.start()
215215

216+
async def resend_current_running_states(self, dispatch_type: str) -> None:
217+
"""Resend the current running states of all dispatches of a given type.
218+
219+
Warning: Usually you don't need to call this method. It is only useful
220+
when you need to recover the current running state of your actor.
221+
222+
Args:
223+
dispatch_type: The type of dispatches to resend.
224+
"""
225+
await self._actor.resend_current_running_states(dispatch_type)
226+
216227
@property
217228
def client(self) -> Client:
218229
"""Return the client."""

src/frequenz/dispatch/actor.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,23 @@ def __init__(
8484
always at index 0.
8585
"""
8686

87+
async def resend_current_running_states(self, dispatch_type: str) -> None:
88+
"""Trigger a resend of all running states.
89+
90+
Causes the current state to be sent out again on the running state
91+
change channel.
92+
93+
Args:
94+
dispatch_type: The type of dispatches to resend.
95+
"""
96+
_logger.info(
97+
"Resending current running states for dispatch type %s", dispatch_type
98+
)
99+
for dispatch in self._dispatches.values():
100+
if dispatch.type == dispatch_type:
101+
_logger.debug("Resending dispatch %s", dispatch)
102+
await self._send_running_state_change(dispatch)
103+
87104
async def _run(self) -> None:
88105
"""Run the actor."""
89106
_logger.info("Starting dispatch actor for microgrid %s", self._microgrid_id)

tests/test_frequenz_dispatch.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -498,3 +498,48 @@ async def test_notification_on_actor_start(
498498
# Expect notification of the running dispatch being ready to run
499499
ready_dispatch = await actor_env.running_state_change.receive()
500500
assert ready_dispatch.running(running_dispatch.type) == RunningState.RUNNING
501+
502+
503+
async def test_notification_on_resend(
504+
actor_env: ActorTestEnv,
505+
generator: DispatchGenerator,
506+
) -> None:
507+
"""Test that the correct notifications are sent when resending dispatches."""
508+
# Generate a dispatch that is already running
509+
running_dispatch = generator.generate_dispatch()
510+
running_dispatch = replace(
511+
running_dispatch,
512+
active=True,
513+
duration=timedelta(seconds=10),
514+
start_time=_now() - timedelta(seconds=5),
515+
recurrence=RecurrenceRule(),
516+
type="I_SHOULD_RUN",
517+
)
518+
# Generate a dispatch that is not running
519+
stopped_dispatch = generator.generate_dispatch()
520+
stopped_dispatch = replace(
521+
stopped_dispatch,
522+
active=False,
523+
duration=timedelta(seconds=5),
524+
start_time=_now() - timedelta(seconds=5),
525+
recurrence=RecurrenceRule(),
526+
type="I_SHOULD_NOT_RUN",
527+
)
528+
await actor_env.actor.stop()
529+
530+
# Create the dispatches
531+
actor_env.client.set_dispatches(
532+
actor_env.microgrid_id, [running_dispatch, stopped_dispatch]
533+
)
534+
535+
# Start the actor
536+
actor_env.actor.start()
537+
# Resend the stopped dispatch, but expect no notification
538+
await actor_env.actor.resend_current_running_states(stopped_dispatch.type)
539+
# Resend the running dispatch
540+
await actor_env.actor.resend_current_running_states(running_dispatch.type)
541+
542+
# Expect notification of the running dispatch being ready to run
543+
ready_dispatch = await actor_env.running_state_change.receive()
544+
assert ready_dispatch.running(running_dispatch.type) == RunningState.RUNNING
545+
assert ready_dispatch.type == running_dispatch.type

0 commit comments

Comments
 (0)