Skip to content

Commit 9b02348

Browse files
committed
Apply suggestions from review comments
Signed-off-by: Sahas Subramanian <[email protected]>
1 parent 1cf3d4b commit 9b02348

File tree

7 files changed

+28
-28
lines changed

7 files changed

+28
-28
lines changed

src/frequenz/sdk/timeseries/formulas/_ast.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -310,9 +310,7 @@ async def evaluate(self) -> Sample[QuantityT] | QuantityT | None:
310310
value=left.value * right.value.base_value,
311311
)
312312
case Quantity(), Quantity():
313-
return left.__class__._new( # pylint: disable=protected-access
314-
left.base_value * right.base_value
315-
)
313+
return left * right.base_value
316314
case (Sample(), Quantity()):
317315
return (
318316
left
@@ -378,12 +376,12 @@ async def evaluate(self) -> Sample[QuantityT] | QuantityT | None:
378376
match left, right:
379377
case Sample(), Sample():
380378
if left.value is None:
381-
return None
379+
return left
382380
if right.value is None:
383-
return None
381+
return right
384382
if is_close_to_zero(right.value.base_value):
385383
_logger.warning("Division by zero encountered in formula.")
386-
return None
384+
return Sample(left.timestamp, None)
387385
return Sample(left.timestamp, left.value / right.value.base_value)
388386
case Quantity(), Quantity():
389387
if is_close_to_zero(right.base_value):

src/frequenz/sdk/timeseries/formulas/_base_ast_node.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
_logger = logging.getLogger(__name__)
1919

20+
_MAX_SYNC_RETRIES = 10
21+
2022

2123
@dataclass(kw_only=True)
2224
class AstNode(abc.ABC, Generic[QuantityT]):
@@ -103,7 +105,7 @@ async def evaluate(
103105

104106
return await self._synchronize_to_timestamp(values, nodes, target_timestamp)
105107

106-
return [await node.evaluate() for node in nodes]
108+
return await asyncio.gather(*(node.evaluate() for node in nodes))
107109

108110
async def _synchronize_to_timestamp(
109111
self,
@@ -114,17 +116,18 @@ async def _synchronize_to_timestamp(
114116
for i, value in enumerate(values):
115117
if isinstance(value, Sample):
116118
ctr = 0
117-
while ctr < 10 and value.timestamp < target_timestamp:
119+
while ctr < _MAX_SYNC_RETRIES and value.timestamp < target_timestamp:
118120
value = await nodes[i].evaluate()
119121
if not isinstance(value, Sample):
120122
raise RuntimeError(
121123
"Subsequent AST node evaluation did not return a Sample"
122124
)
123125
values[i] = value
124126
ctr += 1
125-
if ctr >= 10 and value.timestamp < target_timestamp:
127+
if ctr >= _MAX_SYNC_RETRIES and value.timestamp < target_timestamp:
126128
raise RuntimeError(
127-
"Could not synchronize AST node evaluations after 10 tries"
129+
"Could not synchronize AST node evaluations after "
130+
+ f"{_MAX_SYNC_RETRIES} tries"
128131
)
129132
if value.timestamp > target_timestamp:
130133
self._latest_values[id(nodes[i])] = value

src/frequenz/sdk/timeseries/formulas/_formula.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,9 @@ def metric_fetcher(
3232
) -> Callable[[], Coroutine[None, None, Receiver[Sample[QuantityT]]]]:
3333
"""Fetch a receiver for the formula's output samples."""
3434

35-
async def fetcher(formula: Formula[QuantityT]) -> Receiver[Sample[QuantityT]]:
36-
formula.start()
37-
return formula.new_receiver()
35+
async def fetcher(f: Formula[QuantityT]) -> Receiver[Sample[QuantityT]]:
36+
f.start()
37+
return f.new_receiver()
3838

3939
return lambda: fetcher(formula)
4040

src/frequenz/sdk/timeseries/formulas/_formula_3_phase_evaluator.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -97,9 +97,7 @@ async def _run(self) -> None:
9797
):
9898
# This should never happen because the components are Formula
9999
# instances
100-
raise RuntimeError(
101-
"Expected all phase samples to be of type Sample3Phase"
102-
)
100+
raise RuntimeError("Expected all phase samples to be of type Sample")
103101

104102
sample_3phase = Sample3Phase(
105103
timestamp=phase_1_sample.timestamp,

src/frequenz/sdk/timeseries/formulas/_functions.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ def from_string(
100100
raise ValueError(f"Unknown function name: {name}")
101101

102102

103+
@dataclass
103104
class Coalesce(Function[QuantityT]):
104105
"""A function that returns the first non-None argument."""
105106

@@ -117,7 +118,7 @@ async def __call__(self) -> Sample[QuantityT] | QuantityT | None:
117118
ts: datetime | None = None
118119

119120
if self.num_subscribed == 0:
120-
await self._subscribe_next()
121+
await self._subscribe_next_param()
121122

122123
args = await self._synchronizer.evaluate(
123124
self.params[: self.num_subscribed], sync_to_first_node=True
@@ -128,13 +129,13 @@ async def __call__(self) -> Sample[QuantityT] | QuantityT | None:
128129
if value is not None:
129130
# Found a non-None value, unsubscribe from subsequent params
130131
if ctr < self.num_subscribed:
131-
await self._unsubscribe_after(ctr)
132+
await self._unsubscribe_all_params_after(ctr)
132133
return arg
133134
ts = timestamp
134135
case Quantity():
135136
# Found a non-None value, unsubscribe from subsequent params
136137
if ctr < self.num_subscribed:
137-
await self._unsubscribe_after(ctr)
138+
await self._unsubscribe_all_params_after(ctr)
138139
if ts is not None:
139140
return Sample(timestamp=ts, value=arg)
140141
return arg
@@ -143,7 +144,7 @@ async def __call__(self) -> Sample[QuantityT] | QuantityT | None:
143144
# Don't have a non-None value yet, subscribe to the next parameter for
144145
# next time and return None for now, unless the next value is a constant.
145146
next_value: Sample[QuantityT] | QuantityT | None = None
146-
await self._subscribe_next()
147+
await self._subscribe_next_param()
147148

148149
if isinstance(self.params[self.num_subscribed - 1], Constant):
149150
next_value = await self.params[self.num_subscribed - 1].evaluate()
@@ -158,9 +159,9 @@ async def __call__(self) -> Sample[QuantityT] | QuantityT | None:
158159
async def subscribe(self) -> None:
159160
"""Subscribe to the first parameter if not already subscribed."""
160161
if self.num_subscribed == 0:
161-
await self._subscribe_next()
162+
await self._subscribe_next_param()
162163

163-
async def _subscribe_next(self) -> None:
164+
async def _subscribe_next_param(self) -> None:
164165
"""Subscribe to the next parameter."""
165166
if self.num_subscribed < len(self.params):
166167
_logger.debug(
@@ -171,7 +172,7 @@ async def _subscribe_next(self) -> None:
171172
await self.params[self.num_subscribed].subscribe()
172173
self.num_subscribed += 1
173174

174-
async def _unsubscribe_after(self, index: int) -> None:
175+
async def _unsubscribe_all_params_after(self, index: int) -> None:
175176
"""Unsubscribe from parameters after the given index."""
176177
for param in self.params[index:]:
177178
_logger.debug(
@@ -182,6 +183,7 @@ async def _unsubscribe_after(self, index: int) -> None:
182183
self.num_subscribed = index
183184

184185

186+
@dataclass
185187
class Max(Function[QuantityT]):
186188
"""A function that returns the maximum of the arguments."""
187189

@@ -216,6 +218,7 @@ async def __call__(self) -> Sample[QuantityT] | QuantityT | None:
216218
return max_value
217219

218220

221+
@dataclass
219222
class Min(Function[QuantityT]):
220223
"""A function that returns the minimum of the arguments."""
221224

src/frequenz/sdk/timeseries/logical_meter/_logical_meter.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -94,12 +94,12 @@ def start_formula(
9494
) -> Formula[Quantity]:
9595
"""Start execution of the given formula.
9696
97-
Formulas can have Component IDs that are preceeded by a pound symbol("#"),
98-
constant values and these operators: +, -, *, /, (, ).
97+
Formulas can have Component IDs that are preceded by a pound symbol(`#`),
98+
constant values and these operators: `+`, `-`, `*`, `/`, `(`, `)`.
9999
100100
These functions are also supported: `COALESCE`, `MAX`, `MIN`.
101101
102-
For example, the input string: "#20 + #5" is a formula for adding metrics from
102+
For example, the input string: `#20 + #5` is a formula for adding metrics from
103103
two components with ids 20 and 5.
104104
105105
A more detailed description of the formula syntax with examples can be found

tests/timeseries/_formulas/test_formulas.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -367,8 +367,6 @@ async def run_test( # pylint: disable=too-many-locals
367367

368368
def stream_recv(comp_id: int) -> Receiver[Sample[Quantity]]:
369369
comp_id = int(comp_id)
370-
if comp_id not in channels:
371-
channels[comp_id] = Broadcast(name=f"chan-#{comp_id}")
372370
return channels[comp_id].new_receiver()
373371

374372
telem_fetcher = MagicMock(spec=ResampledStreamFetcher)

0 commit comments

Comments
 (0)