Skip to content

Commit 674ac1a

Browse files
Move formula evaluator to separate file
This patch is done in order to decrease the size of the `_formula_engine.py` file. Signed-off-by: Matthias Wende <[email protected]>
1 parent dcc4b30 commit 674ac1a

File tree

2 files changed

+134
-123
lines changed

2 files changed

+134
-123
lines changed

src/frequenz/sdk/timeseries/_formula_engine/_formula_engine.py

Lines changed: 1 addition & 123 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,12 @@
99
import logging
1010
from abc import ABC
1111
from collections import deque
12-
from datetime import datetime
13-
from math import isinf, isnan
1412
from typing import (
1513
Callable,
1614
Dict,
1715
Generic,
1816
List,
1917
Optional,
20-
Set,
2118
Tuple,
2219
Type,
2320
TypeVar,
@@ -30,6 +27,7 @@
3027
from ..._internal._asyncio import cancel_and_await
3128
from .. import Sample, Sample3Phase
3229
from .._quantities import Quantity, QuantityT
30+
from ._formula_evaluator import FormulaEvaluator
3331
from ._formula_steps import (
3432
Adder,
3533
Averager,
@@ -56,126 +54,6 @@
5654
}
5755

5856

59-
class FormulaEvaluator(Generic[QuantityT]):
60-
"""A post-fix formula evaluator that operates on `Sample` receivers."""
61-
62-
def __init__(
63-
self,
64-
name: str,
65-
steps: List[FormulaStep],
66-
metric_fetchers: Dict[str, MetricFetcher[QuantityT]],
67-
create_method: Callable[[float], QuantityT],
68-
) -> None:
69-
"""Create a `FormulaEngine` instance.
70-
71-
Args:
72-
name: A name for the formula.
73-
steps: Steps for the engine to execute, in post-fix order.
74-
metric_fetchers: Fetchers for each metric stream the formula depends on.
75-
create_method: A method to generate the output `Sample` value with. If the
76-
formula is for generating power values, this would be
77-
`Power.from_watts`, for example.
78-
"""
79-
self._name = name
80-
self._steps = steps
81-
self._metric_fetchers: Dict[str, MetricFetcher[QuantityT]] = metric_fetchers
82-
self._first_run = True
83-
self._create_method: Callable[[float], QuantityT] = create_method
84-
85-
async def _synchronize_metric_timestamps(
86-
self, metrics: Set[asyncio.Task[Optional[Sample[QuantityT]]]]
87-
) -> datetime:
88-
"""Synchronize the metric streams.
89-
90-
For synchronised streams like data from the `ComponentMetricsResamplingActor`,
91-
this a call to this function is required only once, before the first set of
92-
inputs are fetched.
93-
94-
Args:
95-
metrics: The finished tasks from the first `fetch_next` calls to all the
96-
`MetricFetcher`s.
97-
98-
Returns:
99-
The timestamp of the latest metric value.
100-
101-
Raises:
102-
RuntimeError: when some streams have no value, or when the synchronization
103-
of timestamps fails.
104-
"""
105-
metrics_by_ts: Dict[datetime, list[str]] = {}
106-
for metric in metrics:
107-
result = metric.result()
108-
name = metric.get_name()
109-
if result is None:
110-
raise RuntimeError(f"Stream closed for component: {name}")
111-
metrics_by_ts.setdefault(result.timestamp, []).append(name)
112-
latest_ts = max(metrics_by_ts)
113-
114-
# fetch the metrics with non-latest timestamps again until we have the values
115-
# for the same ts for all metrics.
116-
for metric_ts, names in metrics_by_ts.items():
117-
if metric_ts == latest_ts:
118-
continue
119-
while metric_ts < latest_ts:
120-
for name in names:
121-
fetcher = self._metric_fetchers[name]
122-
next_val = await fetcher.fetch_next()
123-
assert next_val is not None
124-
metric_ts = next_val.timestamp
125-
if metric_ts > latest_ts:
126-
raise RuntimeError(
127-
"Unable to synchronize resampled metric timestamps, "
128-
f"for formula: {self._name}"
129-
)
130-
self._first_run = False
131-
return latest_ts
132-
133-
async def apply(self) -> Sample[QuantityT]:
134-
"""Fetch the latest metrics, apply the formula once and return the result.
135-
136-
Returns:
137-
The result of the formula.
138-
139-
Raises:
140-
RuntimeError: if some samples didn't arrive, or if formula application
141-
failed.
142-
"""
143-
eval_stack: List[float] = []
144-
ready_metrics, pending = await asyncio.wait(
145-
[
146-
asyncio.create_task(fetcher.fetch_next(), name=name)
147-
for name, fetcher in self._metric_fetchers.items()
148-
],
149-
return_when=asyncio.ALL_COMPLETED,
150-
)
151-
152-
if pending or any(res.result() is None for res in iter(ready_metrics)):
153-
raise RuntimeError(
154-
f"Some resampled metrics didn't arrive, for formula: {self._name}"
155-
)
156-
157-
if self._first_run:
158-
metric_ts = await self._synchronize_metric_timestamps(ready_metrics)
159-
else:
160-
sample = next(iter(ready_metrics)).result()
161-
assert sample is not None
162-
metric_ts = sample.timestamp
163-
164-
for step in self._steps:
165-
step.apply(eval_stack)
166-
167-
# if all steps were applied and the formula was correct, there should only be a
168-
# single value in the evaluation stack, and that would be the formula result.
169-
if len(eval_stack) != 1:
170-
raise RuntimeError(f"Formula application failed: {self._name}")
171-
172-
res = eval_stack.pop()
173-
if isnan(res) or isinf(res):
174-
return Sample(metric_ts, None)
175-
176-
return Sample(metric_ts, self._create_method(res))
177-
178-
17957
_CompositionType = Union[
18058
"FormulaEngine",
18159
"HigherOrderFormulaBuilder",
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
# License: MIT
2+
# Copyright © 2022 Frequenz Energy-as-a-Service GmbH
3+
4+
"""A post-fix formula evaluator that operates on `Sample` receivers."""
5+
6+
import asyncio
7+
from datetime import datetime
8+
from math import isinf, isnan
9+
from typing import Callable, Dict, Generic, List, Optional, Set
10+
11+
from .. import Sample
12+
from .._quantities import QuantityT
13+
from ._formula_steps import FormulaStep, MetricFetcher
14+
15+
16+
class FormulaEvaluator(Generic[QuantityT]):
17+
"""A post-fix formula evaluator that operates on `Sample` receivers."""
18+
19+
def __init__(
20+
self,
21+
name: str,
22+
steps: List[FormulaStep],
23+
metric_fetchers: Dict[str, MetricFetcher[QuantityT]],
24+
create_method: Callable[[float], QuantityT],
25+
) -> None:
26+
"""Create a `FormulaEngine` instance.
27+
28+
Args:
29+
name: A name for the formula.
30+
steps: Steps for the engine to execute, in post-fix order.
31+
metric_fetchers: Fetchers for each metric stream the formula depends on.
32+
create_method: A method to generate the output `Sample` value with. If the
33+
formula is for generating power values, this would be
34+
`Power.from_watts`, for example.
35+
"""
36+
self._name = name
37+
self._steps = steps
38+
self._metric_fetchers: Dict[str, MetricFetcher[QuantityT]] = metric_fetchers
39+
self._first_run = True
40+
self._create_method: Callable[[float], QuantityT] = create_method
41+
42+
async def _synchronize_metric_timestamps(
43+
self, metrics: Set[asyncio.Task[Optional[Sample[QuantityT]]]]
44+
) -> datetime:
45+
"""Synchronize the metric streams.
46+
47+
For synchronised streams like data from the `ComponentMetricsResamplingActor`,
48+
this a call to this function is required only once, before the first set of
49+
inputs are fetched.
50+
51+
Args:
52+
metrics: The finished tasks from the first `fetch_next` calls to all the
53+
`MetricFetcher`s.
54+
55+
Returns:
56+
The timestamp of the latest metric value.
57+
58+
Raises:
59+
RuntimeError: when some streams have no value, or when the synchronization
60+
of timestamps fails.
61+
"""
62+
metrics_by_ts: Dict[datetime, list[str]] = {}
63+
for metric in metrics:
64+
result = metric.result()
65+
name = metric.get_name()
66+
if result is None:
67+
raise RuntimeError(f"Stream closed for component: {name}")
68+
metrics_by_ts.setdefault(result.timestamp, []).append(name)
69+
latest_ts = max(metrics_by_ts)
70+
71+
# fetch the metrics with non-latest timestamps again until we have the values
72+
# for the same ts for all metrics.
73+
for metric_ts, names in metrics_by_ts.items():
74+
if metric_ts == latest_ts:
75+
continue
76+
while metric_ts < latest_ts:
77+
for name in names:
78+
fetcher = self._metric_fetchers[name]
79+
next_val = await fetcher.fetch_next()
80+
assert next_val is not None
81+
metric_ts = next_val.timestamp
82+
if metric_ts > latest_ts:
83+
raise RuntimeError(
84+
"Unable to synchronize resampled metric timestamps, "
85+
f"for formula: {self._name}"
86+
)
87+
self._first_run = False
88+
return latest_ts
89+
90+
async def apply(self) -> Sample[QuantityT]:
91+
"""Fetch the latest metrics, apply the formula once and return the result.
92+
93+
Returns:
94+
The result of the formula.
95+
96+
Raises:
97+
RuntimeError: if some samples didn't arrive, or if formula application
98+
failed.
99+
"""
100+
eval_stack: List[float] = []
101+
ready_metrics, pending = await asyncio.wait(
102+
[
103+
asyncio.create_task(fetcher.fetch_next(), name=name)
104+
for name, fetcher in self._metric_fetchers.items()
105+
],
106+
return_when=asyncio.ALL_COMPLETED,
107+
)
108+
109+
if pending or any(res.result() is None for res in iter(ready_metrics)):
110+
raise RuntimeError(
111+
f"Some resampled metrics didn't arrive, for formula: {self._name}"
112+
)
113+
114+
if self._first_run:
115+
metric_ts = await self._synchronize_metric_timestamps(ready_metrics)
116+
else:
117+
sample = next(iter(ready_metrics)).result()
118+
assert sample is not None
119+
metric_ts = sample.timestamp
120+
121+
for step in self._steps:
122+
step.apply(eval_stack)
123+
124+
# if all steps were applied and the formula was correct, there should only be a
125+
# single value in the evaluation stack, and that would be the formula result.
126+
if len(eval_stack) != 1:
127+
raise RuntimeError(f"Formula application failed: {self._name}")
128+
129+
res = eval_stack.pop()
130+
if isnan(res) or isinf(res):
131+
return Sample(metric_ts, None)
132+
133+
return Sample(metric_ts, self._create_method(res))

0 commit comments

Comments
 (0)