Skip to content

Commit 3e7c2fe

Browse files
authored
Implement BackgroundService and new Actor class (#564)
`BackgroundService` is a new abstract base class can be used to write other classes that runs one or more tasks in the background. It provides a consistent API to start and stop these services and also takes care of the handling of the background tasks. It can also work as an `async` context manager, giving the service a deterministic lifetime and guaranteed cleanup. The new `Actor` class brings quite a few new improvements over the old `@actor` decorator. These are the main differences: * It doesn't start automatically, `start()` needs to be called to start an actor. * The method to implement the main logic was renamed from `run()` to `_run()`, as it is not intended to be run externally. * Actors can have an optional `name` (useful for debugging/logging purposes) and `loop` (if the actor should run in a loop different from the currently running loop). * The actor will only be restarted if an unhandled `Exception` is raised by `_run()`. It will not be restarted if the `_run()` method finishes normally. If an unhandled `BaseException` is raised instead, it will be re-raised. For normal cancellation the `_run()` method should handle `asyncio.CancelledError` if the cancellation shouldn't be propagated (this is the same as with the decorator). * The `_stop()` method is public (`stop()`) and will `cancel()` and `await` for the task to finish, catching the `asyncio.CancelledError`. * The `join()` method is renamed to `wait()`, but they can also be awaited directly ( `await actor`). * For deterministic cleanup, actors can now be used as `async` context managers. The base actors (`ConfigManagingActor`, `ComponentMetricsResamplingActor`, `DataSourcingActor`, `PowerDistributingActor`) now inherit from the new `Actor` class, as well as the `MovingWindow`. Fixes #240, fixes #45, fixes #196.
2 parents e48110d + cf12100 commit 3e7c2fe

31 files changed

+1890
-1124
lines changed

RELEASE_NOTES.md

Lines changed: 90 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,29 +2,110 @@
22

33
## Summary
44

5-
<!-- Here goes a general summary of what this release is about -->
5+
This release replaces the `@actor` decorator with a new `Actor` class.
66

77
## Upgrading
88

9+
910
- The `frequenz.sdk.power` package contained the power distribution algorithm, which is for internal use in the sdk, and is no longer part of the public API.
1011

1112
- `PowerDistributingActor`'s result type `OutOfBound` has been renamed to `OutOfBounds`, and its member variable `bound` has been renamed to `bounds`.
1213

14+
- The `@actor` decorator was replaced by the new `Actor` class. The main differences between the new class and the old decorator are:
15+
16+
* It doesn't start automatically, `start()` needs to be called to start an actor (using the `frequenz.sdk.actor.run()` function is recommended).
17+
* The method to implement the main logic was renamed from `run()` to `_run()`, as it is not intended to be run externally.
18+
* Actors can have an optional `name` (useful for debugging/logging purposes).
19+
* The actor will only be restarted if an unhandled `Exception` is raised by `_run()`. It will not be restarted if the `_run()` method finishes normally. If an unhandled `BaseException` is raised instead, it will be re-raised. For normal cancellation the `_run()` method should handle `asyncio.CancelledError` if the cancellation shouldn't be propagated (this is the same as with the decorator).
20+
* The `_stop()` method is public (`stop()`) and will `cancel()` and `await` for the task to finish, catching the `asyncio.CancelledError`.
21+
* The `join()` method is renamed to `wait()`, but they can also be awaited directly ( `await actor`).
22+
* For deterministic cleanup, actors can now be used as `async` context managers.
23+
24+
Most actors can be migrated following these steps:
25+
26+
1. Remove the decorator
27+
2. Add `Actor` as a base class
28+
3. Rename `run()` to `_run()`
29+
4. Forward the `name` argument (optional but recommended)
30+
31+
For example, this old actor:
32+
33+
```python
34+
from frequenz.sdk.actor import actor
35+
36+
@actor
37+
class TheActor:
38+
def __init__(self, actor_args) -> None:
39+
# init code
40+
41+
def run(self) -> None:
42+
# run code
43+
```
44+
45+
Can be migrated as:
46+
47+
```python
48+
import asyncio
49+
from frequenz.sdk.actor import Actor
50+
51+
class TheActor(Actor):
52+
def __init__(self, actor_args,
53+
*,
54+
name: str | None = None,
55+
) -> None:
56+
super().__init__(name=name)
57+
# init code
58+
59+
def _run(self) -> None:
60+
# run code
61+
```
62+
63+
Then you can instantiate all your actors first and then run them using:
64+
65+
```python
66+
from frequenz.sdk.actor import run
67+
# Init code
68+
actor = TheActor()
69+
other_actor = OtherActor()
70+
# more setup
71+
await run(actor, other_actor) # Start and await for all the actors
72+
```
73+
74+
- The `MovingWindow` is now a `BackgroundService`, so it needs to be started manually with `await window.start()`. It is recommended to use it as an `async` context manager if possible though:
75+
76+
```python
77+
async with MovingWindow(...) as window:
78+
# The moving windows is started here
79+
use(window)
80+
# The moving window is stopped here
81+
```
82+
83+
- The base actors (`ConfigManagingActor`, `ComponentMetricsResamplingActor`, `DataSourcingActor`, `PowerDistributingActor`) now inherit from the new `Actor` class, if you are using them directly, you need to start them manually with `await actor.start()` and you might need to do some other adjustments.
84+
1385
## New Features
1486

1587
- DFS for compentent graph
1688

89+
- `BackgroundService`: This new abstract base class can be used to write other classes that runs one or more tasks in the background. It provides a consistent API to start and stop these services and also takes care of the handling of the background tasks. It can also work as an `async` context manager, giving the service a deterministic lifetime and guaranteed cleanup.
90+
91+
All classes spawning tasks that are expected to run for an indeterminate amount of time are likely good candidates to use this as a base class.
92+
93+
- `Actor`: This new class inherits from `BackgroundService` and it replaces the `@actor` decorator.
94+
1795
## Bug Fixes
1896

1997
- Fixes a bug in the ring buffer updating the end timestamp of gaps when they are outdated.
20-
- Properly handles PV configurations with no or only some meters before the PV
21-
component.
22-
So far we only had configurations like this: Meter -> Inverter -> PV. However
23-
the scenario with Inverter -> PV is also possible and now handled correctly.
98+
99+
- Properly handles PV configurations with no or only some meters before the PV component.
100+
101+
So far we only had configurations like this: `Meter -> Inverter -> PV`. However the scenario with `Inverter -> PV` is also possible and now handled correctly.
102+
24103
- Fix `consumer_power()` not working certain configurations.
25-
In microgrids without consumers and no main meter, the formula
26-
would never return any values.
27-
- Fix `pv_power` not working in setups with 2 grid meters by using a new
28-
reliable function to search for components in the components graph
104+
105+
In microgrids without consumers and no main meter, the formula would never return any values.
106+
107+
- Fix `pv_power` not working in setups with 2 grid meters by using a new reliable function to search for components in the components graph
108+
29109
- Fix `consumer_power` similar to `pv_power`
110+
30111
- Zero value requests received by the `PowerDistributingActor` will now always be accepted, even when there are non-zero exclusion bounds.

benchmarks/power_distribution/power_distributor.py

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -108,19 +108,16 @@ async def run_test( # pylint: disable=too-many-locals
108108
power_request_channel = Broadcast[Request]("power-request")
109109
battery_status_channel = Broadcast[BatteryStatus]("battery-status")
110110
channel_registry = ChannelRegistry(name="power_distributor")
111-
distributor = PowerDistributingActor(
111+
async with PowerDistributingActor(
112112
channel_registry=channel_registry,
113113
requests_receiver=power_request_channel.new_receiver(),
114114
battery_status_sender=battery_status_channel.new_sender(),
115-
)
116-
117-
tasks: List[Coroutine[Any, Any, List[Result]]] = []
118-
tasks.append(send_requests(batteries, num_requests))
119-
120-
result = await asyncio.gather(*tasks)
121-
exec_time = timeit.default_timer() - start
115+
):
116+
tasks: List[Coroutine[Any, Any, List[Result]]] = []
117+
tasks.append(send_requests(batteries, num_requests))
122118

123-
await distributor._stop() # type: ignore # pylint: disable=no-member, protected-access
119+
result = await asyncio.gather(*tasks)
120+
exec_time = timeit.default_timer() - start
124121

125122
summary = parse_result(result)
126123
summary["num_requests"] = num_requests

benchmarks/timeseries/benchmark_datasourcing.py

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -113,23 +113,22 @@ async def consume(channel: Receiver[Any]) -> None:
113113
await request_sender.send(request)
114114
consume_tasks.append(asyncio.create_task(consume(recv_channel)))
115115

116-
DataSourcingActor(request_receiver, channel_registry)
117-
118-
await asyncio.gather(*consume_tasks)
119-
120-
time_taken = perf_counter() - start_time
121-
122-
await mock_grid.cleanup()
123-
124-
print(f"Samples Sent: {samples_sent}, time taken: {time_taken}")
125-
print(f"Samples per second: {samples_sent / time_taken}")
126-
print(
127-
"Expected samples: "
128-
f"{num_expected_messages}, missing: {num_expected_messages - samples_sent}"
129-
)
130-
print(
131-
f"Missing per EVC: {(num_expected_messages - samples_sent) / num_ev_chargers}"
132-
)
116+
async with DataSourcingActor(request_receiver, channel_registry):
117+
await asyncio.gather(*consume_tasks)
118+
119+
time_taken = perf_counter() - start_time
120+
121+
await mock_grid.cleanup()
122+
123+
print(f"Samples Sent: {samples_sent}, time taken: {time_taken}")
124+
print(f"Samples per second: {samples_sent / time_taken}")
125+
print(
126+
"Expected samples: "
127+
f"{num_expected_messages}, missing: {num_expected_messages - samples_sent}"
128+
)
129+
print(
130+
f"Missing per EVC: {(num_expected_messages - samples_sent) / num_ev_chargers}"
131+
)
133132

134133

135134
def parse_args() -> Tuple[int, int, bool]:

benchmarks/timeseries/periodic_feature_extractor.py

Lines changed: 30 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
from __future__ import annotations
1313

1414
import asyncio
15+
import collections.abc
16+
import contextlib
1517
import logging
1618
from datetime import datetime, timedelta, timezone
1719
from functools import partial
@@ -27,19 +29,23 @@
2729
from frequenz.sdk.timeseries._quantities import Quantity
2830

2931

30-
async def init_feature_extractor(period: int) -> PeriodicFeatureExtractor:
32+
@contextlib.asynccontextmanager
33+
async def init_feature_extractor(
34+
period: int,
35+
) -> collections.abc.AsyncIterator[PeriodicFeatureExtractor]:
3136
"""Initialize the PeriodicFeatureExtractor class."""
3237
# We only need the moving window to initialize the PeriodicFeatureExtractor class.
3338
lm_chan = Broadcast[Sample[Quantity]]("lm_net_power")
34-
moving_window = MovingWindow(
39+
async with MovingWindow(
3540
timedelta(seconds=1), lm_chan.new_receiver(), timedelta(seconds=1)
36-
)
37-
38-
await lm_chan.new_sender().send(Sample(datetime.now(tz=timezone.utc), Quantity(0)))
41+
) as moving_window:
42+
await lm_chan.new_sender().send(
43+
Sample(datetime.now(tz=timezone.utc), Quantity(0))
44+
)
3945

40-
# Initialize the PeriodicFeatureExtractor class with a period of period seconds.
41-
# This works since the sampling period is set to 1 second.
42-
return PeriodicFeatureExtractor(moving_window, timedelta(seconds=period))
46+
# Initialize the PeriodicFeatureExtractor class with a period of period seconds.
47+
# This works since the sampling period is set to 1 second.
48+
yield PeriodicFeatureExtractor(moving_window, timedelta(seconds=period))
4349

4450

4551
def _calculate_avg_window(
@@ -211,22 +217,22 @@ async def main() -> None:
211217

212218
# create a random ndarray with 29 days -5 seconds of data
213219
days_29_s = 29 * DAY_S
214-
feature_extractor = await init_feature_extractor(10)
215-
data = rng.standard_normal(days_29_s)
216-
run_benchmark(data, 4, feature_extractor)
217-
218-
days_29_s = 29 * DAY_S + 3
219-
data = rng.standard_normal(days_29_s)
220-
run_benchmark(data, 4, feature_extractor)
221-
222-
# create a random ndarray with 29 days +5 seconds of data
223-
data = rng.standard_normal(29 * DAY_S + 5)
224-
225-
feature_extractor = await init_feature_extractor(7 * DAY_S)
226-
# TEST one day window and 6 days distance. COPY (Case 3)
227-
run_benchmark(data, DAY_S, feature_extractor)
228-
# benchmark one day window and 6 days distance. NO COPY (Case 1)
229-
run_benchmark(data[: 28 * DAY_S], DAY_S, feature_extractor)
220+
async with init_feature_extractor(10) as feature_extractor:
221+
data = rng.standard_normal(days_29_s)
222+
run_benchmark(data, 4, feature_extractor)
223+
224+
days_29_s = 29 * DAY_S + 3
225+
data = rng.standard_normal(days_29_s)
226+
run_benchmark(data, 4, feature_extractor)
227+
228+
# create a random ndarray with 29 days +5 seconds of data
229+
data = rng.standard_normal(29 * DAY_S + 5)
230+
231+
async with init_feature_extractor(7 * DAY_S) as feature_extractor:
232+
# TEST one day window and 6 days distance. COPY (Case 3)
233+
run_benchmark(data, DAY_S, feature_extractor)
234+
# benchmark one day window and 6 days distance. NO COPY (Case 1)
235+
run_benchmark(data[: 28 * DAY_S], DAY_S, feature_extractor)
230236

231237

232238
logging.basicConfig(level=logging.DEBUG)

src/frequenz/sdk/actor/__init__.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,20 +4,22 @@
44
"""A base class for creating simple composable actors."""
55

66
from ..timeseries._resampling import ResamplerConfig
7+
from ._actor import Actor
8+
from ._background_service import BackgroundService
79
from ._channel_registry import ChannelRegistry
810
from ._config_managing import ConfigManagingActor
911
from ._data_sourcing import ComponentMetricRequest, DataSourcingActor
10-
from ._decorator import actor
1112
from ._resampling import ComponentMetricsResamplingActor
1213
from ._run_utils import run
1314

1415
__all__ = [
16+
"Actor",
17+
"BackgroundService",
1518
"ChannelRegistry",
1619
"ComponentMetricRequest",
1720
"ComponentMetricsResamplingActor",
1821
"ConfigManagingActor",
1922
"DataSourcingActor",
2023
"ResamplerConfig",
21-
"actor",
2224
"run",
2325
]

0 commit comments

Comments
 (0)