Skip to content

Commit 4be4a3b

Browse files
Add a run() method to run/wait for actors completion (#200)
2 parents 9ae09c7 + dbea183 commit 4be4a3b

File tree

9 files changed

+207
-36
lines changed

9 files changed

+207
-36
lines changed

RELEASE_NOTES.md

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,17 @@
22

33
## Summary
44

5+
<!-- Here goes a general summary of what this release is about -->
6+
57
## Upgrading
68

9+
<!-- Here goes notes on how to upgrade from previous versions, including deprecations and what they should be replaced with -->
10+
711
## New Features
812

9-
* A new class `OrderedRingBuffer` is now available, providing a sorted ring buffer of datetime-value pairs with tracking of any values that have not yet been written.
10-
* Add logical meter formula for EV power.
11-
* A `MovingWindow` class has been added that consumes a data stream from a logical meter and updates an `OrderedRingBuffer`.
12-
* Add EVChargerPool implementation. It has only streaming state changes for ev chargers, now.
13-
* Add 3-phase current formulas: `3-phase grid_current` and `3-phase ev_charger_current` to the LogicalMeter.
1413
* A new class `SerializableRingbuffer` is now available, extending the `OrderedRingBuffer` class with the ability to load & dump the data to disk.
14+
* Add the `run(*actors)` function for running and synchronizing the execution of actors. This new function simplifies the way actors are managed on the client side, allowing for a cleaner and more streamlined approach. Users/apps can now run actors simply by calling run(actor1, actor2, actor3...) without the need to manually call join() and deal with linting errors.
1515

1616
## Bug Fixes
1717

18-
* Add COMPONENT_STATE_DISCHARGING as valid state for the inverter. DISCHARGING state was missing by mistake and this caused the power distributor to error out if the inverter is already discharging.
18+
<!-- Here goes notable bug fixes that are worth a special mention or explanation -->

examples/power_distribution.py

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,13 @@
1818

1919
from frequenz.channels import Bidirectional, Broadcast, Receiver, Sender
2020

21-
from frequenz.sdk import microgrid
21+
from frequenz.sdk import actor, microgrid
2222
from frequenz.sdk.actor import (
2323
ChannelRegistry,
2424
ComponentMetricRequest,
2525
ComponentMetricsResamplingActor,
2626
DataSourcingActor,
2727
ResamplerConfig,
28-
actor,
2928
)
3029
from frequenz.sdk.actor.power_distributing import (
3130
PowerDistributingActor,
@@ -42,7 +41,7 @@
4241
PORT = 61060
4342

4443

45-
@actor
44+
@actor.actor
4645
class DecisionMakingActor:
4746
"""Actor that receives set receives power for given batteries."""
4847

@@ -112,7 +111,7 @@ async def run(self) -> None:
112111
_logger.info("Set power with %d succeed.", power_to_set)
113112

114113

115-
@actor
114+
@actor.actor
116115
class DataCollectingActor:
117116
"""Actor that makes decisions about how much to charge/discharge batteries."""
118117

@@ -227,10 +226,7 @@ async def run() -> None:
227226
active_power_data=await logical_meter.grid_power(),
228227
)
229228

230-
# pylint: disable=no-member
231-
await service_actor.join() # type: ignore[attr-defined]
232-
await client_actor.join() # type: ignore[attr-defined]
233-
await power_distributor.join() # type: ignore[attr-defined]
229+
await actor.run(service_actor, client_actor, power_distributor)
234230

235231

236232
asyncio.run(run())

src/frequenz/sdk/actor/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from ._data_sourcing import ComponentMetricRequest, DataSourcingActor
1010
from ._decorator import actor
1111
from ._resampling import ComponentMetricsResamplingActor
12+
from ._run_utils import run
1213

1314
__all__ = [
1415
"ChannelRegistry",
@@ -18,4 +19,5 @@
1819
"DataSourcingActor",
1920
"ResamplerConfig",
2021
"actor",
22+
"run",
2123
]

src/frequenz/sdk/actor/_decorator.py

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -201,25 +201,23 @@ async def _start_actor(self) -> None:
201201
"""
202202
logger.debug("Starting actor: %s", cls.__name__)
203203
number_of_restarts = 0
204-
while True:
204+
while (
205+
self.restart_limit is None or number_of_restarts <= self.restart_limit
206+
):
207+
if number_of_restarts > 0:
208+
logger.info("Restarting actor: %s", cls.__name__)
209+
205210
try:
206211
await super().run()
207212
except asyncio.CancelledError:
208213
logger.debug("Cancelling actor: %s", cls.__name__)
209214
raise
210-
except Exception as err: # pylint: disable=broad-except
211-
logger.exception(
212-
"Actor (%s) crashed with error: %s", cls.__name__, err
213-
)
214-
if (
215-
self.restart_limit is None
216-
or number_of_restarts < self.restart_limit
217-
):
218-
number_of_restarts += 1
219-
logger.info("Restarting actor: %s", cls.__name__)
220-
else:
221-
logger.info("Shutting down actor: %s", cls.__name__)
222-
break
215+
except Exception: # pylint: disable=broad-except
216+
logger.exception("Actor (%s) crashed", cls.__name__)
217+
finally:
218+
number_of_restarts += 1
219+
220+
logger.info("Shutting down actor: %s", cls.__name__)
223221

224222
async def _stop(self) -> None:
225223
"""Stop an running actor."""
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
# License: MIT
2+
# Copyright © 2023 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Utility functions to run and synchronize the execution of actors."""
5+
6+
7+
import asyncio
8+
import logging
9+
from typing import Any
10+
11+
from ._decorator import BaseActor
12+
13+
_logger = logging.getLogger(__name__)
14+
15+
16+
async def run(*actors: Any) -> None:
17+
"""Await the completion of all actors.
18+
19+
Args:
20+
actors: the actors to be awaited.
21+
22+
Raises:
23+
AssertionError: if any of the actors is not an instance of BaseActor.
24+
"""
25+
# Check that each actor is an instance of BaseActor at runtime,
26+
# due to the indirection created by the actor decorator.
27+
for actor in actors:
28+
assert isinstance(actor, BaseActor), f"{actor} is not an instance of BaseActor"
29+
30+
pending_tasks = set()
31+
for actor in actors:
32+
pending_tasks.add(asyncio.create_task(actor.join(), name=str(actor)))
33+
34+
# Currently the actor decorator manages the life-cycle of the actor tasks
35+
while pending_tasks:
36+
done_tasks, pending_tasks = await asyncio.wait(
37+
pending_tasks, return_when=asyncio.FIRST_COMPLETED
38+
)
39+
40+
# This should always be only one task, but we handle many for extra safety
41+
for task in done_tasks:
42+
# Cancellation needs to be checked first, otherwise the other methods
43+
# could raise a CancelledError
44+
if task.cancelled():
45+
_logger.info("The actor %s was cancelled", task.get_name())
46+
elif exception := task.exception():
47+
_logger.error(
48+
"The actor %s was finished due to an uncaught exception",
49+
task.get_name(),
50+
exc_info=exception,
51+
)
52+
else:
53+
_logger.info("The actor %s finished normally", task.get_name())

tests/actor/test_decorator.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from frequenz.channels import Broadcast, Receiver, Sender
66
from frequenz.channels.util import Select
77

8-
from frequenz.sdk.actor import actor
8+
from frequenz.sdk.actor import actor, run
99

1010

1111
@actor
@@ -111,5 +111,4 @@ async def test_actor_does_not_restart() -> None:
111111
)
112112

