Skip to content

Commit e0bd82f

Browse files
committed
Add support for calculating averages in FormulaEngine
Signed-off-by: Sahas Subramanian <[email protected]>
1 parent 2162eaf commit e0bd82f

File tree

3 files changed

+84
-1
lines changed

3 files changed

+84
-1
lines changed
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
# License: MIT
2+
# Copyright © 2022 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Formula Engine Exceptions."""
5+
6+
7+
class FormulaEngineError(Exception):
8+
"""An error occured while fetching metrics or applying the formula on them."""

src/frequenz/sdk/timeseries/logical_meter/_formula_engine.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,14 @@
77

88
import asyncio
99
from datetime import datetime
10-
from typing import Dict, List, Optional, Set
10+
from typing import Dict, List, Optional, Set, Tuple
1111

1212
from frequenz.channels import Receiver
1313

1414
from .. import Sample
1515
from ._formula_steps import (
1616
Adder,
17+
Averager,
1718
Divider,
1819
FormulaStep,
1920
MetricFetcher,
@@ -218,6 +219,20 @@ def push_metric(
218219
)
219220
self._steps.append(fetcher)
220221

222+
def push_average(self, metrics: List[Tuple[str, Receiver[Sample], bool]]) -> None:
223+
"""Push an average calculator into the engine.
224+
225+
Args:
226+
metrics: list of arguments to pass to each `MetricFetcher`.
227+
"""
228+
fetchers: List[MetricFetcher] = []
229+
for metric in metrics:
230+
fetcher = self._metric_fetchers.setdefault(
231+
metric[0], MetricFetcher(*metric)
232+
)
233+
fetchers.append(fetcher)
234+
self._steps.append(Averager(fetchers))
235+
221236
def build(self) -> FormulaEngine:
222237
"""Finalize and build the formula engine.
223238

src/frequenz/sdk/timeseries/logical_meter/_formula_steps.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,15 @@
33

44
"""Steps for building formula engines with."""
55

6+
from __future__ import annotations
7+
68
from abc import ABC, abstractmethod
79
from typing import List, Optional
810

911
from frequenz.channels import Receiver
1012

1113
from .. import Sample
14+
from ._exceptions import FormulaEngineError
1215

1316

1417
class FormulaStep(ABC):
@@ -157,6 +160,54 @@ def apply(self, _: List[Optional[float]]) -> None:
157160
"""No-op."""
158161

159162

163+
class Averager(FormulaStep):
164+
"""A formula step for calculating average."""
165+
166+
def __init__(self, fetchers: List[MetricFetcher]) -> None:
167+
"""Create an `Averager` instance.
168+
169+
Args:
170+
fetchers: MetricFetchers for the metrics that need to be averaged.
171+
"""
172+
self._fetchers = fetchers
173+
174+
def __repr__(self) -> str:
175+
"""Return a string representation of the step.
176+
177+
Returns:
178+
A string representation of the step.
179+
"""
180+
return f"avg({', '.join(repr(f) for f in self._fetchers)})"
181+
182+
def apply(self, eval_stack: List[Optional[float]]) -> None:
183+
"""Calculate average of given metrics, push the average to the eval_stack.
184+
185+
Args:
186+
eval_stack: An evaluation stack, to append the calculated average to.
187+
188+
Raises:
189+
FormulaEngineError: when metric fetchers are unable to fetch values.
190+
"""
191+
value_count = 0
192+
total = 0.0
193+
for fetcher in self._fetchers:
194+
next_val = fetcher.value
195+
if next_val is None:
196+
raise FormulaEngineError(
197+
"Unable to fetch a value from the resampling actor."
198+
)
199+
if next_val.value is None:
200+
continue
201+
value_count += 1
202+
total += next_val.value
203+
if value_count == 0:
204+
avg = 0.0
205+
else:
206+
avg = total / value_count
207+
208+
eval_stack.append(avg)
209+
210+
160211
class MetricFetcher(FormulaStep):
161212
"""A formula step for fetching a value from a metric Receiver."""
162213

@@ -186,6 +237,15 @@ async def fetch_next(self) -> Optional[Sample]:
186237
self._next_value = await self._stream.receive()
187238
return self._next_value
188239

240+
@property
241+
def value(self) -> Optional[Sample]:
242+
"""Get the next value in the stream.
243+
244+
Returns:
245+
Next value in the stream.
246+
"""
247+
return self._next_value
248+
189249
def __repr__(self) -> str:
190250
"""Return a string representation of the step.
191251

0 commit comments

Comments
 (0)