diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 6d2e4d19e..1b7b0fb9a 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -14,4 +14,4 @@ ## Bug Fixes - +- Fixed issue where actors would restart instead of stopping when exceptions occurred during cancellation. Actors now properly stop and surface the unhandled exception. \ No newline at end of file diff --git a/src/frequenz/sdk/actor/_actor.py b/src/frequenz/sdk/actor/_actor.py index e62c2b638..337b00613 100644 --- a/src/frequenz/sdk/actor/_actor.py +++ b/src/frequenz/sdk/actor/_actor.py @@ -44,6 +44,15 @@ class Actor(BackgroundService, abc.ABC): This is mostly used for testing purposes and shouldn't be set in production. """ + def __init__(self, *, name: str | None = None) -> None: + """Create actor instance. + + Args: + name: The name of this background service. + """ + super().__init__(name=name) + self._is_cancelled = False + def start(self) -> None: """Start this actor. @@ -51,6 +60,8 @@ def start(self) -> None: """ if self.is_running: return + + self._is_cancelled = False self._tasks.clear() self._tasks.add(asyncio.create_task(self._run_loop())) @@ -94,6 +105,17 @@ async def _run_loop(self) -> None: _logger.info("Actor %s: Cancelled.", self) raise except Exception: # pylint: disable=broad-except + if self._is_cancelled: + # If actor was cancelled, but any tasks have failed with an exception + # other than asyncio.CancelledError, those exceptions are combined + # in an ExceptionGroup or BaseExceptionGroup. + # We have to handle that case separately to stop actor instead + # of restarting it. + _logger.exception( + "Actor %s: Raised an unhandled exception during stop.", self + ) + break + _logger.exception("Actor %s: Raised an unhandled exception.", self) limit_str = "∞" if self._restart_limit is None else self._restart_limit limit_str = f"({n_restarts}/{limit_str})" @@ -113,3 +135,14 @@ async def _run_loop(self) -> None: break _logger.info("Actor %s: Stopped.", self) + + def cancel(self, msg: str | None = None) -> None: + """Cancel actor. + + Cancelled actor can't be started again. + + Args: + msg: The message to be passed to the tasks being cancelled. + """ + self._is_cancelled = True + return super().cancel(msg) diff --git a/tests/actor/test_actor.py b/tests/actor/test_actor.py index 05a9f5f26..3025ff24d 100644 --- a/tests/actor/test_actor.py +++ b/tests/actor/test_actor.py @@ -53,6 +53,31 @@ async def _run(self) -> None: print(f"{self} done") +class RaiseExceptionOnCancelActor(BaseTestActor): + """Actor that raises exception during stop.""" + + def __init__( + self, *, name: str | None = None, recv: Receiver[int], sender: Sender[int] + ) -> None: + """Create an instance.""" + super().__init__(name=name) + self._recv = recv + self._sender = sender + + async def _run(self) -> None: + """Start the actor and raise exception after receiving CancelledError.""" + self.inc_restart_count() + if BaseTestActor.restart_count == 1: + # This actor should not restart + # If it does we just return to avoid infinite await on `stop` + return + try: + async for msg in self._recv: + await self._sender.send(msg) + except asyncio.CancelledError as exc: + raise RuntimeError("Actor should stop.") from exc + + class RaiseExceptionActor(BaseTestActor): """A faulty actor that raises an Exception as soon as it receives a message.""" @@ -333,3 +358,45 @@ async def cancel_actor() -> None: (*RUN_INFO, "Actor EchoActor[EchoActor]: Cancelled while running."), (*RUN_INFO, "All 1 actor(s) finished."), ] + + +async def test_actor_stop_if_error_was_raised_during_cancel( + actor_auto_restart_once: None, # pylint: disable=unused-argument + caplog: pytest.LogCaptureFixture, +) -> None: + """If actor raises exception during cancellation it should stop. + + And throw unhandled exception to the user.. + """ + caplog.set_level("DEBUG", logger="frequenz.sdk.actor._actor") + caplog.set_level("DEBUG", logger="frequenz.sdk.actor._run_utils") + + input_chan: Broadcast[int] = Broadcast(name="TestChannel") + + echo_chan: Broadcast[int] = Broadcast(name="echo output") + echo_rx = echo_chan.new_receiver() + + actor = RaiseExceptionOnCancelActor( + name="test", + recv=input_chan.new_receiver(), + sender=echo_chan.new_sender(), + ) + + # Start actor and make sure it is running + actor.start() + await input_chan.new_sender().send(5) + msg = await echo_rx.receive() + assert msg == 5 + assert actor.is_running is True + + await actor.stop() + assert actor.restart_count == 0 + + assert caplog.record_tuples == [ + (*ACTOR_INFO, "Actor RaiseExceptionOnCancelActor[test]: Started."), + ( + *ACTOR_ERROR, + "Actor RaiseExceptionOnCancelActor[test]: Raised an unhandled exception during stop.", + ), + (*ACTOR_INFO, "Actor RaiseExceptionOnCancelActor[test]: Stopped."), + ]