Skip to content

Commit 571ff78

Browse files
Add from_receiver method to FormulaEngine (#489)
2 parents 76de40d + 5383893 commit 571ff78

File tree

2 files changed

+80
-0
lines changed

2 files changed

+80
-0
lines changed

src/frequenz/sdk/timeseries/_formula_engine/_formula_engine.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,62 @@ def __init__(
325325
self._create_method = create_method
326326
self._channel: Broadcast[Sample[QuantityT]] = Broadcast(self._name)
327327

328+
@classmethod
329+
def from_receiver(
330+
cls,
331+
name: str,
332+
receiver: Receiver[Sample[QuantityT]],
333+
create_method: Callable[[float], QuantityT],
334+
nones_are_zeros: bool = False,
335+
) -> FormulaEngine[QuantityT]:
336+
"""
337+
Create a formula engine from a receiver.
338+
339+
Can be used to compose a formula engine with a receiver. When composing
340+
the new engine with other engines, make sure that receiver gets data
341+
from the same resampler and that the `create_method`s match.
342+
343+
Example:
344+
```python
345+
from frequenz.sdk import microgrid
346+
from frequenz.sdk.timeseries import Power
347+
348+
async def run() -> None:
349+
producer_power_engine = microgrid.logical_meter().producer_power
350+
consumer_power_recv = (
351+
microgrid.logical_meter().consumer_power.new_receiver()
352+
)
353+
354+
excess_power_recv = (
355+
(
356+
producer_power_engine
357+
+ FormulaEngine.from_receiver(
358+
"consumer power",
359+
consumer_power_recv,
360+
Power.from_watts,
361+
)
362+
)
363+
.build("excess power")
364+
.new_receiver()
365+
)
366+
367+
asyncio.run(run())
368+
```
369+
370+
Args:
371+
receiver: A receiver that streams `Sample`s.
372+
name: A name for the formula engine.
373+
create_method: A method to generate the output `Sample` value with,
374+
e.g. `Power.from_watts`.
375+
nones_are_zeros: If `True`, `None` values in the receiver are treated as 0.
376+
377+
Returns:
378+
A formula engine that streams the `Sample`s from the receiver.
379+
"""
380+
builder = FormulaBuilder(name, create_method)
381+
builder.push_metric(name, receiver, nones_are_zeros=nones_are_zeros)
382+
return cls(builder, create_method)
383+
328384
async def _run(self) -> None:
329385
await self._builder.subscribe()
330386
steps, metric_fetchers = self._builder.finalize()

tests/timeseries/test_formula_engine.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -776,3 +776,27 @@ async def test_types(self) -> None:
776776
result = await results_rx.receive()
777777
assert result is not None and result.value is not None
778778
assert result.value.as_watts() == 160.0
779+
780+
781+
class TestFromReceiver:
782+
"""Test creating a formula engine from a receiver."""
783+
784+
async def test_from_receiver(self) -> None:
785+
"""Test creating a formula engine from a receiver."""
786+
channel = Broadcast[Sample[Power]]("channel_1")
787+
sender = channel.new_sender()
788+
789+
builder = FormulaBuilder("test_from_receiver", create_method=Power.from_watts)
790+
builder.push_metric("channel_1", channel.new_receiver(), False)
791+
engine = builder.build()
792+
793+
engine_from_receiver = FormulaEngine.from_receiver(
794+
"test_from_receiver", engine.new_receiver(), create_method=Power.from_watts
795+
)
796+
797+
results_rx = engine_from_receiver.new_receiver()
798+
799+
await sender.send(Sample(datetime.now(), Power.from_watts(10.0)))
800+
result = await results_rx.receive()
801+
assert result is not None and result.value is not None
802+
assert result.value.as_watts() == 10.0

0 commit comments

Comments
 (0)