Skip to content

Commit ba1bec5

Browse files
committed
Make Samples generic over Quantity types
Signed-off-by: Sahas Subramanian <[email protected]>
1 parent 63d7c92 commit ba1bec5

File tree

20 files changed

+107
-88
lines changed

20 files changed

+107
-88
lines changed

benchmarks/timeseries/periodic_feature_extractor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
async def init_feature_extractor(period: int) -> PeriodicFeatureExtractor:
3131
"""Initialize the PeriodicFeatureExtractor class."""
3232
# We only need the moving window to initialize the PeriodicFeatureExtractor class.
33-
lm_chan = Broadcast[Sample]("lm_net_power")
33+
lm_chan = Broadcast[Sample[Quantity]]("lm_net_power")
3434
moving_window = MovingWindow(
3535
timedelta(seconds=1), lm_chan.new_receiver(), timedelta(seconds=1)
3636
)

benchmarks/timeseries/resampling.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818

1919
def nop( # pylint: disable=unused-argument
20-
samples: Sequence[Sample],
20+
samples: Sequence[Sample[Quantity]],
2121
resampler_config: ResamplerConfig,
2222
source_properties: SourceProperties,
2323
) -> float:

examples/power_distribution.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
from frequenz.sdk.actor import ResamplerConfig
2323
from frequenz.sdk.actor.power_distributing import Result, Success
2424
from frequenz.sdk.timeseries import Sample
25+
from frequenz.sdk.timeseries._quantities import Quantity
2526

2627
_logger = logging.getLogger(__name__)
2728
HOST = "microgrid.sandbox.api.frequenz.io" # it should be the host name.
@@ -97,7 +98,7 @@ class DataCollectingActor:
9798
def __init__(
9899
self,
99100
request_channel: Sender[List[float]],
100-
active_power_data: Receiver[Sample],
101+
active_power_data: Receiver[Sample[Quantity]],
101102
) -> None:
102103
"""Create actor instance.
103104

examples/resampling.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ async def _calculate_average(source: Source, sink: Sink) -> None:
3838
await sink(Sample(datetime.now(timezone.utc), Quantity(avg)))
3939

4040

41-
async def _print_sample(sample: Sample) -> None:
41+
async def _print_sample(sample: Sample[Quantity]) -> None:
4242
print(f"\nResampled average at {sample.timestamp}: {sample.value}\n")
4343

4444

@@ -106,7 +106,7 @@ async def run() -> None: # pylint: disable=too-many-locals
106106
)
107107

108108
# Create a channel to calculate an average for all the data
109-
average_chan = Broadcast[Sample]("average")
109+
average_chan = Broadcast[Sample[Quantity]]("average")
110110

111111
second_stage_resampler = Resampler(
112112
ResamplerConfig(resampling_period=timedelta(seconds=3.0))
@@ -118,7 +118,7 @@ async def run() -> None: # pylint: disable=too-many-locals
118118
average_sender = average_chan.new_sender()
119119

120120
# Needed until channels Senders raises exceptions on errors
121-
async def sink_adapter(sample: Sample) -> None:
121+
async def sink_adapter(sample: Sample[Quantity]) -> None:
122122
await average_sender.send(sample)
123123

124124
print("Starting...")

src/frequenz/sdk/actor/_data_sourcing/microgrid_api_source.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,7 @@ def _get_metric_senders(
305305
self,
306306
category: ComponentCategory,
307307
requests: Dict[ComponentMetricId, List[ComponentMetricRequest]],
308-
) -> List[Tuple[Callable[[Any], float], List[Sender[Sample]]]]:
308+
) -> List[Tuple[Callable[[Any], float], List[Sender[Sample[Quantity]]]]]:
309309
"""Get channel senders from the channel registry for each requested metric.
310310
311311
Args:

src/frequenz/sdk/actor/_resampling.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
from .._internal._asyncio import cancel_and_await
1515
from ..timeseries import Sample
16+
from ..timeseries._quantities import Quantity
1617
from ..timeseries._resampling import Resampler, ResamplerConfig, ResamplingError
1718
from ._channel_registry import ChannelRegistry
1819
from ._data_sourcing import ComponentMetricRequest
@@ -80,7 +81,7 @@ async def _subscribe(self, request: ComponentMetricRequest) -> None:
8081
# exceptions to report errors.
8182
sender = self._channel_registry.new_sender(request.get_channel_name())
8283

83-
async def sink_adapter(sample: Sample) -> None:
84+
async def sink_adapter(sample: Sample[Quantity]) -> None:
8485
await sender.send(sample)
8586

8687
self._resampler.add_timeseries(request_channel_name, receiver, sink_adapter)

