Skip to content

Commit f888367

Browse files
committed
Make AST evaluate methods async
Also update `Function.__call__()` to async. This would make it possible for `coalesce` to start async streams as necessary during evaluation. Signed-off-by: Sahas Subramanian <[email protected]>
1 parent 97510c7 commit f888367

File tree

4 files changed

+25
-25
lines changed

4 files changed

+25
-25
lines changed

src/frequenz/sdk/timeseries/formulas/_ast.py

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ def latest_sample(self) -> Sample[QuantityT] | None:
3535
return self._latest_sample
3636

3737
@override
38-
def evaluate(self) -> Sample[QuantityT] | None:
38+
async def evaluate(self) -> Sample[QuantityT] | None:
3939
"""Return the base value of the latest sample for this component."""
4040
if self._latest_sample is None:
4141
raise ValueError("Next value has not been fetched yet.")
@@ -71,9 +71,9 @@ class FunCall(AstNode[QuantityT]):
7171
function: Function[QuantityT]
7272

7373
@override
74-
def evaluate(self) -> Sample[QuantityT] | QuantityT | None:
74+
async def evaluate(self) -> Sample[QuantityT] | QuantityT | None:
7575
"""Evaluate the function call with its arguments."""
76-
return self.function()
76+
return await self.function()
7777

7878
@override
7979
def format(self, wrap: bool = False) -> str:
@@ -88,7 +88,7 @@ class Constant(AstNode[QuantityT]):
8888
value: QuantityT
8989

9090
@override
91-
def evaluate(self) -> QuantityT | None:
91+
async def evaluate(self) -> QuantityT | None:
9292
"""Return the constant value."""
9393
return self.value
9494

@@ -106,10 +106,10 @@ class Add(AstNode[QuantityT]):
106106
right: AstNode[QuantityT]
107107

108108
@override
109-
def evaluate(self) -> Sample[QuantityT] | QuantityT | None:
109+
async def evaluate(self) -> Sample[QuantityT] | QuantityT | None:
110110
"""Evaluate the addition of the left and right nodes."""
111-
left = self.left.evaluate()
112-
right = self.right.evaluate()
111+
left = await self.left.evaluate()
112+
right = await self.right.evaluate()
113113
match left, right:
114114
case Sample(), Sample():
115115
if left.value is None:
@@ -161,10 +161,10 @@ class Sub(AstNode[QuantityT]):
161161
right: AstNode[QuantityT]
162162

163163
@override
164-
def evaluate(self) -> Sample[QuantityT] | QuantityT | None:
164+
async def evaluate(self) -> Sample[QuantityT] | QuantityT | None:
165165
"""Evaluate the subtraction of the right node from the left node."""
166-
left = self.left.evaluate()
167-
right = self.right.evaluate()
166+
left = await self.left.evaluate()
167+
right = await self.right.evaluate()
168168
print("Sub.evaluate:", left, right)
169169
match left, right:
170170
case Sample(), Sample():
@@ -217,10 +217,10 @@ class Mul(AstNode[QuantityT]):
217217
right: AstNode[QuantityT]
218218

219219
@override
220-
def evaluate(self) -> Sample[QuantityT] | QuantityT | None:
220+
async def evaluate(self) -> Sample[QuantityT] | QuantityT | None:
221221
"""Evaluate the multiplication of the left and right nodes."""
222-
left = self.left.evaluate()
223-
right = self.right.evaluate()
222+
left = await self.left.evaluate()
223+
right = await self.right.evaluate()
224224
match left, right:
225225
case Sample(), Sample():
226226
if left.value is None:
@@ -271,10 +271,10 @@ class Div(AstNode[QuantityT]):
271271
right: AstNode[QuantityT]
272272

273273
@override
274-
def evaluate(self) -> QuantityT | None:
274+
async def evaluate(self) -> QuantityT | None:
275275
"""Evaluate the division of the left node by the right node."""
276-
left = self.left.evaluate()
277-
right = self.right.evaluate()
276+
left = await self.left.evaluate()
277+
right = await self.right.evaluate()
278278
match left, right:
279279
case Sample(), Sample():
280280
if left.value is None:

src/frequenz/sdk/timeseries/formulas/_base_ast_node.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ class AstNode(abc.ABC, Generic[QuantityT]):
2020
span: tuple[int, int] | None = None
2121

2222
@abc.abstractmethod
23-
def evaluate(self) -> Sample[QuantityT] | QuantityT | None:
23+
async def evaluate(self) -> Sample[QuantityT] | QuantityT | None:
2424
"""Evaluate the expression and return its numerical value."""
2525

2626
@abc.abstractmethod

src/frequenz/sdk/timeseries/formulas/_formula_evaluator.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ async def _run(self) -> None:
6565
if comp.latest_sample is not None
6666
)
6767

68-
res = self._root.evaluate()
68+
res = await self._root.evaluate()
6969
next_sample = res if isinstance(res, Sample) else Sample(timestamp, res)
7070
await self._output_sender.send(next_sample)
7171
except (StopAsyncIteration, StopIteration):

src/frequenz/sdk/timeseries/formulas/_functions.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ def name(self) -> str:
2929
"""Return the name of the function."""
3030

3131
@abc.abstractmethod
32-
def __call__(self) -> Sample[QuantityT] | QuantityT | None:
32+
async def __call__(self) -> Sample[QuantityT] | QuantityT | None:
3333
"""Call the function with the given arguments."""
3434

3535
def format(self) -> str:
@@ -63,11 +63,11 @@ def name(self) -> str:
6363
return "COALESCE"
6464

6565
@override
66-
def __call__(self) -> Sample[QuantityT] | QuantityT | None:
66+
async def __call__(self) -> Sample[QuantityT] | QuantityT | None:
6767
"""Return the first non-None argument."""
6868
ts: datetime | None = None
6969
for param in self.params:
70-
arg = param.evaluate()
70+
arg = await param.evaluate()
7171
match arg:
7272
case Sample(value=value, timestamp=timestamp):
7373
if value is not None:
@@ -94,12 +94,12 @@ def name(self) -> str:
9494
return "MAX"
9595

9696
@override
97-
def __call__(self) -> Sample[QuantityT] | QuantityT | None:
97+
async def __call__(self) -> Sample[QuantityT] | QuantityT | None:
9898
"""Return the maximum of the arguments."""
9999
max_value: QuantityT | None = None
100100
ts: datetime | None = None
101101
for param in self.params:
102-
arg = param.evaluate()
102+
arg = await param.evaluate()
103103
match arg:
104104
case Sample(value=value, timestamp=timestamp):
105105
ts = timestamp
@@ -128,12 +128,12 @@ def name(self) -> str:
128128
return "MIN"
129129

130130
@override
131-
def __call__(self) -> Sample[QuantityT] | QuantityT | None:
131+
async def __call__(self) -> Sample[QuantityT] | QuantityT | None:
132132
"""Return the minimum of the arguments."""
133133
min_value: QuantityT | None = None
134134
ts: datetime | None = None
135135
for param in self.params:
136-
arg = param.evaluate()
136+
arg = await param.evaluate()
137137
match arg:
138138
case Sample(value=value, timestamp=timestamp):
139139
ts = timestamp

0 commit comments

Comments
 (0)