Skip to content

Commit 7e2ad89

Browse files
committed
Implement the Formula type
This is a wrapper around the FormulaEvaluatingActor, with methods for composing multiple formulas. Signed-off-by: Sahas Subramanian <[email protected]>
1 parent c32c852 commit 7e2ad89

File tree

1 file changed

+383
-0
lines changed

1 file changed

+383
-0
lines changed
Lines changed: 383 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,383 @@
1+
# License: MIT
2+
# Copyright © 2025 Frequenz Energy-as-a-Service GmbH
3+
4+
"""A composable formula represented as an AST."""
5+
6+
from __future__ import annotations
7+
8+
import logging
9+
from collections.abc import Callable
10+
from typing import Generic
11+
12+
from frequenz.channels import Broadcast, Receiver
13+
from typing_extensions import override
14+
15+
from frequenz.sdk.timeseries.formulas._resampled_stream_fetcher import (
16+
ResampledStreamFetcher,
17+
)
18+
19+
from ...actor import BackgroundService
20+
from .. import ReceiverFetcher, Sample
21+
from .._base_types import QuantityT
22+
from . import _ast
23+
from ._formula_evaluator import FormulaEvaluatingActor
24+
from ._functions import Coalesce, Max, Min
25+
26+
_logger = logging.getLogger(__name__)
27+
28+
29+
class Formula(BackgroundService, ReceiverFetcher[Sample[QuantityT]]):
30+
"""A formula represented as an AST."""
31+
32+
def __init__( # pylint: disable=too-many-arguments
33+
self,
34+
*,
35+
name: str,
36+
root: _ast.Node,
37+
create_method: Callable[[float], QuantityT],
38+
streams: list[_ast.TelemetryStream[QuantityT]],
39+
sub_formulas: list[Formula[QuantityT]] | None = None,
40+
metric_fetcher: ResampledStreamFetcher | None = None,
41+
) -> None:
42+
"""Create a `Formula` instance.
43+
44+
Args:
45+
name: The name of the formula.
46+
root: The root node of the formula AST.
47+
create_method: A method to generate the output values with. If the
48+
formula is for generating power values, this would be
49+
`Power.from_watts`, for example.
50+
streams: The telemetry streams that the formula depends on.
51+
sub_formulas: Any sub-formulas that this formula depends on.
52+
metric_fetcher: An optional metric fetcher that needs to be started
53+
before the formula can be evaluated.
54+
"""
55+
BackgroundService.__init__(self)
56+
self._name: str = name
57+
self._root: _ast.Node = root
58+
self._components: list[_ast.TelemetryStream[QuantityT]] = streams
59+
self._create_method: Callable[[float], QuantityT] = create_method
60+
self._sub_formulas: list[Formula[QuantityT]] = sub_formulas or []
61+
62+
self._channel: Broadcast[Sample[QuantityT]] = Broadcast(
63+
name=f"{self}",
64+
resend_latest=True,
65+
)
66+
self._evaluator: FormulaEvaluatingActor[QuantityT] = FormulaEvaluatingActor(
67+
root=self._root,
68+
components=self._components,
69+
create_method=self._create_method,
70+
output_channel=self._channel,
71+
metric_fetcher=metric_fetcher,
72+
)
73+
74+
@override
75+
def __str__(self) -> str:
76+
"""Return a string representation of the formula."""
77+
return f"[{self._name}]({self._root})"
78+
79+
@override
80+
def new_receiver(self, *, limit: int = 50) -> Receiver[Sample[QuantityT]]:
81+
"""Subscribe to the formula evaluator to get evaluated samples."""
82+
if not self._evaluator.is_running:
83+
# raise RuntimeError(
84+
# f"Formula evaluator for '{self._root}' is not running. Please "
85+
# + "call `start()` on the formula before using it.",
86+
# )
87+
# _logger.warning(
88+
# "Formula evaluator for '%s' is not running. Starting it. "
89+
# + "Please call `start()` on the formula before using it."
90+
# self._root,
91+
# )
92+
self.start()
93+
return self._channel.new_receiver(limit=limit)
94+
95+
@override
96+
def start(self) -> None:
97+
"""Start the formula evaluator."""
98+
for sub_formula in self._sub_formulas:
99+
sub_formula.start()
100+
self._evaluator.start()
101+
102+
@override
103+
async def stop(self, msg: str | None = None) -> None:
104+
"""Stop the formula evaluator."""
105+
await BackgroundService.stop(self, msg)
106+
for sub_formula in self._sub_formulas:
107+
await sub_formula.stop(msg)
108+
await self._evaluator.stop(msg)
109+
110+
def __add__(
111+
self, other: FormulaBuilder[QuantityT] | QuantityT | Formula[QuantityT]
112+
) -> FormulaBuilder[QuantityT]:
113+
"""Create an addition operation node."""
114+
return FormulaBuilder(self, self._create_method) + other
115+
116+
def __sub__(
117+
self, other: FormulaBuilder[QuantityT] | QuantityT | Formula[QuantityT]
118+
) -> FormulaBuilder[QuantityT]:
119+
"""Create a subtraction operation node."""
120+
return FormulaBuilder(self, self._create_method) - other
121+
122+
def __mul__(self, other: float) -> FormulaBuilder[QuantityT]:
123+
"""Create a multiplication operation node."""
124+
return FormulaBuilder(self, self._create_method) * other
125+
126+
def __truediv__(self, other: float) -> FormulaBuilder[QuantityT]:
127+
"""Create a division operation node."""
128+
return FormulaBuilder(self, self._create_method) / other
129+
130+
def coalesce(
131+
self,
132+
other: list[FormulaBuilder[QuantityT] | QuantityT | Formula[QuantityT]],
133+
) -> FormulaBuilder[QuantityT]:
134+
"""Create a coalesce operation node."""
135+
return FormulaBuilder(self, self._create_method).coalesce(other)
136+
137+
def min(
138+
self,
139+
other: list[FormulaBuilder[QuantityT] | QuantityT | Formula[QuantityT]],
140+
) -> FormulaBuilder[QuantityT]:
141+
"""Create a min operation node."""
142+
return FormulaBuilder(self, self._create_method).min(other)
143+
144+
def max(
145+
self,
146+
other: list[FormulaBuilder[QuantityT] | QuantityT | Formula[QuantityT]],
147+
) -> FormulaBuilder[QuantityT]:
148+
"""Create a max operation node."""
149+
return FormulaBuilder(self, self._create_method).max(other)
150+
151+
152+
class FormulaBuilder(Generic[QuantityT]):
153+
"""A builder for higher-order formulas represented as ASTs."""
154+
155+
def __init__(
156+
self,
157+
formula: Formula[QuantityT] | _ast.Node,
158+
create_method: Callable[[float], QuantityT],
159+
streams: list[_ast.TelemetryStream[QuantityT]] | None = None,
160+
sub_formulas: list[Formula[QuantityT]] | None = None,
161+
) -> None:
162+
"""Create a `FormulaBuilder` instance.
163+
164+
Args:
165+
formula: The initial formula to build upon.
166+
create_method: A method to generate the output values with. If the
167+
formula is for generating power values, this would be
168+
`Power.from_watts`, for example.
169+
streams: The telemetry streams that the formula depends on.
170+
sub_formulas: Any sub-formulas that this formula depends on.
171+
"""
172+
self._create_method: Callable[[float], QuantityT] = create_method
173+
self._streams: list[_ast.TelemetryStream[QuantityT]] = streams or []
174+
"""Input streams that need to be synchronized before evaluation."""
175+
self._sub_formulas: list[Formula[QuantityT]] = sub_formulas or []
176+
"""Sub-formulas whose lifetimes are managed by this formula."""
177+
178+
if isinstance(formula, Formula):
179+
self.root: _ast.Node = _ast.TelemetryStream(
180+
None,
181+
str(formula),
182+
formula.new_receiver(),
183+
)
184+
self._streams.append(self.root)
185+
self._sub_formulas.append(formula)
186+
else:
187+
self.root = formula
188+
189+
def __add__(
190+
self,
191+
other: FormulaBuilder[QuantityT] | QuantityT | Formula[QuantityT],
192+
) -> FormulaBuilder[QuantityT]:
193+
"""Create an addition operation node."""
194+
if isinstance(other, FormulaBuilder):
195+
right_node = other.root
196+
self._streams.extend(other._streams)
197+
elif isinstance(other, Formula):
198+
right_node = _ast.TelemetryStream(None, str(other), other.new_receiver())
199+
self._streams.append(right_node)
200+
self._sub_formulas.append(other)
201+
else:
202+
right_node = _ast.Constant(None, other.base_value)
203+
204+
new_root = _ast.Add(None, self.root, right_node)
205+
return FormulaBuilder(
206+
new_root,
207+
self._create_method,
208+
self._streams,
209+
self._sub_formulas,
210+
)
211+
212+
def __sub__(
213+
self,
214+
other: FormulaBuilder[QuantityT] | QuantityT | Formula[QuantityT],
215+
) -> FormulaBuilder[QuantityT]:
216+
"""Create a subtraction operation node."""
217+
if isinstance(other, FormulaBuilder):
218+
right_node = other.root
219+
self._streams.extend(other._streams)
220+
elif isinstance(other, Formula):
221+
right_node = _ast.TelemetryStream(None, str(other), other.new_receiver())
222+
self._streams.append(right_node)
223+
self._sub_formulas.append(other)
224+
else:
225+
right_node = _ast.Constant(None, other.base_value)
226+
227+
new_root = _ast.Sub(None, self.root, right_node)
228+
return FormulaBuilder(
229+
new_root,
230+
self._create_method,
231+
self._streams,
232+
self._sub_formulas,
233+
)
234+
235+
def __mul__(self, other: float) -> FormulaBuilder[QuantityT]:
236+
"""Create a multiplication operation node."""
237+
right_node = _ast.Constant(None, other)
238+
new_root = _ast.Mul(None, self.root, right_node)
239+
return FormulaBuilder(
240+
new_root,
241+
self._create_method,
242+
self._streams,
243+
self._sub_formulas,
244+
)
245+
246+
def __truediv__(
247+
self,
248+
other: float,
249+
) -> FormulaBuilder[QuantityT]:
250+
"""Create a division operation node."""
251+
right_node = _ast.Constant(None, other)
252+
new_root = _ast.Div(None, self.root, right_node)
253+
return FormulaBuilder(
254+
new_root,
255+
self._create_method,
256+
self._streams,
257+
self._sub_formulas,
258+
)
259+
260+
def coalesce(
261+
self,
262+
other: list[FormulaBuilder[QuantityT] | QuantityT | Formula[QuantityT]],
263+
) -> FormulaBuilder[QuantityT]:
264+
"""Create a coalesce operation node."""
265+
right_nodes: list[_ast.Node] = []
266+
for item in other:
267+
if isinstance(item, FormulaBuilder):
268+
right_nodes.append(item.root)
269+
self._streams.extend(item._streams) # pylint: disable=protected-access
270+
elif isinstance(item, Formula):
271+
right_node = _ast.TelemetryStream(
272+
None,
273+
str(item),
274+
item.new_receiver(),
275+
)
276+
right_nodes.append(right_node)
277+
self._streams.append(right_node)
278+
self._sub_formulas.append(item)
279+
else:
280+
right_nodes.append(_ast.Constant(None, item.base_value))
281+
282+
new_root = _ast.FunCall(
283+
None,
284+
Coalesce(),
285+
[self.root] + right_nodes,
286+
)
287+
288+
return FormulaBuilder(
289+
new_root,
290+
self._create_method,
291+
self._streams,
292+
self._sub_formulas,
293+
)
294+
295+
def min(
296+
self,
297+
other: list[FormulaBuilder[QuantityT] | QuantityT | Formula[QuantityT]],
298+
) -> FormulaBuilder[QuantityT]:
299+
"""Create a min operation node."""
300+
right_nodes: list[_ast.Node] = []
301+
for item in other:
302+
if isinstance(item, FormulaBuilder):
303+
right_nodes.append(item.root)
304+
self._streams.extend(item._streams) # pylint: disable=protected-access
305+
elif isinstance(item, Formula):
306+
right_node = _ast.TelemetryStream(
307+
None,
308+
str(item),
309+
item.new_receiver(),
310+
)
311+
right_nodes.append(right_node)
312+
self._streams.append(right_node)
313+
self._sub_formulas.append(item)
314+
else:
315+
right_nodes.append(_ast.Constant(None, item.base_value))
316+
317+
new_root = _ast.FunCall(
318+
None,
319+
Min(),
320+
[self.root] + right_nodes,
321+
)
322+
323+
return FormulaBuilder(
324+
new_root,
325+
self._create_method,
326+
self._streams,
327+
self._sub_formulas,
328+
)
329+
330+
def max(
331+
self,
332+
other: list[FormulaBuilder[QuantityT] | QuantityT | Formula[QuantityT]],
333+
) -> FormulaBuilder[QuantityT]:
334+
"""Create a max operation node."""
335+
right_nodes: list[_ast.Node] = []
336+
for item in other:
337+
if isinstance(item, FormulaBuilder):
338+
right_nodes.append(item.root)
339+
self._streams.extend(item._streams) # pylint: disable=protected-access
340+
elif isinstance(item, Formula):
341+
right_node = _ast.TelemetryStream(
342+
None,
343+
str(item),
344+
item.new_receiver(),
345+
)
346+
right_nodes.append(right_node)
347+
self._streams.append(right_node)
348+
self._sub_formulas.append(item)
349+
else:
350+
right_nodes.append(_ast.Constant(None, item.base_value))
351+
352+
new_root = _ast.FunCall(
353+
None,
354+
Max(),
355+
[self.root] + right_nodes,
356+
)
357+
358+
return FormulaBuilder(
359+
new_root,
360+
self._create_method,
361+
self._streams,
362+
self._sub_formulas,
363+
)
364+
365+
def build(
366+
self,
367+
name: str,
368+
) -> Formula[QuantityT]:
369+
"""Build a `Formula` instance.
370+
371+
Args:
372+
name: The name of the formula.
373+
374+
Returns:
375+
A `Formula` instance.
376+
"""
377+
return Formula(
378+
name=name,
379+
root=self.root,
380+
create_method=self._create_method,
381+
streams=self._streams,
382+
sub_formulas=self._sub_formulas,
383+
)

0 commit comments

Comments
 (0)