66from __future__ import annotations
77
88import asyncio
9+ import itertools
910import logging
1011from collections .abc import Callable , Iterable , Set
1112from dataclasses import replace
12- from typing import Any , TypeVar
13+ from functools import partial
14+ from typing import Any , NotRequired , TypedDict , TypeVar , assert_never
1315
1416from frequenz .api .common import components_pb2 , metrics_pb2
15- from frequenz .api .microgrid import microgrid_pb2 , microgrid_pb2_grpc
17+ from frequenz .api .microgrid import microgrid_pb2 , microgrid_pb2_grpc , sensor_pb2
1618from frequenz .channels import Receiver
1719from frequenz .client .base import channel , client , retry , streaming
1820from google .protobuf .empty_pb2 import Empty
3537from ._connection import Connection
3638from ._constants import RECEIVER_MAX_SIZE
3739from ._exception import ApiClientError , ClientNotConnected
38- from ._id import ComponentId , MicrogridId
3940from ._metadata import Location , Metadata
41+ from ._sensor_proto import sensor_data_samples_from_proto , sensor_from_proto
42+ from .id import ComponentId , MicrogridId , SensorId
43+ from .sensor import Sensor , SensorDataSamples , SensorMetric
4044
4145DEFAULT_GRPC_CALL_TIMEOUT = 60.0
4246"""The default timeout for gRPC calls made by this client (in seconds)."""
@@ -96,6 +100,12 @@ def __init__(
96100 self ._broadcasters : dict [
97101 ComponentId , streaming .GrpcStreamBroadcaster [Any , Any ]
98102 ] = {}
103+ self ._sensor_data_broadcasters : dict [
104+ str ,
105+ streaming .GrpcStreamBroadcaster [
106+ microgrid_pb2 .ComponentData , SensorDataSamples
107+ ],
108+ ] = {}
99109 self ._retry_strategy = retry_strategy
100110
101111 @property
@@ -117,15 +127,22 @@ async def __aexit__(
117127 exc_tb : Any | None ,
118128 ) -> bool | None :
119129 """Close the gRPC channel and stop all broadcasters."""
120- exceptions = [
130+ exceptions = list (
121131 exc
122132 for exc in await asyncio .gather (
123- * (broadcaster .stop () for broadcaster in self ._broadcasters .values ()),
133+ * (
134+ broadcaster .stop ()
135+ for broadcaster in itertools .chain (
136+ self ._broadcasters .values (),
137+ self ._sensor_data_broadcasters .values (),
138+ )
139+ ),
124140 return_exceptions = True ,
125141 )
126142 if isinstance (exc , BaseException )
127- ]
143+ )
128144 self ._broadcasters .clear ()
145+ self ._sensor_data_broadcasters .clear ()
129146
130147 result = None
131148 try :
@@ -177,6 +194,33 @@ async def components( # noqa: DOC502 (raises ApiClientError indirectly)
177194
178195 return result
179196
197+ async def list_sensors ( # noqa: DOC502 (raises ApiClientError indirectly)
198+ self ,
199+ ) -> Iterable [Sensor ]:
200+ """Fetch all the sensors present in the microgrid.
201+
202+ Returns:
203+ Iterator whose elements are all the sensors in the microgrid.
204+
205+ Raises:
206+ ApiClientError: If the are any errors communicating with the Microgrid API,
207+ most likely a subclass of
208+ [GrpcError][frequenz.client.microgrid.GrpcError].
209+ """
210+ component_list = await client .call_stub_method (
211+ self ,
212+ lambda : self .stub .ListComponents (
213+ microgrid_pb2 .ComponentFilter (
214+ categories = [
215+ components_pb2 .ComponentCategory .COMPONENT_CATEGORY_SENSOR
216+ ]
217+ ),
218+ timeout = int (DEFAULT_GRPC_CALL_TIMEOUT ),
219+ ),
220+ method_name = "ListComponents" ,
221+ )
222+ return map (sensor_from_proto , component_list .components )
223+
180224 async def metadata (self ) -> Metadata :
181225 """Fetch the microgrid metadata.
182226
@@ -539,3 +583,91 @@ async def set_bounds( # noqa: DOC503 (raises ApiClientError indirectly)
539583 ),
540584 method_name = "AddInclusionBounds" ,
541585 )
586+
587+ # noqa: DOC502 (Raises ApiClientError indirectly)
588+ def stream_sensor_data (
589+ self ,
590+ sensor : SensorId | Sensor ,
591+ metrics : Iterable [SensorMetric | int ] | None = None ,
592+ * ,
593+ buffer_size : int = 50 ,
594+ ) -> Receiver [SensorDataSamples ]:
595+ """Stream data samples from a sensor.
596+
597+ Warning:
598+ Sensors may not support all metrics. If a sensor does not support
599+ a given metric, then the returned data stream will not contain that metric.
600+
601+ There is no way to tell if a metric is not being received because the
602+ sensor does not support it or because there is a transient issue when
603+ retrieving the metric from the sensor.
604+
605+ The supported metrics by a sensor can even change with time, for example,
606+ if a sensor is updated with new firmware.
607+
608+ Args:
609+ sensor: The sensor to stream data from.
610+ metrics: If not `None`, only the specified metrics will be retrieved.
611+ Otherwise all available metrics will be retrieved.
612+ buffer_size: The maximum number of messages to buffer in the returned
613+ receiver. After this limit is reached, the oldest messages will be
614+ dropped.
615+
616+ Returns:
617+ A receiver to retrieve data from the sensor.
618+ """
619+ sensor_id = _get_sensor_id (sensor )
620+ key = str (sensor_id )
621+
622+ class _ExtraArgs (TypedDict ):
623+ metrics : NotRequired [frozenset [sensor_pb2 .SensorMetric .ValueType ]]
624+
625+ extra_args : _ExtraArgs = {}
626+ if metrics is not None :
627+ extra_args ["metrics" ] = frozenset (
628+ [_get_sensor_metric_value (m ) for m in metrics ]
629+ )
630+ # We use the frozenset because iterables are not hashable
631+ key += f"{ hash (extra_args ['metrics' ])} "
632+
633+ broadcaster = self ._sensor_data_broadcasters .get (key )
634+ if broadcaster is None :
635+ client_id = hex (id (self ))[2 :]
636+ stream_name = f"microgrid-client-{ client_id } -sensor-data-{ key } "
637+ broadcaster = streaming .GrpcStreamBroadcaster (
638+ stream_name ,
639+ lambda : aiter (
640+ self .stub .StreamComponentData (
641+ microgrid_pb2 .ComponentIdParam (id = sensor_id ),
642+ timeout = DEFAULT_GRPC_CALL_TIMEOUT ,
643+ )
644+ ),
645+ partial (sensor_data_samples_from_proto , ** extra_args ),
646+ retry_strategy = self ._retry_strategy ,
647+ )
648+ self ._sensor_data_broadcasters [key ] = broadcaster
649+ return broadcaster .new_receiver (maxsize = buffer_size )
650+
651+
652+ def _get_sensor_id (sensor : SensorId | Sensor ) -> int :
653+ """Get the sensor ID from a sensor or sensor ID."""
654+ match sensor :
655+ case SensorId ():
656+ return int (sensor )
657+ case Sensor ():
658+ return int (sensor .id )
659+ case unexpected :
660+ assert_never (unexpected )
661+
662+
663+ def _get_sensor_metric_value (
664+ metric : SensorMetric | int ,
665+ ) -> sensor_pb2 .SensorMetric .ValueType :
666+ """Get the sensor metric ID from a sensor metric or sensor metric ID."""
667+ match metric :
668+ case SensorMetric ():
669+ return sensor_pb2 .SensorMetric .ValueType (metric .value )
670+ case int ():
671+ return sensor_pb2 .SensorMetric .ValueType (metric )
672+ case unexpected :
673+ assert_never (unexpected )
0 commit comments