Skip to content

Commit 5c44da2

Browse files
authored
Add Quantity types and update formulas to produce typed Samples (#422)
2 parents 4ac6286 + 22a47b6 commit 5c44da2

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+1773
-493
lines changed

benchmarks/power_distribution/power_distributor.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
)
2727
from frequenz.sdk.microgrid import connection_manager
2828
from frequenz.sdk.microgrid.component import Component, ComponentCategory
29+
from frequenz.sdk.timeseries._quantities import Power
2930

3031
HOST = "microgrid.sandbox.api.frequenz.io"
3132
PORT = 61060
@@ -49,7 +50,7 @@ async def send_requests(batteries: Set[int], request_num: int) -> List[Result]:
4950
results_rx = battery_pool.power_distribution_results()
5051
result: List[Result] = []
5152
for _ in range(request_num):
52-
await battery_pool.set_power(float(random.randrange(100000, 1000000)))
53+
await battery_pool.set_power(Power(float(random.randrange(100000, 1000000))))
5354
try:
5455
output = await asyncio.wait_for(results_rx.receive(), timeout=3)
5556
if output is None:

benchmarks/timeseries/periodic_feature_extractor.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,18 @@
2424
from numpy.typing import NDArray
2525

2626
from frequenz.sdk.timeseries import MovingWindow, PeriodicFeatureExtractor, Sample
27+
from frequenz.sdk.timeseries._quantities import Quantity
2728

2829

2930
async def init_feature_extractor(period: int) -> PeriodicFeatureExtractor:
3031
"""Initialize the PeriodicFeatureExtractor class."""
3132
# We only need the moving window to initialize the PeriodicFeatureExtractor class.
32-
lm_chan = Broadcast[Sample]("lm_net_power")
33+
lm_chan = Broadcast[Sample[Quantity]]("lm_net_power")
3334
moving_window = MovingWindow(
3435
timedelta(seconds=1), lm_chan.new_receiver(), timedelta(seconds=1)
3536
)
3637

37-
await lm_chan.new_sender().send(Sample(datetime.now(tz=timezone.utc), 0))
38+
await lm_chan.new_sender().send(Sample(datetime.now(tz=timezone.utc), Quantity(0)))
3839

3940
# Initialize the PeriodicFeatureExtractor class with a period of period seconds.
4041
# This works since the sampling period is set to 1 second.

benchmarks/timeseries/resampling.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from typing import Sequence
99

1010
from frequenz.sdk.timeseries import Sample
11+
from frequenz.sdk.timeseries._quantities import Quantity
1112
from frequenz.sdk.timeseries._resampling import (
1213
ResamplerConfig,
1314
SourceProperties,
@@ -16,7 +17,7 @@
1617

1718

1819
def nop( # pylint: disable=unused-argument
19-
samples: Sequence[Sample],
20+
samples: Sequence[Sample[Quantity]],
2021
resampler_config: ResamplerConfig,
2122
source_properties: SourceProperties,
2223
) -> float:
@@ -44,7 +45,7 @@ def _do_work() -> None:
4445
for _n_resample in range(resamples):
4546
for _n_sample in range(samples):
4647
now = now + timedelta(seconds=1 / samples)
47-
helper.add_sample(Sample(now, 0.0))
48+
helper.add_sample(Sample(now, Quantity(0.0)))
4849
helper.resample(now)
4950

5051
print(timeit(_do_work, number=5))

benchmarks/timeseries/ringbuffer_memusage.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import numpy as np
1313

1414
from frequenz.sdk.timeseries import Sample
15+
from frequenz.sdk.timeseries._quantities import Quantity
1516
from frequenz.sdk.timeseries._ringbuffer import OrderedRingBuffer
1617

1718
FIVE_MINUTES = timedelta(minutes=5)
@@ -75,7 +76,7 @@ def main(ringbuffer_len: int, iterations: int, gap_size: int) -> None:
7576
datetime.fromtimestamp(
7677
200 + i * FIVE_MINUTES.total_seconds(), tz=timezone.utc
7778
),
78-
i,
79+
Quantity(i),
7980
)
8081
)
8182

benchmarks/timeseries/ringbuffer_serialization.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
import frequenz.sdk.timeseries._ringbuffer as rb
1717
from frequenz.sdk.timeseries import Sample
18+
from frequenz.sdk.timeseries._quantities import Quantity
1819

1920
FILE_NAME = "ringbuffer.pkl"
2021
FIVE_MINUTES = timedelta(minutes=5)
@@ -72,7 +73,7 @@ def main() -> None:
7273
datetime.fromtimestamp(
7374
200 + i * FIVE_MINUTES.total_seconds(), tz=timezone.utc
7475
),
75-
i,
76+
Quantity(i),
7677
)
7778
)
7879

examples/power_distribution.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
from frequenz.sdk.actor import ResamplerConfig
2323
from frequenz.sdk.actor.power_distributing import Result, Success
2424
from frequenz.sdk.timeseries import Sample
25+
from frequenz.sdk.timeseries._quantities import Power
2526

2627
_logger = logging.getLogger(__name__)
2728
HOST = "microgrid.sandbox.api.frequenz.io" # it should be the host name.
@@ -61,13 +62,13 @@ async def run(self) -> None:
6162

6263
avg_power = sum(request) / len(request)
6364
_logger.debug("Avg power %d", avg_power)
64-
power_to_set: float
65+
power_to_set: Power
6566
if avg_power > 30000:
6667
# Charge
67-
power_to_set = 10000.0
68+
power_to_set = Power(10000.0)
6869
else:
6970
# Discharge
70-
power_to_set = -10000.0
71+
power_to_set = Power(-10000.0)
7172

