Skip to content

Commit b4ffa2a

Browse files
committed
Add a 3-phase formula engine implementation
It is a simple implementation that takes the outputs of 3 single-phase formula engines and packages received samples into `Sample3Phase` objects. Also add 3-phase channel/receiver/builder specializations using the newly introduced `FormulaEngine3Phase`. Signed-off-by: Sahas Subramanian <[email protected]>
1 parent eee8025 commit b4ffa2a

File tree

1 file changed

+138
-8
lines changed

1 file changed

+138
-8
lines changed

src/frequenz/sdk/timeseries/logical_meter/_formula_engine.py

Lines changed: 138 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
from frequenz.channels._broadcast import Receiver as BroadcastReceiver
2020

2121
from ..._internal.asyncio import cancel_and_await
22-
from .. import Sample
22+
from .. import Sample, Sample3Phase
2323
from ._formula_steps import (
2424
Adder,
2525
Averager,
@@ -222,6 +222,72 @@ def new_receiver(
222222
return self._channel.new_receiver(name, max_size)
223223

224224

225+
class FormulaEngine3Phase:
226+
"""
227+
The FormulaEngine evaluates formulas and streams the results.
228+
229+
Use the `FormulaBuilder` to create `FormulaEngine` instances.
230+
"""
231+
232+
def __init__(
233+
self,
234+
name: str,
235+
phase_streams: Tuple[FormulaReceiver, FormulaReceiver, FormulaReceiver],
236+
) -> None:
237+
"""Create a `FormulaEngine` instance.
238+
239+
Args:
240+
name: A name for the formula.
241+
phase_streams: output streams of formula engines running per-phase formulas.
242+
"""
243+
self._name = name
244+
self._channel = FormulaChannel3Phase(self._name, self)
245+
self._task = None
246+
self._streams = phase_streams
247+
248+
async def _run(self) -> None:
249+
sender = self._channel.new_sender()
250+
while True:
251+
try:
252+
phase_1 = await self._streams[0].receive()
253+
phase_2 = await self._streams[1].receive()
254+
phase_3 = await self._streams[2].receive()
255+
msg = Sample3Phase(
256+
phase_1.timestamp,
257+
phase_1.value,
258+
phase_2.value,
259+
phase_3.value,
260+
)
261+
except asyncio.CancelledError:
262+
logger.exception("FormulaEngine task cancelled: %s", self._name)
263+
break
264+
else:
265+
await sender.send(msg)
266+
267+
async def _stop(self) -> None:
268+
"""Stop a running formula engine."""
269+
if self._task is None:
270+
return
271+
await cancel_and_await(self._task)
272+
273+
def new_receiver(
274+
self, name: Optional[str] = None, max_size: int = 50
275+
) -> FormulaReceiver3Phase:
276+
"""Create a new receiver that streams the output of the formula engine.
277+
278+
Args:
279+
name: An optional name for the receiver.
280+
max_size: The size of the receiver's buffer.
281+
282+
Returns:
283+
A receiver that streams output `Sample`s from the formula engine.
284+
"""
285+
if self._task is None:
286+
self._task = asyncio.create_task(self._run())
287+
288+
return self._channel.new_receiver(name, max_size)
289+
290+
225291
class FormulaBuilder:
226292
"""Builds a post-fix formula engine that operates on `Sample` receivers.
227293
@@ -332,11 +398,19 @@ def build(self) -> FormulaEngine:
332398
return FormulaEngine(self._name, self._steps, self._metric_fetchers)
333399

334400

335-
_GenericSample = TypeVar("_GenericSample")
336-
_GenericEngine = TypeVar("_GenericEngine")
337-
_GenericFormulaChannel = TypeVar("_GenericFormulaChannel")
338-
_GenericFormulaReceiver = TypeVar("_GenericFormulaReceiver")
339-
_GenericHOFormulaBuilder = TypeVar("_GenericHOFormulaBuilder")
401+
_GenericSample = TypeVar("_GenericSample", Sample, Sample3Phase)
402+
_GenericEngine = TypeVar("_GenericEngine", FormulaEngine, FormulaEngine3Phase)
403+
_GenericFormulaChannel = TypeVar(
404+
"_GenericFormulaChannel", "FormulaChannel", "FormulaChannel3Phase"
405+
)
406+
_GenericFormulaReceiver = TypeVar(
407+
"_GenericFormulaReceiver", "FormulaReceiver", "FormulaReceiver3Phase"
408+
)
409+
_GenericHOFormulaBuilder = TypeVar(
410+
"_GenericHOFormulaBuilder",
411+
"HigherOrderFormulaBuilder",
412+
"HigherOrderFormulaBuilder3Phase",
413+
)
340414

341415

342416
class _BaseFormulaChannel(
@@ -346,7 +420,7 @@ class _BaseFormulaChannel(
346420
):
347421
"""A broadcast channel implementation for use with formulas."""
348422

349-
ReceiverType: Type[FormulaReceiver]
423+
ReceiverType: Type[FormulaReceiver | FormulaReceiver3Phase]
350424

351425
def __init__(
352426
self, name: str, engine: _GenericEngine, resend_latest: bool = False
@@ -408,7 +482,7 @@ class _BaseFormulaReceiver(
408482
formulas.
409483
"""
410484

411-
BuilderType: Type[HigherOrderFormulaBuilder]
485+
BuilderType: Type[HigherOrderFormulaBuilder | HigherOrderFormulaBuilder3Phase]
412486

413487
def __init__(
414488
self,
@@ -676,13 +750,69 @@ def build(self, name: str, nones_are_zeros: bool = False) -> FormulaEngine:
676750
return self._engine
677751

678752

753+
class HigherOrderFormulaBuilder3Phase(
754+
_BaseHOFormulaBuilder["FormulaReceiver3Phase", Sample3Phase, FormulaEngine3Phase]
755+
):
756+
"""A specialization of the _BaseHOFormulaBuilder for `FormulaReceiver3Phase`."""
757+
758+
def build(self, name: str, nones_are_zeros: bool = False) -> FormulaEngine3Phase:
759+
"""Build a `FormulaEngine3Phase` instance from the builder.
760+
761+
Args:
762+
name: A name for the newly generated formula.
763+
nones_are_zeros: whether `None` values in any of the input streams should be
764+
treated as zeros.
765+
766+
Returns:
767+
A `FormulaEngine3Phase` instance.
768+
"""
769+
builders = [FormulaBuilder(name), FormulaBuilder(name), FormulaBuilder(name)]
770+
for step in self._steps:
771+
if step[0] == TokenType.COMPONENT_METRIC:
772+
assert isinstance(step[1], FormulaReceiver3Phase)
773+
for phase in range(3):
774+
builders[phase].push_metric(
775+
f"{step[1].name}-{phase+1}",
776+
step[1] # pylint: disable=protected-access
777+
.engine._streams[phase]
778+
.clone(),
779+
nones_are_zeros,
780+
)
781+
step[1]._deactivate() # pylint: disable=protected-access
782+
elif step[0] == TokenType.OPER:
783+
assert isinstance(step[1], str)
784+
for phase in range(3):
785+
builders[phase].push_oper(step[1])
786+
self._engine = FormulaEngine3Phase(
787+
name,
788+
(
789+
builders[0].build().new_receiver(),
790+
builders[1].build().new_receiver(),
791+
builders[2].build().new_receiver(),
792+
),
793+
)
794+
return self._engine
795+
796+
679797
class FormulaReceiver(_BaseFormulaReceiver[Sample, FormulaEngine]):
680798
"""A specialization of the _BaseFormulaChannel for `Sample` objects."""
681799

682800
BuilderType = HigherOrderFormulaBuilder
683801

684802

803+
class FormulaReceiver3Phase(_BaseFormulaReceiver[Sample3Phase, FormulaEngine3Phase]):
804+
"""A specialization of the _BaseFormulaChannel for `Sample3Phase` objects."""
805+
806+
BuilderType = HigherOrderFormulaBuilder3Phase
807+
808+
685809
class FormulaChannel(_BaseFormulaChannel[Sample, FormulaEngine]):
686810
"""A specialization of the _BaseFormulaChannel for `Sample` objects."""
687811

688812
ReceiverType = FormulaReceiver
813+
814+
815+
class FormulaChannel3Phase(_BaseFormulaChannel[Sample3Phase, FormulaEngine3Phase]):
816+
"""A specialization of the _BaseFormulaChannel for `Sample3Phase` objects."""
817+
818+
ReceiverType = FormulaReceiver3Phase

0 commit comments

Comments
 (0)