Skip to content

Commit 573a71a

Browse files
committed
Add Actor abstract base class
This class will replace the `@actor` decorator and it inherits from `BackgroundService`, providing a consistent interface. The main differences with the `@actor` decorator are: * It doesn't start automatically, `start()` needs to be called to start an actor. * The method to implement the main logic was renamed from `run()` to `_run()`, as it is not intended to be run externally. * Actors can have an optional `name` (useful for debugging/logging purposes). * The actor will only be restarted if an unhandled `Exception` is raised by `_run()`. It will not be restarted if the `_run()` method finishes normally. If an unhandled `BaseException` is raised instead, it will be re-raised. For normal cancellation the `_run()` method should handle `asyncio.CancelledError` if the cancellation shouldn't be propagated (this is the same as with the decorator). * The `_stop()` method is public (`stop()`) and will `cancel()` and `await` for the task to finish, catching the `asyncio.CancelledError`. * The `join()` is removed an the actor can be awaited directly (like `await actor`. * For deterministic cleanup, actors can now be used as async context managers. Signed-off-by: Leandro Lucarella <[email protected]>
1 parent 2cd0b79 commit 573a71a

File tree

2 files changed

+205
-0
lines changed

2 files changed

+205
-0
lines changed

src/frequenz/sdk/actor/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
"""A base class for creating simple composable actors."""
55

66
from ..timeseries._resampling import ResamplerConfig
7+
from ._actor import Actor
78
from ._background_service import BackgroundService
89
from ._channel_registry import ChannelRegistry
910
from ._config_managing import ConfigManagingActor
@@ -13,6 +14,7 @@
1314
from ._run_utils import run
1415

1516
__all__ = [
17+
"Actor",
1618
"BackgroundService",
1719
"ChannelRegistry",
1820
"ComponentMetricRequest",

src/frequenz/sdk/actor/_actor.py

Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
# License: MIT
2+
# Copyright © 2023 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Actor model implementation."""
5+
6+
import abc
7+
import asyncio
8+
import logging
9+
10+
from ._background_service import BackgroundService
11+
12+
_logger = logging.getLogger(__name__)
13+
14+
15+
class Actor(BackgroundService, abc.ABC):
16+
"""A primitive unit of computation that runs autonomously.
17+
18+
From [Wikipedia](https://en.wikipedia.org/wiki/Actor_model), an actor is:
19+
20+
> [...] the basic building block of concurrent computation. In response to
21+
> a message it receives, an actor can: make local decisions, create more actors,
22+
> send more messages, and determine how to respond to the next message received.
23+
> Actors may modify their own private state, but can only affect each other
24+
> indirectly through messaging (removing the need for lock-based synchronization).
25+
26+
[Channels](https://github.com/frequenz-floss/frequenz-channels-python/) can be used
27+
to implement communication between actors, as shown in the examples below.
28+
29+
To implement an actor, subclasses must implement the `_run()` method, which should
30+
run the actor's logic. The `_run()` method is called by the base class when the
31+
actor is started, and is expected to run until the actor is stopped.
32+
33+
If an unhandled exception is raised in the `_run()` method, the actor will be
34+
restarted automatically. Unhandled [`BaseException`][]s will cause the actor to stop
35+
immediately and will be re-raised.
36+
37+
!!! warning
38+
39+
As actors manage [`asyncio.Task`][] objects, a reference to them must be held
40+
for as long as the actor is expected to be running, otherwise its tasks will be
41+
cancelled and the actor will stop. For more information, please refer to the
42+
[Python `asyncio`
43+
documentation](https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task).
44+
45+
Example: Example of an actor receiving from two receivers
46+
47+
```python
48+
from frequenz.channels import Broadcast, Receiver, Sender
49+
from frequenz.channels.util import select, selected_from
50+
51+
class EchoActor(Actor):
52+
def __init__(
53+
self,
54+
recv1: Receiver[bool],
55+
recv2: Receiver[bool],
56+
output: Sender[bool],
57+
) -> None:
58+
super().__init__()
59+
self._recv1 = recv1
60+
self._recv2 = recv2
61+
self._output = output
62+
63+
async def _run(self) -> None:
64+
async for selected in select(self._recv1, self._recv2):
65+
if selected_from(selected, self._recv1):
66+
await self._output.send(selected.value)
67+
elif selected_from(selected, self._recv1):
68+
await self._output.send(selected.value)
69+
else:
70+
assert False, "Unknown selected channel"
71+
72+
73+
input_channel_1 = Broadcast[bool]("input_channel_1")
74+
input_channel_2 = Broadcast[bool]("input_channel_2")
75+
input_channel_2_sender = input_channel_2.new_sender()
76+
77+
echo_channel = Broadcast[bool]("EchoChannel")
78+
echo_receiver = echo_channel.new_receiver()
79+
80+
async with EchoActor(
81+
input_channel_1.new_receiver(),
82+
input_channel_2.new_receiver(),
83+
echo_channel.new_sender(),
84+
):
85+
await input_channel_2_sender.send(True)
86+
print(await echo_receiver.receive())
87+
```
88+
89+
Example: Example of composing two actors
90+
91+
```python
92+
from frequenz.channels import Broadcast, Receiver, Sender
93+
94+
class Actor1(Actor):
95+
def __init__(
96+
self,
97+
recv: Receiver[bool],
98+
output: Sender[bool],
99+
) -> None:
100+
super().__init__()
101+
self._recv = recv
102+
self._output = output
103+
104+
async def _run(self) -> None:
105+
async for msg in self._recv:
106+
await self._output.send(msg)
107+
108+
109+
class Actor2(Actor):
110+
def __init__(
111+
self,
112+
recv: Receiver[bool],
113+
output: Sender[bool],
114+
) -> None:
115+
super().__init__()
116+
self._recv = recv
117+
self._output = output
118+
119+
async def _run(self) -> None:
120+
async for msg in self._recv:
121+
await self._output.send(msg)
122+
123+
input_channel: Broadcast[bool] = Broadcast("Input to Actor1")
124+
middle_channel: Broadcast[bool] = Broadcast("Actor1 -> Actor2 stream")
125+
output_channel: Broadcast[bool] = Broadcast("Actor2 output")
126+
127+
input_sender = input_channel.new_sender()
128+
output_receiver = output_channel.new_receiver()
129+
130+
async with (
131+
Actor1(input_channel.new_receiver(), middle_channel.new_sender()),
132+
Actor2(middle_channel.new_receiver(), output_channel.new_sender()),
133+
):
134+
await input_sender.send(True)
135+
print(await output_receiver.receive())
136+
```
137+
"""
138+
139+
_restart_limit: int | None = None
140+
"""The number of times actors can be restarted when they are stopped by unhandled exceptions.
141+
142+
If this is bigger than 0 or `None`, the actor will be restarted when there is an
143+
unhanded exception in the `_run()` method.
144+
145+
If `None`, the actor will be restarted an unlimited number of times.
146+
147+
!!! note
148+
149+
This is mostly used for testing purposes and shouldn't be set in production.
150+
"""
151+
152+
async def start(self) -> None:
153+
"""Start this actor.
154+
155+
If this actor is already running, this method does nothing.
156+
"""
157+
if self.is_running:
158+
return
159+
self._tasks.clear()
160+
self._tasks.add(asyncio.create_task(self._run_loop()))
161+
162+
@abc.abstractmethod
163+
async def _run(self) -> None:
164+
"""Run this actor's logic."""
165+
166+
async def _run_loop(self) -> None:
167+
"""Run this actor's task in a loop until `_restart_limit` is reached.
168+
169+
Raises:
170+
asyncio.CancelledError: If this actor's `_run()` gets cancelled.
171+
Exception: If this actor's `_run()` raises any other `Exception` and reached
172+
the maximum number of restarts.
173+
BaseException: If this actor's `_run()` raises any other `BaseException`.
174+
"""
175+
_logger.info("Actor %s: Starting...", self)
176+
n_restarts = 0
177+
while True:
178+
try:
179+
await self._run()
180+
_logger.info("Actor %s: _run() returned without error.", self)
181+
except asyncio.CancelledError:
182+
_logger.info("Actor %s: Cancelled.", self)
183+
raise
184+
except Exception: # pylint: disable=broad-except
185+
_logger.exception("Actor %s: Raised an unhandled exception.", self)
186+
limit_str = "∞" if self._restart_limit is None else self._restart_limit
187+
limit_str = f"({n_restarts}/{limit_str})"
188+
if self._restart_limit is None or n_restarts < self._restart_limit:
189+
n_restarts += 1
190+
_logger.info("Actor %s: Restarting %s...", self._name, limit_str)
191+
continue
192+
_logger.info(
193+
"Actor %s: Maximum restarts attempted %s, bailing out...",
194+
self,
195+
limit_str,
196+
)
197+
raise
198+
except BaseException: # pylint: disable=broad-except
199+
_logger.exception("Actor %s: Raised a BaseException.", self)
200+
raise
201+
break
202+
203+
_logger.info("Actor %s: Stopped.", self)

0 commit comments

Comments
 (0)