Skip to content

Commit 56a4377

Browse files
committed
Implement a parser for string formulas
Signed-off-by: Sahas Subramanian <[email protected]>
1 parent 7e2ad89 commit 56a4377

File tree

1 file changed

+216
-0
lines changed

1 file changed

+216
-0
lines changed
Lines changed: 216 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,216 @@
1+
# License: MIT
2+
# Copyright © 2025 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Parser for formulas."""
5+
6+
from __future__ import annotations
7+
8+
from collections.abc import Callable
9+
from typing import Generic, cast
10+
11+
from frequenz.client.common.microgrid.components import ComponentId
12+
13+
from frequenz.sdk.timeseries._base_types import QuantityT
14+
15+
from . import _ast, _token
16+
from ._formula import Formula
17+
from ._functions import Function
18+
from ._lexer import Lexer
19+
from ._peekable import Peekable
20+
from ._resampled_stream_fetcher import ResampledStreamFetcher
21+
22+
23+
def parse(
24+
*,
25+
name: str,
26+
formula: str,
27+
telemetry_fetcher: ResampledStreamFetcher,
28+
create_method: Callable[[float], QuantityT],
29+
) -> Formula[QuantityT]:
30+
"""Parse a formula string into an AST.
31+
32+
Args:
33+
name: The name of the formula.
34+
formula: The formula string to parse.
35+
telemetry_fetcher: The telemetry fetcher to get component streams.
36+
create_method: A method to create the corresponding QuantityT from a
37+
float, based on the metric.
38+
39+
Returns:
40+
The parsed formula AST.
41+
"""
42+
return _Parser(
43+
name=name,
44+
formula=formula,
45+
telemetry_fetcher=telemetry_fetcher,
46+
create_method=create_method,
47+
).parse()
48+
49+
50+
class _Parser(Generic[QuantityT]):
51+
def __init__(
52+
self,
53+
*,
54+
name: str,
55+
formula: str,
56+
telemetry_fetcher: ResampledStreamFetcher,
57+
create_method: Callable[[float], QuantityT],
58+
):
59+
"""Initialize the parser."""
60+
self._name: str = name
61+
self._lexer: Peekable[_token.Token] = Peekable(Lexer(formula))
62+
self._telemetry_fetcher: ResampledStreamFetcher = telemetry_fetcher
63+
self._components: list[_ast.TelemetryStream[QuantityT]] = []
64+
self._create_method: Callable[[float], QuantityT] = create_method
65+
66+
def _parse_term(self) -> _ast.Node | None:
67+
factor = self._parse_factor()
68+
if factor is None:
69+
return None
70+
71+
token: _token.Token | None = self._lexer.peek()
72+
while token is not None and isinstance(token, (_token.Plus, _token.Minus)):
73+
token = next(self._lexer)
74+
next_factor = self._parse_factor()
75+
76+
if next_factor is None:
77+
raise ValueError(
78+
f"Expected factor after operator at span: {token.span}"
79+
)
80+
81+
if isinstance(token, _token.Plus):
82+
factor = _ast.Add(span=token.span, left=factor, right=next_factor)
83+
elif isinstance(token, _token.Minus):
84+
factor = _ast.Sub(span=token.span, left=factor, right=next_factor)
85+
86+
token = self._lexer.peek()
87+
88+
return factor
89+
90+
def _parse_factor(self) -> _ast.Node | None:
91+
unary = self._parse_unary()
92+
93+
if unary is None:
94+
return None
95+
96+
token: _token.Token | None = self._lexer.peek()
97+
while token is not None and isinstance(token, (_token.Mul, _token.Div)):
98+
token = next(self._lexer)
99+
next_unary = self._parse_unary()
100+
if next_unary is None:
101+
raise ValueError(f"Expected unary after operator at span: {token.span}")
102+
103+
if isinstance(token, _token.Mul):
104+
unary = _ast.Mul(span=token.span, left=unary, right=next_unary)
105+
elif isinstance(token, _token.Div):
106+
unary = _ast.Div(span=token.span, left=unary, right=next_unary)
107+
108+
token = self._lexer.peek()
109+
110+
return unary
111+
112+
def _parse_unary(self) -> _ast.Node | None:
113+
token: _token.Token | None = self._lexer.peek()
114+
if token is not None and isinstance(token, _token.Minus):
115+
token = next(self._lexer)
116+
primary: _ast.Node | None = self._parse_primary()
117+
if primary is None:
118+
raise ValueError(
119+
f"Expected primary expression after unary '-' at position {token.span}"
120+
)
121+
122+
zero_const = _ast.Constant(span=token.span, value=0.0)
123+
return _ast.Sub(span=token.span, left=zero_const, right=primary)
124+
125+
return self._parse_primary()
126+
127+
def _parse_bracketed(self) -> _ast.Node | None:
128+
oparen = next(self._lexer) # consume '('
129+
assert isinstance(oparen, _token.OpenParen)
130+
131+
expr: _ast.Node | None = self._parse_term()
132+
if expr is None:
133+
raise ValueError(f"Expected expression after '(' at position {oparen.span}")
134+
135+
token: _token.Token | None = self._lexer.peek()
136+
if token is None or not isinstance(token, _token.CloseParen):
137+
raise ValueError(f"Expected ')' after expression at position {expr.span}")
138+
139+
_ = next(self._lexer) # consume ')'
140+
141+
return expr
142+
143+
def _parse_function_call(self) -> _ast.Node | None:
144+
fn_name: _token.Token = next(self._lexer)
145+
args: list[_ast.Node] = []
146+
147+
token: _token.Token | None = self._lexer.peek()
148+
if token is None or not isinstance(token, _token.OpenParen):
149+
raise ValueError(
150+
f"Expected '(' after function name at position {fn_name.span}"
151+
)
152+
153+
_ = next(self._lexer) # consume '('
154+
while True:
155+
arg = self._parse_term()
156+
if arg is None:
157+
raise ValueError(
158+
f"Expected argument in function call at position {fn_name.span}"
159+
)
160+
args.append(arg)
161+
162+
token = self._lexer.peek()
163+
if token is not None and isinstance(token, _token.Comma):
164+
_ = next(self._lexer) # consume ','
165+
continue
166+
if token is not None and isinstance(token, _token.CloseParen):
167+
_ = next(self._lexer) # consume ')'
168+
break
169+
raise ValueError(
170+
f"Expected ',' or ')' in function call at position {fn_name.span}"
171+
)
172+
173+
return _ast.FunCall(
174+
span=fn_name.span,
175+
function=Function.from_string(fn_name.value),
176+
args=args,
177+
)
178+
179+
def _parse_primary(self) -> _ast.Node | None:
180+
token: _token.Token | None = self._lexer.peek()
181+
if token is None:
182+
return None
183+
184+
if isinstance(token, _token.Component):
185+
_ = next(self._lexer) # consume token
186+
comp = _ast.TelemetryStream(
187+
span=token.span,
188+
source=f"#{token.id}",
189+
stream=self._telemetry_fetcher.fetch_stream(ComponentId(int(token.id))),
190+
)
191+
self._components.append(cast(_ast.TelemetryStream[QuantityT], comp))
192+
return comp
193+
194+
if isinstance(token, _token.Number):
195+
_ = next(self._lexer)
196+
return _ast.Constant(span=token.span, value=float(token.value))
197+
198+
if isinstance(token, _token.OpenParen):
199+
return self._parse_bracketed()
200+
201+
if isinstance(token, _token.Symbol):
202+
return self._parse_function_call()
203+
204+
return None
205+
206+
def parse(self) -> Formula[QuantityT]:
207+
expr = self._parse_term()
208+
if expr is None:
209+
raise ValueError("Empty formula.")
210+
return Formula(
211+
name=self._name,
212+
root=expr,
213+
create_method=self._create_method,
214+
streams=self._components,
215+
metric_fetcher=self._telemetry_fetcher,
216+
)

0 commit comments

Comments
 (0)