113113
await channel.new_sender().send(1)
114-
# pylint: disable=no-member
115-
await _faulty_actor.join() # type: ignore
114+
await run(_faulty_actor)

tests/actor/test_resampling.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,14 @@
22
# Copyright © 2022 Frequenz Energy-as-a-Service GmbH
33

44
"""Frequenz Python SDK resampling example."""
5+
import asyncio
56
import dataclasses
67
from datetime import datetime, timezone
78
from typing import Iterator
89

910
import async_solipsism
1011
import pytest
1112
import time_machine
12-
from async_solipsism.socket import asyncio
1313
from frequenz.channels import Broadcast
1414

1515
from frequenz.sdk.actor import (
@@ -25,8 +25,9 @@
2525
#
2626

2727

28-
@pytest.fixture(autouse=True)
29-
def fake_loop() -> Iterator[async_solipsism.EventLoop]:
28+
# Setting 'autouse' has no effect as this method replaces the event loop for all tests in the file.
29+
@pytest.fixture()
30+
def event_loop() -> Iterator[async_solipsism.EventLoop]:
3031
"""Replace the loop with one that doesn't interact with the outside world."""
3132
loop = async_solipsism.EventLoop()
3233
yield loop

tests/actor/test_run_utils.py

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
# License: MIT
2+
# Copyright © 2023 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Simple tests for the actor runner."""
5+
6+
import asyncio
7+
import time
8+
from typing import Iterator
9+
10+
import async_solipsism
11+
import pytest
12+
import time_machine
13+
14+
from frequenz.sdk.actor import actor, run
15+
16+
17+
# Setting 'autouse' has no effect as this method replaces the event loop for all tests in the file.
18+
@pytest.fixture()
19+
def event_loop() -> Iterator[async_solipsism.EventLoop]:
20+
"""Replace the loop with one that doesn't interact with the outside world."""
21+
loop = async_solipsism.EventLoop()
22+
yield loop
23+
loop.close()
24+
25+
26+
@pytest.fixture
27+
def fake_time() -> Iterator[time_machine.Coordinates]:
28+
"""Replace real time with a time machine that doesn't automatically tick."""
29+
with time_machine.travel(0, tick=False) as traveller:
30+
yield traveller
31+
32+
33+
@actor
34+
class FaultyActor:
35+
"""A test faulty actor."""
36+
37+
def __init__(self, name: str) -> None:
38+
"""Initialize the faulty actor.
39+
40+
Args:
41+
name: the name of the faulty actor.
42+
"""
43+
self.name = name
44+
self.is_cancelled = False
45+
46+
async def run(self) -> None:
47+
"""Run the faulty actor.
48+
49+
Raises:
50+
CancelledError: the exception causes the actor to be cancelled
51+
"""
52+
self.is_cancelled = True
53+
raise asyncio.CancelledError(f"Faulty Actor {self.name} failed")
54+
55+
56+
@actor
57+
class SleepyActor:
58+
"""A test actor that sleeps a short time."""
59+
60+
def __init__(self, name: str, sleep_duration: float) -> None:
61+
"""Initialize the sleepy actor.
62+
63+
Args:
64+
name: the name of the sleepy actor.
65+
sleep_duration: the virtual duration to sleep while running.
66+
"""
67+
self.name = name
68+
self.sleep_duration = sleep_duration
69+
self.is_joined = False
70+
71+
async def run(self) -> None:
72+
"""Run the sleepy actor."""
73+
while time.time() < self.sleep_duration:
74+
await asyncio.sleep(0.1)
75+
76+
self.is_joined = True
77+
78+
79+
# pylint: disable=redefined-outer-name
80+
async def test_all_actors_done(fake_time: time_machine.Coordinates) -> None:
81+
"""Test the completion of all actors."""
82+
83+
sleepy_actor_1 = SleepyActor("sleepy_actor_1", sleep_duration=1.0)
84+
sleepy_actor_2 = SleepyActor("sleepy_actor_2", sleep_duration=2.0)
85+
86+
test_task = asyncio.create_task(run(sleepy_actor_1, sleepy_actor_2))
87+
88+
sleep_duration = time.time()
89+
90+
assert sleep_duration == 0
91+
assert sleepy_actor_1.is_joined is False
92+
assert sleepy_actor_2.is_joined is False
93+
94+
while not test_task.done():
95+
if sleep_duration < 1:
96+
assert sleepy_actor_1.is_joined is False
97+
assert sleepy_actor_2.is_joined is False
98+
elif sleep_duration < 2:
99+
assert sleepy_actor_1.is_joined is True
100+
assert sleepy_actor_2.is_joined is False
101+
elif sleep_duration == 2:
102+
assert sleepy_actor_1.is_joined is True
103+
assert sleepy_actor_2.is_joined is True
104+
105+
fake_time.shift(0.5)
106+
sleep_duration = time.time()
107+
await asyncio.sleep(1)
108+
109+
assert sleepy_actor_1.is_joined
110+
assert sleepy_actor_2.is_joined
111+
112+
113+
async def test_actors_cancelled() -> None:
114+
"""Test the completion of actors being cancelled."""
115+
116+
faulty_actors = [FaultyActor(f"faulty_actor_{idx}") for idx in range(5)]
117+
118+
await asyncio.wait_for(run(*faulty_actors), timeout=1.0)
119+
120+
for faulty_actor in faulty_actors:
121+
assert faulty_actor.is_cancelled

tests/timeseries/test_resampling.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,9 @@
3737
# pylint: disable=too-many-locals,redefined-outer-name
3838

3939

40-
@pytest.fixture(autouse=True)
41-
def fake_loop() -> Iterator[async_solipsism.EventLoop]:
40+
# Setting 'autouse' has no effect as this method replaces the event loop for all tests in the file.
41+
@pytest.fixture()
42+
def event_loop() -> Iterator[async_solipsism.EventLoop]:
4243
"""Replace the loop with one that doesn't interact with the outside world."""
4344
loop = async_solipsism.EventLoop()
4445
yield loop

0 commit comments

Comments
 (0)