Skip to content

Commit 63d7c92

Browse files
committed
Make Samples hold Quantity instances instead of float
This makes the transition to a generic `Sample` type easier, which is done in a subsequent commit. This commit also introduces the `output_type` parameter to formula builders and formula engines. The new parameter is used to determine which type of quantity instance a formula engine would produce. Signed-off-by: Sahas Subramanian <[email protected]>
1 parent de33b03 commit 63d7c92

38 files changed

+437
-259
lines changed

benchmarks/timeseries/periodic_feature_extractor.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
from numpy.typing import NDArray
2525

2626
from frequenz.sdk.timeseries import MovingWindow, PeriodicFeatureExtractor, Sample
27+
from frequenz.sdk.timeseries._quantities import Quantity
2728

2829

2930
async def init_feature_extractor(period: int) -> PeriodicFeatureExtractor:
@@ -34,7 +35,7 @@ async def init_feature_extractor(period: int) -> PeriodicFeatureExtractor:
3435
timedelta(seconds=1), lm_chan.new_receiver(), timedelta(seconds=1)
3536
)
3637

37-
await lm_chan.new_sender().send(Sample(datetime.now(tz=timezone.utc), 0))
38+
await lm_chan.new_sender().send(Sample(datetime.now(tz=timezone.utc), Quantity(0)))
3839

3940
# Initialize the PeriodicFeatureExtractor class with a period of period seconds.
4041
# This works since the sampling period is set to 1 second.

benchmarks/timeseries/resampling.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from typing import Sequence
99

1010
from frequenz.sdk.timeseries import Sample
11+
from frequenz.sdk.timeseries._quantities import Quantity
1112
from frequenz.sdk.timeseries._resampling import (
1213
ResamplerConfig,
1314
SourceProperties,
@@ -44,7 +45,7 @@ def _do_work() -> None:
4445
for _n_resample in range(resamples):
4546
for _n_sample in range(samples):
4647
now = now + timedelta(seconds=1 / samples)
47-
helper.add_sample(Sample(now, 0.0))
48+
helper.add_sample(Sample(now, Quantity(0.0)))
4849
helper.resample(now)
4950

5051
print(timeit(_do_work, number=5))

benchmarks/timeseries/ringbuffer_memusage.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import numpy as np
1313

1414
from frequenz.sdk.timeseries import Sample
15+
from frequenz.sdk.timeseries._quantities import Quantity
1516
from frequenz.sdk.timeseries._ringbuffer import OrderedRingBuffer
1617

1718
FIVE_MINUTES = timedelta(minutes=5)
@@ -75,7 +76,7 @@ def main(ringbuffer_len: int, iterations: int, gap_size: int) -> None:
7576
datetime.fromtimestamp(
7677
200 + i * FIVE_MINUTES.total_seconds(), tz=timezone.utc
7778
),
78-
i,
79+
Quantity(i),
7980
)
8081
)
8182

benchmarks/timeseries/ringbuffer_serialization.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
import frequenz.sdk.timeseries._ringbuffer as rb
1717
from frequenz.sdk.timeseries import Sample
18+
from frequenz.sdk.timeseries._quantities import Quantity
1819

1920
FILE_NAME = "ringbuffer.pkl"
2021
FIVE_MINUTES = timedelta(minutes=5)
@@ -72,7 +73,7 @@ def main() -> None:
7273
datetime.fromtimestamp(
7374
200 + i * FIVE_MINUTES.total_seconds(), tz=timezone.utc
7475
),
75-
i,
76+
Quantity(i),
7677
)
7778
)
7879

examples/power_distribution.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,9 @@ async def run(self) -> None:
126126
if (datetime.now(timezone.utc) - time_data).total_seconds() > 30:
127127
_logger.error("Active power data are stale")
128128
continue
129-
queue.put_nowait(active_power.value)
129+
queue.put_nowait(
130+
None if not active_power.value else active_power.value.base_value
131+
)
130132

131133
await self._request_channel.send(list(queue.queue))
132134

examples/resampling.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from frequenz.sdk.microgrid import connection_manager
2020
from frequenz.sdk.microgrid.component import ComponentCategory, ComponentMetricId
2121
from frequenz.sdk.timeseries import Sample
22+
from frequenz.sdk.timeseries._quantities import Quantity
2223
from frequenz.sdk.timeseries._resampling import Resampler, ResamplerConfig, Sink, Source
2324

