Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@

## Bug Fixes

<!-- Here goes notable bug fixes that are worth a special mention or explanation -->
- Fixed issue where actors would restart instead of stopping when exceptions occurred during cancellation. Actors now properly stop and surface the unhandled exception.
33 changes: 33 additions & 0 deletions src/frequenz/sdk/actor/_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,24 @@ class Actor(BackgroundService, abc.ABC):
This is mostly used for testing purposes and shouldn't be set in production.
"""

def __init__(self, *, name: str | None = None) -> None:
"""Create actor instance.

Args:
name: The name of this background service.
"""
super().__init__(name=name)
self._is_cancelled = False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we save the main actor task instead and use self._main_task.cancelled() as a single source of truth?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checked and in that case self._main_task.cancelled() returns False, because task was not cancelled successfully.


def start(self) -> None:
"""Start this actor.

If this actor is already running, this method does nothing.
"""
if self.is_running:
return

self._is_cancelled = False
self._tasks.clear()
self._tasks.add(asyncio.create_task(self._run_loop()))

Expand Down Expand Up @@ -94,6 +105,17 @@ async def _run_loop(self) -> None:
_logger.info("Actor %s: Cancelled.", self)
raise
except Exception: # pylint: disable=broad-except
if self._is_cancelled:
# If actor was cancelled, but any tasks have failed with an exception
# other than asyncio.CancelledError, those exceptions are combined
# in an ExceptionGroup or BaseExceptionGroup.
# We have to handle that case separately to stop actor instead
# of restarting it.
_logger.exception(
"Actor %s: Raised an unhandled exception during stop.", self
)
break

_logger.exception("Actor %s: Raised an unhandled exception.", self)
limit_str = "∞" if self._restart_limit is None else self._restart_limit
limit_str = f"({n_restarts}/{limit_str})"
Expand All @@ -113,3 +135,14 @@ async def _run_loop(self) -> None:
break

_logger.info("Actor %s: Stopped.", self)

def cancel(self, msg: str | None = None) -> None:
"""Cancel actor.

Cancelled actor can't be started again.

Args:
msg: The message to be passed to the tasks being cancelled.
"""
self._is_cancelled = True
return super().cancel(msg)
67 changes: 67 additions & 0 deletions tests/actor/test_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,31 @@ async def _run(self) -> None:
print(f"{self} done")


class RaiseExceptionOnCancelActor(BaseTestActor):
"""Actor that raises exception during stop."""

def __init__(
self, *, name: str | None = None, recv: Receiver[int], sender: Sender[int]
) -> None:
"""Create an instance."""
super().__init__(name=name)
self._recv = recv
self._sender = sender

async def _run(self) -> None:
"""Start the actor and raise exception after receiving CancelledError."""
self.inc_restart_count()
if BaseTestActor.restart_count == 1:
# This actor should not restart
# If it does we just return to avoid infinite await on `stop`
return
try:
async for msg in self._recv:
await self._sender.send(msg)
except asyncio.CancelledError as exc:
raise RuntimeError("Actor should stop.") from exc
Comment on lines +77 to +78
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we need to support this, this feels wrong, if a task is cancelled, it should not raise some other exception. Cancellation should always be supported and do nothing if it was already cancelled.

Copy link
Contributor Author

@ela-kotulska-frequenz ela-kotulska-frequenz May 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if a task is cancelled, it should not raise some other exception.

Yes but bugs happens and anything can raise exception. The problem is in this case we restart the actor and await actor.stop() never ends... :/



class RaiseExceptionActor(BaseTestActor):
"""A faulty actor that raises an Exception as soon as it receives a message."""

Expand Down Expand Up @@ -333,3 +358,45 @@ async def cancel_actor() -> None:
(*RUN_INFO, "Actor EchoActor[EchoActor]: Cancelled while running."),
(*RUN_INFO, "All 1 actor(s) finished."),
]


async def test_actor_stop_if_error_was_raised_during_cancel(
actor_auto_restart_once: None, # pylint: disable=unused-argument
caplog: pytest.LogCaptureFixture,
) -> None:
"""If actor raises exception during cancellation it should stop.

And throw unhandled exception to the user..
"""
caplog.set_level("DEBUG", logger="frequenz.sdk.actor._actor")
caplog.set_level("DEBUG", logger="frequenz.sdk.actor._run_utils")

input_chan: Broadcast[int] = Broadcast(name="TestChannel")

echo_chan: Broadcast[int] = Broadcast(name="echo output")
echo_rx = echo_chan.new_receiver()

actor = RaiseExceptionOnCancelActor(
name="test",
recv=input_chan.new_receiver(),
sender=echo_chan.new_sender(),
)

# Start actor and make sure it is running
actor.start()
await input_chan.new_sender().send(5)
msg = await echo_rx.receive()
assert msg == 5
assert actor.is_running is True

await actor.stop()
assert actor.restart_count == 0

assert caplog.record_tuples == [
(*ACTOR_INFO, "Actor RaiseExceptionOnCancelActor[test]: Started."),
(
*ACTOR_ERROR,
"Actor RaiseExceptionOnCancelActor[test]: Raised an unhandled exception during stop.",
),
(*ACTOR_INFO, "Actor RaiseExceptionOnCancelActor[test]: Stopped."),
]
Loading