diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 02a8a5c72..013d88460 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -14,6 +14,8 @@ - Stop fallback formulas when primary formula starts working again. +- Add `RestartActorException` to `frequnez.sdk.actor`. Actor can use it to restart `run` method if necessary. + ## Bug Fixes - Fixed bug with formulas raising exception when stopped. diff --git a/src/frequenz/sdk/actor/__init__.py b/src/frequenz/sdk/actor/__init__.py index d0b00f323..70df508b1 100644 --- a/src/frequenz/sdk/actor/__init__.py +++ b/src/frequenz/sdk/actor/__init__.py @@ -598,12 +598,13 @@ async def main() -> None: # (6)! """ from ..timeseries._resampling import ResamplerConfig -from ._actor import Actor +from ._actor import Actor, RestartActorException from ._background_service import BackgroundService from ._run_utils import run __all__ = [ "Actor", + "RestartActorException", "BackgroundService", "ResamplerConfig", "run", diff --git a/src/frequenz/sdk/actor/_actor.py b/src/frequenz/sdk/actor/_actor.py index e62c2b638..c86a0579e 100644 --- a/src/frequenz/sdk/actor/_actor.py +++ b/src/frequenz/sdk/actor/_actor.py @@ -13,6 +13,10 @@ _logger = logging.getLogger(__name__) +class RestartActorException(Exception): + """Exception raised when an actor should be restarted.""" + + class Actor(BackgroundService, abc.ABC): """A primitive unit of computation that runs autonomously. @@ -58,19 +62,14 @@ def start(self) -> None: async def _run(self) -> None: """Run this actor's logic.""" - async def _delay_if_restart(self, iteration: int) -> None: - """Delay the restart of this actor's n'th iteration. - - Args: - iteration: The current iteration of the restart. - """ + async def _delay(self) -> None: + """Delay the restart of this actor.""" # NB: I think it makes sense (in the future) to think about deminishing returns # the longer the actor has been running. # Not just for the restart-delay but actually for the n_restarts counter as well. - if iteration > 0: - delay = self.RESTART_DELAY.total_seconds() - _logger.info("Actor %s: Waiting %s seconds...", self, delay) - await asyncio.sleep(delay) + delay = self.RESTART_DELAY.total_seconds() + _logger.info("Actor %s: Waiting %s seconds...", self, delay) + await asyncio.sleep(delay) async def _run_loop(self) -> None: """Run the actor's task continuously, managing restarts, cancellation, and termination. @@ -84,22 +83,31 @@ async def _run_loop(self) -> None: BaseException: If the actor's `_run()` method raises any base exception. """ _logger.info("Actor %s: Started.", self) - n_restarts = 0 + n_errors = 0 + should_delay = False while True: try: - await self._delay_if_restart(n_restarts) + if should_delay: + await self._delay() + should_delay = False await self._run() _logger.info("Actor %s: _run() returned without error.", self) except asyncio.CancelledError: _logger.info("Actor %s: Cancelled.", self) raise + except RestartActorException: + _logger.info("Actor %s: Restarting.", self) + continue except Exception: # pylint: disable=broad-except + should_delay = True _logger.exception("Actor %s: Raised an unhandled exception.", self) limit_str = "∞" if self._restart_limit is None else self._restart_limit - limit_str = f"({n_restarts}/{limit_str})" - if self._restart_limit is None or n_restarts < self._restart_limit: - n_restarts += 1 - _logger.info("Actor %s: Restarting %s...", self, limit_str) + limit_str = f"({n_errors}/{limit_str})" + if self._restart_limit is None or n_errors < self._restart_limit: + n_errors += 1 + _logger.info( + "Actor %s: Restarting, error count %s...", self, limit_str + ) continue _logger.info( "Actor %s: Maximum restarts attempted %s, bailing out...", diff --git a/tests/actor/test_actor.py b/tests/actor/test_actor.py index 05a9f5f26..e61dc2b5c 100644 --- a/tests/actor/test_actor.py +++ b/tests/actor/test_actor.py @@ -9,7 +9,7 @@ import pytest from frequenz.channels import Broadcast, Receiver, Sender, select, selected_from -from frequenz.sdk.actor import Actor, run +from frequenz.sdk.actor import Actor, RestartActorException, run from ..conftest import actor_restart_limit @@ -78,6 +78,32 @@ async def _run(self) -> None: print(f"{self} done (should not happen)") +class RaiseRestartExceptionActor(BaseTestActor): + """Actor that raises an RestartActorException as soon as it receives a message.""" + + def __init__(self, recv: Receiver[int], raise_count: int) -> None: + """Create an instance. + + Args: + recv: A channel receiver for int data. + raise_count: How many time raise RestartActorException + """ + super().__init__(name="test") + self._recv = recv + self._raise_count = raise_count + + async def _run(self) -> None: + """Start the actor and crash upon receiving a message.""" + print(f"{self} started") + self.inc_restart_count() + async for _ in self._recv: + if self._raise_count <= 0: + break + self._raise_count -= 1 + raise RestartActorException("Actor should restarts") + print(f"{self} done") + + ACTOR_INFO = ("frequenz.sdk.actor._actor", 20) ACTOR_ERROR = ("frequenz.sdk.actor._actor", 40) RUN_INFO = ("frequenz.sdk.actor._run_utils", 20) @@ -220,7 +246,8 @@ async def test_restart_on_unhandled_exception( ), ( *ACTOR_INFO, - f"Actor RaiseExceptionActor[test]: Restarting ({i}/{restart_limit})...", + "Actor RaiseExceptionActor[test]: " + f"Restarting, error count ({i}/{restart_limit})...", ), ( *ACTOR_INFO, @@ -256,6 +283,82 @@ async def test_restart_on_unhandled_exception( assert filtered_logs == expected_log +@pytest.mark.parametrize("restart_num", [1]) +async def test_restart_on_restart_exception( + restart_num: int, caplog: pytest.LogCaptureFixture +) -> None: + """Create a faulty actor and expect it to restart because it raises an exception. + + Also test this works with different restart limits. + + Args: + restart_num: The restart limit to use. + caplog: The log capture fixture. + """ + relevant_loggers = {"frequenz.sdk.actor._actor", "frequenz.sdk.actor._run_utils"} + for logger in relevant_loggers: + caplog.set_level("DEBUG", logger=logger) + + channel: Broadcast[int] = Broadcast(name="channel") + + # We need some timeout, 1 second for each restart should be enough. + # There should be no restart delay. + expected_wait_time = timedelta(seconds=restart_num + 1.0) + async with asyncio.timeout(expected_wait_time.total_seconds()): + actor = RaiseRestartExceptionActor( + channel.new_receiver(), + raise_count=restart_num, + ) + for i in range(restart_num + 1): + await channel.new_sender().send(i) + + await run(actor) + await actor.wait() + + assert actor.is_running is False + assert BaseTestActor.restart_count == restart_num + expected_log = [ + (*RUN_INFO, "Starting 1 actor(s)..."), + (*RUN_INFO, "Actor RaiseRestartExceptionActor[test]: Starting..."), + (*ACTOR_INFO, "Actor RaiseRestartExceptionActor[test]: Started."), + ] + for i in range(restart_num): + expected_log.append( + ( + *ACTOR_INFO, + "Actor RaiseRestartExceptionActor[test]: Restarting.", + ), + ) + expected_log.extend( + [ + ( + *ACTOR_INFO, + "Actor RaiseRestartExceptionActor[test]: _run() returned without error.", + ), + ( + *ACTOR_INFO, + "Actor RaiseRestartExceptionActor[test]: Stopped.", + ), + ( + *RUN_INFO, + "Actor RaiseRestartExceptionActor[test]: Finished normally.", + ), + ( + *RUN_INFO, + "All 1 actor(s) finished.", + ), + ] + ) + print("caplog.record_tuples:", caplog.record_tuples) + # This is an ugly hack. There seem to be some issues with asyncio and caplog, maybe + # pytest-asyncio, that reports some pending tasks from an unrelated test when tested + # inside QEMU (suggesting also some timing issue when things run very slow). + filtered_logs = [r for r in caplog.record_tuples if r[0] in relevant_loggers] + print("filtered_logs:", filtered_logs) + print("expected_log:", expected_log) + assert filtered_logs == expected_log + + async def test_does_not_restart_on_normal_exit( actor_auto_restart_once: None, # pylint: disable=unused-argument caplog: pytest.LogCaptureFixture,