Skip to content

Commit 0a65574

Browse files
Add from_receiver method to FormulaEngine
When composing two formula engines it's not possible to access the individual elements. In such a case one can first alter the individual streams before composing. But altering is only supported for channel receivers while composing only works on formula engines. This method can be used to construct a formula engine from a receiver even after the receivers samples have been manipulated. Signed-off-by: Matthias Wende <[email protected]>
1 parent 0e884b7 commit 0a65574

File tree

1 file changed

+56
-0
lines changed

1 file changed

+56
-0
lines changed

src/frequenz/sdk/timeseries/_formula_engine/_formula_engine.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,62 @@ def __init__(
325325
self._create_method = create_method
326326
self._channel: Broadcast[Sample[QuantityT]] = Broadcast(self._name)
327327

328+
@classmethod
329+
def from_receiver(
330+
cls,
331+
name: str,
332+
receiver: Receiver[Sample[QuantityT]],
333+
create_method: Callable[[float], QuantityT],
334+
nones_are_zeros: bool = False,
335+
) -> FormulaEngine[QuantityT]:
336+
"""
337+
Create a formula engine from a receiver.
338+
339+
Can be used to compose a formula engine with a receiver. When composing
340+
the new engine with other engines, make sure that receiver gets data
341+
from the same resampler and that the `create_method`s match.
342+
343+
Example:
344+
```python
345+
from frequenz.sdk import microgrid
346+
from frequenz.sdk.timeseries import Power
347+
348+
async def run() -> None:
349+
producer_power_engine = microgrid.logical_meter().producer_power
350+
consumer_power_recv = (
351+
microgrid.logical_meter().consumer_power.new_receiver()
352+
)
353+
354+
excess_power_recv = (
355+
(
356+
producer_power_engine
357+
+ FormulaEngine.from_receiver(
358+
"consumer power",
359+
consumer_power_recv,
360+
Power.from_watts,
361+
)
362+
)
363+
.build("excess power")
364+
.new_receiver()
365+
)
366+
367+
asyncio.run(run())
368+
```
369+
370+
Args:
371+
receiver: A receiver that streams `Sample`s.
372+
name: A name for the formula engine.
373+
create_method: A method to generate the output `Sample` value with,
374+
e.g. `Power.from_watts`.
375+
nones_are_zeros: If `True`, `None` values in the receiver are treated as 0.
376+
377+
Returns:
378+
A formula engine that streams the `Sample`s from the receiver.
379+
"""
380+
builder = FormulaBuilder(name, create_method)
381+
builder.push_metric(name, receiver, nones_are_zeros=nones_are_zeros)
382+
return cls(builder, create_method)
383+
328384
async def _run(self) -> None:
329385
await self._builder.subscribe()
330386
steps, metric_fetchers = self._builder.finalize()

0 commit comments

Comments
 (0)