Skip to content

Commit eee8025

Browse files
committed
Extract formula eval logic from FormulaEngine into a separate class
... called `FormulaEvaluator`, so that it can be reused in the engine for 3-phase formulas. Signed-off-by: Sahas Subramanian <[email protected]>
1 parent 42e8eed commit eee8025

File tree

2 files changed

+40
-12
lines changed

2 files changed

+40
-12
lines changed

src/frequenz/sdk/timeseries/logical_meter/_formula_engine.py

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,8 @@
4444
}
4545

4646

47-
class FormulaEngine:
48-
"""A post-fix formula engine that operates on `Sample` receivers.
49-
50-
Use the `FormulaBuilder` to create `FormulaEngine` instances.
51-
"""
47+
class FormulaEvaluator:
48+
"""A post-fix formula evaluator that operates on `Sample` receivers."""
5249

5350
def __init__(
5451
self,
@@ -67,8 +64,6 @@ def __init__(
6764
self._steps = steps
6865
self._metric_fetchers = metric_fetchers
6966
self._first_run = True
70-
self._channel = FormulaChannel(self._name, self)
71-
self._task = None
7267

7368
async def _synchronize_metric_timestamps(
7469
self, metrics: Set[asyncio.Task[Optional[Sample]]]
@@ -117,7 +112,7 @@ async def _synchronize_metric_timestamps(
117112
self._first_run = False
118113
return latest_ts
119114

120-
async def _apply(self) -> Sample:
115+
async def apply(self) -> Sample:
121116
"""Fetch the latest metrics, apply the formula once and return the result.
122117
123118
Returns:
@@ -162,11 +157,37 @@ async def _apply(self) -> Sample:
162157

163158
return Sample(metric_ts, res)
164159

160+
161+
class FormulaEngine:
162+
"""
163+
The FormulaEngine evaluates formulas and streams the results.
164+
165+
Use the `FormulaBuilder` to create `FormulaEngine` instances.
166+
"""
167+
168+
def __init__(
169+
self,
170+
name: str,
171+
steps: List[FormulaStep],
172+
metric_fetchers: Dict[str, MetricFetcher],
173+
) -> None:
174+
"""Create a `FormulaEngine` instance.
175+
176+
Args:
177+
name: A name for the formula.
178+
steps: Steps for the engine to execute, in post-fix order.
179+
metric_fetchers: Fetchers for each metric stream the formula depends on.
180+
"""
181+
self._name = name
182+
self._channel = FormulaChannel(self._name, self)
183+
self._task = None
184+
self._evaluator = FormulaEvaluator(name, steps, metric_fetchers)
185+
165186
async def _run(self) -> None:
166187
sender = self._channel.new_sender()
167188
while True:
168189
try:
169-
msg = await self._apply()
190+
msg = await self._evaluator.apply()
170191
except asyncio.CancelledError:
171192
logger.exception("FormulaEngine task cancelled: %s", self._name)
172193
break

tests/timeseries/test_formula_engine.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,10 @@ async def run_test(
6767
builder.push_oper(token.value)
6868
engine = builder.build()
6969

70-
assert repr(engine._steps) == postfix # pylint: disable=protected-access
70+
assert (
71+
repr(engine._evaluator._steps) # pylint: disable=protected-access
72+
== postfix
73+
)
7174

7275
now = datetime.now()
7376
tests_passed = 0
@@ -81,7 +84,9 @@ async def run_test(
8184
]
8285
)
8386
)
84-
next_val = await engine._apply() # pylint: disable=protected-access
87+
next_val = (
88+
await engine._evaluator.apply() # pylint: disable=protected-access
89+
)
8590
assert (next_val).value == io_output
8691
tests_passed += 1
8792
await engine._stop() # pylint: disable=protected-access
@@ -575,7 +580,9 @@ async def run_test(
575580
]
576581
)
577582
)
578-
next_val = await engine._apply() # pylint: disable=protected-access
583+
next_val = (
584+
await engine._evaluator.apply() # pylint: disable=protected-access
585+
)
579586
assert (next_val).value == io_output
580587
tests_passed += 1
581588
await engine._stop() # pylint: disable=protected-access

0 commit comments

Comments
 (0)