66from __future__ import annotations
77
88import asyncio
9+ import itertools
910import logging
1011from collections .abc import Callable , Iterable , Set
1112from dataclasses import replace
12- from typing import Any , TypeVar
13+ from functools import partial
14+ from typing import Any , NotRequired , TypedDict , TypeVar , assert_never
1315
1416from frequenz .api .common import components_pb2 , metrics_pb2
15- from frequenz .api .microgrid import microgrid_pb2 , microgrid_pb2_grpc
17+ from frequenz .api .microgrid import microgrid_pb2 , microgrid_pb2_grpc , sensor_pb2
1618from frequenz .channels import Receiver
1719from frequenz .client .base import channel , client , retry , streaming
1820from google .protobuf .empty_pb2 import Empty
3537from ._connection import Connection
3638from ._constants import RECEIVER_MAX_SIZE
3739from ._exception import ApiClientError , ClientNotConnected
38- from ._id import ComponentId , MicrogridId
40+ from ._id import ComponentId , MicrogridId , SensorId
3941from ._metadata import Location , Metadata
40- from ._sensor_proto import sensor_from_proto
41- from .sensor import Sensor
42+ from ._sensor_proto import sensor_data_samples_from_proto , sensor_from_proto
43+ from .sensor import Sensor , SensorDataSamples , SensorMetric
4244
4345DEFAULT_GRPC_CALL_TIMEOUT = 60.0
4446"""The default timeout for gRPC calls made by this client (in seconds)."""
@@ -98,6 +100,12 @@ def __init__(
98100 self ._broadcasters : dict [
99101 ComponentId , streaming .GrpcStreamBroadcaster [Any , Any ]
100102 ] = {}
103+ self ._sensor_data_broadcasters : dict [
104+ str ,
105+ streaming .GrpcStreamBroadcaster [
106+ microgrid_pb2 .ComponentData , SensorDataSamples
107+ ],
108+ ] = {}
101109 self ._retry_strategy = retry_strategy
102110
103111 @property
@@ -119,15 +127,22 @@ async def __aexit__(
119127 exc_tb : Any | None ,
120128 ) -> bool | None :
121129 """Close the gRPC channel and stop all broadcasters."""
122- exceptions = [
130+ exceptions = list (
123131 exc
124132 for exc in await asyncio .gather (
125- * (broadcaster .stop () for broadcaster in self ._broadcasters .values ()),
133+ * (
134+ broadcaster .stop ()
135+ for broadcaster in itertools .chain (
136+ self ._broadcasters .values (),
137+ self ._sensor_data_broadcasters .values (),
138+ )
139+ ),
126140 return_exceptions = True ,
127141 )
128142 if isinstance (exc , BaseException )
129- ]
143+ )
130144 self ._broadcasters .clear ()
145+ self ._sensor_data_broadcasters .clear ()
131146
132147 result = None
133148 try :
@@ -568,3 +583,91 @@ async def set_bounds( # noqa: DOC503 (raises ApiClientError indirectly)
568583 ),
569584 method_name = "AddInclusionBounds" ,
570585 )
586+
587+ # noqa: DOC502 (Raises ApiClientError indirectly)
588+ def stream_sensor_data (
589+ self ,
590+ sensor : SensorId | Sensor ,
591+ metrics : Iterable [SensorMetric | int ] | None = None ,
592+ * ,
593+ buffer_size : int = 50 ,
594+ ) -> Receiver [SensorDataSamples ]:
595+ """Stream data samples from a sensor.
596+
597+ Warning:
598+ Sensors may not support all metrics. If a sensor does not support
599+ a given metric, then the returned data stream will not contain that metric.
600+
601+ There is no way to tell if a metric is not being received because the
602+ sensor does not support it or because there is a transient issue when
603+ retrieving the metric from the sensor.
604+
605+ The supported metrics by a sensor can even change with time, for example,
606+ if a sensor is updated with new firmware.
607+
608+ Args:
609+ sensor: The sensor to stream data from.
610+ metrics: If not `None`, only the specified metrics will be retrieved.
611+ Otherwise all available metrics will be retrieved.
612+ buffer_size: The maximum number of messages to buffer in the returned
613+ receiver. After this limit is reached, the oldest messages will be
614+ dropped.
615+
616+ Returns:
617+ A receiver to retrieve data from the sensor.
618+ """
619+ sensor_id = _get_sensor_id (sensor )
620+ key = str (sensor_id )
621+
622+ class _ExtraArgs (TypedDict ):
623+ metrics : NotRequired [frozenset [sensor_pb2 .SensorMetric .ValueType ]]
624+
625+ extra_args : _ExtraArgs = {}
626+ if metrics is not None :
627+ extra_args ["metrics" ] = frozenset (
628+ [_get_sensor_metric_value (m ) for m in metrics ]
629+ )
630+ # We use the frozenset because iterables are not hashable
631+ key += f"{ hash (extra_args ['metrics' ])} "
632+
633+ broadcaster = self ._sensor_data_broadcasters .get (key )
634+ if broadcaster is None :
635+ client_id = hex (id (self ))[2 :]
636+ stream_name = f"microgrid-client-{ client_id } -sensor-data-{ key } "
637+ broadcaster = streaming .GrpcStreamBroadcaster (
638+ stream_name ,
639+ lambda : aiter (
640+ self .stub .StreamComponentData (
641+ microgrid_pb2 .ComponentIdParam (id = sensor_id ),
642+ timeout = DEFAULT_GRPC_CALL_TIMEOUT ,
643+ )
644+ ),
645+ partial (sensor_data_samples_from_proto , ** extra_args ),
646+ retry_strategy = self ._retry_strategy ,
647+ )
648+ self ._sensor_data_broadcasters [key ] = broadcaster
649+ return broadcaster .new_receiver (maxsize = buffer_size )
650+
651+
652+ def _get_sensor_id (sensor : SensorId | Sensor ) -> int :
653+ """Get the sensor ID from a sensor or sensor ID."""
654+ match sensor :
655+ case SensorId ():
656+ return int (sensor )
657+ case Sensor ():
658+ return int (sensor .id )
659+ case unexpected :
660+ assert_never (unexpected )
661+
662+
663+ def _get_sensor_metric_value (
664+ metric : SensorMetric | int ,
665+ ) -> sensor_pb2 .SensorMetric .ValueType :
666+ """Get the sensor metric ID from a sensor metric or sensor metric ID."""
667+ match metric :
668+ case SensorMetric ():
669+ return sensor_pb2 .SensorMetric .ValueType (metric .value )
670+ case int ():
671+ return sensor_pb2 .SensorMetric .ValueType (metric )
672+ case unexpected :
673+ assert_never (unexpected )
0 commit comments