99from collections .abc import Iterable
1010from dataclasses import replace
1111from datetime import datetime , timedelta
12- from typing import Any , assert_never
12+ from typing import assert_never
1313
14+ from frequenz .api .common .v1 .metrics import metric_sample_pb2
1415from frequenz .api .common .v1 .microgrid .components import components_pb2
1516from frequenz .api .microgrid .v1 import microgrid_pb2 , microgrid_pb2_grpc
17+ from frequenz .channels import Receiver
1618from frequenz .client .base import channel , client , conversion , retry , streaming
1719from google .protobuf .empty_pb2 import Empty
1820
2628from .component ._component_proto import component_from_proto
2729from .component ._connection import ComponentConnection
2830from .component ._connection_proto import component_connection_from_proto
31+ from .component ._data_samples import ComponentDataSamples
32+ from .component ._data_samples_proto import component_data_sample_from_proto
33+ from .metrics ._metric import Metric
2934
3035DEFAULT_GRPC_CALL_TIMEOUT = 60.0
3136"""The default timeout for gRPC calls made by this client (in seconds)."""
@@ -76,7 +81,12 @@ def __init__(
7681 connect = connect ,
7782 channel_defaults = channel_defaults ,
7883 )
79- self ._broadcasters : dict [int , streaming .GrpcStreamBroadcaster [Any , Any ]] = {}
84+ self ._component_data_samples_broadcasters : dict [
85+ str ,
86+ streaming .GrpcStreamBroadcaster [
87+ microgrid_pb2 .ReceiveComponentDataStreamResponse , ComponentDataSamples
88+ ],
89+ ] = {}
8090 self ._retry_strategy = retry_strategy
8191
8292 @property
@@ -372,7 +382,7 @@ async def set_component_power_reactive( # noqa: DOC502 (raises ApiClientError i
372382
373383 response = await client .call_stub_method (
374384 self ,
375- lambda : self ._async_stub .SetComponentPowerReactive (
385+ lambda : self .stub .SetComponentPowerReactive (
376386 microgrid_pb2 .SetComponentPowerReactiveRequest (
377387 component_id = _get_component_id (component ),
378388 power = power ,
@@ -388,6 +398,68 @@ async def set_component_power_reactive( # noqa: DOC502 (raises ApiClientError i
388398
389399 return None
390400
401+ # noqa: DOC502 (Raises ApiClientError indirectly)
402+ async def receive_component_data_samples_stream (
403+ self ,
404+ component : ComponentId | Component ,
405+ metrics : Iterable [Metric | int ],
406+ * ,
407+ buffer_size : int = 50 ,
408+ ) -> Receiver [ComponentDataSamples ]:
409+ """Stream data samples from a component.
410+
411+ At least one metric must be specified. If no metric is specified, then the
412+ stream will raise an error.
413+
414+ Warning:
415+ Components may not support all metrics. If a component does not support
416+ a given metric, then the returned data stream will not contain that metric.
417+
418+ There is no way to tell if a metric is not being received because the
419+ component does not support it or because there is a transient issue when
420+ retrieving the metric from the component.
421+
422+ The supported metrics by a component can even change with time, for example,
423+ if a component is updated with new firmware.
424+
425+ Args:
426+ component: The component to stream data from.
427+ metrics: List of metrics to return. Only the specified metrics will be
428+ returned.
429+ buffer_size: The maximum number of messages to buffer in the returned
430+ receiver. After this limit is reached, the oldest messages will be
431+ dropped.
432+
433+ Returns:
434+ The data stream from the component.
435+ """
436+ component_id = _get_component_id (component )
437+ metrics_set = frozenset ([_get_metric_value (m ) for m in metrics ])
438+ key = f"{ component_id } -{ hash (metrics_set )} "
439+ broadcaster = self ._component_data_samples_broadcasters .get (key )
440+ if broadcaster is None :
441+ client_id = hex (id (self ))[2 :]
442+ stream_name = f"microgrid-client-{ client_id } -component-data-{ key } "
443+ create_filter = (
444+ microgrid_pb2 .ReceiveComponentDataStreamRequest .ComponentDataStreamFilter
445+ )
446+ broadcaster = streaming .GrpcStreamBroadcaster (
447+ stream_name ,
448+ lambda : aiter (
449+ self .stub .ReceiveComponentDataStream (
450+ microgrid_pb2 .ReceiveComponentDataStreamRequest (
451+ component_id = _get_component_id (component ),
452+ filter = create_filter (metrics = metrics_set ),
453+ ),
454+ timeout = int (DEFAULT_GRPC_CALL_TIMEOUT ),
455+ )
456+ ),
457+ lambda msg : component_data_sample_from_proto (msg .data ),
458+ retry_strategy = self ._retry_strategy ,
459+ )
460+ self ._component_data_samples_broadcasters [key ] = broadcaster
461+ return broadcaster .new_receiver (maxsize = buffer_size )
462+
391463
392464def _get_component_id (component : ComponentId | Component ) -> int :
393465 """Get the component ID from a component or component ID."""
@@ -400,6 +472,17 @@ def _get_component_id(component: ComponentId | Component) -> int:
400472 assert_never (unexpected )
401473
402474
475+ def _get_metric_value (metric : Metric | int ) -> metric_sample_pb2 .Metric .ValueType :
476+ """Get the metric ID from a metric or metric ID."""
477+ match metric :
478+ case Metric ():
479+ return metric_sample_pb2 .Metric .ValueType (metric .value )
480+ case int ():
481+ return metric_sample_pb2 .Metric .ValueType (metric )
482+ case unexpected :
483+ assert_never (unexpected )
484+
485+
403486def _get_category_value (
404487 category : ComponentCategory | int ,
405488) -> components_pb2 .ComponentCategory .ValueType :
0 commit comments