Skip to content

Commit 432b3cb

Browse files
committed
DispatchManagingActor: Support starting/stopping of multiple actors
Signed-off-by: Mathias L. Baumann <[email protected]>
1 parent 0b671ed commit 432b3cb

File tree

1 file changed

+23
-21
lines changed

1 file changed

+23
-21
lines changed

src/frequenz/dispatch/_managing_actor.py

Lines changed: 23 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
import logging
77
from dataclasses import dataclass
8-
from typing import Any
8+
from typing import Any, Set
99

1010
from frequenz.channels import Receiver, Sender
1111
from frequenz.client.dispatch.types import ComponentSelector
@@ -93,55 +93,57 @@ async def run():
9393
9494
status_receiver = dispatcher.running_status_change.new_receiver()
9595
96-
dispatch_runner = DispatchManagingActor(
96+
managing_actor = DispatchManagingActor(
9797
actor=my_actor,
9898
dispatch_type="EXAMPLE",
9999
running_status_receiver=status_receiver,
100100
updates_sender=dispatch_updates_channel.new_sender(),
101101
)
102102
103-
await asyncio.gather(dispatcher.start(), dispatch_runner.start())
103+
await asyncio.gather(dispatcher.start(), managing_actor.start())
104104
```
105105
"""
106106

107107
def __init__(
108108
self,
109-
actor: Actor,
109+
actor: Actor | Set[Actor],
110110
dispatch_type: str,
111111
running_status_receiver: Receiver[Dispatch],
112112
updates_sender: Sender[DispatchUpdate] | None = None,
113113
) -> None:
114114
"""Initialize the dispatch handler.
115115
116116
Args:
117-
actor: The actor to manage.
117+
actor: A set of actors or a single actor to manage.
118118
dispatch_type: The type of dispatches to handle.
119119
running_status_receiver: The receiver for dispatch running status changes.
120120
updates_sender: The sender for dispatch events
121121
"""
122122
super().__init__()
123123
self._dispatch_rx = running_status_receiver
124-
self._actor = actor
124+
self._actors = frozenset([actor] if isinstance(actor, Actor) else actor)
125125
self._dispatch_type = dispatch_type
126126
self._updates_sender = updates_sender
127127

128-
def _start_actor(self) -> None:
129-
"""Start the actor."""
130-
if self._actor.is_running:
131-
_logger.warning("Actor %s is already running", self._actor.name)
132-
else:
133-
self._actor.start()
128+
def _start_actors(self) -> None:
129+
"""Start all actors."""
130+
for actor in self._actors:
131+
if actor.is_running:
132+
_logger.warning("Actor %s is already running", actor.name)
133+
else:
134+
actor.start()
134135

135-
async def _stop_actor(self, msg: str) -> None:
136-
"""Stop the actor.
136+
async def _stop_actors(self, msg: str) -> None:
137+
"""Stop all actors.
137138
138139
Args:
139-
msg: The message to be passed to the actor being stopped.
140+
msg: The message to be passed to the actors being stopped.
140141
"""
141-
if self._actor.is_running:
142-
await self._actor.stop(msg)
143-
else:
144-
_logger.warning("Actor %s is not running", self._actor.name)
142+
for actor in self._actors:
143+
if actor.is_running:
144+
await actor.stop(msg)
145+
else:
146+
_logger.warning("Actor %s is not running", actor.name)
145147

146148
async def _run(self) -> None:
147149
"""Wait for dispatches and handle them."""
@@ -158,7 +160,7 @@ async def _handle_dispatch(self, dispatch: Dispatch) -> None:
158160
match running:
159161
case RunningState.STOPPED:
160162
_logger.info("Stopped by dispatch %s", dispatch.id)
161-
await self._stop_actor("Dispatch stopped")
163+
await self._stop_actors("Dispatch stopped")
162164
case RunningState.RUNNING:
163165
if self._updates_sender is not None:
164166
_logger.info("Updated by dispatch %s", dispatch.id)
@@ -171,7 +173,7 @@ async def _handle_dispatch(self, dispatch: Dispatch) -> None:
171173
)
172174

173175
_logger.info("Started by dispatch %s", dispatch.id)
174-
self._start_actor()
176+
self._start_actors()
175177
case RunningState.DIFFERENT_TYPE:
176178
_logger.debug(
177179
"Unknown dispatch! Ignoring dispatch of type %s", dispatch.type

0 commit comments

Comments
 (0)