Skip to content

Commit 14ef9ff

Browse files
committed
Introduce simple reporting client
This is the initial release of the Reporting API client, streamlined for retrieving single metric historical data for a single component. It incorporates pagination handling and utilizes a wrapper data class that retains the raw protobuf response while offering transformation capabilities limited here to generators of structured data representation via named tuples. Current limitations include a single metric focus with plans for extensibility, ongoing development for states and bounds integration, as well as support for service-side features like resampling, streaming, and formula aggregations. Signed-off-by: cwasicki <[email protected]>
1 parent d6460ac commit 14ef9ff

File tree

2 files changed

+293
-17
lines changed

2 files changed

+293
-17
lines changed
Lines changed: 4 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,12 @@
11
# License: MIT
22
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH
33

4-
"""Reporting API client for Python.
4+
"""Client to connect to the Reporting API.
55
6-
TODO(cookiecutter): Add a more descriptive module description.
6+
This package provides a low-level interface for interacting with the reporting API.
77
"""
88

99

10-
# TODO(cookiecutter): Remove this function
11-
def delete_me(*, blow_up: bool = False) -> bool:
12-
"""Do stuff for demonstration purposes.
10+
from ._client import ReportingClient
1311

14-
Args:
15-
blow_up: If True, raise an exception.
16-
17-
Returns:
18-
True if no exception was raised.
19-
20-
Raises:
21-
RuntimeError: if blow_up is True.
22-
"""
23-
if blow_up:
24-
raise RuntimeError("This function should be removed!")
25-
return True
12+
__all__ = ["ReportingClient"]
Lines changed: 289 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,289 @@
1+
# License: MIT
2+
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Client for requests to the Reporting API."""
5+
6+
from collections import namedtuple
7+
from dataclasses import dataclass
8+
from datetime import datetime
9+
from typing import Any, AsyncIterator, Awaitable, Generator, Type, cast
10+
11+
import grpc.aio as grpcaio
12+
13+
# pylint: disable=no-name-in-module
14+
from frequenz.api.common.v1.metrics.metric_sample_pb2 import Metric as PBMetric
15+
from frequenz.api.common.v1.microgrid.microgrid_pb2 import (
16+
MicrogridComponentIDs as PBMicrogridComponentIDs,
17+
)
18+
from frequenz.api.common.v1.pagination.pagination_params_pb2 import (
19+
PaginationParams as PBPaginationParams,
20+
)
21+
from frequenz.api.reporting.v1.reporting_pb2 import (
22+
ListMicrogridComponentsDataRequest as PBListMicrogridComponentsDataRequest,
23+
)
24+
from frequenz.api.reporting.v1.reporting_pb2 import (
25+
ListMicrogridComponentsDataResponse as PBListMicrogridComponentsDataResponse,
26+
)
27+
from frequenz.api.reporting.v1.reporting_pb2 import TimeFilter as PBTimeFilter
28+
from frequenz.api.reporting.v1.reporting_pb2_grpc import ReportingStub
29+
from frequenz.client.common.metric import Metric
30+
from google.protobuf.timestamp_pb2 import Timestamp as PBTimestamp
31+
32+
# pylint: enable=no-name-in-module
33+
34+
Sample = namedtuple("Sample", ["timestamp", "value"])
35+
"""Type for a sample of a time series."""
36+
37+
MetricSample = namedtuple(
38+
"MetricSample", ["timestamp", "microgrid_id", "component_id", "metric", "value"]
39+
)
40+
"""Type for a sample of a time series incl. metric type, microgrid and component ID"""
41+
42+
43+
@dataclass(frozen=True)
44+
class ComponentsDataPage:
45+
"""A page of microgrid components data returned by the Reporting service."""
46+
47+
_data_pb: PBListMicrogridComponentsDataResponse
48+
"""The underlying protobuf message."""
49+
50+
def is_empty(self) -> bool:
51+
"""Check if the page contains valid data.
52+
53+
Returns:
54+
True if the page contains no valid data.
55+
"""
56+
if not self._data_pb.microgrids:
57+
return True
58+
if not self._data_pb.microgrids[0].components:
59+
return True
60+
if not self._data_pb.microgrids[0].components[0].metric_samples:
61+
return True
62+
return False
63+
64+
def iterate_metric_samples(self) -> Generator[MetricSample, None, None]:
65+
"""Get generator that iterates over all values in the page.
66+
67+
Note: So far only `SimpleMetricSample` in the `MetricSampleVariant`
68+
message is supported.
69+
70+
71+
Yields:
72+
A named tuple with the following fields:
73+
* timestamp: The timestamp of the metric sample.
74+
* microgrid_id: The microgrid ID.
75+
* component_id: The component ID.
76+
* metric: The metric name.
77+
* value: The metric value.
78+
"""
79+
data = self._data_pb
80+
for mdata in data.microgrids:
81+
mid = mdata.microgrid_id
82+
for cdata in mdata.components:
83+
cid = cdata.component_id
84+
for msample in cdata.metric_samples:
85+
ts = msample.sampled_at.ToDatetime()
86+
met = Metric.from_proto(msample.metric).name
87+
value = (
88+
msample.sample.simple_metric.value
89+
if msample.sample.simple_metric
90+
else None
91+
)
92+
yield MetricSample(
93+
timestamp=ts,
94+
microgrid_id=mid,
95+
component_id=cid,
96+
metric=met,
97+
value=value,
98+
)
99+
100+
@property
101+
def next_page_token(self) -> Any:
102+
"""Get the token for the next page of data.
103+
104+
Returns:
105+
The token for the next page of data.
106+
"""
107+
return self._data_pb.pagination_info.next_page_token
108+
109+
110+
class ReportingClient:
111+
"""A client for the Reporting service."""
112+
113+
def __init__(self, service_address: str):
114+
"""Create a new Reporting client.
115+
116+
Args:
117+
service_address: The address of the Reporting service.
118+
"""
119+
self._grpc_channel = grpcaio.insecure_channel(service_address)
120+
self._stub = ReportingStub(self._grpc_channel)
121+
122+
# pylint: disable=too-many-arguments
123+
async def iterate_single_metric(
124+
self,
125+
*,
126+
microgrid_id: int,
127+
component_id: int,
128+
metric: Metric,
129+
start_dt: datetime,
130+
end_dt: datetime,
131+
page_size: int = 1000,
132+
) -> AsyncIterator[Sample]:
133+
"""Iterate over the data for a single metric.
134+
135+
Args:
136+
microgrid_id: The microgrid ID.
137+
component_id: The component ID.
138+
metric: The metric name.
139+
start_dt: The start date and time.
140+
end_dt: The end date and time.
141+
page_size: The page size.
142+
143+
Yields:
144+
A named tuple with the following fields:
145+
* timestamp: The timestamp of the metric sample.
146+
* value: The metric value.
147+
"""
148+
async for page in self._iterate_components_data_pages(
149+
microgrid_components=[(microgrid_id, [component_id])],
150+
metrics=[metric],
151+
start_dt=start_dt,
152+
end_dt=end_dt,
153+
page_size=page_size,
154+
):
155+
for entry in page.iterate_metric_samples():
156+
yield Sample(timestamp=entry.timestamp, value=entry.value)
157+
158+
# pylint: disable=too-many-arguments
159+
async def _iterate_components_data_pages(
160+
self,
161+
*,
162+
microgrid_components: list[tuple[int, list[int]]],
163+
metrics: list[Metric],
164+
start_dt: datetime,
165+
end_dt: datetime,
166+
page_size: int = 1000,
167+
) -> AsyncIterator[ComponentsDataPage]:
168+
"""Iterate over the pages of microgrid components data.
169+
170+
Note: This does not yet support resampling or aggregating the data. It
171+
also does not yet support fetching bound and state data.
172+
173+
Args:
174+
microgrid_components: A list of tuples of microgrid IDs and component IDs.
175+
metrics: A list of metrics.
176+
start_dt: The start date and time.
177+
end_dt: The end date and time.
178+
page_size: The page size.
179+
180+
Yields:
181+
A ComponentsDataPage object of microgrid components data.
182+
"""
183+
microgrid_components_pb = [
184+
PBMicrogridComponentIDs(microgrid_id=mid, component_ids=cids)
185+
for mid, cids in microgrid_components
186+
]
187+
188+
def dt2ts(dt: datetime) -> PBTimestamp:
189+
ts = PBTimestamp()
190+
ts.FromDatetime(dt)
191+
return ts
192+
193+
time_filter = PBTimeFilter(
194+
start=dt2ts(start_dt),
195+
end=dt2ts(end_dt),
196+
)
197+
198+
list_filter = PBListMicrogridComponentsDataRequest.ListFilter(
199+
time_filter=time_filter,
200+
)
201+
202+
metrics_pb = [metric.to_proto() for metric in metrics]
203+
204+
page_token = None
205+
206+
while True:
207+
pagination_params = PBPaginationParams(
208+
page_size=page_size, page_token=page_token
209+
)
210+
211+
response = await self._fetch_page(
212+
microgrid_components=microgrid_components_pb,
213+
metrics=metrics_pb,
214+
list_filter=list_filter,
215+
pagination_params=pagination_params,
216+
)
217+
if not response or response.is_empty():
218+
break
219+
220+
yield response
221+
222+
page_token = response.next_page_token
223+
if not page_token:
224+
break
225+
226+
async def _fetch_page(
227+
self,
228+
*,
229+
microgrid_components: list[PBMicrogridComponentIDs],
230+
metrics: list[PBMetric.ValueType],
231+
list_filter: PBListMicrogridComponentsDataRequest.ListFilter,
232+
pagination_params: PBPaginationParams,
233+
) -> ComponentsDataPage | None:
234+
"""Fetch a single page of microgrid components data.
235+
236+
Args:
237+
microgrid_components: A list of microgrid components.
238+
metrics: A list of metrics.
239+
list_filter: A list filter.
240+
pagination_params: A pagination params.
241+
242+
Returns:
243+
A ComponentsDataPage object of microgrid components data.
244+
"""
245+
try:
246+
request = PBListMicrogridComponentsDataRequest(
247+
microgrid_components=microgrid_components,
248+
metrics=metrics,
249+
filter=list_filter,
250+
pagination_params=pagination_params,
251+
)
252+
response = await cast(
253+
Awaitable[PBListMicrogridComponentsDataResponse],
254+
self._stub.ListMicrogridComponentsData(request),
255+
)
256+
except grpcaio.AioRpcError as e:
257+
print(f"RPC failed: {e}")
258+
return None
259+
return ComponentsDataPage(response)
260+
261+
async def close(self) -> None:
262+
"""Close the client and cancel any pending requests immediately."""
263+
await self._grpc_channel.close(grace=None)
264+
265+
async def __aenter__(self) -> "ReportingClient":
266+
"""Enter the async context."""
267+
return self
268+
269+
async def __aexit__(
270+
self,
271+
_exc_type: Type[BaseException] | None,
272+
_exc_val: BaseException | None,
273+
_exc_tb: Any | None,
274+
) -> bool | None:
275+
"""
276+
Exit the asynchronous context manager.
277+
278+
Note that exceptions are not handled here, but are allowed to propagate.
279+
280+
Args:
281+
_exc_type: Type of exception raised in the async context.
282+
_exc_val: Exception instance raised.
283+
_exc_tb: Traceback object at the point where the exception occurred.
284+
285+
Returns:
286+
None, allowing any exceptions to propagate.
287+
"""
288+
await self.close()
289+
return None

0 commit comments

Comments
 (0)