Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

- Stop fallback formulas when primary formula starts working again.

- Add `RestartActorException` to `frequnez.sdk.actor`. Actor can use it to restart `run` method if necessary.

## Bug Fixes

- Fixed bug with formulas raising exception when stopped.
Expand Down
3 changes: 2 additions & 1 deletion src/frequenz/sdk/actor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -598,12 +598,13 @@ async def main() -> None: # (6)!
"""

from ..timeseries._resampling import ResamplerConfig
from ._actor import Actor
from ._actor import Actor, RestartActorException
from ._background_service import BackgroundService
from ._run_utils import run

__all__ = [
"Actor",
"RestartActorException",
"BackgroundService",
"ResamplerConfig",
"run",
Expand Down
40 changes: 24 additions & 16 deletions src/frequenz/sdk/actor/_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
_logger = logging.getLogger(__name__)


class RestartActorException(Exception):
"""Exception raised when an actor should be restarted."""


class Actor(BackgroundService, abc.ABC):
"""A primitive unit of computation that runs autonomously.

Expand Down Expand Up @@ -58,19 +62,14 @@ def start(self) -> None:
async def _run(self) -> None:
"""Run this actor's logic."""

async def _delay_if_restart(self, iteration: int) -> None:
"""Delay the restart of this actor's n'th iteration.

Args:
iteration: The current iteration of the restart.
"""
async def _delay(self) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe make it a bit more explicit?

Suggested change
async def _delay(self) -> None:
async def _delay_restart(self) -> None:

"""Delay the restart of this actor."""
# NB: I think it makes sense (in the future) to think about deminishing returns
# the longer the actor has been running.
# Not just for the restart-delay but actually for the n_restarts counter as well.
if iteration > 0:
delay = self.RESTART_DELAY.total_seconds()
_logger.info("Actor %s: Waiting %s seconds...", self, delay)
await asyncio.sleep(delay)
delay = self.RESTART_DELAY.total_seconds()
_logger.info("Actor %s: Waiting %s seconds...", self, delay)
await asyncio.sleep(delay)

