Skip to content

Commit 04bc40b

Browse files
committed
Remove unused telemetry stream tracking in formula evaluator
The nodes themselves have become responsible to progressing the input streams, so they don't have to be handled by the formula evaluator anymore. Signed-off-by: Sahas Subramanian <[email protected]>
1 parent 3131305 commit 04bc40b

File tree

3 files changed

+4
-26
lines changed

3 files changed

+4
-26
lines changed

src/frequenz/sdk/timeseries/formulas/_formula.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ def __init__( # pylint: disable=too-many-arguments
4848
name: str,
4949
root: AstNode[QuantityT],
5050
create_method: Callable[[float], QuantityT],
51-
streams: list[_ast.TelemetryStream[QuantityT]],
5251
sub_formulas: list[Formula[QuantityT]] | None = None,
5352
metric_fetcher: ResampledStreamFetcher | None = None,
5453
) -> None:
@@ -60,15 +59,13 @@ def __init__( # pylint: disable=too-many-arguments
6059
create_method: A method to generate the output values with. If the
6160
formula is for generating power values, this would be
6261
`Power.from_watts`, for example.
63-
streams: The telemetry streams that the formula depends on.
6462
sub_formulas: Any sub-formulas that this formula depends on.
6563
metric_fetcher: An optional metric fetcher that needs to be started
6664
before the formula can be evaluated.
6765
"""
6866
BackgroundService.__init__(self)
6967
self._name: str = name
7068
self._root: AstNode[QuantityT] = root
71-
self._components: list[_ast.TelemetryStream[QuantityT]] = streams
7269
self._create_method: Callable[[float], QuantityT] = create_method
7370
self._sub_formulas: list[Formula[QuantityT]] = sub_formulas or []
7471

@@ -78,7 +75,6 @@ def __init__( # pylint: disable=too-many-arguments
7875
)
7976
self._evaluator: FormulaEvaluatingActor[QuantityT] = FormulaEvaluatingActor(
8077
root=self._root,
81-
components=self._components,
8278
output_channel=self._channel,
8379
metric_fetcher=metric_fetcher,
8480
)
@@ -383,6 +379,5 @@ def build(
383379
name=name,
384380
root=self.root,
385381
create_method=self._create_method,
386-
streams=self._streams,
387382
sub_formulas=self._sub_formulas,
388383
)

src/frequenz/sdk/timeseries/formulas/_formula_evaluator.py

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111

1212
from ...actor import Actor
1313
from .._base_types import QuantityT, Sample
14-
from . import _ast
1514
from ._base_ast_node import AstNode
1615
from ._resampled_stream_fetcher import ResampledStreamFetcher
1716

@@ -25,23 +24,20 @@ def __init__( # pylint: disable=too-many-arguments
2524
self,
2625
*,
2726
root: AstNode[QuantityT],
28-
components: list[_ast.TelemetryStream[QuantityT]],
2927
output_channel: Broadcast[Sample[QuantityT]],
3028
metric_fetcher: ResampledStreamFetcher | None = None,
3129
) -> None:
3230
"""Create a `FormulaEvaluatingActor` instance.
3331
3432
Args:
3533
root: The root node of the formula AST.
36-
components: The telemetry streams that the formula depends on.
3734
output_channel: The channel to send evaluated samples to.
3835
metric_fetcher: An optional metric fetcher that needs to be started
3936
before the formula can be evaluated.
4037
"""
4138
super().__init__()
4239

4340
self._root: AstNode[QuantityT] = root
44-
self._components: list[_ast.TelemetryStream[QuantityT]] = components
4541
self._metric_fetcher: ResampledStreamFetcher | None = metric_fetcher
4642
self._output_channel: Broadcast[Sample[QuantityT]] = output_channel
4743

@@ -56,21 +52,11 @@ async def _run(self) -> None:
5652
if isinstance(res, Sample):
5753
next_sample = res
5854
else:
59-
timestamp = next(
60-
(
61-
comp.latest_sample.timestamp
62-
for comp in self._components
63-
if comp.latest_sample is not None
64-
),
65-
None,
55+
_logger.debug(
56+
"No input samples available; stopping formula evaluator. (%s)",
57+
self._root,
6658
)
67-
if timestamp is None:
68-
_logger.debug(
69-
"No input samples available; stopping formula evaluator. (%s)",
70-
self._root,
71-
)
72-
return
73-
next_sample = Sample(timestamp, res)
59+
return
7460
await self._output_sender.send(next_sample)
7561
except (StopAsyncIteration, ReceiverStoppedError):
7662
_logger.debug(

src/frequenz/sdk/timeseries/formulas/_parser.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,6 @@ def __init__(
6767
self._name: str = name
6868
self._lexer: Peekable[_token.Token] = Peekable(Lexer(formula))
6969
self._telemetry_fetcher: ResampledStreamFetcher = telemetry_fetcher
70-
self._components: list[_ast.TelemetryStream[QuantityT]] = []
7170
self._create_method: Callable[[float], QuantityT] = create_method
7271

7372
def _parse_term(self) -> AstNode[QuantityT] | None:
@@ -202,7 +201,6 @@ def make_component_stream_fetcher(
202201
),
203202
create_method=self._create_method,
204203
)
205-
self._components.append(comp)
206204
return comp
207205

208206
if isinstance(token, _token.Number):
@@ -227,6 +225,5 @@ def parse(self) -> Formula[QuantityT]:
227225
name=self._name,
228226
root=expr,
229227
create_method=self._create_method,
230-
streams=self._components,
231228
metric_fetcher=self._telemetry_fetcher,
232229
)

0 commit comments

Comments
 (0)