Skip to content

Commit 2075ad3

Browse files
committed
Allow TelemetryStream AST nodes to subscribe to data on demand
Earlier we were subscribing to all streams, even if they were going to be discarded in a coalesce. This commit makes further progress towards implementing lazy coalesce. Signed-off-by: Sahas Subramanian <[email protected]>
1 parent f888367 commit 2075ad3

File tree

12 files changed

+118
-48
lines changed

12 files changed

+118
-48
lines changed

src/frequenz/sdk/timeseries/formulas/_ast.py

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,10 @@
66
from __future__ import annotations
77

88
import logging
9-
from collections.abc import AsyncIterator, Callable
9+
from collections.abc import Callable, Coroutine
1010
from dataclasses import dataclass
1111

12+
from frequenz.channels import Receiver
1213
from frequenz.quantities import Quantity
1314
from typing_extensions import TypeIs, override
1415

@@ -25,10 +26,24 @@ class TelemetryStream(AstNode[QuantityT]):
2526
"""A AST node that retrieves values from a component's telemetry stream."""
2627

2728
source: str
28-
stream: AsyncIterator[Sample[QuantityT] | Sample[Quantity]]
29+
metric_fetcher: (
30+
Callable[
31+
[], Coroutine[None, None, Receiver[Sample[QuantityT] | Sample[Quantity]]]
32+
]
33+
| None
34+
) = None
2935
create_method: Callable[[float], QuantityT]
36+
_stream: Receiver[Sample[QuantityT] | Sample[Quantity]] | None = None
3037
_latest_sample: Sample[QuantityT] | None = None
3138

39+
def __post_init__(self) -> None:
40+
"""Validate at least one of stream or metric_fetcher is set."""
41+
if self._stream is None and self.metric_fetcher is None:
42+
raise ValueError(
43+
"Either stream or metric_fetcher must be provided for "
44+
+ "TelemetryStream node."
45+
)
46+
3247
@property
3348
def latest_sample(self) -> Sample[QuantityT] | None:
3449
"""Return the latest fetched sample for this component."""
@@ -48,7 +63,13 @@ def format(self, wrap: bool = False) -> str:
4863

