|
2 | 2 | # Copyright © 2022 Frequenz Energy-as-a-Service GmbH |
3 | 3 |
|
4 | 4 | """Simple test for the BaseActor.""" |
| 5 | + |
| 6 | +import asyncio |
| 7 | + |
| 8 | +import pytest |
5 | 9 | from frequenz.channels import Broadcast, Receiver, Sender |
6 | 10 | from frequenz.channels.util import select, selected_from |
7 | 11 |
|
8 | 12 | from frequenz.sdk.actor import Actor, run |
9 | 13 |
|
| 14 | +from ..conftest import actor_restart_limit |
| 15 | + |
| 16 | + |
| 17 | +class MyBaseException(BaseException): |
| 18 | + """A base exception for testing purposes.""" |
| 19 | + |
| 20 | + |
| 21 | +class BaseTestActor(Actor): |
| 22 | + """A base actor for testing purposes.""" |
| 23 | + |
| 24 | + restart_count: int = -1 |
| 25 | + |
| 26 | + def inc_restart_count(self) -> None: |
| 27 | + """Increment the restart count.""" |
| 28 | + BaseTestActor.restart_count += 1 |
| 29 | + |
| 30 | + @classmethod |
| 31 | + def reset_restart_count(cls) -> None: |
| 32 | + """Reset the restart count.""" |
| 33 | + cls.restart_count = -1 |
| 34 | + |
| 35 | + |
| 36 | +@pytest.fixture(autouse=True) |
| 37 | +def reset_restart_count() -> None: |
| 38 | + """Reset the restart count before each test.""" |
| 39 | + BaseTestActor.reset_restart_count() |
| 40 | + |
| 41 | + |
| 42 | +class NopActor(BaseTestActor): |
| 43 | + """An actor that does nothing.""" |
| 44 | + |
| 45 | + def __init__(self) -> None: |
| 46 | + """Create an instance.""" |
| 47 | + super().__init__(name="test") |
| 48 | + |
| 49 | + async def _run(self) -> None: |
| 50 | + """Start the actor and crash upon receiving a message""" |
| 51 | + print(f"{self} started") |
| 52 | + self.inc_restart_count() |
| 53 | + print(f"{self} done") |
10 | 54 |
|
11 | | -class FaultyActor(Actor): |
12 | | - """A faulty actor that crashes as soon as it receives a message.""" |
| 55 | + |
| 56 | +class RaiseExceptionActor(BaseTestActor): |
| 57 | + """A faulty actor that raises an Exception as soon as it receives a message.""" |
13 | 58 |
|
14 | 59 | def __init__( |
15 | 60 | self, |
16 | | - name: str, |
17 | 61 | recv: Receiver[int], |
18 | 62 | ) -> None: |
19 | | - """Create an instance of `FaultyActor`. |
| 63 | + """Create an instance. |
20 | 64 |
|
21 | 65 | Args: |
22 | | - name: Name of the actor. |
23 | 66 | recv: A channel receiver for int data. |
24 | 67 | """ |
25 | | - super().__init__(name=name) |
| 68 | + super().__init__(name="test") |
26 | 69 | self._recv = recv |
27 | 70 |
|
28 | 71 | async def _run(self) -> None: |
29 | 72 | """Start the actor and crash upon receiving a message""" |
| 73 | + print(f"{self} started") |
| 74 | + self.inc_restart_count() |
30 | 75 | async for msg in self._recv: |
| 76 | + print(f"{self} is about to crash") |
31 | 77 | _ = msg / 0 |
| 78 | + print(f"{self} done (should not happen)") |
| 79 | + |
| 80 | + |
| 81 | +class RaiseBaseExceptionActor(BaseTestActor): |
| 82 | + """A faulty actor that raises a BaseException as soon as it receives a message.""" |
| 83 | + |
| 84 | + def __init__( |
| 85 | + self, |
| 86 | + recv: Receiver[int], |
| 87 | + ) -> None: |
| 88 | + """Create an instance. |
| 89 | +
|
| 90 | + Args: |
| 91 | + recv: A channel receiver for int data. |
| 92 | + """ |
| 93 | + super().__init__(name="test") |
| 94 | + self._recv = recv |
| 95 | + |
| 96 | + async def _run(self) -> None: |
| 97 | + """Start the actor and crash upon receiving a message""" |
| 98 | + print(f"{self} started") |
| 99 | + self.inc_restart_count() |
| 100 | + async for _ in self._recv: |
| 101 | + print(f"{self} is about to crash") |
| 102 | + raise MyBaseException("This is a test") |
| 103 | + print(f"{self} done (should not happen)") |
| 104 | + |
32 | 105 |
|
| 106 | +ACTOR_INFO = ("frequenz.sdk.actor._actor", 20) |
| 107 | +ACTOR_ERROR = ("frequenz.sdk.actor._actor", 40) |
| 108 | +RUN_INFO = ("frequenz.sdk.actor._run_utils", 20) |
| 109 | +RUN_ERROR = ("frequenz.sdk.actor._run_utils", 40) |
33 | 110 |
|
34 | | -class EchoActor(Actor): |
| 111 | + |
| 112 | +class EchoActor(BaseTestActor): |
35 | 113 | """An echo actor that whatever it receives into the output channel.""" |
36 | 114 |
|
37 | 115 | def __init__( |
@@ -59,53 +137,194 @@ async def _run(self) -> None: |
59 | 137 | Args: |
60 | 138 | output (Sender[OT]): A channel sender, to send actor's results to. |
61 | 139 | """ |
| 140 | + print(f"{self} started") |
| 141 | + self.inc_restart_count() |
62 | 142 |
|
63 | 143 | channel_1 = self._recv1 |
64 | 144 | channel_2 = self._recv2 |
65 | 145 |
|
66 | 146 | async for selected in select(channel_1, channel_2): |
| 147 | + print(f"{self} received message {selected.value!r}") |
67 | 148 | if selected_from(selected, channel_1): |
| 149 | + print(f"{self} sending message received from channel_1") |
68 | 150 | await self._output.send(selected.value) |
69 | 151 | elif selected_from(selected, channel_2): |
| 152 | + print(f"{self} sending message received from channel_2") |
70 | 153 | await self._output.send(selected.value) |
71 | 154 |
|
| 155 | + print(f"{self} done (should not happen)") |
| 156 | + |
72 | 157 |
|
73 | | -async def test_basic_actor() -> None: |
| 158 | +async def test_basic_actor(caplog: pytest.LogCaptureFixture) -> None: |
74 | 159 | """Initialize the TestActor send a message and wait for the response.""" |
| 160 | + caplog.set_level("DEBUG", logger="frequenz.sdk.actor._actor") |
75 | 161 |
|
76 | 162 | input_chan_1: Broadcast[bool] = Broadcast("TestChannel1") |
77 | 163 | input_chan_2: Broadcast[bool] = Broadcast("TestChannel2") |
78 | 164 |
|
79 | 165 | echo_chan: Broadcast[bool] = Broadcast("echo output") |
| 166 | + echo_rx = echo_chan.new_receiver() |
80 | 167 |
|
81 | 168 | async with EchoActor( |
82 | 169 | "EchoActor", |
83 | 170 | input_chan_1.new_receiver(), |
84 | 171 | input_chan_2.new_receiver(), |
85 | 172 | echo_chan.new_sender(), |
86 | | - ): |
87 | | - echo_rx = echo_chan.new_receiver() |
| 173 | + ) as actor: |
| 174 | + assert actor.is_running is True |
| 175 | + original_tasks = set(actor.tasks) |
88 | 176 |
|
89 | | - await input_chan_1.new_sender().send(True) |
| 177 | + # Start is a no-op if already started |
| 178 | + await actor.start() |
| 179 | + assert actor.is_running is True |
| 180 | + assert original_tasks == set(actor.tasks) |
90 | 181 |
|
| 182 | + await input_chan_1.new_sender().send(True) |
91 | 183 | msg = await echo_rx.receive() |
92 | 184 | assert msg is True |
93 | 185 |
|
94 | 186 | await input_chan_2.new_sender().send(False) |
95 | | - |
96 | 187 | msg = await echo_rx.receive() |
97 | 188 | assert msg is False |
98 | 189 |
|
| 190 | + assert actor.is_running is True |
| 191 | + |
| 192 | + assert actor.is_running is False |
| 193 | + assert BaseTestActor.restart_count == 0 |
| 194 | + assert caplog.record_tuples == [ |
| 195 | + (*ACTOR_INFO, "Actor EchoActor[EchoActor]: Starting..."), |
| 196 | + (*ACTOR_INFO, "Actor EchoActor[EchoActor]: Cancelled."), |
| 197 | + ] |
| 198 | + |
| 199 | + |
| 200 | +@pytest.mark.parametrize("restart_limit", [0, 1, 2, 10]) |
| 201 | +async def test_restart_on_unhandled_exception( |
| 202 | + restart_limit: int, caplog: pytest.LogCaptureFixture |
| 203 | +) -> None: |
| 204 | + """Create a faulty actor and expect it to restart because it raises an exception. |
| 205 | +
|
| 206 | + Also test this works with different restart limits. |
99 | 207 |
|
100 | | -async def test_actor_does_not_restart() -> None: |
101 | | - """Create a faulty actor and expect it to crash and stop running""" |
| 208 | + Args: |
| 209 | + restart_limit: The restart limit to use. |
| 210 | + """ |
| 211 | + caplog.set_level("DEBUG", logger="frequenz.sdk.actor._actor") |
| 212 | + caplog.set_level("DEBUG", logger="frequenz.sdk.actor._run_utils") |
102 | 213 |
|
103 | 214 | channel: Broadcast[int] = Broadcast("channel") |
104 | 215 |
|
105 | | - _faulty_actor = FaultyActor( |
106 | | - "FaultyActor", |
107 | | - channel.new_receiver(), |
| 216 | + async with asyncio.timeout(2.0): |
| 217 | + with actor_restart_limit(restart_limit): |
| 218 | + actor = RaiseExceptionActor( |
| 219 | + channel.new_receiver(), |
| 220 | + ) |
| 221 | + for i in range(restart_limit + 1): |
| 222 | + await channel.new_sender().send(i) |
| 223 | + |
| 224 | + await run(actor) |
| 225 | + |
| 226 | + assert actor.is_running is False |
| 227 | + assert BaseTestActor.restart_count == restart_limit |
| 228 | + expected_log = [ |
| 229 | + (*RUN_INFO, "Starting 1 actor(s)..."), |
| 230 | + (*ACTOR_INFO, "Actor RaiseExceptionActor[test]: Starting..."), |
| 231 | + ] |
| 232 | + for i in range(restart_limit): |
| 233 | + expected_log.extend( |
| 234 | + [ |
| 235 | + ( |
| 236 | + *ACTOR_ERROR, |
| 237 | + "Actor RaiseExceptionActor[test]: Raised an unhandled exception.", |
| 238 | + ), |
| 239 | + ( |
| 240 | + *ACTOR_INFO, |
| 241 | + f"Actor test: Restarting ({i}/{restart_limit})...", |
| 242 | + ), |
| 243 | + ] |
| 244 | + ) |
| 245 | + expected_log.extend( |
| 246 | + [ |
| 247 | + ( |
| 248 | + *ACTOR_ERROR, |
| 249 | + "Actor RaiseExceptionActor[test]: Raised an unhandled exception.", |
| 250 | + ), |
| 251 | + ( |
| 252 | + *ACTOR_INFO, |
| 253 | + "Actor RaiseExceptionActor[test]: Maximum restarts attempted " |
| 254 | + f"({restart_limit}/{restart_limit}), bailing out...", |
| 255 | + ), |
| 256 | + ( |
| 257 | + *RUN_INFO, |
| 258 | + "Actor RaiseExceptionActor[test]: Started normally.", |
| 259 | + ), |
| 260 | + ( |
| 261 | + *RUN_ERROR, |
| 262 | + "Actor RaiseExceptionActor[test]: Raised an exception while running.", |
| 263 | + ), |
| 264 | + (*RUN_INFO, "All 1 actor(s) finished."), |
| 265 | + ] |
108 | 266 | ) |
| 267 | + assert caplog.record_tuples == expected_log |
| 268 | + |
| 269 | + |
| 270 | +async def test_does_not_restart_on_normal_exit( |
| 271 | + actor_auto_restart_once: None, # pylint: disable=unused-argument |
| 272 | + caplog: pytest.LogCaptureFixture, |
| 273 | +) -> None: |
| 274 | + """Create an actor that exists normally and expect it to not be restarted.""" |
| 275 | + caplog.set_level("DEBUG", logger="frequenz.sdk.actor._actor") |
| 276 | + caplog.set_level("DEBUG", logger="frequenz.sdk.actor._run_utils") |
| 277 | + |
| 278 | + channel: Broadcast[int] = Broadcast("channel") |
| 279 | + |
| 280 | + actor = NopActor() |
| 281 | + |
| 282 | + async with asyncio.timeout(1.0): |
| 283 | + await channel.new_sender().send(1) |
| 284 | + await run(actor) |
| 285 | + |
| 286 | + assert BaseTestActor.restart_count == 0 |
| 287 | + assert caplog.record_tuples == [ |
| 288 | + (*RUN_INFO, "Starting 1 actor(s)..."), |
| 289 | + (*ACTOR_INFO, "Actor NopActor[test]: Starting..."), |
| 290 | + (*ACTOR_INFO, "Actor NopActor[test]: _run() returned without error."), |
| 291 | + (*ACTOR_INFO, "Actor NopActor[test]: Stopped."), |
| 292 | + (*RUN_INFO, "Actor NopActor[test]: Started normally."), |
| 293 | + (*RUN_INFO, "Actor NopActor[test]: Finished normally."), |
| 294 | + (*RUN_INFO, "All 1 actor(s) finished."), |
| 295 | + ] |
| 296 | + |
| 297 | + |
| 298 | +async def test_does_not_restart_on_base_exception( |
| 299 | + actor_auto_restart_once: None, # pylint: disable=unused-argument |
| 300 | + caplog: pytest.LogCaptureFixture, |
| 301 | +) -> None: |
| 302 | + """Create a faulty actor and expect it not to restart because it raises a base exception.""" |
| 303 | + caplog.set_level("DEBUG", logger="frequenz.sdk.actor._actor") |
| 304 | + caplog.set_level("DEBUG", logger="frequenz.sdk.actor._run_utils") |
| 305 | + |
| 306 | + channel: Broadcast[int] = Broadcast("channel") |
| 307 | + |
| 308 | + actor = RaiseBaseExceptionActor(channel.new_receiver()) |
| 309 | + |
| 310 | + async with asyncio.timeout(1.0): |
| 311 | + await channel.new_sender().send(1) |
| 312 | + # We can't use pytest.raises() here because known BaseExceptions are handled |
| 313 | + # specially by pytest. |
| 314 | + try: |
| 315 | + await run(actor) |
| 316 | + except MyBaseException as error: |
| 317 | + assert str(error) == "This is a test" |
109 | 318 |
|
110 | | - await channel.new_sender().send(1) |
111 | | - await run(_faulty_actor) |
| 319 | + assert BaseTestActor.restart_count == 0 |
| 320 | + assert caplog.record_tuples == [ |
| 321 | + (*RUN_INFO, "Starting 1 actor(s)..."), |
| 322 | + (*ACTOR_INFO, "Actor RaiseBaseExceptionActor[test]: Starting..."), |
| 323 | + (*ACTOR_ERROR, "Actor RaiseBaseExceptionActor[test]: Raised a BaseException."), |
| 324 | + (*RUN_INFO, "Actor RaiseBaseExceptionActor[test]: Started normally."), |
| 325 | + ( |
| 326 | + *RUN_ERROR, |
| 327 | + "Actor RaiseBaseExceptionActor[test]: Raised an exception while running.", |
| 328 | + ), |
| 329 | + (*RUN_INFO, "All 1 actor(s) finished."), |
| 330 | + ] |
0 commit comments