55
66import asyncio
77import logging
8- from collections .abc import AsyncIterator , Awaitable , Callable , Iterable
9- from typing import Any , TypeVar , cast
10-
11- import grpc .aio
12-
13- # pylint: disable=no-name-in-module
14- from frequenz .api .common .components_pb2 import ComponentCategory as PbComponentCategory
15- from frequenz .api .common .metrics_pb2 import Bounds as PbBounds
16- from frequenz .api .microgrid .microgrid_pb2 import ComponentData as PbComponentData
17- from frequenz .api .microgrid .microgrid_pb2 import ComponentFilter as PbComponentFilter
18- from frequenz .api .microgrid .microgrid_pb2 import ComponentIdParam as PbComponentIdParam
19- from frequenz .api .microgrid .microgrid_pb2 import ComponentList as PbComponentList
20- from frequenz .api .microgrid .microgrid_pb2 import ConnectionFilter as PbConnectionFilter
21- from frequenz .api .microgrid .microgrid_pb2 import ConnectionList as PbConnectionList
22- from frequenz .api .microgrid .microgrid_pb2 import (
23- MicrogridMetadata as PbMicrogridMetadata ,
24- )
25- from frequenz .api .microgrid .microgrid_pb2 import SetBoundsParam as PbSetBoundsParam
26- from frequenz .api .microgrid .microgrid_pb2 import (
27- SetPowerActiveParam as PbSetPowerActiveParam ,
28- )
29- from frequenz .api .microgrid .microgrid_pb2_grpc import MicrogridStub
8+ from collections .abc import Callable , Iterable , Set
9+ from typing import Any , TypeVar
3010
31- # pylint: enable=no-name-in-module
11+ import grpclib
12+ import grpclib .client
13+ from betterproto .lib .google import protobuf as pb_google
3214from frequenz .channels import Receiver
3315from frequenz .client .base import retry , streaming
34- from google .protobuf .empty_pb2 import Empty # pylint: disable=no-name-in-module
35- from google .protobuf .timestamp_pb2 import Timestamp # pylint: disable=no-name-in-module
16+ from frequenz .microgrid .betterproto .frequenz .api import microgrid as pb_microgrid
17+ from frequenz .microgrid .betterproto .frequenz .api .common import (
18+ components as pb_components ,
19+ )
20+ from frequenz .microgrid .betterproto .frequenz .api .common import metrics as pb_metrics
3621
3722from ._component import (
3823 Component ,
@@ -67,7 +52,7 @@ class ApiClient:
6752
6853 def __init__ (
6954 self ,
70- grpc_channel : grpc . aio .Channel ,
55+ grpc_channel : grpclib . client .Channel ,
7156 target : str ,
7257 retry_strategy : retry .Strategy | None = None ,
7358 ) -> None :
@@ -84,7 +69,7 @@ def __init__(
8469 self .target = target
8570 """The location (as "host:port") of the microgrid API gRPC server."""
8671
87- self .api = MicrogridStub (grpc_channel )
72+ self .api = pb_microgrid . MicrogridStub (grpc_channel )
8873 """The gRPC stub for the microgrid API."""
8974
9075 self ._broadcasters : dict [int , streaming .GrpcStreamBroadcaster [Any , Any ]] = {}
@@ -101,22 +86,19 @@ async def components(self) -> Iterable[Component]:
10186 when the api call exceeded the timeout.
10287 """
10388 try :
104- # grpc.aio is missing types and mypy thinks this is not awaitable,
105- # but it is
106- component_list = await cast (
107- Awaitable [PbComponentList ],
108- self .api .ListComponents (
109- PbComponentFilter (),
110- timeout = int (DEFAULT_GRPC_CALL_TIMEOUT ),
111- ),
89+ component_list = await self .api .list_components (
90+ pb_microgrid .ComponentFilter (),
91+ timeout = int (DEFAULT_GRPC_CALL_TIMEOUT ),
11292 )
11393
114- except grpc . aio . AioRpcError as err :
94+ except grpclib . GRPCError as err :
11595 raise ClientError (
116- f"Failed to list components. Microgrid API: { self .target } . Err: { err . details () } "
96+ f"Failed to list components. Microgrid API: { self .target } . Err: { err } "
11797 ) from err
98+
11899 components_only = filter (
119- lambda c : c .category is not PbComponentCategory .COMPONENT_CATEGORY_SENSOR ,
100+ lambda c : c .category
101+ is not pb_components .ComponentCategory .COMPONENT_CATEGORY_SENSOR ,
120102 component_list .components ,
121103 )
122104 result : Iterable [Component ] = map (
@@ -140,16 +122,13 @@ async def metadata(self) -> Metadata:
140122 Returns:
141123 the microgrid metadata.
142124 """
143- microgrid_metadata : PbMicrogridMetadata | None = None
125+ microgrid_metadata : pb_microgrid . MicrogridMetadata | None = None
144126 try :
145- microgrid_metadata = await cast (
146- Awaitable [PbMicrogridMetadata ],
147- self .api .GetMicrogridMetadata (
148- Empty (),
149- timeout = int (DEFAULT_GRPC_CALL_TIMEOUT ),
150- ),
127+ microgrid_metadata = await self .api .get_microgrid_metadata (
128+ pb_google .Empty (),
129+ timeout = int (DEFAULT_GRPC_CALL_TIMEOUT ),
151130 )
152- except grpc . aio . AioRpcError :
131+ except grpclib . GRPCError :
153132 _logger .exception ("The microgrid metadata is not available." )
154133
155134 if not microgrid_metadata :
@@ -166,8 +145,8 @@ async def metadata(self) -> Metadata:
166145
167146 async def connections (
168147 self ,
169- starts : set [int ] | None = None ,
170- ends : set [int ] | None = None ,
148+ starts : Set [int ] = frozenset () ,
149+ ends : Set [int ] = frozenset () ,
171150 ) -> Iterable [Connection ]:
172151 """Fetch the connections between components in the microgrid.
173152
@@ -184,23 +163,20 @@ async def connections(
184163 ClientError: If the connection to the Microgrid API cannot be established or
185164 when the api call exceeded the timeout.
186165 """
187- connection_filter = PbConnectionFilter (starts = starts , ends = ends )
166+ connection_filter = pb_microgrid .ConnectionFilter (
167+ starts = list (starts ), ends = list (ends )
168+ )
188169 try :
189170 valid_components , all_connections = await asyncio .gather (
190171 self .components (),
191- # grpc.aio is missing types and mypy thinks this is not
192- # awaitable, but it is
193- cast (
194- Awaitable [PbConnectionList ],
195- self .api .ListConnections (
196- connection_filter ,
197- timeout = int (DEFAULT_GRPC_CALL_TIMEOUT ),
198- ),
172+ self .api .list_connections (
173+ connection_filter ,
174+ timeout = int (DEFAULT_GRPC_CALL_TIMEOUT ),
199175 ),
200176 )
201- except grpc . aio . AioRpcError as err :
177+ except grpclib . GRPCError as err :
202178 raise ClientError (
203- f"Failed to list connections. Microgrid API: { self .target } . Err: { err . details () } "
179+ f"Failed to list connections. Microgrid API: { self .target } . Err: { err } "
204180 ) from err
205181 # Filter out the components filtered in `components` method.
206182 # id=0 is an exception indicating grid component.
@@ -223,7 +199,7 @@ async def _new_component_data_receiver(
223199 * ,
224200 component_id : int ,
225201 expected_category : ComponentCategory ,
226- transform : Callable [[PbComponentData ], _ComponentDataT ],
202+ transform : Callable [[pb_microgrid . ComponentData ], _ComponentDataT ],
227203 maxsize : int ,
228204 ) -> Receiver [_ComponentDataT ]:
229205 """Return a new broadcaster receiver for a given `component_id`.
@@ -250,13 +226,8 @@ async def _new_component_data_receiver(
250226 if broadcaster is None :
251227 broadcaster = streaming .GrpcStreamBroadcaster (
252228 f"raw-component-data-{ component_id } " ,
253- # We need to cast here because grpc says StreamComponentData is
254- # a grpc.CallIterator[PbComponentData] which is not an AsyncIterator,
255- # but it is a grpc.aio.UnaryStreamCall[..., PbComponentData], which it
256- # is.
257- lambda : cast (
258- AsyncIterator [PbComponentData ],
259- self .api .StreamComponentData (PbComponentIdParam (id = component_id )),
229+ lambda : self .api .stream_component_data (
230+ pb_microgrid .ComponentIdParam (id = component_id )
260231 ),
261232 transform ,
262233 retry_strategy = self ._retry_strategy ,
@@ -409,16 +380,15 @@ async def set_power(self, component_id: int, power_w: float) -> None:
409380 when the api call exceeded the timeout.
410381 """
411382 try :
412- await cast (
413- Awaitable [Empty ],
414- self .api .SetPowerActive (
415- PbSetPowerActiveParam (component_id = component_id , power = power_w ),
416- timeout = int (DEFAULT_GRPC_CALL_TIMEOUT ),
383+ await self .api .set_power_active (
384+ pb_microgrid .SetPowerActiveParam (
385+ component_id = component_id , power = power_w
417386 ),
387+ timeout = int (DEFAULT_GRPC_CALL_TIMEOUT ),
418388 )
419- except grpc . aio . AioRpcError as err :
389+ except grpclib . GRPCError as err :
420390 raise ClientError (
421- f"Failed to set power. Microgrid API: { self .target } . Err: { err . details () } "
391+ f"Failed to set power. Microgrid API: { self .target } . Err: { err } "
422392 ) from err
423393
424394 async def set_bounds (
@@ -427,7 +397,7 @@ async def set_bounds(
427397 lower : float ,
428398 upper : float ,
429399 ) -> None :
430- """Send `PbSetBoundsParam `s received from a channel to the Microgrid service.
400+ """Send `SetBoundsParam `s received from a channel to the Microgrid service.
431401
432402 Args:
433403 component_id: ID of the component to set bounds for.
@@ -446,28 +416,27 @@ async def set_bounds(
446416 if lower > 0 :
447417 raise ValueError (f"Lower bound { lower } must be less than or equal to 0." )
448418
449- target_metric = PbSetBoundsParam .TargetMetric .TARGET_METRIC_POWER_ACTIVE
419+ target_metric = (
420+ pb_microgrid .SetBoundsParamTargetMetric .TARGET_METRIC_POWER_ACTIVE
421+ )
450422 try :
451- await cast (
452- Awaitable [Timestamp ],
453- self .api .AddInclusionBounds (
454- PbSetBoundsParam (
455- component_id = component_id ,
456- target_metric = target_metric ,
457- bounds = PbBounds (lower = lower , upper = upper ),
458- ),
459- timeout = int (DEFAULT_GRPC_CALL_TIMEOUT ),
423+ await self .api .add_inclusion_bounds (
424+ pb_microgrid .SetBoundsParam (
425+ component_id = component_id ,
426+ target_metric = target_metric ,
427+ bounds = pb_metrics .Bounds (lower = lower , upper = upper ),
460428 ),
429+ timeout = int (DEFAULT_GRPC_CALL_TIMEOUT ),
461430 )
462- except grpc . aio . AioRpcError as err :
431+ except grpclib . GRPCError as err :
463432 _logger .error (
464433 "set_bounds write failed: %s, for message: %s, api: %s. Err: %s" ,
465434 err ,
466435 next ,
467436 api_details ,
468- err . details () ,
437+ err ,
469438 )
470439 raise ClientError (
471440 f"Failed to set inclusion bounds. Microgrid API: { self .target } . "
472- f"Err: { err . details () } "
441+ f"Err: { err } "
473442 ) from err
0 commit comments