4964
async def fetch_next(self) -> None:
5065
"""Fetch the next value for this component and store it internally."""
51-
latest_sample = await anext(self.stream)
66+
if self._stream is None:
67+
await self._fetch_stream()
68+
assert self._stream is not None
69+
70+
latest_sample = await anext(self._stream)
71+
# pylint: disable-next=fixme
72+
# TODO: convert to QuantityT if needed only at the end in the evaluator.
5273
if self._is_quantity_sample(latest_sample):
5374
assert latest_sample.value is not None
5475
self._latest_sample = Sample(
@@ -58,6 +79,14 @@ async def fetch_next(self) -> None:
5879
else:
5980
self._latest_sample = latest_sample
6081

82+
async def _fetch_stream(self) -> None:
83+
"""Subscribe to the telemetry stream for this component."""
84+
if self._stream is not None:
85+
return
86+
if self.metric_fetcher is None:
87+
raise RuntimeError("Metric fetcher is not set for TelemetryStream node.")
88+
self._stream = await self.metric_fetcher()
89+
6190
def _is_quantity_sample(
6291
self, sample: Sample[QuantityT] | Sample[Quantity]
6392
) -> TypeIs[Sample[Quantity]]:

src/frequenz/sdk/timeseries/formulas/_formula.py

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from __future__ import annotations
77

88
import logging
9-
from collections.abc import Callable
9+
from collections.abc import Callable, Coroutine
1010
from typing import Generic
1111

1212
from frequenz.channels import Broadcast, Receiver
@@ -27,6 +27,18 @@
2727
_logger = logging.getLogger(__name__)
2828

2929

30+
def metric_fetcher(
31+
formula: Formula[QuantityT],
32+
) -> Callable[[], Coroutine[None, None, Receiver[Sample[QuantityT]]]]:
33+
"""Fetch a receiver for the formula's output samples."""
34+
35+
async def fetcher(formula: Formula[QuantityT]) -> Receiver[Sample[QuantityT]]:
36+
formula.start()
37+
return formula.new_receiver()
38+
39+
return lambda: fetcher(formula)
40+
41+
3042
class Formula(BackgroundService, ReceiverFetcher[Sample[QuantityT]]):
3143
"""A formula represented as an AST."""
3244

@@ -178,7 +190,7 @@ def __init__(
178190
if isinstance(formula, Formula):
179191
self.root: AstNode[QuantityT] = _ast.TelemetryStream(
180192
source=str(formula),
181-
stream=formula.new_receiver(),
193+
metric_fetcher=metric_fetcher(formula),
182194
create_method=create_method,
183195
)
184196
self._streams.append(self.root)
@@ -197,7 +209,7 @@ def __add__(
197209
elif isinstance(other, Formula):
198210
right_node = _ast.TelemetryStream(
199211
source=str(other),
200-
stream=other.new_receiver(),
212+
metric_fetcher=metric_fetcher(other),
201213
create_method=self._create_method,
202214
)
203215
self._streams.append(right_node)
@@ -224,7 +236,7 @@ def __sub__(
224236
elif isinstance(other, Formula):
225237
right_node = _ast.TelemetryStream(
226238
source=str(other),
227-
stream=other.new_receiver(),
239+
metric_fetcher=metric_fetcher(other),
228240
create_method=self._create_method,
229241
)
230242
self._streams.append(right_node)
@@ -278,7 +290,7 @@ def coalesce(
278290
elif isinstance(item, Formula):
279291
right_node = _ast.TelemetryStream(
280292
source=str(item),
281-
stream=item.new_receiver(),
293+
metric_fetcher=metric_fetcher(item),
282294
create_method=self._create_method,
283295
)
284296
right_nodes.append(right_node)
@@ -311,7 +323,7 @@ def min(
311323
elif isinstance(item, Formula):
312324
right_node = _ast.TelemetryStream(
313325
source=str(item),
314-
stream=item.new_receiver(),
326+
metric_fetcher=metric_fetcher(item),
315327
create_method=self._create_method,
316328
)
317329
right_nodes.append(right_node)
@@ -344,7 +356,7 @@ def max(
344356
elif isinstance(item, Formula):
345357
right_node = _ast.TelemetryStream(
346358
source=str(item),
347-
stream=item.new_receiver(),
359+
metric_fetcher=metric_fetcher(item),
348360
create_method=self._create_method,
349361
)
350362
right_nodes.append(right_node)

src/frequenz/sdk/timeseries/formulas/_formula_3_phase_evaluator.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from ...actor import Actor
1414
from .._base_types import QuantityT, Sample3Phase
1515
from . import _ast
16-
from ._formula import Formula
16+
from ._formula import Formula, metric_fetcher
1717
from ._formula_evaluator import synchronize_receivers
1818

1919
_logger = logging.getLogger(__name__)
@@ -45,17 +45,17 @@ def __init__(
4545
self._components: list[_ast.TelemetryStream[QuantityT]] = [
4646
_ast.TelemetryStream(
4747
source="phase_1",
48-
stream=phase_1.new_receiver(),
48+
metric_fetcher=metric_fetcher(phase_1),
4949
create_method=phase_1._create_method, # pylint: disable=protected-access
5050
),
5151
_ast.TelemetryStream(
5252
source="phase_2",
53-
stream=phase_2.new_receiver(),
53+
metric_fetcher=metric_fetcher(phase_2),
5454
create_method=phase_2._create_method, # pylint: disable=protected-access
5555
),
5656
_ast.TelemetryStream(
5757
source="phase_3",
58-
stream=phase_3.new_receiver(),
58+
metric_fetcher=metric_fetcher(phase_3),
5959
create_method=phase_3._create_method, # pylint: disable=protected-access
6060
),
6161
]

src/frequenz/sdk/timeseries/formulas/_formula_evaluator.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,6 @@ def __init__( # pylint: disable=too-many-arguments
5353
@override
5454
async def _run(self) -> None:
5555
"""Run the formula evaluator actor."""
56-
if self._metric_fetcher is not None:
57-
await self._metric_fetcher.subscribe()
5856
await synchronize_receivers(self._components)
5957

6058
while True:

src/frequenz/sdk/timeseries/formulas/_functions.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ async def __call__(self) -> Sample[QuantityT] | QuantityT | None:
6969
for param in self.params:
7070
arg = await param.evaluate()
7171
match arg:
72-
case Sample(value=value, timestamp=timestamp):
72+
case Sample(timestamp, value):
7373
if value is not None:
7474
return arg
7575
ts = timestamp

src/frequenz/sdk/timeseries/formulas/_parser.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,15 @@
55

66
from __future__ import annotations
77

8-
from collections.abc import Callable
8+
import logging
9+
from collections.abc import Callable, Coroutine
910
from typing import Generic
1011

12+
from frequenz.channels import Receiver
1113
from frequenz.client.common.microgrid.components import ComponentId
14+
from frequenz.quantities import Quantity
1215

16+
from frequenz.sdk.timeseries import Sample
1317
from frequenz.sdk.timeseries._base_types import QuantityT
1418

1519
from . import _ast, _token
@@ -20,6 +24,8 @@
2024
from ._peekable import Peekable
2125
from ._resampled_stream_fetcher import ResampledStreamFetcher
2226

27+
_logger = logging.getLogger(__name__)
28+
2329

2430
def parse(
2531
*,
@@ -181,12 +187,19 @@ def _parse_primary(self) -> AstNode[QuantityT] | None:
181187
if token is None:
182188
return None
183189

190+
def make_component_stream_fetcher(
191+
f: ResampledStreamFetcher, cid: ComponentId
192+
) -> Callable[[], Coroutine[None, None, Receiver[Sample[Quantity]]]]:
193+
return lambda: f.fetch_stream(cid)
194+
184195
if isinstance(token, _token.Component):
185196
_ = next(self._lexer) # consume token
186197
comp = _ast.TelemetryStream(
187198
span=token.span,
188199
source=f"#{token.id}",
189-
stream=self._telemetry_fetcher.fetch_stream(ComponentId(int(token.id))),
200+
metric_fetcher=make_component_stream_fetcher(
201+
self._telemetry_fetcher, ComponentId(int(token.id))
202+
),
190203
create_method=self._create_method,
191204
)
192205
self._components.append(comp)

src/frequenz/sdk/timeseries/formulas/_resampled_stream_fetcher.py

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,11 @@
77
from frequenz.client.common.microgrid.components import ComponentId
88
from frequenz.quantities import Quantity
99

10+
from frequenz.sdk.timeseries import Sample
11+
1012
from ..._internal._channels import ChannelRegistry
1113
from ...microgrid._data_sourcing import ComponentMetricRequest, Metric
1214
from ...microgrid._old_component_data import TransitionalMetric
13-
from .. import Sample
1415

1516

1617
class ResampledStreamFetcher:
@@ -41,9 +42,7 @@ def __init__(
4142
)
4243
self._metric: Metric | TransitionalMetric = metric
4344

44-
self._pending_requests: list[ComponentMetricRequest] = []
45-
46-
def fetch_stream(
45+
async def fetch_stream(
4746
self,
4847
component_id: ComponentId,
4948
) -> Receiver[Sample[Quantity]]:
@@ -61,13 +60,8 @@ def fetch_stream(
6160
self._metric,
6261
None,
6362
)
64-
self._pending_requests.append(request)
63+
await self._resampler_subscription_sender.send(request)
64+
6565
return self._channel_registry.get_or_create(
6666
Sample[Quantity], request.get_channel_name()
6767
).new_receiver()
68-
69-
async def subscribe(self) -> None:
70-
"""Subscribe to all resampled component metric streams."""
71-
for request in self._pending_requests:
72-
await self._resampler_subscription_sender.send(request)
73-
self._pending_requests.clear()

tests/microgrid/test_grid.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ async def test_grid_power_1(mocker: MockerFixture) -> None:
6565

6666
grid_power_recv = grid.power.new_receiver()
6767

68-
grid_meter_recv = get_resampled_stream(
68+
grid_meter_recv = await get_resampled_stream(
6969
grid._formula_pool._namespace, # pylint: disable=protected-access
7070
mockgrid.meter_ids[0],
7171
Metric.AC_ACTIVE_POWER,
@@ -109,7 +109,7 @@ async def test_grid_power_2(mocker: MockerFixture) -> None:
109109
grid_power_recv = grid.power.new_receiver()
110110

111111
component_receivers = [
112-
get_resampled_stream(
112+
await get_resampled_stream(
113113
grid._formula_pool._namespace, # pylint: disable=protected-access
114114
component_id,
115115
Metric.AC_ACTIVE_POWER,
@@ -160,7 +160,7 @@ async def test_grid_reactive_power_1(mocker: MockerFixture) -> None:
160160

161161
grid_power_recv = grid.reactive_power.new_receiver()
162162

163-
grid_meter_recv = get_resampled_stream(
163+
grid_meter_recv = await get_resampled_stream(
164164
grid._formula_pool._namespace, # pylint: disable=protected-access
165165
mockgrid.meter_ids[0],
166166
Metric.AC_REACTIVE_POWER,
@@ -204,7 +204,7 @@ async def test_grid_reactive_power_2(mocker: MockerFixture) -> None:
204204
grid_power_recv = grid.reactive_power.new_receiver()
205205

206206
component_receivers = [
207-
get_resampled_stream(
207+
await get_resampled_stream(
208208
grid._formula_pool._namespace, # pylint: disable=protected-access
209209
component_id,
210210
Metric.AC_REACTIVE_POWER,

tests/timeseries/_formulas/test_formula_composition.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ async def test_formula_composition( # pylint: disable=too-many-locals
4545
grid = microgrid.grid()
4646
stack.push_async_callback(grid.stop)
4747

48-
grid_meter_recv = get_resampled_stream(
48+
grid_meter_recv = await get_resampled_stream(
4949
grid._formula_pool._namespace, # pylint: disable=protected-access
5050
mockgrid.meter_ids[0],
5151
Metric.AC_ACTIVE_POWER,

0 commit comments

Comments
 (0)