Skip to content

Commit f67039e

Browse files
committed
Take custom constructors in FormulaEngine for creating quantity objects
This commit also updates the FormulaBuilders to take custom constructors as arguments. FormulaEngines were previously taking the type of the quantity, with which, it could use only the default constructors to create objects. Because support for using default constructors is going away, FormulaEngines and FormulaBuilders need to know the exact constructor to use for creating output quantities. Signed-off-by: Sahas Subramanian <[email protected]>
1 parent b55b559 commit f67039e

File tree

6 files changed

+63
-62
lines changed

6 files changed

+63
-62
lines changed

src/frequenz/sdk/timeseries/_formula_engine/_formula_engine.py

Lines changed: 39 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from datetime import datetime
1313
from math import isinf, isnan
1414
from typing import (
15+
Callable,
1516
Dict,
1617
Generic,
1718
List,
@@ -63,23 +64,23 @@ def __init__(
6364
name: str,
6465
steps: List[FormulaStep],
6566
metric_fetchers: Dict[str, MetricFetcher[QuantityT]],
66-
output_type: Type[QuantityT],
67+
create_method: Callable[[float], QuantityT],
6768
) -> None:
6869
"""Create a `FormulaEngine` instance.
6970
7071
Args:
7172
name: A name for the formula.
7273
steps: Steps for the engine to execute, in post-fix order.
7374
metric_fetchers: Fetchers for each metric stream the formula depends on.
74-
output_type: A type object to generate the output `Sample` with. If the
75-
formula is for generating power values, this would be `Power`, for
76-
example.
75+
create_method: A method to generate the output `Sample` value with. If the
76+
formula is for generating power values, this would be
77+
`Power.from_watts`, for example.
7778
"""
7879
self._name = name
7980
self._steps = steps
8081
self._metric_fetchers: Dict[str, MetricFetcher[QuantityT]] = metric_fetchers
8182
self._first_run = True
82-
self._output_type: type[QuantityT] = output_type
83+
self._create_method: Callable[[float], QuantityT] = create_method
8384

8485
async def _synchronize_metric_timestamps(
8586
self, metrics: Set[asyncio.Task[Optional[Sample[QuantityT]]]]
@@ -172,7 +173,7 @@ async def apply(self) -> Sample[QuantityT]:
172173
if isnan(res) or isinf(res):
173174
return Sample(metric_ts, None)
174175

175-
return Sample(metric_ts, self._output_type(res))
176+
return Sample(metric_ts, self._create_method(res))
176177

177178

178179
_CompositionType = Union[
@@ -218,7 +219,7 @@ class _ComposableFormulaEngine(
218219
):
219220
"""A base class for formula engines."""
220221

221-
_output_type: Type[QuantityT]
222+
_create_method: Callable[[float], QuantityT]
222223
_higher_order_builder: Type[_GenericHigherOrderBuilder]
223224
_task: asyncio.Task[None] | None = None
224225

@@ -242,7 +243,7 @@ def __add__(
242243
A formula builder that can take further expressions, or can be built
243244
into a formula engine.
244245
"""
245-
return self._higher_order_builder(self, self._output_type) + other # type: ignore
246+
return self._higher_order_builder(self, self._create_method) + other # type: ignore
246247

247248
def __sub__(
248249
self, other: _GenericEngine | _GenericHigherOrderBuilder
@@ -257,7 +258,7 @@ def __sub__(
257258
A formula builder that can take further expressions, or can be built
258259
into a formula engine.
259260
"""
260-
return self._higher_order_builder(self, self._output_type) - other # type: ignore
261+
return self._higher_order_builder(self, self._create_method) - other # type: ignore
261262

262263
def __mul__(
263264
self, other: _GenericEngine | _GenericHigherOrderBuilder
@@ -272,7 +273,7 @@ def __mul__(
272273
A formula builder that can take further expressions, or can be built
273274
into a formula engine.
274275
"""
275-
return self._higher_order_builder(self, self._output_type) * other # type: ignore
276+
return self._higher_order_builder(self, self._create_method) * other # type: ignore
276277

277278
def __truediv__(
278279
self, other: _GenericEngine | _GenericHigherOrderBuilder
@@ -287,7 +288,7 @@ def __truediv__(
287288
A formula builder that can take further expressions, or can be built
288289
into a formula engine.
289290
"""
290-
return self._higher_order_builder(self, self._output_type) / other # type: ignore
291+
return self._higher_order_builder(self, self._create_method) / other # type: ignore
291292

292293

293294
class FormulaEngine(
@@ -307,28 +308,28 @@ class FormulaEngine(
307308
def __init__(
308309
self,
309310
builder: FormulaBuilder[QuantityT],
310-
output_type: Type[QuantityT],
311+
create_method: Callable[[float], QuantityT],
311312
) -> None:
312313
"""Create a `FormulaEngine` instance.
313314
314315
Args:
315316
builder: A `FormulaBuilder` instance to get the formula steps and metric
316317
fetchers from.
317-
output_type: A type object to generate the output `Sample` with. If the
318-
formula is for generating power values, this would be `Power`, for
319-
example.
318+
create_method: A method to generate the output `Sample` value with. If the
319+
formula is for generating power values, this would be
320+
`Power.from_watts`, for example.
320321
"""
321322
self._higher_order_builder = HigherOrderFormulaBuilder
322323
self._name: str = builder.name
323324
self._builder: FormulaBuilder[QuantityT] = builder
324-
self._output_type: Type[QuantityT] = output_type # type: ignore
325+
self._create_method = create_method
325326
self._channel: Broadcast[Sample[QuantityT]] = Broadcast(self._name)
326327

327328
async def _run(self) -> None:
328329
await self._builder.subscribe()
329330
steps, metric_fetchers = self._builder.finalize()
330331
evaluator = FormulaEvaluator[QuantityT](
331-
self._name, steps, metric_fetchers, self._output_type
332+
self._name, steps, metric_fetchers, self._create_method
332333
)
333334
sender = self._channel.new_sender()
334335
while True:
@@ -378,7 +379,7 @@ class FormulaEngine3Phase(
378379
def __init__(
379380
self,
380381
name: str,
381-
output_type: Type[QuantityT],
382+
create_method: Callable[[float], QuantityT],
382383
phase_streams: Tuple[
383384
FormulaEngine[QuantityT],
384385
FormulaEngine[QuantityT],
@@ -389,14 +390,14 @@ def __init__(
389390
390391
Args:
391392
name: A name for the formula.
392-
output_type: A type object to generate the output `Sample` with. If the
393-
formula is for generating power values, this would be `Power`, for
394-
example.
393+
create_method: A method to generate the output `Sample` value with. If the
394+
formula is for generating power values, this would be
395+
`Power.from_watts`, for example.
395396
phase_streams: output streams of formula engines running per-phase formulas.
396397
"""
397398
self._higher_order_builder = HigherOrderFormulaBuilder3Phase
398399
self._name: str = name
399-
self._output_type = output_type
400+
self._create_method = create_method
400401
self._channel: Broadcast[Sample3Phase[QuantityT]] = Broadcast(self._name)
401402
self._task: asyncio.Task[None] | None = None
402403
self._streams: tuple[
@@ -474,17 +475,17 @@ class FormulaBuilder(Generic[QuantityT]):
474475
add the values and return the result.
475476
"""
476477

477-
def __init__(self, name: str, output_type: Type[QuantityT]) -> None:
478+
def __init__(self, name: str, create_method: Callable[[float], QuantityT]) -> None:
478479
"""Create a `FormulaBuilder` instance.
479480
480481
Args:
481482
name: A name for the formula being built.
482-
output_type: A type object to generate the output `Sample` with. If the
483-
formula is for generating power values, this would be `Power`, for
484-
example.
483+
create_method: A method to generate the output `Sample` value with. If the
484+
formula is for generating power values, this would be
485+
`Power.from_watts`, for example.
485486
"""
486487
self._name = name
487-
self._output_type: Type[QuantityT] = output_type
488+
self._create_method: Callable[[float], QuantityT] = create_method
488489
self._build_stack: List[FormulaStep] = []
489490
self._steps: List[FormulaStep] = []
490491
self._metric_fetchers: Dict[str, MetricFetcher[QuantityT]] = {}
@@ -648,7 +649,7 @@ def build(self) -> FormulaEngine[QuantityT]:
648649
A `FormulaEngine` instance.
649650
"""
650651
self.finalize()
651-
return FormulaEngine(self, output_type=self._output_type)
652+
return FormulaEngine(self, create_method=self._create_method)
652653

653654

654655
class _BaseHOFormulaBuilder(ABC, Generic[QuantityT]):
@@ -657,16 +658,16 @@ class _BaseHOFormulaBuilder(ABC, Generic[QuantityT]):
657658
def __init__(
658659
self,
659660
engine: FormulaEngine[QuantityT] | FormulaEngine3Phase[QuantityT],
660-
output_type: Type[QuantityT],
661+
create_method: Callable[[float], QuantityT],
661662
) -> None:
662663
"""Create a `GenericHigherOrderFormulaBuilder` instance.
663664
664665
Args:
665666
engine: A first input stream to create a builder with, so that python
666667
operators `+, -, *, /` can be used directly on newly created instances.
667-
output_type: A type object to generate the output `Sample` with. If the
668-
formula is for generating power values, this would be `Power`, for
669-
example.
668+
create_method: A method to generate the output `Sample` value with. If the
669+
formula is for generating power values, this would be
670+
`Power.from_watts`, for example.
670671
"""
671672
self._steps: deque[
672673
tuple[
@@ -675,7 +676,7 @@ def __init__(
675676
]
676677
] = deque()
677678
self._steps.append((TokenType.COMPONENT_METRIC, engine))
678-
self._output_type: Type[QuantityT] = output_type
679+
self._create_method: Callable[[float], QuantityT] = create_method
679680

680681
@overload
681682
def _push(
@@ -854,7 +855,7 @@ def build(
854855
Returns:
855856
A `FormulaEngine` instance.
856857
"""
857-
builder = FormulaBuilder(name, self._output_type)
858+
builder = FormulaBuilder(name, self._create_method)
858859
for typ, value in self._steps:
859860
if typ == TokenType.COMPONENT_METRIC:
860861
assert isinstance(value, FormulaEngine)
@@ -888,9 +889,9 @@ def build(
888889
A `FormulaEngine3Phase` instance.
889890
"""
890891
builders = [
891-
FormulaBuilder(name, self._output_type),
892-
FormulaBuilder(name, self._output_type),
893-
FormulaBuilder(name, self._output_type),
892+
FormulaBuilder(name, self._create_method),
893+
FormulaBuilder(name, self._create_method),
894+
FormulaBuilder(name, self._create_method),
894895
]
895896
for typ, value in self._steps:
896897
if typ == TokenType.COMPONENT_METRIC:
@@ -909,7 +910,7 @@ def build(
909910
builders[phase].push_oper(value)
910911
return FormulaEngine3Phase(
911912
name,
912-
self._output_type,
913+
self._create_method,
913914
(
914915
builders[0].build(),
915916
builders[1].build(),

src/frequenz/sdk/timeseries/_formula_engine/_formula_generators/_formula_generator.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from collections import abc
1111
from dataclasses import dataclass
1212
from enum import Enum
13-
from typing import Generic, Type
13+
from typing import Callable, Generic
1414

1515
from frequenz.channels import Sender
1616

@@ -88,15 +88,15 @@ def _get_builder(
8888
self,
8989
name: str,
9090
component_metric_id: ComponentMetricId,
91-
output_type: Type[QuantityT],
91+
create_method: Callable[[float], QuantityT],
9292
) -> ResampledFormulaBuilder[QuantityT]:
9393
builder = ResampledFormulaBuilder(
9494
self._namespace,
9595
name,
9696
self._channel_registry,
9797
self._resampler_subscription_sender,
9898
component_metric_id,
99-
output_type,
99+
create_method,
100100
)
101101
return builder
102102

src/frequenz/sdk/timeseries/_formula_engine/_resampled_formula_builder.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
from __future__ import annotations
88

9-
from typing import Generic, Type
9+
from typing import Callable, Generic
1010

1111
from frequenz.channels import Receiver, Sender
1212

@@ -28,7 +28,7 @@ def __init__( # pylint: disable=too-many-arguments
2828
channel_registry: ChannelRegistry,
2929
resampler_subscription_sender: Sender[ComponentMetricRequest],
3030
metric_id: ComponentMetricId,
31-
output_type: Type[QuantityT],
31+
create_method: Callable[[float], QuantityT],
3232
) -> None:
3333
"""Create a `ResampledFormulaBuilder` instance.
3434
@@ -41,16 +41,16 @@ def __init__( # pylint: disable=too-many-arguments
4141
resampler_subscription_sender: A sender to send metric requests to the
4242
resampling actor.
4343
metric_id: A metric ID to fetch for all components in this formula.
44-
output_type: A type object to generate the output `Sample` with. If the
45-
formula is for generating power values, this would be `Power`, for
46-
example.
44+
create_method: A method to generate the output `Sample` value with. If the
45+
formula is for generating power values, this would be
46+
`Power.from_watts`, for example.
4747
"""
4848
self._channel_registry = channel_registry
4949
self._resampler_subscription_sender = resampler_subscription_sender
5050
self._namespace = namespace
5151
self._metric_id = metric_id
5252
self._resampler_requests: list[ComponentMetricRequest] = []
53-
super().__init__(formula_name, output_type) # type: ignore[arg-type]
53+
super().__init__(formula_name, create_method) # type: ignore[arg-type]
5454

5555
def _get_resampled_receiver(
5656
self, component_id: int, metric_id: ComponentMetricId

tests/timeseries/_formula_engine/test_formula_composition.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ async def test_formula_composition( # pylint: disable=too-many-locals
3535
logical_meter._namespace, # pylint: disable=protected-access
3636
4,
3737
ComponentMetricId.ACTIVE_POWER,
38-
Power,
38+
Power.from_watts,
3939
)
4040
grid_power_recv = logical_meter.grid_power.new_receiver()
4141
battery_power_recv = battery_pool.power.new_receiver()

tests/timeseries/_formula_engine/utils.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from __future__ import annotations
77

88
from math import isclose
9-
from typing import Type
9+
from typing import Callable
1010

1111
from frequenz.channels import Receiver
1212

@@ -21,7 +21,7 @@ def get_resampled_stream(
2121
namespace: str,
2222
comp_id: int,
2323
metric_id: ComponentMetricId,
24-
output_type: Type[QuantityT],
24+
create_method: Callable[[float], QuantityT],
2525
) -> Receiver[Sample[QuantityT]]:
2626
"""Return the resampled data stream for the given component."""
2727
# Create a `FormulaBuilder` instance, just in order to reuse its
@@ -34,7 +34,7 @@ def get_resampled_stream(
3434
_data_pipeline._get()._channel_registry,
3535
_data_pipeline._get()._resampling_request_sender(),
3636
metric_id,
37-
output_type,
37+
create_method,
3838
)
3939
# Resampled data is always `Quantity` type, so we need to convert it to the desired
4040
# output type.
@@ -44,7 +44,7 @@ def get_resampled_stream(
4444
).map(
4545
lambda sample: Sample(
4646
sample.timestamp,
47-
None if sample.value is None else output_type(sample.value.base_value),
47+
None if sample.value is None else create_method(sample.value.base_value),
4848
)
4949
)
5050
# pylint: enable=protected-access

0 commit comments

Comments
 (0)