Skip to content

Commit a2c5a4b

Browse files
Handle NaN values in FormulaEngine (#99)
While you are at this, it should probably be a good idea to also treat *NaN*'s specially (probably the same as `None`). You can check using `math.isnan()`. (same for all steps) **OR**, maybe replace `None` with *NaN* and it should have the same effect of propagating `None` manually as you are doing here, but it will be done automatically. Then you only need to convert the final *NaN* in the stack to `None` and you are set. This option is easier to write (as you don't have to touch every step) but it might be harder read (to figure out what's going on). _Originally posted by @leandro-lucarella-frequenz in #95 (comment)
2 parents a1ce9b3 + e7ed408 commit a2c5a4b

File tree

3 files changed

+82
-19
lines changed

3 files changed

+82
-19
lines changed

src/frequenz/sdk/timeseries/logical_meter/_formula_engine.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import weakref
1111
from collections import deque
1212
from datetime import datetime
13+
from math import isinf, isnan
1314
from typing import Dict, List, Optional, Set, Tuple
1415
from uuid import UUID, uuid4
1516

@@ -154,7 +155,11 @@ async def _apply(self) -> Sample:
154155
if len(eval_stack) != 1:
155156
raise RuntimeError(f"Formula application failed: {self._name}")
156157

157-
return Sample(metric_ts, eval_stack[0])
158+
res = eval_stack.pop()
159+
if isnan(res) or isinf(res):
160+
res = None
161+
162+
return Sample(metric_ts, res)
158163

159164
async def _run(self) -> None:
160165
sender = self._channel.new_sender()

src/frequenz/sdk/timeseries/logical_meter/_formula_steps.py

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from __future__ import annotations
77

88
from abc import ABC, abstractmethod
9+
from math import isinf, isnan
910
from typing import List, Optional
1011

1112
from frequenz.channels import Receiver
@@ -57,10 +58,7 @@ def apply(self, eval_stack: List[Optional[float]]) -> None:
5758
"""
5859
val2 = eval_stack.pop()
5960
val1 = eval_stack.pop()
60-
if val1 is None or val2 is None:
61-
res = None
62-
else:
63-
res = val1 + val2
61+
res = val1 + val2
6462
eval_stack.append(res)
6563

6664

@@ -83,10 +81,7 @@ def apply(self, eval_stack: List[Optional[float]]) -> None:
8381
"""
8482
val2 = eval_stack.pop()
8583
val1 = eval_stack.pop()
86-
if val1 is None or val2 is None:
87-
res = None
88-
else:
89-
res = val1 - val2
84+
res = val1 - val2
9085
eval_stack.append(res)
9186

9287

@@ -109,10 +104,7 @@ def apply(self, eval_stack: List[Optional[float]]) -> None:
109104
"""
110105
val2 = eval_stack.pop()
111106
val1 = eval_stack.pop()
112-
if val1 is None or val2 is None:
113-
res = None
114-
else:
115-
res = val1 * val2
107+
res = val1 * val2
116108
eval_stack.append(res)
117109

118110

@@ -135,10 +127,7 @@ def apply(self, eval_stack: List[Optional[float]]) -> None:
135127
"""
136128
val2 = eval_stack.pop()
137129
val1 = eval_stack.pop()
138-
if val1 is None or val2 is None:
139-
res = None
140-
else:
141-
res = val1 / val2
130+
res = val1 / val2
142131
eval_stack.append(res)
143132

144133

@@ -265,7 +254,12 @@ def apply(self, eval_stack: List[Optional[float]]) -> None:
265254
"""
266255
if self._next_value is None:
267256
raise RuntimeError("No next value available to append.")
268-
if self._next_value.value is None and self._nones_are_zeros:
269-
eval_stack.append(0.0)
257+
258+
next_value = self._next_value.value
259+
if next_value is None or isnan(next_value) or isinf(next_value):
260+
if self._nones_are_zeros:
261+
eval_stack.append(0.0)
262+
else:
263+
eval_stack.append(float("NaN"))
270264
else:
271265
eval_stack.append(self._next_value.value)

tests/timeseries/test_formula_engine.py

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -541,3 +541,67 @@ async def test_nones_are_not_zeros(self) -> None:
541541
],
542542
False,
543543
)
544+
545+
546+
class TestFormulaAverager:
547+
"""Tests for the formula step for calculating average."""
548+
549+
async def run_test(
550+
self,
551+
components: List[str],
552+
io_pairs: List[Tuple[List[Optional[float]], Optional[float]]],
553+
) -> None:
554+
"""Run a formula test."""
555+
channels: Dict[str, Broadcast[Sample]] = {}
556+
streams: List[Tuple[str, Receiver[Sample], bool]] = []
557+
builder = FormulaBuilder("test_averager")
558+
for comp_id in components:
559+
if comp_id not in channels:
560+
channels[comp_id] = Broadcast(comp_id)
561+
streams.append((f"{comp_id}", channels[comp_id].new_receiver(), False))
562+
563+
builder.push_average(streams)
564+
engine = builder.build()
565+
566+
now = datetime.now()
567+
tests_passed = 0
568+
for io_pair in io_pairs:
569+
io_input, io_output = io_pair
570+
assert all(
571+
await asyncio.gather(
572+
*[
573+
chan.new_sender().send(Sample(now, value))
574+
for chan, value in zip(channels.values(), io_input)
575+
]
576+
)
577+
)
578+
next_val = await engine._apply() # pylint: disable=protected-access
579+
assert (next_val).value == io_output
580+
tests_passed += 1
581+
await engine._stop() # pylint: disable=protected-access
582+
assert tests_passed == len(io_pairs)
583+
584+
async def test_simple(self) -> None:
585+
"""Test simple formulas."""
586+
await self.run_test(
587+
["#2", "#4", "#5"],
588+
[
589+
([10.0, 12.0, 14.0], 12.0),
590+
([15.0, 17.0, 19.0], 17.0),
591+
([11.1, 11.1, 11.1], 11.1),
592+
],
593+
)
594+
595+
async def test_nones_are_skipped(self) -> None:
596+
"""Test that `None`s are skipped for computing the average."""
597+
await self.run_test(
598+
["#2", "#4", "#5"],
599+
[
600+
([11.0, 13.0, 15.0], 13.0),
601+
([None, 13.0, 19.0], 16.0),
602+
([12.2, None, 22.2], 17.2),
603+
([16.5, 19.5, None], 18.0),
604+
([None, 13.0, None], 13.0),
605+
([None, None, None], 0.0),
606+
],
607+
)

0 commit comments

Comments
 (0)