44"""Client for requests to the Reporting API."""
55
66from collections import namedtuple
7- from collections .abc import AsyncIterator , Iterator
7+ from collections .abc import AsyncIterator , Iterable , Iterator
88from dataclasses import dataclass
99from datetime import datetime
1010from typing import cast
1515from frequenz .api .common .v1 .microgrid .microgrid_pb2 import (
1616 MicrogridComponentIDs as PBMicrogridComponentIDs ,
1717)
18+ from frequenz .api .reporting .v1 .reporting_pb2 import IncludeOptions as PBIncludeOptions
1819from frequenz .api .reporting .v1 .reporting_pb2 import (
1920 ReceiveMicrogridComponentsDataStreamRequest as PBReceiveMicrogridComponentsDataStreamRequest ,
2021)
@@ -56,7 +57,10 @@ def is_empty(self) -> bool:
5657 """
5758 if not self ._data_pb .components :
5859 return True
59- if not self ._data_pb .components [0 ].metric_samples :
60+ if (
61+ not self ._data_pb .components [0 ].metric_samples
62+ and not self ._data_pb .components [0 ].states
63+ ):
6064 return True
6165 return False
6266
@@ -94,6 +98,26 @@ def __iter__(self) -> Iterator[MetricSample]:
9498 metric = met ,
9599 value = value ,
96100 )
101+ for state in cdata .states :
102+ ts = state .sampled_at .ToDatetime ()
103+ for name , category in {
104+ "state" : state .states ,
105+ "warning" : state .warnings ,
106+ "error" : state .errors ,
107+ }.items ():
108+ # Skip if the category is not present
109+ if not isinstance (category , Iterable ):
110+ continue
111+ # Each category can have multiple states
112+ # that are provided as individual samples
113+ for s in category :
114+ yield MetricSample (
115+ timestamp = ts ,
116+ microgrid_id = mid ,
117+ component_id = cid ,
118+ metric = name ,
119+ value = s ,
120+ )
97121
98122
99123class ReportingApiClient (BaseApiClient [ReportingStub , grpcaio .Channel ]):
@@ -120,6 +144,7 @@ async def list_single_component_data(
120144 start_dt : datetime ,
121145 end_dt : datetime ,
122146 resolution : int | None ,
147+ include_states : bool = False ,
123148 ) -> AsyncIterator [MetricSample ]:
124149 """Iterate over the data for a single metric.
125150
@@ -130,6 +155,7 @@ async def list_single_component_data(
130155 start_dt: The start date and time.
131156 end_dt: The end date and time.
132157 resolution: The resampling resolution for the data, represented in seconds.
158+ include_states: Whether to include the state data.
133159
134160 Yields:
135161 A named tuple with the following fields:
@@ -142,6 +168,7 @@ async def list_single_component_data(
142168 start_dt = start_dt ,
143169 end_dt = end_dt ,
144170 resolution = resolution ,
171+ include_states = include_states ,
145172 ):
146173 for entry in batch :
147174 yield entry
@@ -155,6 +182,7 @@ async def list_microgrid_components_data(
155182 start_dt : datetime ,
156183 end_dt : datetime ,
157184 resolution : int | None ,
185+ include_states : bool = False ,
158186 ) -> AsyncIterator [MetricSample ]:
159187 """Iterate over the data for multiple microgrids and components.
160188
@@ -165,6 +193,7 @@ async def list_microgrid_components_data(
165193 start_dt: The start date and time.
166194 end_dt: The end date and time.
167195 resolution: The resampling resolution for the data, represented in seconds.
196+ include_states: Whether to include the state data.
168197
169198 Yields:
170199 A named tuple with the following fields:
@@ -180,11 +209,13 @@ async def list_microgrid_components_data(
180209 start_dt = start_dt ,
181210 end_dt = end_dt ,
182211 resolution = resolution ,
212+ include_states = include_states ,
183213 ):
184214 for entry in batch :
185215 yield entry
186216
187217 # pylint: disable=too-many-arguments
218+ # pylint: disable=too-many-locals
188219 async def _list_microgrid_components_data_batch (
189220 self ,
190221 * ,
@@ -193,6 +224,7 @@ async def _list_microgrid_components_data_batch(
193224 start_dt : datetime ,
194225 end_dt : datetime ,
195226 resolution : int | None ,
227+ include_states : bool = False ,
196228 ) -> AsyncIterator [ComponentsDataBatch ]:
197229 """Iterate over the component data batches in the stream.
198230
@@ -205,6 +237,7 @@ async def _list_microgrid_components_data_batch(
205237 start_dt: The start date and time.
206238 end_dt: The end date and time.
207239 resolution: The resampling resolution for the data, represented in seconds.
240+ include_states: Whether to include the state data.
208241
209242 Yields:
210243 A ComponentsDataBatch object of microgrid components data.
@@ -224,17 +257,27 @@ def dt2ts(dt: datetime) -> PBTimestamp:
224257 end = dt2ts (end_dt ),
225258 )
226259
227- list_filter = PBReceiveMicrogridComponentsDataStreamRequest .StreamFilter (
260+ incl_states = (
261+ PBIncludeOptions .FilterOption .FILTER_OPTION_INCLUDE
262+ if include_states
263+ else PBIncludeOptions .FilterOption .FILTER_OPTION_EXCLUDE
264+ )
265+ include_options = PBIncludeOptions (
266+ states = incl_states ,
267+ )
268+
269+ stream_filter = PBReceiveMicrogridComponentsDataStreamRequest .StreamFilter (
228270 time_filter = time_filter ,
229271 resampling_options = PBResamplingOptions (resolution = resolution ),
272+ include_options = include_options ,
230273 )
231274
232275 metrics_pb = [metric .to_proto () for metric in metrics ]
233276
234277 request = PBReceiveMicrogridComponentsDataStreamRequest (
235278 microgrid_components = microgrid_components_pb ,
236279 metrics = metrics_pb ,
237- filter = list_filter ,
280+ filter = stream_filter ,
238281 )
239282
240283 try :
0 commit comments