src/frequenz/sdk/timeseries/_base_types.py

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,16 @@
66
import functools
77
from dataclasses import dataclass
88
from datetime import datetime, timezone
9-
from typing import Callable, Iterator, Self, overload
9+
from typing import Callable, Generic, Iterator, Self, overload
1010

11-
from ._quantities import Quantity
11+
from ._quantities import QuantityT
1212

1313
UNIX_EPOCH = datetime.fromtimestamp(0.0, tz=timezone.utc)
1414
"""The UNIX epoch (in UTC)."""
1515

1616

1717
@dataclass(frozen=True, order=True)
18-
class Sample:
18+
class Sample(Generic[QuantityT]):
1919
"""A measurement taken at a particular point in time.
2020
2121
The `value` could be `None` if a component is malfunctioning or data is
@@ -26,12 +26,12 @@ class Sample:
2626
timestamp: datetime
2727
"""The time when this sample was generated."""
2828

29-
value: Quantity | None = None
29+
value: QuantityT | None = None
3030
"""The value of this sample."""
3131

3232

3333
@dataclass(frozen=True)
34-
class Sample3Phase:
34+
class Sample3Phase(Generic[QuantityT]):
3535
"""A 3-phase measurement made at a particular point in time.
3636
3737
Each of the `value` fields could be `None` if a component is malfunctioning
@@ -42,16 +42,16 @@ class Sample3Phase:
4242

4343
timestamp: datetime
4444
"""The time when this sample was generated."""
45-
value_p1: Quantity | None
45+
value_p1: QuantityT | None
4646
"""The value of the 1st phase in this sample."""
4747

48-
value_p2: Quantity | None
48+
value_p2: QuantityT | None
4949
"""The value of the 2nd phase in this sample."""
5050

51-
value_p3: Quantity | None
51+
value_p3: QuantityT | None
5252
"""The value of the 3rd phase in this sample."""
5353

54-
def __iter__(self) -> Iterator[Quantity | None]:
54+
def __iter__(self) -> Iterator[QuantityT | None]:
5555
"""Return an iterator that yields values from each of the phases.
5656
5757
Yields:
@@ -62,14 +62,14 @@ def __iter__(self) -> Iterator[Quantity | None]:
6262
yield self.value_p3
6363

6464
@overload
65-
def max(self, default: Quantity) -> Quantity:
65+
def max(self, default: QuantityT) -> QuantityT:
6666
...
6767

6868
@overload
69-
def max(self, default: None = None) -> Quantity | None:
69+
def max(self, default: None = None) -> QuantityT | None:
7070
...
7171

72-
def max(self, default: Quantity | None = None) -> Quantity | None:
72+
def max(self, default: QuantityT | None = None) -> QuantityT | None:
7373
"""Return the max value among all phases, or default if they are all `None`.
7474
7575
Args:
@@ -80,21 +80,21 @@ def max(self, default: Quantity | None = None) -> Quantity | None:
8080
"""
8181
if not any(self):
8282
return default
83-
value: Quantity = functools.reduce(
83+
value: QuantityT = functools.reduce(
8484
lambda x, y: x if x > y else y,
8585
filter(None, self),
8686
)
8787
return value
8888

8989
@overload
90-
def min(self, default: Quantity) -> Quantity:
90+
def min(self, default: QuantityT) -> QuantityT:
9191
...
9292

9393
@overload
94-
def min(self, default: None = None) -> Quantity | None:
94+
def min(self, default: None = None) -> QuantityT | None:
9595
...
9696

97-
def min(self, default: Quantity | None = None) -> Quantity | None:
97+
def min(self, default: QuantityT | None = None) -> QuantityT | None:
9898
"""Return the min value among all phases, or default if they are all `None`.
9999
100100
Args:
@@ -105,16 +105,16 @@ def min(self, default: Quantity | None = None) -> Quantity | None:
105105
"""
106106
if not any(self):
107107
return default
108-
value: Quantity = functools.reduce(
108+
value: QuantityT = functools.reduce(
109109
lambda x, y: x if x < y else y,
110110
filter(None, self),
111111
)
112112
return value
113113

114114
def map(
115115
self,
116-
function: Callable[[Quantity], Quantity],
117-
default: Quantity | None = None,
116+
function: Callable[[QuantityT], QuantityT],
117+
default: QuantityT | None = None,
118118
) -> Self:
119119
"""Apply the given function on each of the phase values and return the result.
120120

