Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@

- The `min` and `max` functions in the `FormulaEngine` are now public. Note that the same functions have been public in the builder.

- Drop `Averager` from `FormulaEngine`.

## Bug Fixes

<!-- Here goes notable bug fixes that are worth a special mention or explanation -->
18 changes: 0 additions & 18 deletions src/frequenz/sdk/timeseries/_formula_engine/_formula_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
from ._formula_formatter import format_formula
from ._formula_steps import (
Adder,
Averager,
Clipper,
ConstantValue,
Divider,
Expand Down Expand Up @@ -589,23 +588,6 @@ def push_clipper(self, min_value: float | None, max_value: float | None) -> None
"""
self._steps.append(Clipper(min_value, max_value))

def push_average(
self, metrics: list[tuple[str, Receiver[Sample[QuantityT]], bool]]
) -> None:
"""Push an average calculator into the engine.

Args:
metrics: list of arguments to pass to each `MetricFetcher`.
"""
fetchers: list[MetricFetcher[QuantityT]] = []
for metric in metrics:
fetcher = self._metric_fetchers.setdefault(
metric[0],
MetricFetcher(metric[0], metric[1], nones_are_zeros=metric[2]),
)
fetchers.append(fetcher)
self._steps.append(Averager(fetchers))

@property
def name(self) -> str:
"""Return the name of the formula being built.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

from ._formula_steps import (
Adder,
Averager,
Clipper,
ConstantValue,
Divider,
Expand Down Expand Up @@ -200,12 +199,6 @@ def format(self, postfix_expr: list[FormulaStep]) -> str:
self._format_binary(Operator.MULTIPLICATION)
case Divider():
self._format_binary(Operator.DIVISION)
case Averager():
value = (
# pylint: disable=protected-access
f"avg({', '.join(self.format([f]) for f in step.fetchers)})"
)
self._stack.append(StackItem(value, OperatorPrecedence.PRIMARY, 1))
case Clipper():
the_value = self._stack.pop()
min_value = step.min_value if step.min_value is not None else "-inf"
Expand Down
60 changes: 1 addition & 59 deletions src/frequenz/sdk/timeseries/_formula_engine/_formula_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,12 @@

import math
from abc import ABC, abstractmethod
from typing import Generic, Sequence
from typing import Generic

from frequenz.channels import Receiver

from .. import Sample
from .._quantities import QuantityT
from ._exceptions import FormulaEngineError


class FormulaStep(ABC):
Expand Down Expand Up @@ -196,63 +195,6 @@ def apply(self, _: list[float]) -> None:
"""No-op."""


class Averager(Generic[QuantityT], FormulaStep):
"""A formula step for calculating average."""

def __init__(self, fetchers: list[MetricFetcher[QuantityT]]) -> None:
"""Create an `Averager` instance.

Args:
fetchers: MetricFetchers for the metrics that need to be averaged.
"""
self._fetchers: list[MetricFetcher[QuantityT]] = fetchers

@property
def fetchers(self) -> Sequence[MetricFetcher[QuantityT]]:
"""Return the metric fetchers.

Returns:
The metric fetchers.
"""
return self._fetchers

def __repr__(self) -> str:
"""Return a string representation of the step.

Returns:
A string representation of the step.
"""
return f"avg({', '.join(repr(f) for f in self._fetchers)})"

def apply(self, eval_stack: list[float]) -> None:
"""Calculate average of given metrics, push the average to the eval_stack.

Args:
eval_stack: An evaluation stack, to append the calculated average to.

Raises:
FormulaEngineError: when metric fetchers are unable to fetch values.
"""
value_count = 0
total = 0.0
for fetcher in self._fetchers:
next_val = fetcher.value
if next_val is None:
raise FormulaEngineError(
"Unable to fetch a value from the resampling actor."
)
if next_val.value is None:
continue
value_count += 1
total += next_val.value.base_value
if value_count == 0:
avg = 0.0
else:
avg = total / value_count

eval_stack.append(avg)


class ConstantValue(FormulaStep):
"""A formula step for inserting a constant value."""

Expand Down
67 changes: 0 additions & 67 deletions tests/timeseries/test_formula_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -643,73 +643,6 @@ async def test_nones_are_not_zeros(self) -> None:
)


class TestFormulaAverager:
"""Tests for the formula step for calculating average."""

async def run_test(
self,
components: list[str],
io_pairs: list[tuple[list[float | None], float | None]],
) -> None:
"""Run a formula test."""
channels: dict[str, Broadcast[Sample[Quantity]]] = {}
streams: list[tuple[str, Receiver[Sample[Quantity]], bool]] = []
builder = FormulaBuilder("test_averager", create_method=Quantity)
for comp_id in components:
if comp_id not in channels:
channels[comp_id] = Broadcast(comp_id)
streams.append((f"{comp_id}", channels[comp_id].new_receiver(), False))

builder.push_average(streams)
engine = builder.build()
results_rx = engine.new_receiver()
now = datetime.now()
tests_passed = 0
for io_pair in io_pairs:
io_input, io_output = io_pair
await asyncio.gather(
*[
chan.new_sender().send(
Sample(now, None if not value else Quantity(value))
)
for chan, value in zip(channels.values(), io_input)
]
)
next_val = await results_rx.receive()
if io_output is None:
assert next_val.value is None
else:
assert next_val.value and next_val.value.base_value == io_output
tests_passed += 1
await engine._stop() # pylint: disable=protected-access
assert tests_passed == len(io_pairs)

async def test_simple(self) -> None:
"""Test simple formulas."""
await self.run_test(
["#2", "#4", "#5"],
[
([10.0, 12.0, 14.0], 12.0),
([15.0, 17.0, 19.0], 17.0),
([11.1, 11.1, 11.1], 11.1),
],
)

async def test_nones_are_skipped(self) -> None:
"""Test that `None`s are skipped for computing the average."""
await self.run_test(
["#2", "#4", "#5"],
[
([11.0, 13.0, 15.0], 13.0),
([None, 13.0, 19.0], 16.0),
([12.2, None, 22.2], 17.2),
([16.5, 19.5, None], 18.0),
([None, 13.0, None], 13.0),
([None, None, None], 0.0),
],
)


class TestConstantValue:
"""Tests for the constant value step."""

Expand Down
8 changes: 1 addition & 7 deletions tests/timeseries/test_formula_formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,14 @@
from frequenz.sdk.timeseries._formula_engine._formula_engine import FormulaBuilder
from frequenz.sdk.timeseries._formula_engine._formula_formatter import format_formula
from frequenz.sdk.timeseries._formula_engine._formula_steps import (
Averager,
Clipper,
ConstantValue,
FormulaStep,
Maximizer,
Minimizer,
)
from frequenz.sdk.timeseries._formula_engine._tokenizer import Tokenizer, TokenType
from frequenz.sdk.timeseries._quantities import Percentage, Quantity
from frequenz.sdk.timeseries._quantities import Quantity
from tests.timeseries.mock_microgrid import MockMicrogrid


Expand Down Expand Up @@ -111,11 +110,6 @@ def test_functions(self) -> None:
# flake8: enable
# fmt: on

def test_function_avg(self) -> None:
"""Test that the avg function is formatted correctly."""
# This tests the special case of the avg function with no arguments.
assert format_formula([Averager[Percentage]([])]) == "avg()"

async def test_higher_order_formula(self, mocker: MockerFixture) -> None:
"""Test that the formula is formatted correctly for a higher-order formula."""
mockgrid = MockMicrogrid(grid_meter=False)
Expand Down