Skip to content

Commit 81de64a

Browse files
Add a restart delay to actors to avoid spam-restarting of buggy actors (#741)
Closes #732 I've decided to go for the hard coded 2.0 seconds delay for now. I very much like the idea of (instead of hard-coding an exponential backoff in), allow configuring the actor at construction time, and have it passed a retry strategy (as suggested by @shms), so the dev of that actor can decide what's best for this particular actor. This can then default to a fixed-delay retry strategy.
2 parents 3562004 + 24c7ede commit 81de64a

File tree

3 files changed

+35
-1
lines changed

3 files changed

+35
-1
lines changed

RELEASE_NOTES.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ This version ships an experimental version of the **Power Manager**, adds prelim
2525

2626
- Move `microgrid.ComponentGraph` class to `microgrid.component_graph.ComponentGraph`, exposing only the high level interface functions through the `microgrid` package.
2727

28+
- An actor that is crashing will no longer instantly restart but induce an artificial delay to avoid potential spam-restarting.
29+
2830
## New Features
2931

3032
- New and improved documentation.

src/frequenz/sdk/actor/_actor.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import abc
77
import asyncio
88
import logging
9+
from datetime import timedelta
910

1011
from ._background_service import BackgroundService
1112

@@ -27,6 +28,9 @@ class Actor(BackgroundService, abc.ABC):
2728
comprehensive guide on how to use and implement actors properly.
2829
"""
2930

31+
RESTART_DELAY: timedelta = timedelta(seconds=2)
32+
"""The delay to wait between restarts of this actor."""
33+
3034
_restart_limit: int | None = None
3135
"""The number of times actors can be restarted when they are stopped by unhandled exceptions.
3236
@@ -54,6 +58,20 @@ def start(self) -> None:
5458
async def _run(self) -> None:
5559
"""Run this actor's logic."""
5660

61+
async def _delay_if_restart(self, iteration: int) -> None:
62+
"""Delay the restart of this actor's n'th iteration.
63+
64+
Args:
65+
iteration: The current iteration of the restart.
66+
"""
67+
# NB: I think it makes sense (in the future) to think about deminishing returns
68+
# the longer the actor has been running.
69+
# Not just for the restart-delay but actually for the n_restarts counter as well.
70+
if iteration > 0:
71+
delay = self.RESTART_DELAY.total_seconds()
72+
_logger.info("Actor %s: Waiting %s seconds...", self, delay)
73+
await asyncio.sleep(delay)
74+
5775
async def _run_loop(self) -> None:
5876
"""Run this actor's task in a loop until `_restart_limit` is reached.
5977
@@ -67,6 +85,7 @@ async def _run_loop(self) -> None:
6785
n_restarts = 0
6886
while True:
6987
try:
88+
await self._delay_if_restart(n_restarts)
7089
await self._run()
7190
_logger.info("Actor %s: _run() returned without error.", self)
7291
except asyncio.CancelledError:

tests/actor/test_actor.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
"""Simple test for the BaseActor."""
55

66
import asyncio
7+
from datetime import timedelta
78

89
import pytest
910
from frequenz.channels import Broadcast, Receiver, Sender
@@ -211,7 +212,12 @@ async def test_restart_on_unhandled_exception(
211212

212213
channel: Broadcast[int] = Broadcast("channel")
213214

214-
async with asyncio.timeout(2.0):
215+
# NB: We're adding 1.0s to the timeout to account for the time it takes to
216+
# run, crash, and restart the actor.
217+
expected_wait_time = timedelta(
218+
seconds=restart_limit * RaiseExceptionActor.RESTART_DELAY.total_seconds() + 1.0
219+
)
220+
async with asyncio.timeout(expected_wait_time.total_seconds()):
215221
with actor_restart_limit(restart_limit):
216222
actor = RaiseExceptionActor(
217223
channel.new_receiver(),
@@ -229,6 +235,7 @@ async def test_restart_on_unhandled_exception(
229235
(*RUN_INFO, "Actor RaiseExceptionActor[test]: Starting..."),
230236
(*ACTOR_INFO, "Actor RaiseExceptionActor[test]: Started."),
231237
]
238+
restart_delay = Actor.RESTART_DELAY.total_seconds()
232239
for i in range(restart_limit):
233240
expected_log.extend(
234241
[
@@ -240,6 +247,10 @@ async def test_restart_on_unhandled_exception(
240247
*ACTOR_INFO,
241248
f"Actor test: Restarting ({i}/{restart_limit})...",
242249
),
250+
(
251+
*ACTOR_INFO,
252+
f"Actor RaiseExceptionActor[test]: Waiting {restart_delay} seconds...",
253+
),
243254
]
244255
)
245256
expected_log.extend(
@@ -260,6 +271,8 @@ async def test_restart_on_unhandled_exception(
260271
(*RUN_INFO, "All 1 actor(s) finished."),
261272
]
262273
)
274+
print("expected_log:", expected_log)
275+
print("caplog.record_tuples:", caplog.record_tuples)
263276
assert caplog.record_tuples == expected_log
264277

265278

0 commit comments

Comments
 (0)