|
15 | 15 | class Actor(BackgroundService, abc.ABC): |
16 | 16 | """A primitive unit of computation that runs autonomously. |
17 | 17 |
|
18 | | - From [Wikipedia](https://en.wikipedia.org/wiki/Actor_model), an actor is: |
19 | | -
|
20 | | - > [...] the basic building block of concurrent computation. In response to |
21 | | - > a message it receives, an actor can: make local decisions, create more actors, |
22 | | - > send more messages, and determine how to respond to the next message received. |
23 | | - > Actors may modify their own private state, but can only affect each other |
24 | | - > indirectly through messaging (removing the need for lock-based synchronization). |
25 | | -
|
26 | | - [Channels](https://github.com/frequenz-floss/frequenz-channels-python/) can be used |
27 | | - to implement communication between actors, as shown in the examples below. |
28 | | -
|
29 | 18 | To implement an actor, subclasses must implement the `_run()` method, which should |
30 | 19 | run the actor's logic. The `_run()` method is called by the base class when the |
31 | 20 | actor is started, and is expected to run until the actor is stopped. |
32 | 21 |
|
33 | | - If an unhandled exception is raised in the `_run()` method, the actor will be |
34 | | - restarted automatically. Unhandled [`BaseException`][]s will cause the actor to stop |
35 | | - immediately and will be re-raised. |
36 | | -
|
37 | | - !!! warning |
38 | | -
|
39 | | - As actors manage [`asyncio.Task`][] objects, a reference to them must be held |
40 | | - for as long as the actor is expected to be running, otherwise its tasks will be |
41 | | - cancelled and the actor will stop. For more information, please refer to the |
42 | | - [Python `asyncio` |
43 | | - documentation](https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task). |
44 | | -
|
45 | | - Example: Example of an actor receiving from two receivers |
46 | | -
|
47 | | - ```python |
48 | | - from frequenz.channels import Broadcast, Receiver, Sender |
49 | | - from frequenz.channels.util import select, selected_from |
50 | | -
|
51 | | - class EchoActor(Actor): |
52 | | - def __init__( |
53 | | - self, |
54 | | - recv1: Receiver[bool], |
55 | | - recv2: Receiver[bool], |
56 | | - output: Sender[bool], |
57 | | - ) -> None: |
58 | | - super().__init__() |
59 | | - self._recv1 = recv1 |
60 | | - self._recv2 = recv2 |
61 | | - self._output = output |
62 | | -
|
63 | | - async def _run(self) -> None: |
64 | | - async for selected in select(self._recv1, self._recv2): |
65 | | - if selected_from(selected, self._recv1): |
66 | | - await self._output.send(selected.value) |
67 | | - elif selected_from(selected, self._recv1): |
68 | | - await self._output.send(selected.value) |
69 | | - else: |
70 | | - assert False, "Unknown selected channel" |
71 | | -
|
72 | | -
|
73 | | - input_channel_1 = Broadcast[bool]("input_channel_1") |
74 | | - input_channel_2 = Broadcast[bool]("input_channel_2") |
75 | | - input_channel_2_sender = input_channel_2.new_sender() |
76 | | -
|
77 | | - echo_channel = Broadcast[bool]("EchoChannel") |
78 | | - echo_receiver = echo_channel.new_receiver() |
79 | | -
|
80 | | - async with EchoActor( |
81 | | - input_channel_1.new_receiver(), |
82 | | - input_channel_2.new_receiver(), |
83 | | - echo_channel.new_sender(), |
84 | | - ): |
85 | | - await input_channel_2_sender.send(True) |
86 | | - print(await echo_receiver.receive()) |
87 | | - ``` |
88 | | -
|
89 | | - Example: Example of composing two actors |
90 | | -
|
91 | | - ```python |
92 | | - from frequenz.channels import Broadcast, Receiver, Sender |
93 | | -
|
94 | | - class Actor1(Actor): |
95 | | - def __init__( |
96 | | - self, |
97 | | - recv: Receiver[bool], |
98 | | - output: Sender[bool], |
99 | | - ) -> None: |
100 | | - super().__init__() |
101 | | - self._recv = recv |
102 | | - self._output = output |
103 | | -
|
104 | | - async def _run(self) -> None: |
105 | | - async for msg in self._recv: |
106 | | - await self._output.send(msg) |
107 | | -
|
108 | | -
|
109 | | - class Actor2(Actor): |
110 | | - def __init__( |
111 | | - self, |
112 | | - recv: Receiver[bool], |
113 | | - output: Sender[bool], |
114 | | - ) -> None: |
115 | | - super().__init__() |
116 | | - self._recv = recv |
117 | | - self._output = output |
118 | | -
|
119 | | - async def _run(self) -> None: |
120 | | - async for msg in self._recv: |
121 | | - await self._output.send(msg) |
122 | | -
|
123 | | - input_channel: Broadcast[bool] = Broadcast("Input to Actor1") |
124 | | - middle_channel: Broadcast[bool] = Broadcast("Actor1 -> Actor2 stream") |
125 | | - output_channel: Broadcast[bool] = Broadcast("Actor2 output") |
126 | | -
|
127 | | - input_sender = input_channel.new_sender() |
128 | | - output_receiver = output_channel.new_receiver() |
129 | | -
|
130 | | - async with ( |
131 | | - Actor1(input_channel.new_receiver(), middle_channel.new_sender()), |
132 | | - Actor2(middle_channel.new_receiver(), output_channel.new_sender()), |
133 | | - ): |
134 | | - await input_sender.send(True) |
135 | | - print(await output_receiver.receive()) |
136 | | - ``` |
137 | 22 | """ |
138 | 23 |
|
139 | 24 | _restart_limit: int | None = None |
|
0 commit comments