Skip to content

Commit 5009232

Browse files
Add RestartActorException
Actor should be able to restart `_run` method for example to update formula and start listening on new receiver. Restarting actor should not print exception.
1 parent bcfbd5b commit 5009232

File tree

4 files changed

+126
-13
lines changed

4 files changed

+126
-13
lines changed

RELEASE_NOTES.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414

1515
- Stop fallback formulas when primary formula starts working again.
1616

17+
- Add `RestartActorException` to `frequnez.sdk.actor`. Actor can use it to restart `run` method if necessary.
18+
1719
## Bug Fixes
1820

1921
- Fixed bug with formulas raising exception when stopped.

src/frequenz/sdk/actor/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -598,12 +598,13 @@ async def main() -> None: # (6)!
598598
"""
599599

600600
from ..timeseries._resampling import ResamplerConfig
601-
from ._actor import Actor
601+
from ._actor import Actor, RestartActorException
602602
from ._background_service import BackgroundService
603603
from ._run_utils import run
604604

605605
__all__ = [
606606
"Actor",
607+
"RestartActorException",
607608
"BackgroundService",
608609
"ResamplerConfig",
609610
"run",

src/frequenz/sdk/actor/_actor.py

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,12 @@
1313
_logger = logging.getLogger(__name__)
1414

1515

16+
class RestartActorException(Exception):
17+
"""Exception raised when an actor should be restarted."""
18+
19+
pass
20+
21+
1622
class Actor(BackgroundService, abc.ABC):
1723
"""A primitive unit of computation that runs autonomously.
1824
@@ -58,19 +64,14 @@ def start(self) -> None:
5864
async def _run(self) -> None:
5965
"""Run this actor's logic."""
6066

61-
async def _delay_if_restart(self, iteration: int) -> None:
62-
"""Delay the restart of this actor's n'th iteration.
63-
64-
Args:
65-
iteration: The current iteration of the restart.
66-
"""
67+
async def _delay(self) -> None:
68+
"""Delay the restart of this actor."""
6769
# NB: I think it makes sense (in the future) to think about deminishing returns
6870
# the longer the actor has been running.
6971
# Not just for the restart-delay but actually for the n_restarts counter as well.
70-
if iteration > 0:
71-
delay = self.RESTART_DELAY.total_seconds()
72-
_logger.info("Actor %s: Waiting %s seconds...", self, delay)
73-
await asyncio.sleep(delay)
72+
delay = self.RESTART_DELAY.total_seconds()
73+
_logger.info("Actor %s: Waiting %s seconds...", self, delay)
74+
await asyncio.sleep(delay)
7475

7576
async def _run_loop(self) -> None:
7677
"""Run the actor's task continuously, managing restarts, cancellation, and termination.
@@ -85,15 +86,22 @@ async def _run_loop(self) -> None:
8586
"""
8687
_logger.info("Actor %s: Started.", self)
8788
n_restarts = 0
89+
should_delay = False
8890
while True:
8991
try:
90-
await self._delay_if_restart(n_restarts)
92+
if should_delay:
93+
await self._delay()
94+
should_delay = False
9195
await self._run()
9296
_logger.info("Actor %s: _run() returned without error.", self)
9397
except asyncio.CancelledError:
9498
_logger.info("Actor %s: Cancelled.", self)
9599
raise
100+
except RestartActorException:
101+
_logger.info("Actor %s: Restarting.", self)
102+
continue
96103
except Exception: # pylint: disable=broad-except
104+
should_delay = True
97105
_logger.exception("Actor %s: Raised an unhandled exception.", self)
98106
limit_str = "∞" if self._restart_limit is None else self._restart_limit
99107
limit_str = f"({n_restarts}/{limit_str})"

tests/actor/test_actor.py

Lines changed: 103 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
import pytest
1010
from frequenz.channels import Broadcast, Receiver, Sender, select, selected_from
1111

12-
from frequenz.sdk.actor import Actor, run
12+
from frequenz.sdk.actor import Actor, run, RestartActorException
1313

1414
from ..conftest import actor_restart_limit
1515

@@ -78,6 +78,31 @@ async def _run(self) -> None:
7878
print(f"{self} done (should not happen)")
7979

8080

