Skip to content

Commit 9366cfc

Browse files
committed
Implement a ResampledStreamFetcher
It sends requests to the resampler and fetches resampled telemetry streams. Signed-off-by: Sahas Subramanian <[email protected]>
1 parent 25dfc0b commit 9366cfc

File tree

1 file changed

+74
-0
lines changed

1 file changed

+74
-0
lines changed
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
# License: MIT
2+
# Copyright © 2025 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Fetches telemetry streams for components."""
5+
6+
from frequenz.channels import Receiver, Sender
7+
from frequenz.client.common.microgrid.components import ComponentId
8+
from frequenz.quantities import Quantity
9+
10+
from frequenz.sdk.timeseries import Sample
11+
12+
from ..._internal._channels import ChannelRegistry
13+
from ...microgrid._data_sourcing import ComponentMetricRequest, Metric
14+
from ...microgrid._old_component_data import TransitionalMetric
15+
16+
17+
class ResampledStreamFetcher:
18+
"""Fetches telemetry streams for components."""
19+
20+
def __init__(
21+
self,
22+
namespace: str,
23+
channel_registry: ChannelRegistry,
24+
resampler_subscription_sender: Sender[ComponentMetricRequest],
25+
metric: Metric | TransitionalMetric,
26+
):
27+
"""Initialize this instance.
28+
29+
Args:
30+
namespace: The unique namespace to allow reuse of streams in the data
31+
pipeline.
32+
channel_registry: The channel registry instance shared with the resampling
33+
and the data sourcing actors.
34+
resampler_subscription_sender: A sender to send metric requests to the
35+
resampling actor.
36+
metric: The metric to fetch for all components in this formula.
37+
"""
38+
self._namespace: str = namespace
39+
self._channel_registry: ChannelRegistry = channel_registry
40+
self._resampler_subscription_sender: Sender[ComponentMetricRequest] = (
41+
resampler_subscription_sender
42+
)
43+
self._metric: Metric | TransitionalMetric = metric
44+
45+
self._pending_requests: list[ComponentMetricRequest] = []
46+
47+
def fetch_stream(
48+
self,
49+
component_id: ComponentId,
50+
) -> Receiver[Sample[Quantity]]:
51+
"""Get a receiver with the resampled data for the given component id.
52+
53+
Args:
54+
component_id: The component id for which to get a resampled data stream.
55+
56+
Returns:
57+
A receiver to stream resampled data for the given component id.
58+
"""
59+
request = ComponentMetricRequest(
60+
self._namespace,
61+
component_id,
62+
self._metric,
63+
None,
64+
)
65+
self._pending_requests.append(request)
66+
return self._channel_registry.get_or_create(
67+
Sample[Quantity], request.get_channel_name()
68+
).new_receiver()
69+
70+
async def subscribe(self) -> None:
71+
"""Subscribe to all resampled component metric streams."""
72+
for request in self._pending_requests:
73+
await self._resampler_subscription_sender.send(request)
74+
self._pending_requests.clear()

0 commit comments

Comments
 (0)