From 0ff0cdf64693191a894066af91be2f44deb60554 Mon Sep 17 00:00:00 2001 From: "Mathias L. Baumann" Date: Mon, 14 Apr 2025 11:15:44 +0200 Subject: [PATCH 1/3] Fix that every actor instance receives every update for their type Signed-off-by: Mathias L. Baumann --- RELEASE_NOTES.md | 2 +- src/frequenz/dispatch/_actor_dispatcher.py | 50 ++++++---- tests/test_mananging_actor.py | 111 ++++++++++++++++++++- 3 files changed, 142 insertions(+), 21 deletions(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index d4546f4..fef33bb 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -14,4 +14,4 @@ ## Bug Fixes - +* ActorDispatcher: Fix that every instance of the same type receives every dispatch update diff --git a/src/frequenz/dispatch/_actor_dispatcher.py b/src/frequenz/dispatch/_actor_dispatcher.py index e97a434..6e0e793 100644 --- a/src/frequenz/dispatch/_actor_dispatcher.py +++ b/src/frequenz/dispatch/_actor_dispatcher.py @@ -10,7 +10,7 @@ from datetime import timedelta from typing import Any, Awaitable -from frequenz.channels import Broadcast, Receiver, select +from frequenz.channels import Broadcast, Receiver, Sender, select from frequenz.client.dispatch.types import TargetComponents from frequenz.sdk.actor import Actor, BackgroundService @@ -189,6 +189,19 @@ async def _retry_after_delay(self, dispatch: Dispatch) -> None: _logger.info("Retrying dispatch %s now", dispatch.id) await self._sender.send(dispatch) + @dataclass(frozen=True, kw_only=True) + class ActorAndChannel: + """Actor and its sender.""" + + actor: Actor + """The actor.""" + + channel: Broadcast[DispatchInfo] + """The channel for dispatch updates.""" + + sender: Sender[DispatchInfo] + """The sender for dispatch updates.""" + def __init__( # pylint: disable=too-many-arguments, too-many-positional-arguments self, actor_factory: Callable[ @@ -215,11 +228,8 @@ def __init__( # pylint: disable=too-many-arguments, too-many-positional-argumen self._dispatch_rx = running_status_receiver self._actor_factory = actor_factory - self._actors: dict[int, Actor] = {} - self._updates_channel = Broadcast[DispatchInfo]( - name="dispatch_updates_channel", resend_latest=True - ) - self._updates_sender = self._updates_channel.new_sender() + self._actors: dict[int, ActorDispatcher.ActorAndChannel] = {} + self._retrier = ActorDispatcher.FailedDispatchesRetrier(retry_interval) def start(self) -> None: @@ -236,24 +246,25 @@ async def _start_actor(self, dispatch: Dispatch) -> None: ) identity = self._dispatch_identity(dispatch) - actor: Actor | None = self._actors.get(identity) + actor_and_channel = self._actors.get(identity) - if actor: - sent_str = "" - if self._updates_sender is not None: - sent_str = ", sent a dispatch update instead of creating a new actor" - await self._updates_sender.send(dispatch_update) + if actor_and_channel: + await actor_and_channel.sender.send(dispatch_update) _logger.info( - "Actor for dispatch type %r is already running%s", + "Actor for dispatch type %r is already running, " + "sent a dispatch update instead of creating a new actor", dispatch.type, - sent_str, ) else: try: _logger.info("Starting actor for dispatch type %r", dispatch.type) + channel = Broadcast[DispatchInfo]( + name=f"dispatch_updates_channel_instance={identity}", + resend_latest=True, + ) actor = await self._actor_factory( dispatch_update, - self._updates_channel.new_receiver(limit=1, warn_on_overflow=False), + channel.new_receiver(limit=1, warn_on_overflow=False), ) actor.start() @@ -267,7 +278,9 @@ async def _start_actor(self, dispatch: Dispatch) -> None: self._retrier.retry(dispatch) else: # No exception occurred, so we can add the actor to the list - self._actors[identity] = actor + self._actors[identity] = ActorDispatcher.ActorAndChannel( + actor=actor, channel=channel, sender=channel.new_sender() + ) async def _stop_actor(self, stopping_dispatch: Dispatch, msg: str) -> None: """Stop all actors. @@ -278,8 +291,9 @@ async def _stop_actor(self, stopping_dispatch: Dispatch, msg: str) -> None: """ identity = self._dispatch_identity(stopping_dispatch) - if actor := self._actors.pop(identity, None): - await actor.stop(msg) + if actor_and_channel := self._actors.pop(identity, None): + await actor_and_channel.actor.stop(msg) + await actor_and_channel.channel.close() else: _logger.warning( "Actor for dispatch type %r is not running", stopping_dispatch.type diff --git a/tests/test_mananging_actor.py b/tests/test_mananging_actor.py index 4ac2c3d..e8b3639 100644 --- a/tests/test_mananging_actor.py +++ b/tests/test_mananging_actor.py @@ -15,7 +15,7 @@ import time_machine from frequenz.channels import Broadcast, Receiver, Sender from frequenz.client.dispatch import recurrence -from frequenz.client.dispatch.recurrence import Frequency +from frequenz.client.dispatch.recurrence import Frequency, RecurrenceRule from frequenz.client.dispatch.test.client import FakeClient from frequenz.client.dispatch.test.generator import DispatchGenerator from frequenz.sdk.actor import Actor @@ -104,9 +104,17 @@ def actor(self, identity: int) -> MockActor: """Return the actor.""" # pylint: disable=protected-access assert identity in self.actors_service._actors - return cast(MockActor, self.actors_service._actors[identity]) + return cast(MockActor, self.actors_service._actors[identity].actor) # pylint: enable=protected-access + def is_running(self, identity: int) -> bool: + """Return whether the actor is running.""" + # pylint: disable-next=protected-access + if identity not in self.actors_service._actors: + return False + + return self.actor(identity).is_running + @fixture async def test_env() -> AsyncIterator[_TestEnv]: @@ -383,3 +391,102 @@ async def new_mock_receiver( # Check if actor instance is created assert identity(dispatch) in actor_manager._actors + + +async def test_actor_dispatcher_update_isolation( + test_env: _TestEnv, + fake_time: time_machine.Coordinates, +) -> None: + """Test that updates for one dispatch don't affect other actors of the same type.""" + dispatch_type = "ISOLATION_TEST" + start_time = _now() + duration = timedelta(minutes=5) + + # Create first dispatch + dispatch1_spec = replace( + test_env.generator.generate_dispatch(), + id=101, # Unique ID + type=dispatch_type, + active=True, + dry_run=False, + start_time=start_time + timedelta(seconds=1), # Stagger start slightly + duration=duration, + payload={"instance": 1}, + recurrence=RecurrenceRule(), + ) + dispatch1 = Dispatch(dispatch1_spec) + + # Create second dispatch of the same type, different ID + dispatch2_spec = replace( + test_env.generator.generate_dispatch(), + id=102, # Unique ID + type=dispatch_type, # Same type + active=True, + dry_run=False, + start_time=start_time + timedelta(seconds=2), # Stagger start slightly + duration=duration, + payload={"instance": 2}, + recurrence=RecurrenceRule(), + ) + dispatch2 = Dispatch(dispatch2_spec) + + # Send dispatch 1 to start actor 1 + # print(f"Sending dispatch 1: {dispatch1}") + await test_env.running_status_sender.send(dispatch1) + fake_time.shift(timedelta(seconds=1.1)) # Move time past dispatch1 start + await asyncio.sleep(0.1) # Allow actor to start + + assert test_env.is_running(101), "Actor 1 should be running" + actor1 = test_env.actor(101) + assert actor1 is not None + # pylint: disable-next=protected-access + assert actor1.initial_dispatch._src.id == 101 + assert actor1.initial_dispatch.options == {"instance": 1} + assert not test_env.is_running(102), "Actor 2 should not be running yet" + + # Send dispatch 2 to start actor 2 + # print(f"Sending dispatch 2: {dispatch2}") + await test_env.running_status_sender.send(dispatch2) + fake_time.shift(timedelta(seconds=1)) # Move time past dispatch2 start + await asyncio.sleep(0.1) # Allow actor to start + + assert test_env.actor(101).is_running, "Actor 1 should still be running" + assert test_env.actor(102).is_running, "Actor 2 should now be running" + actor2 = test_env.actor(102) + assert actor2 is not None + # pylint: disable-next=protected-access + assert actor2.initial_dispatch._src.id == 102 + assert actor2.initial_dispatch.options == {"instance": 2} + + # Now, send an update to stop dispatch 1 + dispatch1_stop = Dispatch( + replace(dispatch1_spec, duration=timedelta(seconds=1), active=False) + ) + # print(f"Sending stop for dispatch 1: {dispatch1_stop}") + await test_env.running_status_sender.send(dispatch1_stop) + await asyncio.sleep(0.1) # Allow ActorDispatcher to process the stop + + # THE CORE ASSERTION: Actor 1 should stop, Actor 2 should remain running + # pylint: disable=protected-access + assert ( + 101 not in test_env.actors_service._actors + ), "Actor 1 should have been removed" + # pylint: enable=protected-access + assert ( + test_env.actor(102).is_running is True + ), "Actor 2 should be running after Actor 1 stopped" + # Double check actor1 object state if needed (though removal is stronger check) + # assert not actor1.is_running + + # Cleanup: Stop Actor 2 + dispatch2_stop = Dispatch(replace(dispatch2_spec, active=False)) + # print(f"Sending stop for dispatch 2: {dispatch2_stop}") + await test_env.running_status_sender.send(dispatch2_stop) + await asyncio.sleep(0.1) # Allow ActorDispatcher to process the stop + + # pylint: disable=protected-access + assert ( + 102 not in test_env.actors_service._actors + ), "Actor 2 should have been removed" + # pylint: enable=protected-access + assert not test_env.is_running(102), "Actor 2 should be stopped" From 25536a6f6015b6e52514da2799c434b6f5686b12 Mon Sep 17 00:00:00 2001 From: "Mathias L. Baumann" Date: Mon, 14 Apr 2025 11:28:38 +0200 Subject: [PATCH 2/3] Fix typo in test file name Signed-off-by: Mathias L. Baumann --- tests/{test_mananging_actor.py => test_managing_actor.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename tests/{test_mananging_actor.py => test_managing_actor.py} (100%) diff --git a/tests/test_mananging_actor.py b/tests/test_managing_actor.py similarity index 100% rename from tests/test_mananging_actor.py rename to tests/test_managing_actor.py From 8ec74c41542f8e81d0026673015c2f285f5d5bee Mon Sep 17 00:00:00 2001 From: "Mathias L. Baumann" Date: Mon, 14 Apr 2025 16:09:12 +0200 Subject: [PATCH 3/3] Update RELEASE_NOTES Signed-off-by: Mathias L. Baumann --- RELEASE_NOTES.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index fef33bb..9c6a924 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -14,4 +14,5 @@ ## Bug Fixes -* ActorDispatcher: Fix that every instance of the same type receives every dispatch update +* ActorDispatcher: Fix that every actor instance wrongly received all updates for their dispatch type. This is only relevant to you if your actor has more than one running instance at any time. +