Skip to content

Commit d22243a

Browse files
authored
Use streaming endpoint in list_microgrid_components_data (#67)
The streaming endpoint supports requests for historical data. This replaces the list endpoint with the streaming endpoint. The names in the client are for now unchanged to not break user code. They might be revised when live streaming will also be supported by the service. The list endpoint will most likely be deprecated and removed at some point.
2 parents 6c8753e + 223cb13 commit d22243a

File tree

4 files changed

+70
-130
lines changed

4 files changed

+70
-130
lines changed

RELEASE_NOTES.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66

77
## Upgrading
88

9+
* The client now uses the streaming endpoint for historical data requests. The page size parameter is no longer required.
10+
911
## New Features
1012

1113
## Bug Fixes

src/frequenz/client/reporting/__main__.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,6 @@ def main() -> None:
6666
args.start,
6767
args.end,
6868
args.resolution,
69-
page_size=args.psize,
7069
service_address=args.url,
7170
key=args.key,
7271
fmt=args.format,
@@ -82,7 +81,6 @@ async def run(
8281
start_dt: datetime,
8382
end_dt: datetime,
8483
resolution: int,
85-
page_size: int,
8684
service_address: str,
8785
key: str,
8886
fmt: str,
@@ -96,7 +94,6 @@ async def run(
9694
start_dt: start datetime
9795
end_dt: end datetime
9896
resolution: resampling resolution in sec
99-
page_size: page size
10097
service_address: service address
10198
key: API key
10299
fmt: output format
@@ -123,7 +120,6 @@ def data_iter() -> AsyncIterator[MetricSample]:
123120
start_dt=start_dt,
124121
end_dt=end_dt,
125122
resolution=resolution,
126-
page_size=page_size,
127123
)
128124

129125
if fmt == "iter":

src/frequenz/client/reporting/_client.py

Lines changed: 58 additions & 115 deletions
Original file line numberDiff line numberDiff line change
@@ -4,25 +4,22 @@
44
"""Client for requests to the Reporting API."""
55

66
from collections import namedtuple
7+
from collections.abc import AsyncIterator, Iterator
78
from dataclasses import dataclass
89
from datetime import datetime
9-
from typing import Any, AsyncIterator, Awaitable, Iterator, Type, cast
10+
from typing import Any, cast
1011

1112
import grpc.aio as grpcaio
1213

1314
# pylint: disable=no-name-in-module
14-
from frequenz.api.common.v1.metrics.metric_sample_pb2 import Metric as PBMetric
1515
from frequenz.api.common.v1.microgrid.microgrid_pb2 import (
1616
MicrogridComponentIDs as PBMicrogridComponentIDs,
1717
)
18-
from frequenz.api.common.v1.pagination.pagination_params_pb2 import (
19-
PaginationParams as PBPaginationParams,
20-
)
2118
from frequenz.api.reporting.v1.reporting_pb2 import (
22-
ListMicrogridComponentsDataRequest as PBListMicrogridComponentsDataRequest,
19+
ReceiveMicrogridComponentsDataStreamRequest as PBReceiveMicrogridComponentsDataStreamRequest,
2320
)
2421
from frequenz.api.reporting.v1.reporting_pb2 import (
25-
ListMicrogridComponentsDataResponse as PBListMicrogridComponentsDataResponse,
22+
ReceiveMicrogridComponentsDataStreamResponse as PBReceiveMicrogridComponentsDataStreamResponse,
2623
)
2724
from frequenz.api.reporting.v1.reporting_pb2 import (
2825
ResamplingOptions as PBResamplingOptions,
@@ -46,28 +43,26 @@
4643

4744

4845
@dataclass(frozen=True)
49-
class ComponentsDataPage:
50-
"""A page of microgrid components data returned by the Reporting service."""
46+
class ComponentsDataBatch:
47+
"""A batch of components data for a single microgrid returned by the Reporting service."""
5148

52-
_data_pb: PBListMicrogridComponentsDataResponse
49+
_data_pb: PBReceiveMicrogridComponentsDataStreamResponse
5350
"""The underlying protobuf message."""
5451

5552
def is_empty(self) -> bool:
56-
"""Check if the page contains valid data.
53+
"""Check if the batch contains valid data.
5754
5855
Returns:
59-
True if the page contains no valid data.
56+
True if the batch contains no valid data.
6057
"""
61-
if not self._data_pb.microgrids:
62-
return True
63-
if not self._data_pb.microgrids[0].components:
58+
if not self._data_pb.components:
6459
return True
65-
if not self._data_pb.microgrids[0].components[0].metric_samples:
60+
if not self._data_pb.components[0].metric_samples:
6661
return True
6762
return False
6863

6964
def __iter__(self) -> Iterator[MetricSample]:
70-
"""Get generator that iterates over all values in the page.
65+
"""Get generator that iterates over all values in the batch.
7166
7267
Note: So far only `SimpleMetricSample` in the `MetricSampleVariant`
7368
message is supported.
@@ -82,34 +77,24 @@ def __iter__(self) -> Iterator[MetricSample]:
8277
* value: The metric value.
8378
"""
8479
data = self._data_pb
85-
for mdata in data.microgrids:
86-
mid = mdata.microgrid_id
87-
for cdata in mdata.components:
88-
cid = cdata.component_id
89-
for msample in cdata.metric_samples:
90-
ts = msample.sampled_at.ToDatetime()
91-
met = Metric.from_proto(msample.metric).name
92-
value = (
93-
msample.value.simple_metric.value
94-
if msample.value.simple_metric
95-
else None
96-
)
97-
yield MetricSample(
98-
timestamp=ts,
99-
microgrid_id=mid,
100-
component_id=cid,
101-
metric=met,
102-
value=value,
103-
)
104-
105-
@property
106-
def next_page_token(self) -> str | None:
107-
"""Get the token for the next page of data.
108-
109-
Returns:
110-
The token for the next page of data.
111-
"""
112-
return self._data_pb.pagination_info.next_page_token
80+
mid = data.microgrid_id
81+
for cdata in data.components:
82+
cid = cdata.component_id
83+
for msample in cdata.metric_samples:
84+
ts = msample.sampled_at.ToDatetime()
85+
met = Metric.from_proto(msample.metric).name
86+
value = (
87+
msample.value.simple_metric.value
88+
if msample.value.simple_metric
89+
else None
90+
)
91+
yield MetricSample(
92+
timestamp=ts,
93+
microgrid_id=mid,
94+
component_id=cid,
95+
metric=met,
96+
value=value,
97+
)
11398

11499

115100
class ReportingApiClient:
@@ -136,7 +121,6 @@ async def list_single_component_data(
136121
start_dt: datetime,
137122
end_dt: datetime,
138123
resolution: int | None,
139-
page_size: int = 1000,
140124
) -> AsyncIterator[MetricSample]:
141125
"""Iterate over the data for a single metric.
142126
@@ -147,22 +131,20 @@ async def list_single_component_data(
147131
start_dt: The start date and time.
148132
end_dt: The end date and time.
149133
resolution: The resampling resolution for the data, represented in seconds.
150-
page_size: The page size.
151134
152135
Yields:
153136
A named tuple with the following fields:
154137
* timestamp: The timestamp of the metric sample.
155138
* value: The metric value.
156139
"""
157-
async for page in self._list_microgrid_components_data_pages(
140+
async for batch in self._list_microgrid_components_data_batch(
158141
microgrid_components=[(microgrid_id, [component_id])],
159142
metrics=[metrics] if isinstance(metrics, Metric) else metrics,
160143
start_dt=start_dt,
161144
end_dt=end_dt,
162145
resolution=resolution,
163-
page_size=page_size,
164146
):
165-
for entry in page:
147+
for entry in batch:
166148
yield entry
167149

168150
# pylint: disable=too-many-arguments
@@ -174,7 +156,6 @@ async def list_microgrid_components_data(
174156
start_dt: datetime,
175157
end_dt: datetime,
176158
resolution: int | None,
177-
page_size: int = 1000,
178159
) -> AsyncIterator[MetricSample]:
179160
"""Iterate over the data for multiple microgrids and components.
180161
@@ -185,7 +166,6 @@ async def list_microgrid_components_data(
185166
start_dt: The start date and time.
186167
end_dt: The end date and time.
187168
resolution: The resampling resolution for the data, represented in seconds.
188-
page_size: The page size.
189169
190170
Yields:
191171
A named tuple with the following fields:
@@ -195,31 +175,29 @@ async def list_microgrid_components_data(
195175
* timestamp: The timestamp of the metric sample.
196176
* value: The metric value.
197177
"""
198-
async for page in self._list_microgrid_components_data_pages(
178+
async for batch in self._list_microgrid_components_data_batch(
199179
microgrid_components=microgrid_components,
200180
metrics=[metrics] if isinstance(metrics, Metric) else metrics,
201181
start_dt=start_dt,
202182
end_dt=end_dt,
203183
resolution=resolution,
204-
page_size=page_size,
205184
):
206-
for entry in page:
185+
for entry in batch:
207186
yield entry
208187

209188
# pylint: disable=too-many-arguments
210-
async def _list_microgrid_components_data_pages(
189+
async def _list_microgrid_components_data_batch(
211190
self,
212191
*,
213192
microgrid_components: list[tuple[int, list[int]]],
214193
metrics: list[Metric],
215194
start_dt: datetime,
216195
end_dt: datetime,
217196
resolution: int | None,
218-
page_size: int,
219-
) -> AsyncIterator[ComponentsDataPage]:
220-
"""Iterate over the pages of microgrid components data.
197+
) -> AsyncIterator[ComponentsDataBatch]:
198+
"""Iterate over the component data batches in the stream.
221199
222-
Note: This does not yet support resampling or aggregating the data. It
200+
Note: This does not yet support aggregating the data. It
223201
also does not yet support fetching bound and state data.
224202
225203
Args:
@@ -228,10 +206,9 @@ async def _list_microgrid_components_data_pages(
228206
start_dt: The start date and time.
229207
end_dt: The end date and time.
230208
resolution: The resampling resolution for the data, represented in seconds.
231-
page_size: The page size.
232209
233210
Yields:
234-
A ComponentsDataPage object of microgrid components data.
211+
A ComponentsDataBatch object of microgrid components data.
235212
"""
236213
microgrid_components_pb = [
237214
PBMicrogridComponentIDs(microgrid_id=mid, component_ids=cids)
@@ -248,71 +225,37 @@ def dt2ts(dt: datetime) -> PBTimestamp:
248225
end=dt2ts(end_dt),
249226
)
250227

251-
list_filter = PBListMicrogridComponentsDataRequest.ListFilter(
228+
list_filter = PBReceiveMicrogridComponentsDataStreamRequest.StreamFilter(
252229
time_filter=time_filter,
253230
resampling_options=PBResamplingOptions(resolution=resolution),
254231
)
255232

256233
metrics_pb = [metric.to_proto() for metric in metrics]
257234

258-
page_token = None
259-
260-
while True:
261-
pagination_params = PBPaginationParams(
262-
page_size=page_size, page_token=page_token
263-
)
264-
265-
response = await self._fetch_page(
266-
microgrid_components=microgrid_components_pb,
267-
metrics=metrics_pb,
268-
list_filter=list_filter,
269-
pagination_params=pagination_params,
270-
)
271-
if not response or response.is_empty():
272-
break
273-
274-
yield response
275-
276-
page_token = response.next_page_token
277-
if not page_token:
278-
break
279-
280-
async def _fetch_page(
281-
self,
282-
*,
283-
microgrid_components: list[PBMicrogridComponentIDs],
284-
metrics: list[PBMetric.ValueType],
285-
list_filter: PBListMicrogridComponentsDataRequest.ListFilter,
286-
pagination_params: PBPaginationParams,
287-
) -> ComponentsDataPage | None:
288-
"""Fetch a single page of microgrid components data.
289-
290-
Args:
291-
microgrid_components: A list of microgrid components.
292-
metrics: A list of metrics.
293-
list_filter: A list filter.
294-
pagination_params: A pagination params.
235+
request = PBReceiveMicrogridComponentsDataStreamRequest(
236+
microgrid_components=microgrid_components_pb,
237+
metrics=metrics_pb,
238+
filter=list_filter,
239+
)
295240

296-
Returns:
297-
A ComponentsDataPage object of microgrid components data.
298-
"""
299241
try:
300-
request = PBListMicrogridComponentsDataRequest(
301-
microgrid_components=microgrid_components,
302-
metrics=metrics,
303-
filter=list_filter,
304-
pagination_params=pagination_params,
305-
)
306-
response = await cast(
307-
Awaitable[PBListMicrogridComponentsDataResponse],
308-
self._stub.ListMicrogridComponentsData(
242+
stream = cast(
243+
AsyncIterator[PBReceiveMicrogridComponentsDataStreamResponse],
244+
self._stub.ReceiveMicrogridComponentsDataStream(
309245
request, metadata=self._metadata
310246
),
311247
)
248+
# grpc.aio is missing types and mypy thinks this is not
249+
# async iterable, but it is.
250+
async for response in stream:
251+
if not response:
252+
break
253+
254+
yield ComponentsDataBatch(response)
255+
312256
except grpcaio.AioRpcError as e:
313257
print(f"RPC failed: {e}")
314-
return None
315-
return ComponentsDataPage(response)
258+
return
316259

317260
async def close(self) -> None:
318261
"""Close the client and cancel any pending requests immediately."""
@@ -324,7 +267,7 @@ async def __aenter__(self) -> "ReportingApiClient":
324267

325268
async def __aexit__(
326269
self,
327-
_exc_type: Type[BaseException] | None,
270+
_exc_type: type[BaseException] | None,
328271
_exc_val: BaseException | None,
329272
_exc_tb: Any | None,
330273
) -> bool | None:

tests/test_client_reporting.py

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import pytest
99

1010
from frequenz.client.reporting import ReportingApiClient
11-
from frequenz.client.reporting._client import ComponentsDataPage
11+
from frequenz.client.reporting._client import ComponentsDataBatch
1212

1313

1414
@pytest.fixture
@@ -25,19 +25,18 @@ async def test_client_initialization(mock_channel: MagicMock) -> None:
2525
mock_channel.assert_called_once_with("localhost:50051")
2626

2727

28-
def test_components_data_page_is_empty_true() -> None:
28+
def test_components_data_batch_is_empty_true() -> None:
2929
"""Test that the is_empty method returns True when the page is empty."""
3030
data_pb = MagicMock()
31-
data_pb.microgrids = []
32-
page = ComponentsDataPage(_data_pb=data_pb)
33-
assert page.is_empty() is True
31+
data_pb.components = []
32+
batch = ComponentsDataBatch(_data_pb=data_pb)
33+
assert batch.is_empty() is True
3434

3535

36-
def test_components_data_page_is_empty_false() -> None:
36+
def test_components_data_batch_is_empty_false() -> None:
3737
"""Test that the is_empty method returns False when the page is not empty."""
3838
data_pb = MagicMock()
39-
data_pb.microgrids = [MagicMock()]
40-
data_pb.microgrids[0].components = [MagicMock()]
41-
data_pb.microgrids[0].components[0].metric_samples = [MagicMock()]
42-
page = ComponentsDataPage(_data_pb=data_pb)
43-
assert page.is_empty() is False
39+
data_pb.components = [MagicMock()]
40+
data_pb.components[0].metric_samples = [MagicMock()]
41+
batch = ComponentsDataBatch(_data_pb=data_pb)
42+
assert batch.is_empty() is False

0 commit comments

Comments
 (0)