Skip to content

Commit 7c8f2ce

Browse files
Split the formula execution logic from the formula building logic (#96)
Splits the `FormulaEngine` class into: - `FormulaBuilder` - `FormulaEngine` The `FormulaBuilder` class is used to build formulas, and has a `build` method that returns a `FormulaEngine` instance. Closes #82
2 parents 458047f + dec3ea8 commit 7c8f2ce

File tree

5 files changed

+120
-97
lines changed

5 files changed

+120
-97
lines changed

src/frequenz/sdk/timeseries/logical_meter/_formula_engine.py

Lines changed: 103 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -35,94 +35,23 @@
3535
class FormulaEngine:
3636
"""A post-fix formula engine that operates on `Sample` receivers.
3737
38-
Operators and metrics need to be pushed into the engine in in-fix order, and they
39-
get rearranged into post-fix in the engine. This is done using the [Shunting yard
40-
algorithm](https://en.wikipedia.org/wiki/Shunting_yard_algorithm).
41-
42-
Example:
43-
To create an engine that adds the latest entries from two receivers, the
44-
following calls need to be made:
45-
46-
```python
47-
engine = FormulaEngine()
48-
engine.push_metric("metric_1", receiver_1)
49-
engine.push_oper("+")
50-
engine.push_metric("metric_2", receiver_2)
51-
engine.finalize()
52-
```
53-
54-
and then every call to `engine.apply()` would fetch a value from each receiver,
55-
add the values and return the result.
38+
Use the `FormulaBuilder` to create `FormulaEngine` instances.
5639
"""
5740

5841
def __init__(
59-
self,
60-
) -> None:
61-
"""Create a `FormulaEngine` instance."""
62-
self._steps: List[FormulaStep] = []
63-
self._build_stack: List[FormulaStep] = []
64-
self._metric_fetchers: Dict[str, MetricFetcher] = {}
65-
self._first_run = True
66-
67-
def push_oper(self, oper: str) -> None:
68-
"""Push an operator into the engine.
69-
70-
Args:
71-
oper: One of these strings - "+", "-", "*", "/", "(", ")"
72-
"""
73-
if self._build_stack and oper != "(":
74-
op_prec = _operator_precedence[oper]
75-
while self._build_stack:
76-
prev_step = self._build_stack[-1]
77-
if op_prec <= _operator_precedence[repr(prev_step)]:
78-
break
79-
if oper == ")" and repr(prev_step) == "(":
80-
self._build_stack.pop()
81-
break
82-
if repr(prev_step) == "(":
83-
break
84-
self._steps.append(prev_step)
85-
self._build_stack.pop()
86-
87-
if oper == "+":
88-
self._build_stack.append(Adder())
89-
elif oper == "-":
90-
self._build_stack.append(Subtractor())
91-
elif oper == "*":
92-
self._build_stack.append(Multiplier())
93-
elif oper == "/":
94-
self._build_stack.append(Divider())
95-
elif oper == "(":
96-
self._build_stack.append(OpenParen())
97-
98-
def push_metric(
99-
self,
100-
name: str,
101-
data_stream: Receiver[Sample],
102-
nones_are_zeros: bool,
42+
self, steps: List[FormulaStep], metric_fetchers: Dict[str, MetricFetcher]
10343
) -> None:
104-
"""Push a metric receiver into the engine.
44+
"""Create a `FormulaEngine` instance.
10545
10646
Args:
107-
name: A name for the metric.
108-
data_stream: A receiver to fetch this metric from.
109-
nones_are_zeros: Whether to treat None values from the stream as 0s. If
110-
False, the returned value will be a None.
111-
"""
112-
fetcher = self._metric_fetchers.setdefault(
113-
name, MetricFetcher(name, data_stream, nones_are_zeros)
114-
)
115-
self._steps.append(fetcher)
116-
117-
def finalize(self) -> None:
118-
"""Finalize the formula engine.
119-
120-
This function must be called before calls to `apply` can be made.
47+
steps: Steps for the engine to execute, in post-fix order.
48+
metric_fetchers: Fetchers for each metric stream the formula depends on.
12149
"""
122-
while self._build_stack:
123-
self._steps.append(self._build_stack.pop())
50+
self._steps = steps
51+
self._metric_fetchers = metric_fetchers
52+
self._first_run = True
12453

125-
async def synchronize_metric_timestamps(
54+
async def _synchronize_metric_timestamps(
12655
self, metrics: Set[asyncio.Task[Optional[Sample]]]
12756
) -> datetime:
12857
"""Synchronize the metric streams.
@@ -191,7 +120,7 @@ async def apply(self) -> Sample:
191120
raise RuntimeError("Some resampled metrics didn't arrive")
192121

193122
if self._first_run:
194-
metric_ts = await self.synchronize_metric_timestamps(ready_metrics)
123+
metric_ts = await self._synchronize_metric_timestamps(ready_metrics)
195124
else:
196125
res = next(iter(ready_metrics)).result()
197126
assert res is not None
@@ -206,3 +135,96 @@ async def apply(self) -> Sample:
206135
raise RuntimeError("Formula application failed.")
207136

208137
return Sample(metric_ts, eval_stack[0])
138+
139+
140+
class FormulaBuilder:
141+
"""Builds a post-fix formula engine that operates on `Sample` receivers.
142+
143+
Operators and metrics need to be pushed in in-fix order, and they get rearranged
144+
into post-fix order. This is done using the [Shunting yard
145+
algorithm](https://en.wikipedia.org/wiki/Shunting_yard_algorithm).
146+
147+
Example:
148+
To create an engine that adds the latest entries from two receivers, the
149+
following calls need to be made:
150+
151+
```python
152+
builder = FormulaBuilder()
153+
builder.push_metric("metric_1", receiver_1)
154+
builder.push_oper("+")
155+
builder.push_metric("metric_2", receiver_2)
156+
engine = builder.build()
157+
```
158+
159+
and then every call to `engine.apply()` would fetch a value from each receiver,
160+
add the values and return the result.
161+
"""
162+
163+
def __init__(
164+
self,
165+
) -> None:
166+
"""Create a `FormulaBuilder` instance."""
167+
self._build_stack: List[FormulaStep] = []
168+
self._steps: List[FormulaStep] = []
169+
self._metric_fetchers: Dict[str, MetricFetcher] = {}
170+
171+
def push_oper(self, oper: str) -> None:
172+
"""Push an operator into the engine.
173+
174+
Args:
175+
oper: One of these strings - "+", "-", "*", "/", "(", ")"
176+
"""
177+
if self._build_stack and oper != "(":
178+
op_prec = _operator_precedence[oper]
179+
while self._build_stack:
180+
prev_step = self._build_stack[-1]
181+
if op_prec <= _operator_precedence[repr(prev_step)]:
182+
break
183+
if oper == ")" and repr(prev_step) == "(":
184+
self._build_stack.pop()
185+
break
186+
if repr(prev_step) == "(":
187+
break
188+
self._steps.append(prev_step)
189+
self._build_stack.pop()
190+
191+
if oper == "+":
192+
self._build_stack.append(Adder())
193+
elif oper == "-":
194+
self._build_stack.append(Subtractor())
195+
elif oper == "*":
196+
self._build_stack.append(Multiplier())
197+
elif oper == "/":
198+
self._build_stack.append(Divider())
199+
elif oper == "(":
200+
self._build_stack.append(OpenParen())
201+
202+
def push_metric(
203+
self,
204+
name: str,
205+
data_stream: Receiver[Sample],
206+
nones_are_zeros: bool,
207+
) -> None:
208+
"""Push a metric receiver into the engine.
209+
210+
Args:
211+
name: A name for the metric.
212+
data_stream: A receiver to fetch this metric from.
213+
nones_are_zeros: Whether to treat None values from the stream as 0s. If
214+
False, the returned value will be a None.
215+
"""
216+
fetcher = self._metric_fetchers.setdefault(
217+
name, MetricFetcher(name, data_stream, nones_are_zeros)
218+
)
219+
self._steps.append(fetcher)
220+
221+
def build(self) -> FormulaEngine:
222+
"""Finalize and build the formula engine.
223+
224+
Returns:
225+
A `FormulaEngine` instance.
226+
"""
227+
while self._build_stack:
228+
self._steps.append(self._build_stack.pop())
229+
230+
return FormulaEngine(self._steps, self._metric_fetchers)

src/frequenz/sdk/timeseries/logical_meter/_logical_meter.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@
1212
from ...actor import ChannelRegistry, ComponentMetricRequest
1313
from ...microgrid.component import ComponentMetricId
1414
from .._sample import Sample
15-
from ._formula_builder import FormulaBuilder
1615
from ._formula_engine import FormulaEngine
16+
from ._resampled_formula_builder import ResampledFormulaBuilder
1717

1818

1919
class LogicalMeter:
@@ -54,7 +54,7 @@ def __init__(
5454
async def _engine_from_formula_string(
5555
self, formula: str, metric_id: ComponentMetricId, nones_are_zeros: bool
5656
) -> FormulaEngine:
57-
builder = FormulaBuilder(
57+
builder = ResampledFormulaBuilder(
5858
self._namespace,
5959
self._channel_registry,
6060
self._resampler_subscription_sender,

src/frequenz/sdk/timeseries/logical_meter/_formula_builder.py renamed to src/frequenz/sdk/timeseries/logical_meter/_resampled_formula_builder.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,11 @@
99
from ...actor import ChannelRegistry, ComponentMetricRequest
1010
from ...microgrid.component import ComponentMetricId
1111
from .._sample import Sample
12-
from ._formula_engine import FormulaEngine
12+
from ._formula_engine import FormulaBuilder, FormulaEngine
1313
from ._tokenizer import Tokenizer, TokenType
1414

1515

16-
class FormulaBuilder:
16+
class ResampledFormulaBuilder:
1717
"""Provides a way to build a FormulaEngine from resampled data streams."""
1818

1919
def __init__(
@@ -23,7 +23,7 @@ def __init__(
2323
resampler_subscription_sender: Sender[ComponentMetricRequest],
2424
metric_id: ComponentMetricId,
2525
) -> None:
26-
"""Create a `FormulaBuilder` instance.
26+
"""Create a `ResampledFormulaBuilder` instance.
2727
2828
Args:
2929
namespace: The unique namespace to allow reuse of streams in the data
@@ -37,14 +37,14 @@ def __init__(
3737
self._channel_registry = channel_registry
3838
self._resampler_subscription_sender = resampler_subscription_sender
3939
self._namespace = namespace
40-
self._formula = FormulaEngine()
40+
self._formula = FormulaBuilder()
4141
self._metric_id = metric_id
4242

4343
async def _get_resampled_receiver(self, component_id: int) -> Receiver[Sample]:
4444
"""Get a receiver with the resampled data for the given component id.
4545
4646
This receiver would contain data for the `metric_id` specified when creating the
47-
`FormulaBuilder` instance.
47+
`ResampledFormulaBuilder` instance.
4848
4949
Args:
5050
component_id: The component id for which to get a resampled data receiver.
@@ -105,5 +105,4 @@ async def from_string(
105105
else:
106106
raise ValueError(f"Unknown token type: {token}")
107107

108-
self._formula.finalize()
109-
return self._formula
108+
return self._formula.build()

tests/timeseries/test_formula_engine.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from frequenz.channels import Broadcast
1010

1111
from frequenz.sdk.timeseries import Sample
12-
from frequenz.sdk.timeseries.logical_meter._formula_engine import FormulaEngine
12+
from frequenz.sdk.timeseries.logical_meter._formula_engine import FormulaBuilder
1313
from frequenz.sdk.timeseries.logical_meter._tokenizer import Token, Tokenizer, TokenType
1414

1515

@@ -63,19 +63,19 @@ async def run_test(
6363
nones_are_zeros: bool = False,
6464
) -> None:
6565
channels: Dict[str, Broadcast[Sample]] = {}
66-
engine = FormulaEngine()
66+
builder = FormulaBuilder()
6767
for token in Tokenizer(formula):
6868
if token.type == TokenType.COMPONENT_METRIC:
6969
if token.value not in channels:
7070
channels[token.value] = Broadcast(token.value)
71-
engine.push_metric(
71+
builder.push_metric(
7272
f"#{token.value}",
7373
channels[token.value].new_receiver(),
7474
nones_are_zeros,
7575
)
7676
elif token.type == TokenType.OPER:
77-
engine.push_oper(token.value)
78-
engine.finalize()
77+
builder.push_oper(token.value)
78+
engine = builder.build()
7979

8080
assert repr(engine._steps) == postfix
8181

tests/timeseries/test_logical_meter.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@
2525
)
2626
from frequenz.sdk.microgrid.component import ComponentMetricId
2727
from frequenz.sdk.timeseries.logical_meter import LogicalMeter
28-
from frequenz.sdk.timeseries.logical_meter._formula_builder import FormulaBuilder
28+
from frequenz.sdk.timeseries.logical_meter._resampled_formula_builder import (
29+
ResampledFormulaBuilder,
30+
)
2931
from tests.microgrid import mock_api
3032

3133

@@ -149,7 +151,7 @@ async def test_1(self, mocker: MockerFixture) -> None:
149151
# `_get_resampled_receiver` function implementation.
150152

151153
# pylint: disable=protected-access
152-
builder = FormulaBuilder(
154+
builder = ResampledFormulaBuilder(
153155
logical_meter._namespace,
154156
channel_registry,
155157
request_sender,

0 commit comments

Comments
 (0)