Skip to content

Commit 304035c

Browse files
Add RestartActorException
Actor should be able to restart `_run` method for example to update formula and start listening on new receiver. Restarting actor should not print exception. Signed-off-by: Elzbieta Kotulska <[email protected]>
1 parent bcfbd5b commit 304035c

File tree

4 files changed

+124
-13
lines changed

4 files changed

+124
-13
lines changed

RELEASE_NOTES.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414

1515
- Stop fallback formulas when primary formula starts working again.
1616

17+
- Add `RestartActorException` to `frequnez.sdk.actor`. Actor can use it to restart `run` method if necessary.
18+
1719
## Bug Fixes
1820

1921
- Fixed bug with formulas raising exception when stopped.

src/frequenz/sdk/actor/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -598,12 +598,13 @@ async def main() -> None: # (6)!
598598
"""
599599

600600
from ..timeseries._resampling import ResamplerConfig
601-
from ._actor import Actor
601+
from ._actor import Actor, RestartActorException
602602
from ._background_service import BackgroundService
603603
from ._run_utils import run
604604

605605
__all__ = [
606606
"Actor",
607+
"RestartActorException",
607608
"BackgroundService",
608609
"ResamplerConfig",
609610
"run",

src/frequenz/sdk/actor/_actor.py

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@
1313
_logger = logging.getLogger(__name__)
1414

1515

16+
class RestartActorException(Exception):
17+
"""Exception raised when an actor should be restarted."""
18+
19+
1620
class Actor(BackgroundService, abc.ABC):
1721
"""A primitive unit of computation that runs autonomously.
1822
@@ -58,19 +62,14 @@ def start(self) -> None:
5862
async def _run(self) -> None:
5963
"""Run this actor's logic."""
6064

61-
async def _delay_if_restart(self, iteration: int) -> None:
62-
"""Delay the restart of this actor's n'th iteration.
63-
64-
Args:
65-
iteration: The current iteration of the restart.
66-
"""
65+
async def _delay(self) -> None:
66+
"""Delay the restart of this actor."""
6767
# NB: I think it makes sense (in the future) to think about deminishing returns
6868
# the longer the actor has been running.
6969
# Not just for the restart-delay but actually for the n_restarts counter as well.
70-
if iteration > 0:
71-
delay = self.RESTART_DELAY.total_seconds()
72-
_logger.info("Actor %s: Waiting %s seconds...", self, delay)
73-
await asyncio.sleep(delay)
70+
delay = self.RESTART_DELAY.total_seconds()
71+
_logger.info("Actor %s: Waiting %s seconds...", self, delay)
72+
await asyncio.sleep(delay)
7473

7574
async def _run_loop(self) -> None:
7675
"""Run the actor's task continuously, managing restarts, cancellation, and termination.
@@ -85,15 +84,22 @@ async def _run_loop(self) -> None:
8584
"""
8685
_logger.info("Actor %s: Started.", self)
8786
n_restarts = 0
87+
should_delay = False
8888
while True:
8989
try:
90-
await self._delay_if_restart(n_restarts)
90+
if should_delay:
91+
await self._delay()
92+
should_delay = False
9193
await self._run()
9294
_logger.info("Actor %s: _run() returned without error.", self)
9395
except asyncio.CancelledError:
9496
_logger.info("Actor %s: Cancelled.", self)
9597
raise
98+
except RestartActorException:
99+
_logger.info("Actor %s: Restarting.", self)
100+
continue
96101
except Exception: # pylint: disable=broad-except
102+
should_delay = True
97103
_logger.exception("Actor %s: Raised an unhandled exception.", self)
98104
limit_str = "∞" if self._restart_limit is None else self._restart_limit
99105
limit_str = f"({n_restarts}/{limit_str})"

tests/actor/test_actor.py

Lines changed: 103 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
import pytest
1010
from frequenz.channels import Broadcast, Receiver, Sender, select, selected_from
1111

12-
from frequenz.sdk.actor import Actor, run
12+
from frequenz.sdk.actor import Actor, RestartActorException, run
1313

1414
from ..conftest import actor_restart_limit
1515

@@ -78,6 +78,32 @@ async def _run(self) -> None:
7878
print(f"{self} done (should not happen)")
7979

8080

