-
Notifications
You must be signed in to change notification settings - Fork 20
Fix issue with actor restarting instead of stopping #1223
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -53,6 +53,31 @@ async def _run(self) -> None: | |
| print(f"{self} done") | ||
|
|
||
|
|
||
| class RaiseExceptionOnCancelActor(BaseTestActor): | ||
| """Actor that raises exception during stop.""" | ||
|
|
||
| def __init__( | ||
| self, *, name: str | None = None, recv: Receiver[int], sender: Sender[int] | ||
| ) -> None: | ||
| """Create an instance.""" | ||
| super().__init__(name=name) | ||
| self._recv = recv | ||
| self._sender = sender | ||
|
|
||
| async def _run(self) -> None: | ||
| """Start the actor and raise exception after receiving CancelledError.""" | ||
| self.inc_restart_count() | ||
| if BaseTestActor.restart_count == 1: | ||
| # This actor should not restart | ||
| # If it does we just return to avoid infinite await on `stop` | ||
| return | ||
| try: | ||
| async for msg in self._recv: | ||
| await self._sender.send(msg) | ||
| except asyncio.CancelledError as exc: | ||
| raise RuntimeError("Actor should stop.") from exc | ||
|
Comment on lines
+77
to
+78
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure we need to support this, this feels wrong, if a task is cancelled, it should not raise some other exception. Cancellation should always be supported and do nothing if it was already cancelled. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yes but bugs happens and anything can raise exception. The problem is in this case we restart the actor and |
||
|
|
||
|
|
||
| class RaiseExceptionActor(BaseTestActor): | ||
| """A faulty actor that raises an Exception as soon as it receives a message.""" | ||
|
|
||
|
|
@@ -333,3 +358,45 @@ async def cancel_actor() -> None: | |
| (*RUN_INFO, "Actor EchoActor[EchoActor]: Cancelled while running."), | ||
| (*RUN_INFO, "All 1 actor(s) finished."), | ||
| ] | ||
|
|
||
|
|
||
| async def test_actor_stop_if_error_was_raised_during_cancel( | ||
| actor_auto_restart_once: None, # pylint: disable=unused-argument | ||
| caplog: pytest.LogCaptureFixture, | ||
| ) -> None: | ||
| """If actor raises exception during cancellation it should stop. | ||
|
|
||
| And throw unhandled exception to the user.. | ||
| """ | ||
| caplog.set_level("DEBUG", logger="frequenz.sdk.actor._actor") | ||
| caplog.set_level("DEBUG", logger="frequenz.sdk.actor._run_utils") | ||
|
|
||
| input_chan: Broadcast[int] = Broadcast(name="TestChannel") | ||
|
|
||
| echo_chan: Broadcast[int] = Broadcast(name="echo output") | ||
| echo_rx = echo_chan.new_receiver() | ||
|
|
||
| actor = RaiseExceptionOnCancelActor( | ||
| name="test", | ||
| recv=input_chan.new_receiver(), | ||
| sender=echo_chan.new_sender(), | ||
| ) | ||
|
|
||
| # Start actor and make sure it is running | ||
| actor.start() | ||
| await input_chan.new_sender().send(5) | ||
| msg = await echo_rx.receive() | ||
| assert msg == 5 | ||
| assert actor.is_running is True | ||
|
|
||
| await actor.stop() | ||
| assert actor.restart_count == 0 | ||
ela-kotulska-frequenz marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| assert caplog.record_tuples == [ | ||
| (*ACTOR_INFO, "Actor RaiseExceptionOnCancelActor[test]: Started."), | ||
| ( | ||
| *ACTOR_ERROR, | ||
| "Actor RaiseExceptionOnCancelActor[test]: Raised an unhandled exception during stop.", | ||
| ), | ||
| (*ACTOR_INFO, "Actor RaiseExceptionOnCancelActor[test]: Stopped."), | ||
| ] | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't we save the main actor task instead and use
self._main_task.cancelled()as a single source of truth?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Checked and in that case
self._main_task.cancelled()returns False, because task was not cancelled successfully.