Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,7 @@
## Bug Fixes

- Fixed bug with formulas raising exception when stopped.

- Fix a bug that raised `CancelledError` when actor was started with `frequenz.sdk.actor.run` and stopped.

- Stop catching `BaseException` in `frequenz.sdk.actor.run`. Only `CancelledError` and `Exception` are caught now.
17 changes: 10 additions & 7 deletions src/frequenz/sdk/actor/_run_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,21 @@ async def run(*actors: Actor) -> None:
done_tasks, pending_tasks = await asyncio.wait(
pending_tasks, return_when=asyncio.FIRST_COMPLETED
)

# This should always be only one task, but we handle many for extra safety
for task in done_tasks:
# Cancellation needs to be checked first, otherwise the other methods
# could raise a CancelledError
if task.cancelled():
# BackgroundService returns a BaseExceptionGroup containing multiple
# exceptions. The 'task.result()' statement raises these exceptions,
# and 'except*' is used to handle them as a group. If the task raises
# multiple different exceptions, 'except*' will be invoked multiple times,
# once for each exception group.
try:
task.result()
except* asyncio.CancelledError:
_logger.info("Actor %s: Cancelled while running.", task.get_name())
elif exception := task.exception():
_logger.error(
except* Exception: # pylint: disable=broad-exception-caught
_logger.exception(
"Actor %s: Raised an exception while running.",
task.get_name(),
exc_info=exception,
)
else:
_logger.info("Actor %s: Finished normally.", task.get_name())
Expand Down
62 changes: 1 addition & 61 deletions tests/actor/test_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,31 +78,6 @@ async def _run(self) -> None:
print(f"{self} done (should not happen)")


class RaiseBaseExceptionActor(BaseTestActor):
"""A faulty actor that raises a BaseException as soon as it receives a message."""

def __init__(
self,
recv: Receiver[int],
) -> None:
"""Create an instance.

Args:
recv: A channel receiver for int data.
"""
super().__init__(name="test")
self._recv = recv

async def _run(self) -> None:
"""Start the actor and crash upon receiving a message."""
print(f"{self} started")
self.inc_restart_count()
async for _ in self._recv:
print(f"{self} is about to crash")
raise MyBaseException("This is a test")
print(f"{self} done (should not happen)")


ACTOR_INFO = ("frequenz.sdk.actor._actor", 20)
ACTOR_ERROR = ("frequenz.sdk.actor._actor", 40)
RUN_INFO = ("frequenz.sdk.actor._run_utils", 20)
Expand Down Expand Up @@ -309,41 +284,6 @@ async def test_does_not_restart_on_normal_exit(
]


async def test_does_not_restart_on_base_exception(
actor_auto_restart_once: None, # pylint: disable=unused-argument
caplog: pytest.LogCaptureFixture,
) -> None:
"""Create a faulty actor and expect it not to restart because it raises a base exception."""
caplog.set_level("DEBUG", logger="frequenz.sdk.actor._actor")
caplog.set_level("DEBUG", logger="frequenz.sdk.actor._run_utils")

channel: Broadcast[int] = Broadcast(name="channel")

actor = RaiseBaseExceptionActor(channel.new_receiver())

async with asyncio.timeout(1.0):
await channel.new_sender().send(1)
# We can't use pytest.raises() here because known BaseExceptions are handled
# specially by pytest.
try:
await run(actor)
except MyBaseException as error:
assert str(error) == "This is a test"

assert BaseTestActor.restart_count == 0
assert caplog.record_tuples == [
(*RUN_INFO, "Starting 1 actor(s)..."),
(*RUN_INFO, "Actor RaiseBaseExceptionActor[test]: Starting..."),
(*ACTOR_INFO, "Actor RaiseBaseExceptionActor[test]: Started."),
(*ACTOR_ERROR, "Actor RaiseBaseExceptionActor[test]: Raised a BaseException."),
(
*RUN_ERROR,
"Actor RaiseBaseExceptionActor[test]: Raised an exception while running.",
),
(*RUN_INFO, "All 1 actor(s) finished."),
]


async def test_does_not_restart_if_cancelled(
actor_auto_restart_once: None, # pylint: disable=unused-argument
caplog: pytest.LogCaptureFixture,
Expand Down Expand Up @@ -390,6 +330,6 @@ async def cancel_actor() -> None:
(*RUN_INFO, "Actor EchoActor[EchoActor]: Starting..."),
(*ACTOR_INFO, "Actor EchoActor[EchoActor]: Started."),
(*ACTOR_INFO, "Actor EchoActor[EchoActor]: Cancelled."),
(*RUN_ERROR, "Actor EchoActor[EchoActor]: Raised an exception while running."),
(*RUN_INFO, "Actor EchoActor[EchoActor]: Cancelled while running."),
(*RUN_INFO, "All 1 actor(s) finished."),
]
Loading