81+
class RaiseRestartExceptionActor(BaseTestActor):
82+
"""Actor that raises an RestartActorException as soon as it receives a message."""
83+
84+
def __init__(self, recv: Receiver[int], raise_count: int) -> None:
85+
"""Create an instance.
86+
87+
Args:
88+
recv: A channel receiver for int data.
89+
raise_count: How many time raise RestartActorException
90+
"""
91+
super().__init__(name="test")
92+
self._recv = recv
93+
self._raise_count = raise_count
94+
95+
async def _run(self) -> None:
96+
"""Start the actor and crash upon receiving a message."""
97+
print(f"{self} started")
98+
self.inc_restart_count()
99+
async for _ in self._recv:
100+
if self._raise_count <= 0:
101+
break
102+
self._raise_count -= 1
103+
raise RestartActorException("Actor should restarts")
104+
print(f"{self} done")
105+
106+
81107
ACTOR_INFO = ("frequenz.sdk.actor._actor", 20)
82108
ACTOR_ERROR = ("frequenz.sdk.actor._actor", 40)
83109
RUN_INFO = ("frequenz.sdk.actor._run_utils", 20)
@@ -256,6 +282,82 @@ async def test_restart_on_unhandled_exception(
256282
assert filtered_logs == expected_log
257283

258284

285+
@pytest.mark.parametrize("restart_num", [1])
286+
async def test_restart_on_restart_exception(
287+
restart_num: int, caplog: pytest.LogCaptureFixture
288+
) -> None:
289+
"""Create a faulty actor and expect it to restart because it raises an exception.
290+
291+
Also test this works with different restart limits.
292+
293+
Args:
294+
restart_num: The restart limit to use.
295+
caplog: The log capture fixture.
296+
"""
297+
relevant_loggers = {"frequenz.sdk.actor._actor", "frequenz.sdk.actor._run_utils"}
298+
for logger in relevant_loggers:
299+
caplog.set_level("DEBUG", logger=logger)
300+
301+
channel: Broadcast[int] = Broadcast(name="channel")
302+
303+
# We need some timeout, 1 second for each restart should be enough.
304+
# There should be no restart delay.
305+
expected_wait_time = timedelta(seconds=restart_num + 1.0)
306+
async with asyncio.timeout(expected_wait_time.total_seconds()):
307+
actor = RaiseRestartExceptionActor(
308+
channel.new_receiver(),
309+
raise_count=restart_num,
310+
)
311+
for i in range(restart_num + 1):
312+
await channel.new_sender().send(i)
313+
314+
await run(actor)
315+
await actor.wait()
316+
317+
assert actor.is_running is False
318+
assert BaseTestActor.restart_count == restart_num
319+
expected_log = [
320+
(*RUN_INFO, "Starting 1 actor(s)..."),
321+
(*RUN_INFO, "Actor RaiseRestartExceptionActor[test]: Starting..."),
322+
(*ACTOR_INFO, "Actor RaiseRestartExceptionActor[test]: Started."),
323+
]
324+
for i in range(restart_num):
325+
expected_log.append(
326+
(
327+
*ACTOR_INFO,
328+
"Actor RaiseRestartExceptionActor[test]: Restarting.",
329+
),
330+
)
331+
expected_log.extend(
332+
[
333+
(
334+
*ACTOR_INFO,
335+
"Actor RaiseRestartExceptionActor[test]: _run() returned without error.",
336+
),
337+
(
338+
*ACTOR_INFO,
339+
"Actor RaiseRestartExceptionActor[test]: Stopped.",
340+
),
341+
(
342+
*RUN_INFO,
343+
"Actor RaiseRestartExceptionActor[test]: Finished normally.",
344+
),
345+
(
346+
*RUN_INFO,
347+
"All 1 actor(s) finished.",
348+
),
349+
]
350+
)
351+
print("caplog.record_tuples:", caplog.record_tuples)
352+
# This is an ugly hack. There seem to be some issues with asyncio and caplog, maybe
353+
# pytest-asyncio, that reports some pending tasks from an unrelated test when tested
354+
# inside QEMU (suggesting also some timing issue when things run very slow).
355+
filtered_logs = [r for r in caplog.record_tuples if r[0] in relevant_loggers]
356+
print("filtered_logs:", filtered_logs)
357+
print("expected_log:", expected_log)
358+
assert filtered_logs == expected_log
359+
360+
259361
async def test_does_not_restart_on_normal_exit(
260362
actor_auto_restart_once: None, # pylint: disable=unused-argument
261363
caplog: pytest.LogCaptureFixture,

0 commit comments

Comments
 (0)