Skip to content

Commit 4736a75

Browse files
committed
Redesign managing actor
This way we can always send a status update to a new receiver Signed-off-by: Mathias L. Baumann <[email protected]>
1 parent 1c88c58 commit 4736a75

File tree

1 file changed

+30
-18
lines changed

1 file changed

+30
-18
lines changed

src/frequenz/dispatch/_managing_actor.py

Lines changed: 30 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from dataclasses import dataclass
88
from typing import Any, Set
99

10-
from frequenz.channels import Receiver, Sender
10+
from frequenz.channels import Broadcast, Receiver
1111
from frequenz.client.dispatch.types import TargetComponents
1212
from frequenz.sdk.actor import Actor
1313

@@ -85,9 +85,6 @@ async def run():
8585
key=key
8686
)
8787
88-
# Create update channel to receive dispatch update events pre-start and mid-run
89-
dispatch_updates_channel = Broadcast[DispatchUpdate](name="dispatch_updates_channel")
90-
9188
# Start actor and give it an dispatch updates channel receiver
9289
my_actor = MyActor(dispatch_updates_channel.new_receiver())
9390
@@ -106,26 +103,45 @@ async def run():
106103

107104
def __init__(
108105
self,
109-
actor: Actor | Set[Actor],
110-
dispatch_type: str,
111106
running_status_receiver: Receiver[Dispatch],
112-
updates_sender: Sender[DispatchUpdate] | None = None,
107+
actor: Actor | Set[Actor] | None = None,
113108
) -> None:
114109
"""Initialize the dispatch handler.
115110
116111
Args:
117-
actor: A set of actors or a single actor to manage.
118-
dispatch_type: The type of dispatches to handle.
119112
running_status_receiver: The receiver for dispatch running status changes.
120-
updates_sender: The sender for dispatch events
113+
actor: Optional, but should be set later in set_actor(). A set of
114+
actors or a single actor to manage.
121115
"""
122116
super().__init__()
123117
self._dispatch_rx = running_status_receiver
124-
self._actors: frozenset[Actor] = frozenset(
125-
[actor] if isinstance(actor, Actor) else actor
118+
self._actors: frozenset[Actor] = (
119+
frozenset()
120+
if actor is None
121+
else frozenset([actor] if isinstance(actor, Actor) else actor)
126122
)
127-
self._dispatch_type = dispatch_type
128-
self._updates_sender = updates_sender
123+
self._updates_channel = Broadcast[DispatchUpdate](
124+
name="dispatch_updates_channel", resend_latest=True
125+
)
126+
self._updates_sender = self._updates_channel.new_sender()
127+
128+
def set_actor(self, actor: Actor | Set[Actor]) -> None:
129+
"""Set the actor to manage.
130+
131+
Args:
132+
actor: A set of actors or a single actor to manage.
133+
"""
134+
self._actors = (
135+
frozenset([actor]) if isinstance(actor, Actor) else frozenset(actor)
136+
)
137+
138+
def new_receiver(self) -> Receiver[DispatchUpdate]:
139+
"""Create a new receiver for dispatch updates.
140+
141+
Returns:
142+
A new receiver for dispatch updates.
143+
"""
144+
return self._updates_channel.new_receiver()
129145

130146
def _start_actors(self) -> None:
131147
"""Start all actors."""
@@ -158,10 +174,6 @@ async def _handle_dispatch(self, dispatch: Dispatch) -> None:
158174
Args:
159175
dispatch: The dispatch to handle.
160176
"""
161-
if dispatch.type != self._dispatch_type:
162-
_logger.debug("Ignoring dispatch %s", dispatch.id)
163-
return
164-
165177
if dispatch.started:
166178
if self._updates_sender is not None:
167179
_logger.info("Updated by dispatch %s", dispatch.id)

0 commit comments

Comments
 (0)