81+
class RaiseRestartExceptionActor(BaseTestActor):
82+
"""A faulty actor that raises an Exception as soon as it receives a message."""
83+
84+
def __init__(self, recv: Receiver[int], raise_count: int) -> None:
85+
"""Create an instance.
86+
87+
Args:
88+
recv: A channel receiver for int data.
89+
"""
90+
super().__init__(name="test")
91+
self._recv = recv
92+
self._raise_count = raise_count
93+
94+
async def _run(self) -> None:
95+
"""Start the actor and crash upon receiving a message."""
96+
print(f"{self} started")
97+
self.inc_restart_count()
98+
async for _ in self._recv:
99+
if self._raise_count <= 0:
100+
break
101+
self._raise_count -= 1
102+
raise RestartActorException("Actor should restarts")
103+
print(f"{self} done")
104+
105+
81106
ACTOR_INFO = ("frequenz.sdk.actor._actor", 20)
82107
ACTOR_ERROR = ("frequenz.sdk.actor._actor", 40)
83108
RUN_INFO = ("frequenz.sdk.actor._run_utils", 20)
@@ -256,6 +281,83 @@ async def test_restart_on_unhandled_exception(
256281
assert filtered_logs == expected_log
257282

258283

284+
@pytest.mark.parametrize("restart_num", [0, 1, 2, 10])
285+
async def test_restart_on_restart_exception(
286+
restart_num: int, caplog: pytest.LogCaptureFixture
287+
) -> None:
288+
"""Create a faulty actor and expect it to restart because it raises an exception.
289+
290+
Also test this works with different restart limits.
291+
292+
Args:
293+
restart_num: The restart limit to use.
294+
caplog: The log capture fixture.
295+
"""
296+
relevant_loggers = {"frequenz.sdk.actor._actor", "frequenz.sdk.actor._run_utils"}
297+
for logger in relevant_loggers:
298+
caplog.set_level("DEBUG", logger=logger)
299+
300+
channel: Broadcast[int] = Broadcast(name="channel")
301+
302+
# We need some timeout, this sounds reasonable.
303+
expected_wait_time = timedelta(
304+
seconds=restart_num * RaiseExceptionActor.RESTART_DELAY.total_seconds()
305+
)
306+
async with asyncio.timeout(expected_wait_time.total_seconds()):
307+
actor = RaiseRestartExceptionActor(
308+
channel.new_receiver(),
309+
raise_count=restart_num,
310+
)
311+
for i in range(restart_num + 1):
312+
await channel.new_sender().send(i)
313+
314+
await run(actor)
315+
await actor.wait()
316+
317+
assert actor.is_running is False
318+
assert BaseTestActor.restart_count == restart_num
319+
expected_log = [
320+
(*RUN_INFO, "Starting 1 actor(s)..."),
321+
(*RUN_INFO, "Actor RaiseRestartExceptionActor[test]: Starting..."),
322+
(*ACTOR_INFO, "Actor RaiseRestartExceptionActor[test]: Started."),
323+
]
324+
for i in range(restart_num):
325+
expected_log.append(
326+
(
327+
*ACTOR_INFO,
328+
"Actor RaiseRestartExceptionActor[test]: Restarting.",
329+
),
330+
)
331+
expected_log.extend(
332+
[
333+
(
334+
*ACTOR_INFO,
335+
"Actor RaiseRestartExceptionActor[test]: _run() returned without error.",
336+
),
337+
(
338+
*ACTOR_INFO,
339+
"Actor RaiseRestartExceptionActor[test]: Stopped.",
340+
),
341+
(
342+
*RUN_INFO,
343+
"Actor RaiseRestartExceptionActor[test]: Finished normally.",
344+
),
345+
(
346+
*RUN_INFO,
347+
"All 1 actor(s) finished.",
348+
),
349+
]
350+
)
351+
print("caplog.record_tuples:", caplog.record_tuples)
352+
# This is an ugly hack. There seem to be some issues with asyncio and caplog, maybe
353+
# pytest-asyncio, that reports some pending tasks from an unrelated test when tested
354+
# inside QEMU (suggesting also some timing issue when things run very slow).
355+
filtered_logs = [r for r in caplog.record_tuples if r[0] in relevant_loggers]
356+
print("filtered_logs:", filtered_logs)
357+
print("expected_log:", expected_log)
358+
assert filtered_logs == expected_log
359+
360+
259361
async def test_does_not_restart_on_normal_exit(
260362
actor_auto_restart_once: None, # pylint: disable=unused-argument
261363
caplog: pytest.LogCaptureFixture,

0 commit comments

Comments
 (0)