-
Notifications
You must be signed in to change notification settings - Fork 6
Support multiple target actors for one type of dispatch #104
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
27252a8
df2f8a1
5e7bfb9
e4fea21
df88828
2c29ab7
bd75c0f
07a077e
5ddb903
484c90b
94b8efe
f3b4146
e0c5d8d
23cc65f
9747b15
7d43537
6c34157
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -41,7 +41,7 @@ class ActorDispatcher(BackgroundService): | |
| import os | ||
| import asyncio | ||
| from typing import override | ||
| from frequenz.dispatch import Dispatcher, DispatchManagingActor, DispatchInfo | ||
| from frequenz.dispatch import Dispatcher, ActorDispatcher, DispatchInfo | ||
| from frequenz.client.dispatch.types import TargetComponents | ||
| from frequenz.client.common.microgrid.components import ComponentCategory | ||
| from frequenz.channels import Receiver, Broadcast, select, selected_from | ||
|
|
@@ -125,7 +125,7 @@ async def main(): | |
|
|
||
| status_receiver = dispatcher.new_running_state_event_receiver("EXAMPLE_TYPE") | ||
|
|
||
| managing_actor = DispatchManagingActor( | ||
| managing_actor = ActorDispatcher( | ||
| actor_factory=MyActor.new_with_dispatch, | ||
| running_status_receiver=status_receiver, | ||
| ) | ||
|
|
@@ -138,18 +138,25 @@ def __init__( | |
| self, | ||
| actor_factory: Callable[[DispatchInfo, Receiver[DispatchInfo]], Actor], | ||
| running_status_receiver: Receiver[Dispatch], | ||
| dispatch_identity: Callable[[Dispatch], int] | None = None, | ||
| ) -> None: | ||
| """Initialize the dispatch handler. | ||
|
|
||
| Args: | ||
| actor_factory: A callable that creates an actor with some initial dispatch | ||
| information. | ||
| running_status_receiver: The receiver for dispatch running status changes. | ||
| dispatch_identity: A function to identify to which actor a dispatch refers. | ||
| By default, it uses the dispatch ID. | ||
| """ | ||
| super().__init__() | ||
| self._dispatch_identity: Callable[[Dispatch], int] = ( | ||
| dispatch_identity if dispatch_identity else lambda d: d.id | ||
| ) | ||
|
|
||
| self._dispatch_rx = running_status_receiver | ||
| self._actor_factory = actor_factory | ||
| self._actor: Actor | None = None | ||
| self._actors: dict[int, Actor] = {} | ||
| self._updates_channel = Broadcast[DispatchInfo]( | ||
| name="dispatch_updates_channel", resend_latest=True | ||
| ) | ||
|
|
@@ -167,22 +174,36 @@ async def _start_actor(self, dispatch: Dispatch) -> None: | |
| options=dispatch.payload, | ||
| ) | ||
|
|
||
| if self._actor: | ||
| actor: Actor | None = self._actors.get(self._dispatch_identity(dispatch)) | ||
|
|
||
| if actor: | ||
| sent_str = "" | ||
| if self._updates_sender is not None: | ||
| sent_str = ", sent a dispatch update instead of creating a new actor" | ||
| await self._updates_sender.send(dispatch_update) | ||
| _logger.warning( | ||
| _logger.info( | ||
| "Actor for dispatch type %r is already running%s", | ||
| dispatch.type, | ||
| sent_str, | ||
| ) | ||
| else: | ||
| _logger.info("Starting actor for dispatch type %r", dispatch.type) | ||
| self._actor = self._actor_factory( | ||
| dispatch_update, self._updates_channel.new_receiver() | ||
| ) | ||
| self._actor.start() | ||
| try: | ||
| _logger.info("Starting actor for dispatch type %r", dispatch.type) | ||
| actor = self._actor_factory( | ||
| dispatch_update, | ||
| self._updates_channel.new_receiver(limit=1, warn_on_overflow=False), | ||
| ) | ||
| self._actors[self._dispatch_identity(dispatch)] = actor | ||
|
|
||
| actor.start() | ||
|
|
||
| except Exception as e: # pylint: disable=broad-except | ||
| _logger.error( | ||
| "Failed to start actor for dispatch type %r: %s", | ||
| dispatch.type, | ||
| e, | ||
| exc_info=True, | ||
| ) | ||
|
Comment on lines
+190
to
+206
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not for this PR, as it is a separate feature that can be implemented separately, but I think it might be good to do the starting in a background task and retrying forever like the edge app "dispatch subsystem" does (maybe with the new dispatcher we can even get rid of the dispatch subsystem). Not 100% sure, as doing this is making maybe too many assumptions about what clients want, but maybe it could also be opt-in/out by passing an option like |
||
|
|
||
| async def _stop_actor(self, stopping_dispatch: Dispatch, msg: str) -> None: | ||
| """Stop all actors. | ||
|
|
@@ -191,13 +212,12 @@ async def _stop_actor(self, stopping_dispatch: Dispatch, msg: str) -> None: | |
| stopping_dispatch: The dispatch that is stopping the actor. | ||
| msg: The message to be passed to the actors being stopped. | ||
| """ | ||
| if self._actor is None: | ||
| if actor := self._actors.pop(self._dispatch_identity(stopping_dispatch), None): | ||
| await actor.stop(msg) | ||
| else: | ||
| _logger.warning( | ||
| "Actor for dispatch type %r is not running", stopping_dispatch.type | ||
| ) | ||
| else: | ||
| await self._actor.stop(msg) | ||
| self._actor = None | ||
|
|
||
| async def _run(self) -> None: | ||
| """Wait for dispatches and handle them.""" | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't get this change, the commit message says " Don't warn for changing dispatch parameters: It's a normal action ", does this mean that
_start_actoris called every time a dispatch is updated? If so, I would even make this log a debug or remove it completely, because it seems misleading, as it was never the intention to start the actor if it was just a dispatch update.If there is a way to tell if this is called because a dispatch just started or was updated, then maybe we can log more meaningful messages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean, there is a way, but it's not available to the actor_dispatcher (without restructuring).
The idea is that the dispatch instance tells us the desired state, no matter the previous state, so from that perspectives it doesn't matter whether it was an update or a new dispatch, both cases should do both, start or update a running actor instance..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would make this a debug then, because it is completely normal and probably not very useful to get that info when things are running. But not hung to block this PR on.