2425
HOST = "microgrid.sandbox.api.frequenz.io"
@@ -33,8 +34,8 @@ async def _calculate_average(source: Source, sink: Sink) -> None:
3334
count += 1
3435
if sample.value is None:
3536
continue
36-
avg = avg * (count - 1) / count + sample.value / count
37-
await sink(Sample(datetime.now(timezone.utc), avg))
37+
avg = avg * (count - 1) / count + sample.value.base_value / count
38+
await sink(Sample(datetime.now(timezone.utc), Quantity(avg)))
3839

3940

4041
async def _print_sample(sample: Sample) -> None:

src/frequenz/sdk/actor/_data_sourcing/microgrid_api_source.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
MeterData,
2222
)
2323
from ...timeseries import Sample
24+
from ...timeseries._quantities import Quantity
2425
from .._channel_registry import ChannelRegistry
2526

2627

@@ -355,7 +356,9 @@ def process_msg(data: Any) -> None:
355356
tasks = []
356357
for extractor, senders in stream_senders:
357358
for sender in senders:
358-
tasks.append(sender.send(Sample(data.timestamp, extractor(data))))
359+
tasks.append(
360+
sender.send(Sample(data.timestamp, Quantity(extractor(data))))
361+
)
359362
asyncio.gather(*tasks)
360363
nonlocal pending_messages
361364
pending_messages -= 1

src/frequenz/sdk/timeseries/_base_types.py

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
from datetime import datetime, timezone
99
from typing import Callable, Iterator, Self, overload
1010

11+
from ._quantities import Quantity
12+
1113
UNIX_EPOCH = datetime.fromtimestamp(0.0, tz=timezone.utc)
1214
"""The UNIX epoch (in UTC)."""
1315

@@ -24,7 +26,7 @@ class Sample:
2426
timestamp: datetime
2527
"""The time when this sample was generated."""
2628

27-
value: float | None = None
29+
value: Quantity | None = None
2830
"""The value of this sample."""
2931

3032

@@ -40,16 +42,16 @@ class Sample3Phase:
4042

4143
timestamp: datetime
4244
"""The time when this sample was generated."""
43-
value_p1: float | None
45+
value_p1: Quantity | None
4446
"""The value of the 1st phase in this sample."""
4547

46-
value_p2: float | None
48+
value_p2: Quantity | None
4749
"""The value of the 2nd phase in this sample."""
4850

49-
value_p3: float | None
51+
value_p3: Quantity | None
5052
"""The value of the 3rd phase in this sample."""
5153

52-
def __iter__(self) -> Iterator[float | None]:
54+
def __iter__(self) -> Iterator[Quantity | None]:
5355
"""Return an iterator that yields values from each of the phases.
5456
5557
Yields:
@@ -60,14 +62,14 @@ def __iter__(self) -> Iterator[float | None]:
6062
yield self.value_p3
6163

6264
@overload
63-
def max(self, default: float) -> float:
65+
def max(self, default: Quantity) -> Quantity:
6466
...
6567

6668
@overload
67-
def max(self, default: None = None) -> float | None:
69+
def max(self, default: None = None) -> Quantity | None:
6870
...
6971

70-
def max(self, default: float | None = None) -> float | None:
72+
def max(self, default: Quantity | None = None) -> Quantity | None:
7173
"""Return the max value among all phases, or default if they are all `None`.
7274
7375
Args:
@@ -78,21 +80,21 @@ def max(self, default: float | None = None) -> float | None:
7880
"""
7981
if not any(self):
8082
return default
81-
value: float = functools.reduce(
83+
value: Quantity = functools.reduce(
8284
lambda x, y: x if x > y else y,
8385
filter(None, self),
8486
)
8587
return value
8688

8789
@overload
88-
def min(self, default: float) -> float:
90+
def min(self, default: Quantity) -> Quantity:
8991
...
9092

9193
@overload
92-
def min(self, default: None = None) -> float | None:
94+
def min(self, default: None = None) -> Quantity | None:
9395
...
9496

95-
def min(self, default: float | None = None) -> float | None:
97+
def min(self, default: Quantity | None = None) -> Quantity | None:
9698
"""Return the min value among all phases, or default if they are all `None`.
9799
98100
Args:
@@ -103,14 +105,16 @@ def min(self, default: float | None = None) -> float | None:
103105
"""
104106
if not any(self):
105107
return default
106-
value: float = functools.reduce(
108+
value: Quantity = functools.reduce(
107109
lambda x, y: x if x < y else y,
108110
filter(None, self),
109111
)
110112
return value
111113

112114
def map(
113-
self, function: Callable[[float], float], default: float | None = None
115+
self,
116+
function: Callable[[Quantity], Quantity],
117+
default: Quantity | None = None,
114118
) -> Self:
115119
"""Apply the given function on each of the phase values and return the result.
116120

0 commit comments

Comments
 (0)