Skip to content

Commit 7079533

Browse files
committed
Add second stage resampling to the example
Signed-off-by: Leandro Lucarella <[email protected]>
1 parent 1068725 commit 7079533

File tree

1 file changed

+53
-12
lines changed

1 file changed

+53
-12
lines changed

examples/resampling.py

Lines changed: 53 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,10 @@
44
"""Frequenz Python SDK resampling example."""
55

66
import asyncio
7+
from datetime import datetime, timezone
78

89
from frequenz.channels import Broadcast
9-
from frequenz.channels.util import MergeNamed
10+
from frequenz.channels.util import Merge
1011

1112
from frequenz.sdk import microgrid
1213
from frequenz.sdk.actor import (
@@ -16,11 +17,29 @@
1617
DataSourcingActor,
1718
)
1819
from frequenz.sdk.microgrid.component import ComponentCategory, ComponentMetricId
20+
from frequenz.sdk.timeseries import Sample, Sink, Source
21+
from frequenz.sdk.timeseries.resampling import Resampler
1922

2023
HOST = "microgrid.sandbox.api.frequenz.io"
2124
PORT = 61060
2225

2326

27+
async def _calculate_average(source: Source, sink: Sink) -> None:
28+
avg: float = 0.0
29+
count: int = 0
30+
async for sample in source:
31+
print(f"Received sample to average at {sample.timestamp}: {sample.value}")
32+
count += 1
33+
if sample.value is None:
34+
continue
35+
avg = avg * (count - 1) / count + sample.value / count
36+
await sink(Sample(datetime.now(timezone.utc), avg))
37+
38+
39+
async def _print_sample(sample: Sample) -> None:
40+
print(f"\nResampled average at {sample.timestamp}: {sample.value}\n")
41+
42+
2443
async def run() -> None: # pylint: disable=too-many-locals
2544
"""Run main functions that initializes and creates everything."""
2645
await microgrid.initialize(HOST, PORT)
@@ -46,7 +65,7 @@ async def run() -> None: # pylint: disable=too-many-locals
4665
channel_registry=channel_registry,
4766
data_sourcing_request_sender=data_source_request_sender,
4867
resampling_request_receiver=resampling_request_receiver,
49-
resampling_period_s=1.0,
68+
resampling_period_s=1,
5069
)
5170

5271
components = await microgrid.get().api_client.components()
@@ -56,7 +75,9 @@ async def run() -> None: # pylint: disable=too-many-locals
5675
if comp.category == ComponentCategory.BATTERY
5776
]
5877

59-
# Create subscription requests for each time series id
78+
print(f"Found {len(battery_ids)} batteries: {battery_ids}")
79+
80+
# Create subscription requests for each battery's SoC
6081
subscription_requests = [
6182
ComponentMetricRequest(
6283
namespace="resampling",
@@ -73,17 +94,37 @@ async def run() -> None: # pylint: disable=too-many-locals
7394
)
7495

7596
# Merge sample receivers for each subscription into one receiver
76-
merged_receiver = MergeNamed(
77-
**{
78-
req.get_channel_name(): channel_registry.new_receiver(
79-
req.get_channel_name()
80-
)
97+
merged_receiver = Merge(
98+
*[
99+
channel_registry.new_receiver(req.get_channel_name())
81100
for req in subscription_requests
82-
}
101+
]
83102
)
84103

85-
async for channel_name, msg in merged_receiver:
86-
print(f"{channel_name}: {msg}")
104+
# Create a channel to calculate an average for all the data
105+
average_chan = Broadcast[Sample]("average")
106+
107+
second_stage_resampler = Resampler(resampling_period_s=3.0)
108+
second_stage_resampler.add_timeseries(average_chan.new_receiver(), _print_sample)
109+
110+
average_sender = average_chan.new_sender()
111+
# Needed until channels Senders raises exceptions on errors
112+
async def sink_adapter(sample: Sample) -> None:
113+
assert await average_sender.send(sample)
114+
115+
print("Starting...")
116+
117+
try:
118+
# This will run until it is interrupted (with Ctrl-C for example)
119+
await asyncio.gather(
120+
_calculate_average(merged_receiver, sink_adapter),
121+
second_stage_resampler.resample(),
122+
)
123+
finally:
124+
await second_stage_resampler.stop()
87125

88126

89-
asyncio.run(run())
127+
try:
128+
asyncio.run(run())
129+
except KeyboardInterrupt:
130+
print("Bye!")

0 commit comments

Comments
 (0)