Skip to content

Commit e058855

Browse files
Support handling None values in LogicalMeter/FormulaEngine (#95)
The Resampling actor sends `None` values out when there is no data from a component. Currently the logical meter raises an exception if it sees a `None` value from any of its streams, which kills the LogicalMeter. This is incorrect. `None` values should instead be treated either as `None` or `0` based on context. To this end, a new parameter `nones_are_zeros` is introduced in the LogicalMeter and FormulaEngine. If this field is False, then when there are None values in any of the input streams, the calculated values will be None. When it is True, the None values will be treated as 0.0 when the formula is evaluated.
2 parents b3afe70 + 1af5c63 commit e058855

File tree

5 files changed

+132
-28
lines changed

5 files changed

+132
-28
lines changed

src/frequenz/sdk/timeseries/logical_meter/_formula_builder.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,16 +58,24 @@ async def _get_resampled_receiver(self, component_id: int) -> Receiver[Sample]:
5858
await self._resampler_subscription_sender.send(request)
5959
return self._channel_registry.new_receiver(request.get_channel_name())
6060

61-
async def push_component_metric(self, component_id: int) -> None:
61+
async def push_component_metric(
62+
self, component_id: int, nones_are_zeros: bool
63+
) -> None:
6264
"""Push a resampled component metric stream to the formula engine.
6365
6466
Args:
6567
component_id: The component id for which to push a metric fetcher.
68+
nones_are_zeros: Whether to treat None values from the stream as 0s. If
69+
False, the returned value will be a None.
6670
"""
6771
receiver = await self._get_resampled_receiver(component_id)
68-
self._formula.push_metric(f"#{component_id}", receiver)
72+
self._formula.push_metric(f"#{component_id}", receiver, nones_are_zeros)
6973

70-
async def from_string(self, formula: str) -> FormulaEngine:
74+
async def from_string(
75+
self,
76+
formula: str,
77+
nones_are_zeros: bool,
78+
) -> FormulaEngine:
7179
"""Construct a `FormulaEngine` from the given formula string.
7280
7381
Formulas can have Component IDs that are preceeded by a pound symbol("#"), and
@@ -78,6 +86,8 @@ async def from_string(self, formula: str) -> FormulaEngine:
7886
7987
Args:
8088
formula: A string formula.
89+
nones_are_zeros: Whether to treat None values from the stream as 0s. If
90+
False, the returned value will be a None.
8191
8292
Returns:
8393
A FormulaEngine instance corresponding to the given formula.
@@ -89,7 +99,7 @@ async def from_string(self, formula: str) -> FormulaEngine:
8999

90100
for token in tokenizer:
91101
if token.type == TokenType.COMPONENT_METRIC:
92-
await self.push_component_metric(int(token.value))
102+
await self.push_component_metric(int(token.value), nones_are_zeros)
93103
elif token.type == TokenType.OPER:
94104
self._formula.push_oper(token.value)
95105
else:

src/frequenz/sdk/timeseries/logical_meter/_formula_engine.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,15 +95,22 @@ def push_oper(self, oper: str) -> None:
9595
elif oper == "(":
9696
self._build_stack.append(OpenParen())
9797

98-
def push_metric(self, name: str, data_stream: Receiver[Sample]) -> None:
98+
def push_metric(
99+
self,
100+
name: str,
101+
data_stream: Receiver[Sample],
102+
nones_are_zeros: bool,
103+
) -> None:
99104
"""Push a metric receiver into the engine.
100105
101106
Args:
102107
name: A name for the metric.
103108
data_stream: A receiver to fetch this metric from.
109+
nones_are_zeros: Whether to treat None values from the stream as 0s. If
110+
False, the returned value will be a None.
104111
"""
105112
fetcher = self._metric_fetchers.setdefault(
106-
name, MetricFetcher(name, data_stream)
113+
name, MetricFetcher(name, data_stream, nones_are_zeros)
107114
)
108115
self._steps.append(fetcher)
109116

@@ -171,7 +178,7 @@ async def apply(self) -> Sample:
171178
RuntimeError: if some samples didn't arrive, or if formula application
172179
failed.
173180
"""
174-
eval_stack: List[float] = []
181+
eval_stack: List[Optional[float]] = []
175182
ready_metrics, pending = await asyncio.wait(
176183
[
177184
asyncio.create_task(fetcher.fetch_next(), name=name)

src/frequenz/sdk/timeseries/logical_meter/_formula_steps.py

Lines changed: 37 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ def __repr__(self) -> str:
2727
"""
2828

2929
@abstractmethod
30-
def apply(self, eval_stack: List[float]) -> None:
30+
def apply(self, eval_stack: List[Optional[float]]) -> None:
3131
"""Apply a formula operation on the eval_stack.
3232
3333
Args:
@@ -46,15 +46,19 @@ def __repr__(self) -> str:
4646
"""
4747
return "+"
4848

49-
def apply(self, eval_stack: List[float]) -> None:
49+
def apply(self, eval_stack: List[Optional[float]]) -> None:
5050
"""Extract two values from the stack, add them, push the result back in.
5151
5252
Args:
5353
eval_stack: An evaluation stack, to apply the formula step on.
5454
"""
5555
val2 = eval_stack.pop()
5656
val1 = eval_stack.pop()
57-
eval_stack.append(val1 + val2)
57+
if val1 is None or val2 is None:
58+
res = None
59+
else:
60+
res = val1 + val2
61+
eval_stack.append(res)
5862

5963

6064
class Subtractor(FormulaStep):
@@ -68,15 +72,19 @@ def __repr__(self) -> str:
6872
"""
6973
return "-"
7074

71-
def apply(self, eval_stack: List[float]) -> None:
75+
def apply(self, eval_stack: List[Optional[float]]) -> None:
7276
"""Extract two values from the stack, subtract them, push the result back in.
7377
7478
Args:
7579
eval_stack: An evaluation stack, to apply the formula step on.
7680
"""
7781
val2 = eval_stack.pop()
7882
val1 = eval_stack.pop()
79-
eval_stack.append(val1 - val2)
83+
if val1 is None or val2 is None:
84+
res = None
85+
else:
86+
res = val1 - val2
87+
eval_stack.append(res)
8088

8189

8290
class Multiplier(FormulaStep):
@@ -90,15 +98,19 @@ def __repr__(self) -> str:
9098
"""
9199
return "*"
92100

93-
def apply(self, eval_stack: List[float]) -> None:
101+
def apply(self, eval_stack: List[Optional[float]]) -> None:
94102
"""Extract two values from the stack, multiply them, push the result back in.
95103
96104
Args:
97105
eval_stack: An evaluation stack, to apply the formula step on.
98106
"""
99107
val2 = eval_stack.pop()
100108
val1 = eval_stack.pop()
101-
eval_stack.append(val1 * val2)
109+
if val1 is None or val2 is None:
110+
res = None
111+
else:
112+
res = val1 * val2
113+
eval_stack.append(res)
102114

103115

104116
class Divider(FormulaStep):
@@ -112,15 +124,19 @@ def __repr__(self) -> str:
112124
"""
113125
return "/"
114126

115-
def apply(self, eval_stack: List[float]) -> None:
127+
def apply(self, eval_stack: List[Optional[float]]) -> None:
116128
"""Extract two values from the stack, divide them, push the result back in.
117129
118130
Args:
119131
eval_stack: An evaluation stack, to apply the formula step on.
120132
"""
121133
val2 = eval_stack.pop()
122134
val1 = eval_stack.pop()
123-
eval_stack.append(val1 / val2)
135+
if val1 is None or val2 is None:
136+
res = None
137+
else:
138+
res = val1 / val2
139+
eval_stack.append(res)
124140

125141

126142
class OpenParen(FormulaStep):
@@ -137,23 +153,27 @@ def __repr__(self) -> str:
137153
"""
138154
return "("
139155

140-
def apply(self, _: List[float]) -> None:
156+
def apply(self, _: List[Optional[float]]) -> None:
141157
"""No-op."""
142158

143159

144160
class MetricFetcher(FormulaStep):
145161
"""A formula step for fetching a value from a metric Receiver."""
146162

147-
def __init__(self, name: str, stream: Receiver[Sample]) -> None:
163+
def __init__(
164+
self, name: str, stream: Receiver[Sample], nones_are_zeros: bool
165+
) -> None:
148166
"""Create a `MetricFetcher` instance.
149167
150168
Args:
151169
name: The name of the metric.
152170
stream: A channel receiver from which to fetch samples.
171+
nones_are_zeros: Whether to treat None values from the stream as 0s.
153172
"""
154173
self._name = name
155174
self._stream = stream
156175
self._next_value: Optional[Sample] = None
176+
self._nones_are_zeros = nones_are_zeros
157177

158178
async def fetch_next(self) -> Optional[Sample]:
159179
"""Fetch the next value from the stream.
@@ -174,7 +194,7 @@ def __repr__(self) -> str:
174194
"""
175195
return self._name
176196

177-
def apply(self, eval_stack: List[float]) -> None:
197+
def apply(self, eval_stack: List[Optional[float]]) -> None:
178198
"""Push the latest value from the stream into the evaluation stack.
179199
180200
Args:
@@ -183,6 +203,9 @@ def apply(self, eval_stack: List[float]) -> None:
183203
Raises:
184204
RuntimeError: No next value available to append.
185205
"""
186-
if self._next_value is None or self._next_value.value is None:
206+
if self._next_value is None:
187207
raise RuntimeError("No next value available to append.")
188-
eval_stack.append(self._next_value.value)
208+
if self._next_value.value is None and self._nones_are_zeros:
209+
eval_stack.append(0.0)
210+
else:
211+
eval_stack.append(self._next_value.value)

src/frequenz/sdk/timeseries/logical_meter/_logical_meter.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,15 +52,15 @@ def __init__(
5252
self._tasks: List[asyncio.Task[None]] = []
5353

5454
async def _engine_from_formula_string(
55-
self, formula: str, metric_id: ComponentMetricId
55+
self, formula: str, metric_id: ComponentMetricId, nones_are_zeros: bool
5656
) -> FormulaEngine:
5757
builder = FormulaBuilder(
5858
self._namespace,
5959
self._channel_registry,
6060
self._resampler_subscription_sender,
6161
metric_id,
6262
)
63-
return await builder.from_string(formula)
63+
return await builder.from_string(formula, nones_are_zeros)
6464

6565
async def _run_formula(
6666
self, formula: FormulaEngine, sender: Sender[Sample]
@@ -78,13 +78,16 @@ async def start_formula(
7878
self,
7979
formula: str,
8080
component_metric_id: ComponentMetricId,
81+
nones_are_zeros: bool = False,
8182
) -> Receiver[Sample]:
8283
"""Start execution of the given formula name.
8384
8485
Args:
8586
formula: formula to execute.
8687
component_metric_id: The metric ID to use when fetching receivers from the
8788
resampling actor.
89+
nones_are_zeros: Whether to treat None values from the stream as 0s. If
90+
False, the returned value will be a None.
8891
8992
Returns:
9093
A Receiver that streams values with the formulas applied.
@@ -94,8 +97,7 @@ async def start_formula(
9497
return self._output_channels[channel_key].new_receiver()
9598

9699
formula_engine = await self._engine_from_formula_string(
97-
formula,
98-
component_metric_id,
100+
formula, component_metric_id, nones_are_zeros
99101
)
100102
out_chan = Broadcast[Sample](channel_key)
101103
self._output_channels[channel_key] = out_chan

tests/timeseries/test_formula_engine.py

Lines changed: 65 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
"""Tests for the FormulaEngine and the Tokenizer."""
55

66
from datetime import datetime
7-
from typing import Dict, List, Tuple
7+
from typing import Dict, List, Optional, Tuple
88

99
from frequenz.channels import Broadcast
1010

@@ -56,7 +56,11 @@ def setup(self) -> None:
5656
# pylint: enable=attribute-defined-outside-init
5757

5858
async def run_test(
59-
self, formula: str, postfix: str, io_pairs: List[Tuple[List[float], float]]
59+
self,
60+
formula: str,
61+
postfix: str,
62+
io_pairs: List[Tuple[List[Optional[float]], Optional[float]]],
63+
nones_are_zeros: bool = False,
6064
) -> None:
6165
channels: Dict[str, Broadcast[Sample]] = {}
6266
engine = FormulaEngine()
@@ -65,7 +69,9 @@ async def run_test(
6569
if token.value not in channels:
6670
channels[token.value] = Broadcast(token.value)
6771
engine.push_metric(
68-
f"#{token.value}", channels[token.value].new_receiver()
72+
f"#{token.value}",
73+
channels[token.value].new_receiver(),
74+
nones_are_zeros,
6975
)
7076
elif token.type == TokenType.OPER:
7177
engine.push_oper(token.value)
@@ -170,3 +176,59 @@ async def test_compound(self) -> None:
170176
([15.0, 17.0, 20.0, 5.0], 28.0),
171177
],
172178
)
179+
180+
async def test_nones_are_zeros(self) -> None:
181+
"""Test that `None`s are treated as zeros when configured."""
182+
await self.run_test(
183+
"#2 - #4 + #5",
184+
"[#2, #4, -, #5, +]",
185+
[
186+
([10.0, 12.0, 15.0], 13.0),
187+
([None, 12.0, 15.0], 3.0),
188+
([10.0, None, 15.0], 25.0),
189+
([15.0, 17.0, 20.0], 18.0),
190+
([15.0, None, None], 15.0),
191+
],
192+
True,
193+
)
194+
195+
await self.run_test(
196+
"#2 + #4 - (#5 * #6)",
197+
"[#2, #4, #5, #6, *, -, +]",
198+
[
199+
([10.0, 12.0, 15.0, 2.0], -8.0),
200+
([10.0, 12.0, 15.0, None], 22.0),
201+
([10.0, None, 15.0, 2.0], -20.0),
202+
([15.0, 17.0, 20.0, 5.0], -68.0),
203+
([15.0, 17.0, None, 5.0], 32.0),
204+
],
205+
True,
206+
)
207+
208+
async def test_nones_are_not_zeros(self) -> None:
209+
"""Test that calculated values are `None` on input `None`s."""
210+
await self.run_test(
211+
"#2 - #4 + #5",
212+
"[#2, #4, -, #5, +]",
213+
[
214+
([10.0, 12.0, 15.0], 13.0),
215+
([None, 12.0, 15.0], None),
216+
([10.0, None, 15.0], None),
217+
([15.0, 17.0, 20.0], 18.0),
218+
([15.0, None, None], None),
219+
],
220+
False,
221+
)
222+
223+
await self.run_test(
224+
"#2 + #4 - (#5 * #6)",
225+
"[#2, #4, #5, #6, *, -, +]",
226+
[
227+
([10.0, 12.0, 15.0, 2.0], -8.0),
228+
([10.0, 12.0, 15.0, None], None),
229+
([10.0, None, 15.0, 2.0], None),
230+
([15.0, 17.0, 20.0, 5.0], -68.0),
231+
([15.0, 17.0, None, 5.0], None),
232+
],
233+
False,
234+
)

0 commit comments

Comments
 (0)