Skip to content

Commit c32c852

Browse files
committed
Implement a FormulaEvaluatingActor
Waits for new values from the input data streams. When there's one new value from each of them, evaluates the AST and sends the calculated value out. Signed-off-by: Sahas Subramanian <[email protected]>
1 parent 9366cfc commit c32c852

File tree

1 file changed

+129
-0
lines changed

1 file changed

+129
-0
lines changed
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
# License: MIT
2+
# Copyright © 2025 Frequenz Energy-as-a-Service GmbH
3+
4+
"""An evaluator for a formula represented as an AST."""
5+
6+
7+
import asyncio
8+
import logging
9+
from collections.abc import Callable
10+
from datetime import datetime
11+
from typing import Generic
12+
13+
from frequenz.channels import Broadcast, ReceiverStoppedError, Sender
14+
from typing_extensions import override
15+
16+
from ...actor import Actor
17+
from .._base_types import QuantityT, Sample
18+
from . import _ast
19+
from ._resampled_stream_fetcher import ResampledStreamFetcher
20+
21+
_logger = logging.getLogger(__name__)
22+
23+
24+
class FormulaEvaluatingActor(Generic[QuantityT], Actor):
25+
"""An evaluator for a formula represented as an AST."""
26+
27+
def __init__( # pylint: disable=too-many-arguments
28+
self,
29+
*,
30+
root: _ast.Node,
31+
components: list[_ast.TelemetryStream[QuantityT]],
32+
create_method: Callable[[float], QuantityT],
33+
output_channel: Broadcast[Sample[QuantityT]],
34+
metric_fetcher: ResampledStreamFetcher | None = None,
35+
) -> None:
36+
"""Create a `FormulaEvaluatingActor` instance.
37+
38+
Args:
39+
root: The root node of the formula AST.
40+
components: The telemetry streams that the formula depends on.
41+
create_method: A method to generate the output values with. If the
42+
formula is for generating power values, this would be
43+
`Power.from_watts`, for example.
44+
output_channel: The channel to send evaluated samples to.
45+
metric_fetcher: An optional metric fetcher that needs to be started
46+
before the formula can be evaluated.
47+
"""
48+
super().__init__()
49+
50+
self._root: _ast.Node = root
51+
self._components: list[_ast.TelemetryStream[QuantityT]] = components
52+
self._create_method: Callable[[float], QuantityT] = create_method
53+
self._metric_fetcher: ResampledStreamFetcher | None = metric_fetcher
54+
self._output_channel: Broadcast[Sample[QuantityT]] = output_channel
55+
56+
self._output_sender: Sender[Sample[QuantityT]] = output_channel.new_sender()
57+
58+
@override
59+
async def _run(self) -> None:
60+
"""Run the formula evaluator actor."""
61+
if self._metric_fetcher is not None:
62+
await self._metric_fetcher.subscribe()
63+
await synchronize_receivers(self._components)
64+
65+
while True:
66+
try:
67+
timestamp = next(
68+
comp.latest_sample.timestamp
69+
for comp in self._components
70+
if comp.latest_sample is not None
71+
)
72+
73+
res = self._root.evaluate()
74+
next_sample = Sample(
75+
timestamp, None if res is None else self._create_method(res)
76+
)
77+
await self._output_sender.send(next_sample)
78+
except (StopAsyncIteration, StopIteration):
79+
_logger.debug(
80+
"No more input samples available; stopping formula evaluator. (%s)",
81+
self._root,
82+
)
83+
await self._output_channel.close()
84+
return
85+
except Exception as e: # pylint: disable=broad-except
86+
_logger.error(
87+
"Error evaluating formula %s: %s", self._root, e, exc_info=True
88+
)
89+
await self._output_channel.close()
90+
return
91+
92+
fetch_results = await asyncio.gather(
93+
*(comp.fetch_next() for comp in self._components),
94+
return_exceptions=True,
95+
)
96+
if ex := next((e for e in fetch_results if isinstance(e, Exception)), None):
97+
if isinstance(ex, (StopAsyncIteration, ReceiverStoppedError)):
98+
_logger.debug(
99+
"input streams closed; stopping formula evaluator. (%s)",
100+
self._root,
101+
)
102+
await self._output_channel.close()
103+
return
104+
raise ex
105+
106+
107+
async def synchronize_receivers(
108+
components: list[_ast.TelemetryStream[QuantityT]],
109+
) -> None:
110+
"""Synchronize the given telemetry stream receivers."""
111+
_ = await asyncio.gather(
112+
*(comp.fetch_next() for comp in components),
113+
)
114+
latest_ts: datetime | None = None
115+
for comp in components:
116+
if comp.latest_sample is not None and (
117+
latest_ts is None or comp.latest_sample.timestamp > latest_ts
118+
):
119+
latest_ts = comp.latest_sample.timestamp
120+
if latest_ts is None:
121+
_logger.debug("No samples available to synchronize receivers.")
122+
return
123+
for comp in components:
124+
if comp.latest_sample is None:
125+
raise RuntimeError("Can't synchronize receivers.")
126+
ctr = 0
127+
while ctr < 10 and comp.latest_sample.timestamp < latest_ts:
128+
await comp.fetch_next()
129+
ctr += 1

0 commit comments

Comments
 (0)