Skip to content

Commit 187cf0e

Browse files
committed
Add subscribe method to AST nodes for stream initialization
With this subscription can be delegated to individual nodes, allowing coalesce to subscribe to only the interesting stuff. Signed-off-by: Sahas Subramanian <[email protected]>
1 parent a37a75a commit 187cf0e

File tree

3 files changed

+56
-3
lines changed

3 files changed

+56
-3
lines changed

src/frequenz/sdk/timeseries/formulas/_ast.py

Lines changed: 46 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,10 +61,10 @@ def format(self, wrap: bool = False) -> str:
6161
"""Return a string representation of the telemetry stream node."""
6262
return f"{self.source}"
6363

64-
async def fetch_next(self) -> None:
64+
async def _fetch_next(self) -> None:
6565
"""Fetch the next value for this component and store it internally."""
6666
if self._stream is None:
67-
await self._fetch_stream()
67+
await self.subscribe()
6868
assert self._stream is not None
6969

7070
latest_sample = await anext(self._stream)
@@ -79,10 +79,12 @@ async def fetch_next(self) -> None:
7979
else:
8080
self._latest_sample = latest_sample
8181

82-
async def _fetch_stream(self) -> None:
82+
@override
83+
async def subscribe(self) -> None:
8384
"""Subscribe to the telemetry stream for this component."""
8485
if self._stream is not None:
8586
return
87+
_logger.debug("Subscribing to telemetry stream for %s", self.source)
8688
if self.metric_fetcher is None:
8789
raise RuntimeError("Metric fetcher is not set for TelemetryStream node.")
8890
self._stream = await self.metric_fetcher()
@@ -109,6 +111,11 @@ def format(self, wrap: bool = False) -> str:
109111
"""Return a string representation of the function call node."""
110112
return self.function.format()
111113

114+
@override
115+
async def subscribe(self) -> None:
116+
"""Subscribe to any data streams needed by the function."""
117+
await self.function.subscribe()
118+
112119

113120
@dataclass(kw_only=True)
114121
class Constant(AstNode[QuantityT]):
@@ -126,6 +133,10 @@ def format(self, wrap: bool = False) -> str:
126133
"""Return a string representation of the constant node."""
127134
return str(self.value.base_value)
128135

136+
@override
137+
async def subscribe(self) -> None:
138+
"""Subscribe to any data streams needed by the function."""
139+
129140

130141
@dataclass(kw_only=True)
131142
class Add(AstNode[QuantityT]):
@@ -181,6 +192,14 @@ def format(self, wrap: bool = False) -> str:
181192
expr = f"({expr})"
182193
return expr
183194

195+
@override
196+
async def subscribe(self) -> None:
197+
"""Subscribe to any data streams needed by the function."""
198+
_ = await asyncio.gather(
199+
self.left.subscribe(),
200+
self.right.subscribe(),
201+
)
202+
184203

185204
@dataclass(kw_only=True)
186205
class Sub(AstNode[QuantityT]):
@@ -237,6 +256,14 @@ def format(self, wrap: bool = False) -> str:
237256
expr = f"({expr})"
238257
return expr
239258

259+
@override
260+
async def subscribe(self) -> None:
261+
"""Subscribe to any data streams needed by the function."""
262+
_ = await asyncio.gather(
263+
self.left.subscribe(),
264+
self.right.subscribe(),
265+
)
266+
240267

241268
@dataclass(kw_only=True)
242269
class Mul(AstNode[QuantityT]):
@@ -291,6 +318,14 @@ def format(self, wrap: bool = False) -> str:
291318
"""Return a string representation of the multiplication node."""
292319
return f"{self.left.format(True)} * {self.right.format(True)}"
293320

321+
@override
322+
async def subscribe(self) -> None:
323+
"""Subscribe to any data streams needed by the function."""
324+
_ = await asyncio.gather(
325+
self.left.subscribe(),
326+
self.right.subscribe(),
327+
)
328+
294329

295330
@dataclass(kw_only=True)
296331
class Div(AstNode[QuantityT]):
@@ -343,3 +378,11 @@ async def evaluate(self) -> Sample[QuantityT] | QuantityT | None:
343378
def format(self, wrap: bool = False) -> str:
344379
"""Return a string representation of the division node."""
345380
return f"{self.left.format(True)} / {self.right.format(True)}"
381+
382+
@override
383+
async def subscribe(self) -> None:
384+
"""Subscribe to any data streams needed by the function."""
385+
_ = await asyncio.gather(
386+
self.left.subscribe(),
387+
self.right.subscribe(),
388+
)

src/frequenz/sdk/timeseries/formulas/_base_ast_node.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,3 +31,7 @@ def format(self, wrap: bool = False) -> str:
3131
def __str__(self) -> str:
3232
"""Return the string representation of the node."""
3333
return self.format()
34+
35+
@abc.abstractmethod
36+
async def subscribe(self) -> None:
37+
"""Subscribe to any data streams needed by this node."""

src/frequenz/sdk/timeseries/formulas/_functions.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,12 @@ def format(self) -> str:
3737
params_str = ", ".join(str(param) for param in self.params)
3838
return f"{self.name}({params_str})"
3939

40+
async def subscribe(self) -> None:
41+
"""Subscribe to any data streams needed by the function."""
42+
_ = await asyncio.gather(
43+
*(param.subscribe() for param in self.params),
44+
)
45+
4046
@classmethod
4147
def from_string(
4248
cls, name: str, params: list[AstNode[QuantityT]]

0 commit comments

Comments
 (0)