Skip to content

Commit d5b5994

Browse files
authored
Rewrite the resampler class to be more reusable (#101)
First we introduce a timeseries `Source` (async iterator of `Sample`) and `Sink` (an async function taking a `Sample`) types. The idea is for all classes in the `timeseries` package to be based on these types. On advantage of using a function as a `Sink` (instead of a channel) is that for cases where we don't really need a channel (like if we only need to store samples in a buffer), there is no added overhead (peak shaving for example could benefit from this). The `GroupResampler` and `Resampler` classes are not very reusable, or at least require quite a bit of work from users to use them and they need to take care of handling timers and calling methods at the right time for them to be useful. The new `Resampler` class has its own task(s) to handle all the resampling, users only need to subscribe to new timeseries by passing a timeseries `Source` and `Sink`. The `Sink` will be called each time a new sample is produced. A new `frequenz.sdk.timeseries.resampling` module is added because there are more classes only used by the `Resampler` that might not be needed by anyone just wanting to use `frequenz.sdk.timeseries`. This PR also introduces the use of [`async-solipsism`](https://pypi.org/project/async-solipsism/) (see #70) for testing, so tests using timers can be mocked in a way that they don't really need to wait for timers and time comparisons can be done precisely. It works pretty well in combination with [`time_machine`](https://pypi.org/project/time-machine/). Fixes #56.
2 parents 4a2dac8 + 1fe75cc commit d5b5994

22 files changed

+1304
-600
lines changed

RELEASE_NOTES.md

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,28 @@
66

77
## Upgrading
88

9-
<!-- Here goes notes on how to upgrade from previous versions, including deprecations and what they should be replaced with -->
9+
- `frequenz.sdk.timseries`:
10+
- The resample classes in the `frequenz.sdk.timseries` were removed. Use the new `frequenz.sdk.timseries.resampling.Resampler` class instead.
11+
- The `ResamplingFunction` was moved to the new module `frequenz.sdk.timseries.resampling`.
12+
13+
- `frequenz.sdk.actor.ComponentMetricsResamplingActor`:
14+
- The constructor now requires to pass all arguments as keywords.
15+
- The following constructor arguments were renamed to make them more clear:
16+
- `subscription_sender` -> `data_sourcing_request_sender`
17+
- `subscription_receiver` -> `resampling_request_receiver`
18+
1019

1120
## New Features
1221

13-
<!-- Here goes the main new features and examples or instructions on how to use them -->
22+
- New in `frequenz.sdk.timeseries`:
23+
24+
- `Source` and `Sink` types to work generically with streaming timeseries.
25+
26+
- New `frequenz.sdk.timeseries.resampling` module with:
27+
- `Resampler` class that works on timseries `Source` and `Sink`.
28+
- `ResamplingFunction` (moved from `frequenz.sdk.timeseries`).
29+
- `ResamplingError` and `SourceStoppedError` exceptions.
30+
- `average` function (the default resampling function).
1431

1532
## Bug Fixes
1633

examples/resampling.py

Lines changed: 59 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,10 @@
44
"""Frequenz Python SDK resampling example."""
55

66
import asyncio
7+
from datetime import datetime, timezone
78

89
from frequenz.channels import Broadcast
9-
from frequenz.channels.util import MergeNamed
10+
from frequenz.channels.util import Merge
1011

1112
from frequenz.sdk import microgrid
1213
from frequenz.sdk.actor import (
@@ -16,11 +17,29 @@
1617
DataSourcingActor,
1718
)
1819
from frequenz.sdk.microgrid.component import ComponentCategory, ComponentMetricId
20+
from frequenz.sdk.timeseries import Sample
21+
from frequenz.sdk.timeseries._resampling import Resampler, Sink, Source
1922

2023
HOST = "microgrid.sandbox.api.frequenz.io"
2124
PORT = 61060
2225

2326

27+
async def _calculate_average(source: Source, sink: Sink) -> None:
28+
avg: float = 0.0
29+
count: int = 0
30+
async for sample in source:
31+
print(f"Received sample to average at {sample.timestamp}: {sample.value}")
32+
count += 1
33+
if sample.value is None:
34+
continue
35+
avg = avg * (count - 1) / count + sample.value / count
36+
await sink(Sample(datetime.now(timezone.utc), avg))
37+
38+
39+
async def _print_sample(sample: Sample) -> None:
40+
print(f"\nResampled average at {sample.timestamp}: {sample.value}\n")
41+
42+
2443
async def run() -> None: # pylint: disable=too-many-locals
2544
"""Run main functions that initializes and creates everything."""
2645
await microgrid.initialize(HOST, PORT)
@@ -32,9 +51,9 @@ async def run() -> None: # pylint: disable=too-many-locals
3251
data_source_request_sender = data_source_request_channel.new_sender()
3352
data_source_request_receiver = data_source_request_channel.new_receiver()
3453

35-
resampling_actor_request_channel = Broadcast[ComponentMetricRequest]("resample")
36-
resampling_actor_request_sender = resampling_actor_request_channel.new_sender()
37-
resampling_actor_request_receiver = resampling_actor_request_channel.new_receiver()
54+
resampling_request_channel = Broadcast[ComponentMetricRequest]("resample")
55+
resampling_request_sender = resampling_request_channel.new_sender()
56+
resampling_request_receiver = resampling_request_channel.new_receiver()
3857

3958
# Instantiate a data sourcing actor
4059
_data_sourcing_actor = DataSourcingActor(
@@ -44,9 +63,9 @@ async def run() -> None: # pylint: disable=too-many-locals
4463
# Instantiate a resampling actor
4564
_resampling_actor = ComponentMetricsResamplingActor(
4665
channel_registry=channel_registry,
47-
subscription_sender=data_source_request_sender,
48-
subscription_receiver=resampling_actor_request_receiver,
49-
resampling_period_s=1.0,
66+
data_sourcing_request_sender=data_source_request_sender,
67+
resampling_request_receiver=resampling_request_receiver,
68+
resampling_period_s=1,
5069
)
5170

5271
components = await microgrid.get().api_client.components()
@@ -56,7 +75,9 @@ async def run() -> None: # pylint: disable=too-many-locals
5675
if comp.category == ComponentCategory.BATTERY
5776
]
5877

59-
# Create subscription requests for each time series id
78+
print(f"Found {len(battery_ids)} batteries: {battery_ids}")
79+
80+
# Create subscription requests for each battery's SoC
6081
subscription_requests = [
6182
ComponentMetricRequest(
6283
namespace="resampling",
@@ -69,24 +90,41 @@ async def run() -> None: # pylint: disable=too-many-locals
6990

7091
# Send the subscription requests
7192
await asyncio.gather(
72-
*[
73-
resampling_actor_request_sender.send(request)
74-
for request in subscription_requests
75-
]
93+
*[resampling_request_sender.send(request) for request in subscription_requests]
7694
)
7795

7896
# Merge sample receivers for each subscription into one receiver
79-
merged_receiver = MergeNamed(
80-
**{
81-
req.get_channel_name(): channel_registry.new_receiver(
82-
req.get_channel_name()
83-
)
97+
merged_receiver = Merge(
98+
*[
99+
channel_registry.new_receiver(req.get_channel_name())
84100
for req in subscription_requests
85-
}
101+
]
86102
)
87103

88-
async for channel_name, msg in merged_receiver:
89-
print(f"{channel_name}: {msg}")
104+
# Create a channel to calculate an average for all the data
105+
average_chan = Broadcast[Sample]("average")
106+
107+
second_stage_resampler = Resampler(resampling_period_s=3.0)
108+
second_stage_resampler.add_timeseries(average_chan.new_receiver(), _print_sample)
109+
110+
average_sender = average_chan.new_sender()
111+
# Needed until channels Senders raises exceptions on errors
112+
async def sink_adapter(sample: Sample) -> None:
113+
assert await average_sender.send(sample)
114+
115+
print("Starting...")
116+
117+
try:
118+
# This will run until it is interrupted (with Ctrl-C for example)
119+
await asyncio.gather(
120+
_calculate_average(merged_receiver, sink_adapter),
121+
second_stage_resampler.resample(),
122+
)
123+
finally:
124+
await second_stage_resampler.stop()
90125

91126

92-
asyncio.run(run())
127+
try:
128+
asyncio.run(run())
129+
except KeyboardInterrupt:
130+
print("Bye!")

noxfile.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,14 @@
1313

1414
FMT_DEPS = ["black", "isort"]
1515
DOCSTRING_DEPS = ["pydocstyle", "darglint"]
16-
PYTEST_DEPS = ["pytest", "pytest-cov", "pytest-mock", "pytest-asyncio", "time-machine"]
16+
PYTEST_DEPS = [
17+
"pytest",
18+
"pytest-cov",
19+
"pytest-mock",
20+
"pytest-asyncio",
21+
"time-machine",
22+
"async-solipsism",
23+
]
1724
MYPY_DEPS = ["mypy", "pandas-stubs", "grpc-stubs"]
1825

1926

pyproject.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,10 +100,13 @@ src_paths = ["src", "examples", "tests"]
100100
asyncio_mode = "auto"
101101
required_plugins = [ "pytest-asyncio", "pytest-mock" ]
102102

103-
[[tools.mypy.overrides]]
104103
[[tool.mypy.overrides]]
105104
module = [
106105
"grpc.aio",
107106
"grpc.aio.*"
108107
]
109108
ignore_missing_imports = true
109+
110+
[[tool.mypy.overrides]]
111+
module = "async_solipsism"
112+
ignore_missing_imports = true

0 commit comments

Comments
 (0)