From 0609b7a6d8022c2a902d5f3ae5b075a29b7accb0 Mon Sep 17 00:00:00 2001 From: Elzbieta Kotulska Date: Thu, 22 May 2025 11:09:21 +0200 Subject: [PATCH 1/2] Add test that fails because of actor bug When an actor throws an exception during cancellation, it restarts rather than stopping as expected. Signed-off-by: Elzbieta Kotulska --- tests/actor/test_actor.py | 58 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/tests/actor/test_actor.py b/tests/actor/test_actor.py index 05a9f5f26..930ffaf9e 100644 --- a/tests/actor/test_actor.py +++ b/tests/actor/test_actor.py @@ -53,6 +53,31 @@ async def _run(self) -> None: print(f"{self} done") +class RaiseExceptionOnCancelActor(BaseTestActor): + """Actor that raises exception during stop.""" + + def __init__( + self, *, name: str | None = None, recv: Receiver[int], sender: Sender[int] + ) -> None: + """Create an instance.""" + super().__init__(name=name) + self._recv = recv + self._sender = sender + + async def _run(self) -> None: + """Start the actor and raise exception after receiving CancelledError.""" + self.inc_restart_count() + if BaseTestActor.restart_count == 1: + # This actor should not restart + # If it does we just return to avoid infinite await on `stop` + return + try: + async for msg in self._recv: + await self._sender.send(msg) + except asyncio.CancelledError as exc: + raise RuntimeError("Actor should stop.") from exc + + class RaiseExceptionActor(BaseTestActor): """A faulty actor that raises an Exception as soon as it receives a message.""" @@ -333,3 +358,36 @@ async def cancel_actor() -> None: (*RUN_INFO, "Actor EchoActor[EchoActor]: Cancelled while running."), (*RUN_INFO, "All 1 actor(s) finished."), ] + + +async def test_actor_stop_if_error_was_raised_during_cancel( + actor_auto_restart_once: None, # pylint: disable=unused-argument + caplog: pytest.LogCaptureFixture, +) -> None: + """If actor raises exception during cancellation it should stop. + + And throw unhandled exception to the user.. + """ + caplog.set_level("DEBUG", logger="frequenz.sdk.actor._actor") + caplog.set_level("DEBUG", logger="frequenz.sdk.actor._run_utils") + + input_chan: Broadcast[int] = Broadcast(name="TestChannel") + + echo_chan: Broadcast[int] = Broadcast(name="echo output") + echo_rx = echo_chan.new_receiver() + + actor = RaiseExceptionOnCancelActor( + name="test", + recv=input_chan.new_receiver(), + sender=echo_chan.new_sender(), + ) + + # Start actor and make sure it is running + actor.start() + await input_chan.new_sender().send(5) + msg = await echo_rx.receive() + assert msg == 5 + assert actor.is_running is True + + await actor.stop() + assert actor.restart_count == 0 From 02e30d7d197640f475c5c064c34a02eb0fa759dc Mon Sep 17 00:00:00 2001 From: Elzbieta Kotulska Date: Thu, 22 May 2025 11:28:13 +0200 Subject: [PATCH 2/2] Fix issue with actor restarting instead of stopping When an actor threw an exception during cancellation, it restarted rather than stopping as expected. Now it stops and raises an unhandled exception. Signed-off-by: Elzbieta Kotulska --- RELEASE_NOTES.md | 2 +- src/frequenz/sdk/actor/_actor.py | 33 ++++++++++++++++++++++++++++++++ tests/actor/test_actor.py | 9 +++++++++ 3 files changed, 43 insertions(+), 1 deletion(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 6d2e4d19e..1b7b0fb9a 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -14,4 +14,4 @@ ## Bug Fixes - +- Fixed issue where actors would restart instead of stopping when exceptions occurred during cancellation. Actors now properly stop and surface the unhandled exception. \ No newline at end of file diff --git a/src/frequenz/sdk/actor/_actor.py b/src/frequenz/sdk/actor/_actor.py index e62c2b638..337b00613 100644 --- a/src/frequenz/sdk/actor/_actor.py +++ b/src/frequenz/sdk/actor/_actor.py @@ -44,6 +44,15 @@ class Actor(BackgroundService, abc.ABC): This is mostly used for testing purposes and shouldn't be set in production. """ + def __init__(self, *, name: str | None = None) -> None: + """Create actor instance. + + Args: + name: The name of this background service. + """ + super().__init__(name=name) + self._is_cancelled = False + def start(self) -> None: """Start this actor. @@ -51,6 +60,8 @@ def start(self) -> None: """ if self.is_running: return + + self._is_cancelled = False self._tasks.clear() self._tasks.add(asyncio.create_task(self._run_loop())) @@ -94,6 +105,17 @@ async def _run_loop(self) -> None: _logger.info("Actor %s: Cancelled.", self) raise except Exception: # pylint: disable=broad-except + if self._is_cancelled: + # If actor was cancelled, but any tasks have failed with an exception + # other than asyncio.CancelledError, those exceptions are combined + # in an ExceptionGroup or BaseExceptionGroup. + # We have to handle that case separately to stop actor instead + # of restarting it. + _logger.exception( + "Actor %s: Raised an unhandled exception during stop.", self + ) + break + _logger.exception("Actor %s: Raised an unhandled exception.", self) limit_str = "∞" if self._restart_limit is None else self._restart_limit limit_str = f"({n_restarts}/{limit_str})" @@ -113,3 +135,14 @@ async def _run_loop(self) -> None: break _logger.info("Actor %s: Stopped.", self) + + def cancel(self, msg: str | None = None) -> None: + """Cancel actor. + + Cancelled actor can't be started again. + + Args: + msg: The message to be passed to the tasks being cancelled. + """ + self._is_cancelled = True + return super().cancel(msg) diff --git a/tests/actor/test_actor.py b/tests/actor/test_actor.py index 930ffaf9e..3025ff24d 100644 --- a/tests/actor/test_actor.py +++ b/tests/actor/test_actor.py @@ -391,3 +391,12 @@ async def test_actor_stop_if_error_was_raised_during_cancel( await actor.stop() assert actor.restart_count == 0 + + assert caplog.record_tuples == [ + (*ACTOR_INFO, "Actor RaiseExceptionOnCancelActor[test]: Started."), + ( + *ACTOR_ERROR, + "Actor RaiseExceptionOnCancelActor[test]: Raised an unhandled exception during stop.", + ), + (*ACTOR_INFO, "Actor RaiseExceptionOnCancelActor[test]: Stopped."), + ]