src/frequenz/sdk/timeseries/_formula_engine/_formula_engine.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ def __init__(
8282
self._output_type = output_type
8383

8484
async def _synchronize_metric_timestamps(
85-
self, metrics: Set[asyncio.Task[Optional[Sample]]]
85+
self, metrics: Set[asyncio.Task[Optional[Sample[Quantity]]]]
8686
) -> datetime:
8787
"""Synchronize the metric streams.
8888
@@ -129,7 +129,7 @@ async def _synchronize_metric_timestamps(
129129
self._first_run = False
130130
return latest_ts
131131

132-
async def apply(self) -> Sample:
132+
async def apply(self) -> Sample[Quantity]:
133133
"""Fetch the latest metrics, apply the formula once and return the result.
134134
135135
Returns:
@@ -304,7 +304,7 @@ def __init__(
304304
self._name: str = builder.name
305305
self._builder = builder
306306
self._output_type = output_type
307-
self._channel = Broadcast[Sample](self._name)
307+
self._channel = Broadcast[Sample[Quantity]](self._name)
308308

309309
async def _run(self) -> None:
310310
await self._builder.subscribe()
@@ -328,7 +328,7 @@ async def _run(self) -> None:
328328

329329
def new_receiver(
330330
self, name: Optional[str] = None, max_size: int = 50
331-
) -> Receiver[Sample]:
331+
) -> Receiver[Sample[Quantity]]:
332332
"""Create a new receiver that streams the output of the formula engine.
333333
334334
Args:
@@ -371,7 +371,7 @@ def __init__(
371371
self._higher_order_builder = HigherOrderFormulaBuilder3Phase
372372
self._name: str = name
373373
self._output_type = output_type
374-
self._channel = Broadcast[Sample3Phase](self._name)
374+
self._channel = Broadcast[Sample3Phase[Quantity]](self._name)
375375
self._task: asyncio.Task[None] | None = None
376376
self._streams: tuple[
377377
FormulaEngine, FormulaEngine, FormulaEngine
@@ -402,7 +402,7 @@ async def _run(self) -> None:
402402

403403
def new_receiver(
404404
self, name: Optional[str] = None, max_size: int = 50
405-
) -> Receiver[Sample3Phase]:
405+
) -> Receiver[Sample3Phase[Quantity]]:
406406
"""Create a new receiver that streams the output of the formula engine.
407407
408408
Args:
@@ -493,7 +493,7 @@ def push_oper(self, oper: str) -> None:
493493
def push_metric(
494494
self,
495495
name: str,
496-
data_stream: Receiver[Sample],
496+
data_stream: Receiver[Sample[Quantity]],
497497
nones_are_zeros: bool,
498498
) -> None:
499499
"""Push a metric receiver into the engine.
@@ -562,7 +562,9 @@ def push_clipper(self, min_value: float | None, max_value: float | None) -> None
562562
"""
563563
self._steps.append(Clipper(min_value, max_value))
564564

565-
def push_average(self, metrics: List[Tuple[str, Receiver[Sample], bool]]) -> None:
565+
def push_average(
566+
self, metrics: List[Tuple[str, Receiver[Sample[Quantity]], bool]]
567+
) -> None:
566568
"""Push an average calculator into the engine.
567569
568570
Args:

src/frequenz/sdk/timeseries/_formula_engine/_formula_steps.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from frequenz.channels import Receiver
1212

1313
from .. import Sample
14+
from .._quantities import Quantity
1415
from ._exceptions import FormulaEngineError
1516

1617

@@ -263,7 +264,7 @@ class MetricFetcher(FormulaStep):
263264
"""A formula step for fetching a value from a metric Receiver."""
264265

265266
def __init__(
266-
self, name: str, stream: Receiver[Sample], nones_are_zeros: bool
267+
self, name: str, stream: Receiver[Sample[Quantity]], nones_are_zeros: bool
267268
) -> None:
268269
"""Create a `MetricFetcher` instance.
269270
@@ -274,10 +275,10 @@ def __init__(
274275
"""
275276
self._name = name
276277
self._stream = stream
277-
self._next_value: Optional[Sample] = None
278+
self._next_value: Optional[Sample[Quantity]] = None
278279
self._nones_are_zeros = nones_are_zeros
279280

280-
async def fetch_next(self) -> Optional[Sample]:
281+
async def fetch_next(self) -> Optional[Sample[Quantity]]:
281282
"""Fetch the next value from the stream.
282283
283284
To be called before each call to `apply`.
@@ -289,7 +290,7 @@ async def fetch_next(self) -> Optional[Sample]:
289290
return self._next_value
290291

291292
@property
292-
def value(self) -> Optional[Sample]:
293+
def value(self) -> Optional[Sample[Quantity]]:
293294
"""Get the next value in the stream.
294295
295296
Returns:

src/frequenz/sdk/timeseries/_formula_engine/_resampled_formula_builder.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ def __init__( # pylint: disable=too-many-arguments
5454

5555
def _get_resampled_receiver(
5656
self, component_id: int, metric_id: ComponentMetricId
57-
) -> Receiver[Sample]:
57+
) -> Receiver[Sample[Quantity]]:
5858
"""Get a receiver with the resampled data for the given component id.
5959
6060
Args:

0 commit comments

Comments
 (0)