|
7 | 7 | from collections.abc import AsyncIterator, Iterator |
8 | 8 | from dataclasses import dataclass |
9 | 9 | from datetime import datetime |
10 | | -from typing import Any, cast |
| 10 | +from typing import cast |
11 | 11 |
|
12 | | -import grpc |
13 | 12 | import grpc.aio as grpcaio |
14 | 13 |
|
15 | 14 | # pylint: disable=no-name-in-module |
|
27 | 26 | ) |
28 | 27 | from frequenz.api.reporting.v1.reporting_pb2 import TimeFilter as PBTimeFilter |
29 | 28 | from frequenz.api.reporting.v1.reporting_pb2_grpc import ReportingStub |
| 29 | +from frequenz.client.base.client import BaseApiClient |
30 | 30 | from frequenz.client.common.metric import Metric |
31 | 31 | from google.protobuf.timestamp_pb2 import Timestamp as PBTimestamp |
32 | 32 |
|
33 | | -# pylint: enable=no-name-in-module |
34 | | - |
35 | 33 | MetricSample = namedtuple( |
36 | 34 | "MetricSample", ["timestamp", "microgrid_id", "component_id", "metric", "value"] |
37 | 35 | ) |
@@ -98,20 +96,18 @@ def __iter__(self) -> Iterator[MetricSample]: |
98 | 96 | ) |
99 | 97 |
|
100 | 98 |
|
101 | | -class ReportingApiClient: |
| 99 | +class ReportingApiClient(BaseApiClient[ReportingStub, grpcaio.Channel]): |
102 | 100 | """A client for the Reporting service.""" |
103 | 101 |
|
104 | | - def __init__(self, service_address: str, key: str | None = None) -> None: |
| 102 | + def __init__(self, server_url: str, key: str | None = None) -> None: |
105 | 103 | """Create a new Reporting client. |
106 | 104 |
|
107 | 105 | Args: |
108 | | - service_address: The address of the Reporting service. |
| 106 | + server_url: The URL of the Reporting service. |
109 | 107 | key: The API key for the authorization. |
110 | 108 | """ |
111 | | - self._grpc_channel = grpcaio.secure_channel( |
112 | | - service_address, grpc.ssl_channel_credentials() |
113 | | - ) |
114 | | - self._stub = ReportingStub(self._grpc_channel) |
| 109 | + super().__init__(server_url, ReportingStub, grpcaio.Channel) |
| 110 | + |
115 | 111 | self._metadata = (("key", key),) if key else () |
116 | 112 |
|
117 | 113 | # pylint: disable=too-many-arguments |
@@ -244,48 +240,15 @@ def dt2ts(dt: datetime) -> PBTimestamp: |
244 | 240 | try: |
245 | 241 | stream = cast( |
246 | 242 | AsyncIterator[PBReceiveMicrogridComponentsDataStreamResponse], |
247 | | - self._stub.ReceiveMicrogridComponentsDataStream( |
| 243 | + self.stub.ReceiveMicrogridComponentsDataStream( |
248 | 244 | request, metadata=self._metadata |
249 | 245 | ), |
250 | 246 | ) |
251 | | - # grpc.aio is missing types and mypy thinks this is not |
252 | | - # async iterable, but it is. |
253 | 247 | async for response in stream: |
254 | 248 | if not response: |
255 | 249 | break |
256 | | - |
257 | 250 | yield ComponentsDataBatch(response) |
258 | 251 |
|
259 | 252 | except grpcaio.AioRpcError as e: |
260 | 253 | print(f"RPC failed: {e}") |
261 | 254 | return |
262 | | - |
263 | | - async def close(self) -> None: |
264 | | - """Close the client and cancel any pending requests immediately.""" |
265 | | - await self._grpc_channel.close(grace=None) |
266 | | - |
267 | | - async def __aenter__(self) -> "ReportingApiClient": |
268 | | - """Enter the async context.""" |
269 | | - return self |
270 | | - |
271 | | - async def __aexit__( |
272 | | - self, |
273 | | - _exc_type: type[BaseException] | None, |
274 | | - _exc_val: BaseException | None, |
275 | | - _exc_tb: Any | None, |
276 | | - ) -> bool | None: |
277 | | - """ |
278 | | - Exit the asynchronous context manager. |
279 | | -
|
280 | | - Note that exceptions are not handled here, but are allowed to propagate. |
281 | | -
|
282 | | - Args: |
283 | | - _exc_type: Type of exception raised in the async context. |
284 | | - _exc_val: Exception instance raised. |
285 | | - _exc_tb: Traceback object at the point where the exception occurred. |
286 | | -
|
287 | | - Returns: |
288 | | - None, allowing any exceptions to propagate. |
289 | | - """ |
290 | | - await self.close() |
291 | | - return None |
0 commit comments