7273
await battery_pool.set_power(power_to_set)
7374
try:
@@ -97,7 +98,7 @@ class DataCollectingActor:
9798
def __init__(
9899
self,
99100
request_channel: Sender[List[float]],
100-
active_power_data: Receiver[Sample],
101+
active_power_data: Receiver[Sample[Power]],
101102
) -> None:
102103
"""Create actor instance.
103104
@@ -126,7 +127,9 @@ async def run(self) -> None:
126127
if (datetime.now(timezone.utc) - time_data).total_seconds() > 30:
127128
_logger.error("Active power data are stale")
128129
continue
129-
queue.put_nowait(active_power.value)
130+
queue.put_nowait(
131+
None if not active_power.value else active_power.value.base_value
132+
)
130133

131134
await self._request_channel.send(list(queue.queue))
132135

examples/resampling.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from frequenz.sdk.microgrid import connection_manager
2020
from frequenz.sdk.microgrid.component import ComponentCategory, ComponentMetricId
2121
from frequenz.sdk.timeseries import Sample
22+
from frequenz.sdk.timeseries._quantities import Quantity
2223
from frequenz.sdk.timeseries._resampling import Resampler, ResamplerConfig, Sink, Source
2324

2425
HOST = "microgrid.sandbox.api.frequenz.io"
@@ -33,11 +34,11 @@ async def _calculate_average(source: Source, sink: Sink) -> None:
3334
count += 1
3435
if sample.value is None:
3536
continue
36-
avg = avg * (count - 1) / count + sample.value / count
37-
await sink(Sample(datetime.now(timezone.utc), avg))
37+
avg = avg * (count - 1) / count + sample.value.base_value / count
38+
await sink(Sample(datetime.now(timezone.utc), Quantity(avg)))
3839

3940

40-
async def _print_sample(sample: Sample) -> None:
41+
async def _print_sample(sample: Sample[Quantity]) -> None:
4142
print(f"\nResampled average at {sample.timestamp}: {sample.value}\n")
4243

4344

@@ -105,7 +106,7 @@ async def run() -> None: # pylint: disable=too-many-locals
105106
)
106107

107108
# Create a channel to calculate an average for all the data
108-
average_chan = Broadcast[Sample]("average")
109+
average_chan = Broadcast[Sample[Quantity]]("average")
109110

110111
second_stage_resampler = Resampler(
111112
ResamplerConfig(resampling_period=timedelta(seconds=3.0))
@@ -117,7 +118,7 @@ async def run() -> None: # pylint: disable=too-many-locals
117118
average_sender = average_chan.new_sender()
118119

119120
# Needed until channels Senders raises exceptions on errors
120-
async def sink_adapter(sample: Sample) -> None:
121+
async def sink_adapter(sample: Sample[Quantity]) -> None:
121122
await average_sender.send(sample)
122123

123124
print("Starting...")

pyproject.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,9 @@ disable = [
113113
# disabled because it conflicts with isort
114114
"wrong-import-order",
115115
"ungrouped-imports",
116+
# pylint's unsubscriptable check is buggy and is not needed because
117+
# it is a type-check, for which we already have mypy.
118+
"unsubscriptable-object",
116119
]
117120

118121
[tool.pylint.design]

src/frequenz/sdk/actor/_data_sourcing/microgrid_api_source.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
MeterData,
2222
)
2323
from ...timeseries import Sample
24+
from ...timeseries._quantities import Quantity
2425
from .._channel_registry import ChannelRegistry
2526

2627

@@ -304,7 +305,7 @@ def _get_metric_senders(
304305
self,
305306
category: ComponentCategory,
306307
requests: Dict[ComponentMetricId, List[ComponentMetricRequest]],
307-
) -> List[Tuple[Callable[[Any], float], List[Sender[Sample]]]]:
308+
) -> List[Tuple[Callable[[Any], float], List[Sender[Sample[Quantity]]]]]:
308309
"""Get channel senders from the channel registry for each requested metric.
309310
310311
Args:
@@ -355,7 +356,9 @@ def process_msg(data: Any) -> None:
355356
tasks = []
356357
for extractor, senders in stream_senders:
357358
for sender in senders:
358-
tasks.append(sender.send(Sample(data.timestamp, extractor(data))))
359+
tasks.append(
360+
sender.send(Sample(data.timestamp, Quantity(extractor(data))))
361+
)
359362
asyncio.gather(*tasks)
360363
nonlocal pending_messages
361364
pending_messages -= 1

src/frequenz/sdk/actor/_resampling.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
from .._internal._asyncio import cancel_and_await
1515
from ..timeseries import Sample
16+
from ..timeseries._quantities import Quantity
1617
from ..timeseries._resampling import Resampler, ResamplerConfig, ResamplingError
1718
from ._channel_registry import ChannelRegistry
1819
from ._data_sourcing import ComponentMetricRequest
@@ -80,7 +81,7 @@ async def _subscribe(self, request: ComponentMetricRequest) -> None:
8081
# exceptions to report errors.
8182
sender = self._channel_registry.new_sender(request.get_channel_name())
8283

83-
async def sink_adapter(sample: Sample) -> None:
84+
async def sink_adapter(sample: Sample[Quantity]) -> None:
8485
await sender.send(sample)
8586

8687
self._resampler.add_timeseries(request_channel_name, receiver, sink_adapter)

0 commit comments

Comments
 (0)