diff --git a/src/frequenz/dispatch/_managing_actor.py b/src/frequenz/dispatch/_managing_actor.py index 6ed4e57..5179ae5 100644 --- a/src/frequenz/dispatch/_managing_actor.py +++ b/src/frequenz/dispatch/_managing_actor.py @@ -7,7 +7,7 @@ from dataclasses import dataclass from typing import Any, Set -from frequenz.channels import Receiver, Sender +from frequenz.channels import Broadcast, Receiver from frequenz.client.dispatch.types import TargetComponents from frequenz.sdk.actor import Actor @@ -85,9 +85,6 @@ async def run(): key=key ) - # Create update channel to receive dispatch update events pre-start and mid-run - dispatch_updates_channel = Broadcast[DispatchUpdate](name="dispatch_updates_channel") - # Start actor and give it an dispatch updates channel receiver my_actor = MyActor(dispatch_updates_channel.new_receiver()) @@ -106,26 +103,45 @@ async def run(): def __init__( self, - actor: Actor | Set[Actor], - dispatch_type: str, running_status_receiver: Receiver[Dispatch], - updates_sender: Sender[DispatchUpdate] | None = None, + actor: Actor | Set[Actor] | None = None, ) -> None: """Initialize the dispatch handler. Args: - actor: A set of actors or a single actor to manage. - dispatch_type: The type of dispatches to handle. running_status_receiver: The receiver for dispatch running status changes. - updates_sender: The sender for dispatch events + actor: Optional, but should be set later in set_actor(). A set of + actors or a single actor to manage. """ super().__init__() self._dispatch_rx = running_status_receiver - self._actors: frozenset[Actor] = frozenset( - [actor] if isinstance(actor, Actor) else actor + self._actors: frozenset[Actor] = ( + frozenset() + if actor is None + else frozenset([actor] if isinstance(actor, Actor) else actor) ) - self._dispatch_type = dispatch_type - self._updates_sender = updates_sender + self._updates_channel = Broadcast[DispatchUpdate]( + name="dispatch_updates_channel", resend_latest=True + ) + self._updates_sender = self._updates_channel.new_sender() + + def set_actor(self, actor: Actor | Set[Actor]) -> None: + """Set the actor to manage. + + Args: + actor: A set of actors or a single actor to manage. + """ + self._actors = ( + frozenset([actor]) if isinstance(actor, Actor) else frozenset(actor) + ) + + def new_receiver(self) -> Receiver[DispatchUpdate]: + """Create a new receiver for dispatch updates. + + Returns: + A new receiver for dispatch updates. + """ + return self._updates_channel.new_receiver() def _start_actors(self) -> None: """Start all actors.""" @@ -158,10 +174,6 @@ async def _handle_dispatch(self, dispatch: Dispatch) -> None: Args: dispatch: The dispatch to handle. """ - if dispatch.type != self._dispatch_type: - _logger.debug("Ignoring dispatch %s", dispatch.id) - return - if dispatch.started: if self._updates_sender is not None: _logger.info("Updated by dispatch %s", dispatch.id)