44"""Client for requests to the Reporting API."""
55
66from collections import namedtuple
7+ from collections .abc import AsyncIterator , Iterator
78from dataclasses import dataclass
89from datetime import datetime
9- from typing import Any , AsyncIterator , Awaitable , Iterator , Type , cast
10+ from typing import Any , cast
1011
1112import grpc .aio as grpcaio
1213
1314# pylint: disable=no-name-in-module
14- from frequenz .api .common .v1 .metrics .metric_sample_pb2 import Metric as PBMetric
1515from frequenz .api .common .v1 .microgrid .microgrid_pb2 import (
1616 MicrogridComponentIDs as PBMicrogridComponentIDs ,
1717)
18- from frequenz .api .common .v1 .pagination .pagination_params_pb2 import (
19- PaginationParams as PBPaginationParams ,
20- )
2118from frequenz .api .reporting .v1 .reporting_pb2 import (
22- ListMicrogridComponentsDataRequest as PBListMicrogridComponentsDataRequest ,
19+ ReceiveMicrogridComponentsDataStreamRequest as PBReceiveMicrogridComponentsDataStreamRequest ,
2320)
2421from frequenz .api .reporting .v1 .reporting_pb2 import (
25- ListMicrogridComponentsDataResponse as PBListMicrogridComponentsDataResponse ,
22+ ReceiveMicrogridComponentsDataStreamResponse as PBReceiveMicrogridComponentsDataStreamResponse ,
2623)
2724from frequenz .api .reporting .v1 .reporting_pb2 import (
2825 ResamplingOptions as PBResamplingOptions ,
4643
4744
4845@dataclass (frozen = True )
49- class ComponentsDataPage :
50- """A page of microgrid components data returned by the Reporting service."""
46+ class ComponentsDataBatch :
47+ """A batch of components data for a single microgrid returned by the Reporting service."""
5148
52- _data_pb : PBListMicrogridComponentsDataResponse
49+ _data_pb : PBReceiveMicrogridComponentsDataStreamResponse
5350 """The underlying protobuf message."""
5451
5552 def is_empty (self ) -> bool :
56- """Check if the page contains valid data.
53+ """Check if the batch contains valid data.
5754
5855 Returns:
59- True if the page contains no valid data.
56+ True if the batch contains no valid data.
6057 """
61- if not self ._data_pb .microgrids :
62- return True
63- if not self ._data_pb .microgrids [0 ].components :
58+ if not self ._data_pb .components :
6459 return True
65- if not self ._data_pb .microgrids [ 0 ]. components [0 ].metric_samples :
60+ if not self ._data_pb .components [0 ].metric_samples :
6661 return True
6762 return False
6863
6964 def __iter__ (self ) -> Iterator [MetricSample ]:
70- """Get generator that iterates over all values in the page .
65+ """Get generator that iterates over all values in the batch .
7166
7267 Note: So far only `SimpleMetricSample` in the `MetricSampleVariant`
7368 message is supported.
@@ -82,34 +77,24 @@ def __iter__(self) -> Iterator[MetricSample]:
8277 * value: The metric value.
8378 """
8479 data = self ._data_pb
85- for mdata in data .microgrids :
86- mid = mdata .microgrid_id
87- for cdata in mdata .components :
88- cid = cdata .component_id
89- for msample in cdata .metric_samples :
90- ts = msample .sampled_at .ToDatetime ()
91- met = Metric .from_proto (msample .metric ).name
92- value = (
93- msample .value .simple_metric .value
94- if msample .value .simple_metric
95- else None
96- )
97- yield MetricSample (
98- timestamp = ts ,
99- microgrid_id = mid ,
100- component_id = cid ,
101- metric = met ,
102- value = value ,
103- )
104-
105- @property
106- def next_page_token (self ) -> str | None :
107- """Get the token for the next page of data.
108-
109- Returns:
110- The token for the next page of data.
111- """
112- return self ._data_pb .pagination_info .next_page_token
80+ mid = data .microgrid_id
81+ for cdata in data .components :
82+ cid = cdata .component_id
83+ for msample in cdata .metric_samples :
84+ ts = msample .sampled_at .ToDatetime ()
85+ met = Metric .from_proto (msample .metric ).name
86+ value = (
87+ msample .value .simple_metric .value
88+ if msample .value .simple_metric
89+ else None
90+ )
91+ yield MetricSample (
92+ timestamp = ts ,
93+ microgrid_id = mid ,
94+ component_id = cid ,
95+ metric = met ,
96+ value = value ,
97+ )
11398
11499
115100class ReportingApiClient :
@@ -136,7 +121,6 @@ async def list_single_component_data(
136121 start_dt : datetime ,
137122 end_dt : datetime ,
138123 resolution : int | None ,
139- page_size : int = 1000 ,
140124 ) -> AsyncIterator [MetricSample ]:
141125 """Iterate over the data for a single metric.
142126
@@ -147,22 +131,20 @@ async def list_single_component_data(
147131 start_dt: The start date and time.
148132 end_dt: The end date and time.
149133 resolution: The resampling resolution for the data, represented in seconds.
150- page_size: The page size.
151134
152135 Yields:
153136 A named tuple with the following fields:
154137 * timestamp: The timestamp of the metric sample.
155138 * value: The metric value.
156139 """
157- async for page in self ._list_microgrid_components_data_pages (
140+ async for batch in self ._list_microgrid_components_data_batch (
158141 microgrid_components = [(microgrid_id , [component_id ])],
159142 metrics = [metrics ] if isinstance (metrics , Metric ) else metrics ,
160143 start_dt = start_dt ,
161144 end_dt = end_dt ,
162145 resolution = resolution ,
163- page_size = page_size ,
164146 ):
165- for entry in page :
147+ for entry in batch :
166148 yield entry
167149
168150 # pylint: disable=too-many-arguments
@@ -174,7 +156,6 @@ async def list_microgrid_components_data(
174156 start_dt : datetime ,
175157 end_dt : datetime ,
176158 resolution : int | None ,
177- page_size : int = 1000 ,
178159 ) -> AsyncIterator [MetricSample ]:
179160 """Iterate over the data for multiple microgrids and components.
180161
@@ -185,7 +166,6 @@ async def list_microgrid_components_data(
185166 start_dt: The start date and time.
186167 end_dt: The end date and time.
187168 resolution: The resampling resolution for the data, represented in seconds.
188- page_size: The page size.
189169
190170 Yields:
191171 A named tuple with the following fields:
@@ -195,31 +175,29 @@ async def list_microgrid_components_data(
195175 * timestamp: The timestamp of the metric sample.
196176 * value: The metric value.
197177 """
198- async for page in self ._list_microgrid_components_data_pages (
178+ async for batch in self ._list_microgrid_components_data_batch (
199179 microgrid_components = microgrid_components ,
200180 metrics = [metrics ] if isinstance (metrics , Metric ) else metrics ,
201181 start_dt = start_dt ,
202182 end_dt = end_dt ,
203183 resolution = resolution ,
204- page_size = page_size ,
205184 ):
206- for entry in page :
185+ for entry in batch :
207186 yield entry
208187
209188 # pylint: disable=too-many-arguments
210- async def _list_microgrid_components_data_pages (
189+ async def _list_microgrid_components_data_batch (
211190 self ,
212191 * ,
213192 microgrid_components : list [tuple [int , list [int ]]],
214193 metrics : list [Metric ],
215194 start_dt : datetime ,
216195 end_dt : datetime ,
217196 resolution : int | None ,
218- page_size : int ,
219- ) -> AsyncIterator [ComponentsDataPage ]:
220- """Iterate over the pages of microgrid components data.
197+ ) -> AsyncIterator [ComponentsDataBatch ]:
198+ """Iterate over the component data batches in the stream.
221199
222- Note: This does not yet support resampling or aggregating the data. It
200+ Note: This does not yet support aggregating the data. It
223201 also does not yet support fetching bound and state data.
224202
225203 Args:
@@ -228,10 +206,9 @@ async def _list_microgrid_components_data_pages(
228206 start_dt: The start date and time.
229207 end_dt: The end date and time.
230208 resolution: The resampling resolution for the data, represented in seconds.
231- page_size: The page size.
232209
233210 Yields:
234- A ComponentsDataPage object of microgrid components data.
211+ A ComponentsDataBatch object of microgrid components data.
235212 """
236213 microgrid_components_pb = [
237214 PBMicrogridComponentIDs (microgrid_id = mid , component_ids = cids )
@@ -248,71 +225,37 @@ def dt2ts(dt: datetime) -> PBTimestamp:
248225 end = dt2ts (end_dt ),
249226 )
250227
251- list_filter = PBListMicrogridComponentsDataRequest . ListFilter (
228+ list_filter = PBReceiveMicrogridComponentsDataStreamRequest . StreamFilter (
252229 time_filter = time_filter ,
253230 resampling_options = PBResamplingOptions (resolution = resolution ),
254231 )
255232
256233 metrics_pb = [metric .to_proto () for metric in metrics ]
257234
258- page_token = None
259-
260- while True :
261- pagination_params = PBPaginationParams (
262- page_size = page_size , page_token = page_token
263- )
264-
265- response = await self ._fetch_page (
266- microgrid_components = microgrid_components_pb ,
267- metrics = metrics_pb ,
268- list_filter = list_filter ,
269- pagination_params = pagination_params ,
270- )
271- if not response or response .is_empty ():
272- break
273-
274- yield response
275-
276- page_token = response .next_page_token
277- if not page_token :
278- break
279-
280- async def _fetch_page (
281- self ,
282- * ,
283- microgrid_components : list [PBMicrogridComponentIDs ],
284- metrics : list [PBMetric .ValueType ],
285- list_filter : PBListMicrogridComponentsDataRequest .ListFilter ,
286- pagination_params : PBPaginationParams ,
287- ) -> ComponentsDataPage | None :
288- """Fetch a single page of microgrid components data.
289-
290- Args:
291- microgrid_components: A list of microgrid components.
292- metrics: A list of metrics.
293- list_filter: A list filter.
294- pagination_params: A pagination params.
235+ request = PBReceiveMicrogridComponentsDataStreamRequest (
236+ microgrid_components = microgrid_components_pb ,
237+ metrics = metrics_pb ,
238+ filter = list_filter ,
239+ )
295240
296- Returns:
297- A ComponentsDataPage object of microgrid components data.
298- """
299241 try :
300- request = PBListMicrogridComponentsDataRequest (
301- microgrid_components = microgrid_components ,
302- metrics = metrics ,
303- filter = list_filter ,
304- pagination_params = pagination_params ,
305- )
306- response = await cast (
307- Awaitable [PBListMicrogridComponentsDataResponse ],
308- self ._stub .ListMicrogridComponentsData (
242+ stream = cast (
243+ AsyncIterator [PBReceiveMicrogridComponentsDataStreamResponse ],
244+ self ._stub .ReceiveMicrogridComponentsDataStream (
309245 request , metadata = self ._metadata
310246 ),
311247 )
248+ # grpc.aio is missing types and mypy thinks this is not
249+ # async iterable, but it is.
250+ async for response in stream :
251+ if not response :
252+ break
253+
254+ yield ComponentsDataBatch (response )
255+
312256 except grpcaio .AioRpcError as e :
313257 print (f"RPC failed: { e } " )
314- return None
315- return ComponentsDataPage (response )
258+ return
316259
317260 async def close (self ) -> None :
318261 """Close the client and cancel any pending requests immediately."""
@@ -324,7 +267,7 @@ async def __aenter__(self) -> "ReportingApiClient":
324267
325268 async def __aexit__ (
326269 self ,
327- _exc_type : Type [BaseException ] | None ,
270+ _exc_type : type [BaseException ] | None ,
328271 _exc_val : BaseException | None ,
329272 _exc_tb : Any | None ,
330273 ) -> bool | None :
0 commit comments