Skip to content

Commit 102f57a

Browse files
committed
Add a FormulaEngine for applying formulas on streams
Signed-off-by: Sahas Subramanian <[email protected]>
1 parent 14a0a63 commit 102f57a

File tree

6 files changed

+838
-0
lines changed

6 files changed

+838
-0
lines changed
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
# License: MIT
2+
# Copyright © 2022 Frequenz Energy-as-a-Service GmbH
3+
4+
"""A logical meter for calculating high level metrics for a microgrid."""
5+
6+
from .logical_meter import LogicalMeter
7+
8+
__all__ = ["LogicalMeter"]
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
# License: MIT
2+
# Copyright © 2022 Frequenz Energy-as-a-Service GmbH
3+
4+
"""A builder for creating formula engines that operate on resampled component metrics."""
5+
6+
7+
from frequenz.channels import Receiver, Sender
8+
9+
from ...actor import ChannelRegistry, ComponentMetricRequest
10+
from ...microgrid.component import ComponentMetricId
11+
from .._sample import Sample
12+
from ._formula_engine import FormulaEngine
13+
from ._tokenizer import Tokenizer, TokenType
14+
15+
16+
class FormulaBuilder:
17+
"""Provides a way to build a FormulaEngine from resampled data streams."""
18+
19+
def __init__(
20+
self,
21+
namespace: str,
22+
channel_registry: ChannelRegistry,
23+
resampler_subscription_sender: Sender[ComponentMetricRequest],
24+
metric_id: ComponentMetricId,
25+
) -> None:
26+
"""Create a `FormulaBuilder` instance.
27+
28+
Args:
29+
namespace: The unique namespace to allow reuse of streams in the data
30+
pipeline.
31+
channel_registry: The channel registry instance shared with the resampling
32+
and the data sourcing actors.
33+
resampler_subscription_sender: A sender to send metric requests to the
34+
resampling actor.
35+
metric_id: A metric ID to fetch for all components in this formula.
36+
"""
37+
self._channel_registry = channel_registry
38+
self._resampler_subscription_sender = resampler_subscription_sender
39+
self._namespace = namespace
40+
self._formula = FormulaEngine()
41+
self._metric_id = metric_id
42+
43+
async def _get_resampled_receiver(self, component_id: int) -> Receiver[Sample]:
44+
"""Get a receiver with the resampled data for the given component id.
45+
46+
This receiver would contain data for the `metric_id` specified when creating the
47+
`FormulaBuilder` instance.
48+
49+
Args:
50+
component_id: The component id for which to get a resampled data receiver.
51+
52+
Returns:
53+
A receiver to stream resampled data for the given component id.
54+
"""
55+
request = ComponentMetricRequest(
56+
self._namespace, component_id, self._metric_id, None
57+
)
58+
await self._resampler_subscription_sender.send(request)
59+
return self._channel_registry.new_receiver(request.get_channel_name())
60+
61+
async def push_component_metric(self, component_id: int) -> None:
62+
"""Push a resampled component metric stream to the formula engine.
63+
64+
Args:
65+
component_id: The component id for which to push a metric fetcher.
66+
"""
67+
receiver = await self._get_resampled_receiver(component_id)
68+
self._formula.push_metric(f"#{component_id}", receiver)
69+
70+
async def from_string(self, formula: str) -> FormulaEngine:
71+
"""Construct a `FormulaEngine` from the given formula string.
72+
73+
Formulas can have Component IDs that are preceeded by a pound symbol("#"), and
74+
these operators: +, -, *, /, (, ).
75+
76+
For example, the input string: "#20 + #5" is a formula for adding metrics from
77+
two components with ids 20 and 5.
78+
79+
Args:
80+
formula: A string formula.
81+
82+
Returns:
83+
A FormulaEngine instance corresponding to the given formula.
84+
85+
Raises:
86+
ValueError: when there is an unknown token type.
87+
"""
88+
tokenizer = Tokenizer(formula)
89+
90+
for token in tokenizer:
91+
if token.type == TokenType.COMPONENT_METRIC:
92+
await self.push_component_metric(int(token.value))
93+
elif token.type == TokenType.OPER:
94+
self._formula.push_oper(token.value)
95+
else:
96+
raise ValueError(f"Unknown token type: {token}")
97+
98+
self._formula.finalize()
99+
return self._formula
Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
# License: MIT
2+
# Copyright © 2022 Frequenz Energy-as-a-Service GmbH
3+
4+
"""A formula engine that can apply formulas on streaming data."""
5+
6+
from __future__ import annotations
7+
8+
import asyncio
9+
from datetime import datetime
10+
from typing import Dict, List, Optional, Set
11+
12+
from frequenz.channels import Receiver
13+
14+
from .._sample import Sample
15+
from ._formula_steps import (
16+
Adder,
17+
Divider,
18+
FormulaStep,
19+
MetricFetcher,
20+
Multiplier,
21+
OpenParen,
22+
Subtractor,
23+
)
24+
25+
_operator_precedence = {
26+
"(": 0,
27+
"/": 1,
28+
"*": 2,
29+
"-": 3,
30+
"+": 4,
31+
")": 5,
32+
}
33+
34+
35+
class FormulaEngine:
36+
"""A post-fix formula engine that operates on `Sample` receivers.
37+
38+
Operators and metrics need to be pushed into the engine in in-fix order, and they
39+
get rearranged into post-fix in the engine. This is done using the [Shunting yard
40+
algorithm](https://en.wikipedia.org/wiki/Shunting_yard_algorithm).
41+
42+
Example:
43+
To create an engine that adds the latest entries from two receivers, the
44+
following calls need to be made:
45+
46+
```python
47+
engine = FormulaEngine()
48+
engine.push_metric("metric_1", receiver_1)
49+
engine.push_oper("+")
50+
engine.push_metric("metric_2", receiver_2)
51+
engine.finalize()
52+
```
53+
54+
and then every call to `engine.apply()` would fetch a value from each receiver,
55+
add the values and return the result.
56+
"""
57+
58+
def __init__(
59+
self,
60+
) -> None:
61+
"""Create a `FormulaEngine` instance."""
62+
self._steps: List[FormulaStep] = []
63+
self._build_stack: List[FormulaStep] = []
64+
self._metric_fetchers: Dict[str, MetricFetcher] = {}
65+
self._first_run = True
66+
67+
def push_oper(self, oper: str) -> None:
68+
"""Push an operator into the engine.
69+
70+
Args:
71+
oper: One of these strings - "+", "-", "*", "/", "(", ")"
72+
"""
73+
if self._build_stack and oper != "(":
74+
op_prec = _operator_precedence[oper]
75+
while self._build_stack:
76+
prev_step = self._build_stack[-1]
77+
if op_prec <= _operator_precedence[repr(prev_step)]:
78+
break
79+
if oper == ")" and repr(prev_step) == "(":
80+
self._build_stack.pop()
81+
break
82+
if repr(prev_step) == "(":
83+
break
84+
self._steps.append(prev_step)
85+
self._build_stack.pop()
86+
87+
if oper == "+":
88+
self._build_stack.append(Adder())
89+
elif oper == "-":
90+
self._build_stack.append(Subtractor())
91+
elif oper == "*":
92+
self._build_stack.append(Multiplier())
93+
elif oper == "/":
94+
self._build_stack.append(Divider())
95+
elif oper == "(":
96+
self._build_stack.append(OpenParen())
97+
98+
def push_metric(self, name: str, data_stream: Receiver[Sample]) -> None:
99+
"""Push a metric receiver into the engine.
100+
101+
Args:
102+
name: A name for the metric.
103+
data_stream: A receiver to fetch this metric from.
104+
"""
105+
fetcher = self._metric_fetchers.setdefault(
106+
name, MetricFetcher(name, data_stream)
107+
)
108+
self._steps.append(fetcher)
109+
110+
def finalize(self) -> None:
111+
"""Finalize the formula engine.
112+
113+
This function must be called before calls to `apply` can be made.
114+
"""
115+
while self._build_stack:
116+
self._steps.append(self._build_stack.pop())
117+
118+
async def synchronize_metric_timestamps(
119+
self, metrics: Set[asyncio.Task[Optional[Sample]]]
120+
) -> datetime:
121+
"""Synchronize the metric streams.
122+
123+
For synchronised streams like data from the `ComponentMetricsResamplingActor`,
124+
this a call to this function is required only once, before the first set of
125+
inputs are fetched.
126+
127+
Args:
128+
metrics: The finished tasks from the first `fetch_next` calls to all the
129+
`MetricFetcher`s.
130+
131+
Returns:
132+
The timestamp of the latest metric value.
133+
134+
Raises:
135+
RuntimeError: when some streams have no value, or when the synchronization
136+
of timestamps fails.
137+
"""
138+
metrics_by_ts: Dict[datetime, str] = {}
139+
for metric in metrics:
140+
result = metric.result()
141+
name = metric.get_name()
142+
if result is None:
143+
raise RuntimeError(f"Stream closed for component: {name}")
144+
metrics_by_ts[result.timestamp] = name
145+
latest_ts = max(metrics_by_ts)
146+
147+
# fetch the metrics with non-latest timestamps again until we have the values
148+
# for the same ts for all metrics.
149+
for metric_ts, name in metrics_by_ts.items():
150+
if metric_ts == latest_ts:
151+
continue
152+
fetcher = self._metric_fetchers[name]
153+
while metric_ts < latest_ts:
154+
next_val = await fetcher.fetch_next()
155+
assert next_val is not None
156+
metric_ts = next_val.timestamp
157+
if metric_ts > latest_ts:
158+
raise RuntimeError(
159+
"Unable to synchronize timestamps of resampled metrics"
160+
)
161+
self._first_run = False
162+
return latest_ts
163+
164+
async def apply(self) -> Sample:
165+
"""Fetch the latest metrics, apply the formula once and return the result.
166+
167+
Returns:
168+
The result of the formula.
169+
170+
Raises:
171+
RuntimeError: if some samples didn't arrive, or if formula application
172+
failed.
173+
"""
174+
eval_stack: List[float] = []
175+
ready_metrics, pending = await asyncio.wait(
176+
[
177+
asyncio.create_task(fetcher.fetch_next(), name=name)
178+
for name, fetcher in self._metric_fetchers.items()
179+
],
180+
return_when=asyncio.ALL_COMPLETED,
181+
)
182+
183+
if pending or any(res.result() is None for res in iter(ready_metrics)):
184+
raise RuntimeError("Some resampled metrics didn't arrive")
185+
186+
if self._first_run:
187+
metric_ts = await self.synchronize_metric_timestamps(ready_metrics)
188+
else:
189+
res = next(iter(ready_metrics)).result()
190+
assert res is not None
191+
metric_ts = res.timestamp
192+
193+
for step in self._steps:
194+
step.apply(eval_stack)
195+
196+
# if all steps were applied and the formula was correct, there should only be a
197+
# single value in the evaluation stack, and that would be the formula result.
198+
if len(eval_stack) != 1:
199+
raise RuntimeError("Formula application failed.")
200+
201+
return Sample(metric_ts, eval_stack[0])

0 commit comments

Comments
 (0)