async def _run_loop(self) -> None:
"""Run the actor's task continuously, managing restarts, cancellation, and termination.
Expand All @@ -84,22 +83,31 @@ async def _run_loop(self) -> None:
BaseException: If the actor's `_run()` method raises any base exception.
"""
_logger.info("Actor %s: Started.", self)
n_restarts = 0
n_errors = 0
should_delay = False
while True:
try:
await self._delay_if_restart(n_restarts)
if should_delay:
await self._delay()
should_delay = False
await self._run()
_logger.info("Actor %s: _run() returned without error.", self)
except asyncio.CancelledError:
_logger.info("Actor %s: Cancelled.", self)
raise
except RestartActorException:
_logger.info("Actor %s: Restarting.", self)
continue
except Exception: # pylint: disable=broad-except
should_delay = True
_logger.exception("Actor %s: Raised an unhandled exception.", self)
limit_str = "∞" if self._restart_limit is None else self._restart_limit
limit_str = f"({n_restarts}/{limit_str})"
if self._restart_limit is None or n_restarts < self._restart_limit:
n_restarts += 1
_logger.info("Actor %s: Restarting %s...", self, limit_str)
limit_str = f"({n_errors}/{limit_str})"
if self._restart_limit is None or n_errors < self._restart_limit:
n_errors += 1
_logger.info(
"Actor %s: Restarting, error count %s...", self, limit_str
)
continue
_logger.info(
"Actor %s: Maximum restarts attempted %s, bailing out...",
Expand Down
107 changes: 105 additions & 2 deletions tests/actor/test_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import pytest
from frequenz.channels import Broadcast, Receiver, Sender, select, selected_from

from frequenz.sdk.actor import Actor, run
from frequenz.sdk.actor import Actor, RestartActorException, run

from ..conftest import actor_restart_limit

Expand Down Expand Up @@ -78,6 +78,32 @@ async def _run(self) -> None:
print(f"{self} done (should not happen)")


class RaiseRestartExceptionActor(BaseTestActor):
"""Actor that raises an RestartActorException as soon as it receives a message."""

def __init__(self, recv: Receiver[int], raise_count: int) -> None:
"""Create an instance.

Args:
recv: A channel receiver for int data.
raise_count: How many time raise RestartActorException
"""
super().__init__(name="test")
self._recv = recv
self._raise_count = raise_count

async def _run(self) -> None:
"""Start the actor and crash upon receiving a message."""
print(f"{self} started")
self.inc_restart_count()
async for _ in self._recv:
if self._raise_count <= 0:
break
self._raise_count -= 1
raise RestartActorException("Actor should restarts")
print(f"{self} done")


ACTOR_INFO = ("frequenz.sdk.actor._actor", 20)
ACTOR_ERROR = ("frequenz.sdk.actor._actor", 40)
RUN_INFO = ("frequenz.sdk.actor._run_utils", 20)
Expand Down Expand Up @@ -220,7 +246,8 @@ async def test_restart_on_unhandled_exception(
),
(
*ACTOR_INFO,
f"Actor RaiseExceptionActor[test]: Restarting ({i}/{restart_limit})...",
"Actor RaiseExceptionActor[test]: "
f"Restarting, error count ({i}/{restart_limit})...",
),
(
*ACTOR_INFO,
Expand Down Expand Up @@ -256,6 +283,82 @@ async def test_restart_on_unhandled_exception(
assert filtered_logs == expected_log


@pytest.mark.parametrize("restart_num", [1])
async def test_restart_on_restart_exception(
restart_num: int, caplog: pytest.LogCaptureFixture
) -> None:
"""Create a faulty actor and expect it to restart because it raises an exception.

Also test this works with different restart limits.

Args:
restart_num: The restart limit to use.
caplog: The log capture fixture.
"""
relevant_loggers = {"frequenz.sdk.actor._actor", "frequenz.sdk.actor._run_utils"}
for logger in relevant_loggers:
caplog.set_level("DEBUG", logger=logger)

channel: Broadcast[int] = Broadcast(name="channel")

# We need some timeout, 1 second for each restart should be enough.
# There should be no restart delay.
expected_wait_time = timedelta(seconds=restart_num + 1.0)
async with asyncio.timeout(expected_wait_time.total_seconds()):
actor = RaiseRestartExceptionActor(
channel.new_receiver(),
raise_count=restart_num,
)
for i in range(restart_num + 1):
await channel.new_sender().send(i)

await run(actor)
await actor.wait()

assert actor.is_running is False
assert BaseTestActor.restart_count == restart_num
expected_log = [
(*RUN_INFO, "Starting 1 actor(s)..."),
(*RUN_INFO, "Actor RaiseRestartExceptionActor[test]: Starting..."),
(*ACTOR_INFO, "Actor RaiseRestartExceptionActor[test]: Started."),
]
for i in range(restart_num):
expected_log.append(
(
*ACTOR_INFO,
"Actor RaiseRestartExceptionActor[test]: Restarting.",
),
)
expected_log.extend(
[
(
*ACTOR_INFO,
"Actor RaiseRestartExceptionActor[test]: _run() returned without error.",
),
(
*ACTOR_INFO,
"Actor RaiseRestartExceptionActor[test]: Stopped.",
),
(
*RUN_INFO,
"Actor RaiseRestartExceptionActor[test]: Finished normally.",
),
(
*RUN_INFO,
"All 1 actor(s) finished.",
),
]
)
print("caplog.record_tuples:", caplog.record_tuples)
# This is an ugly hack. There seem to be some issues with asyncio and caplog, maybe
# pytest-asyncio, that reports some pending tasks from an unrelated test when tested
# inside QEMU (suggesting also some timing issue when things run very slow).
filtered_logs = [r for r in caplog.record_tuples if r[0] in relevant_loggers]
print("filtered_logs:", filtered_logs)
print("expected_log:", expected_log)
assert filtered_logs == expected_log


async def test_does_not_restart_on_normal_exit(
actor_auto_restart_once: None, # pylint: disable=unused-argument
caplog: pytest.LogCaptureFixture,
Expand Down
Loading