33
44"""Client for requests to the Microgrid API."""
55
6+ from __future__ import annotations
7+
68import asyncio
79import logging
810from collections .abc import Callable , Iterable , Set
3133)
3234from ._connection import Connection
3335from ._constants import RECEIVER_MAX_SIZE
34- from ._exception import ApiClientError
36+ from ._exception import ApiClientError , ClientNotConnected
3537from ._metadata import Location , Metadata
3638
3739DEFAULT_GRPC_CALL_TIMEOUT = 60.0
@@ -89,10 +91,20 @@ def __init__(
8991 connect = connect ,
9092 channel_defaults = channel_defaults ,
9193 )
92- self ._async_stub : microgrid_pb2_grpc .MicrogridAsyncStub = self .stub # type: ignore
9394 self ._broadcasters : dict [int , streaming .GrpcStreamBroadcaster [Any , Any ]] = {}
9495 self ._retry_strategy = retry_strategy
9596
97+ @property
98+ def stub (self ) -> microgrid_pb2_grpc .MicrogridAsyncStub :
99+ """The gRPC stub for the API."""
100+ if self .channel is None or self ._stub is None :
101+ raise ClientNotConnected (server_url = self .server_url , operation = "stub" )
102+ # This type: ignore is needed because we need to cast the sync stub to
103+ # the async stub, but we can't use cast because the async stub doesn't
104+ # actually exists to the eyes of the interpreter, it only exists for the
105+ # type-checker, so it can only be used for type hints.
106+ return self ._stub # type: ignore
107+
96108 async def components ( # noqa: DOC502 (raises ApiClientError indirectly)
97109 self ,
98110 ) -> Iterable [Component ]:
@@ -108,7 +120,7 @@ async def components( # noqa: DOC502 (raises ApiClientError indirectly)
108120 """
109121 component_list = await client .call_stub_method (
110122 self ,
111- lambda : self ._async_stub .ListComponents (
123+ lambda : self .stub .ListComponents (
112124 microgrid_pb2 .ComponentFilter (),
113125 timeout = int (DEFAULT_GRPC_CALL_TIMEOUT ),
114126 ),
@@ -145,7 +157,7 @@ async def metadata(self) -> Metadata:
145157 try :
146158 microgrid_metadata = await client .call_stub_method (
147159 self ,
148- lambda : self ._async_stub .GetMicrogridMetadata (
160+ lambda : self .stub .GetMicrogridMetadata (
149161 Empty (),
150162 timeout = int (DEFAULT_GRPC_CALL_TIMEOUT ),
151163 ),
@@ -192,7 +204,7 @@ async def connections( # noqa: DOC502 (raises ApiClientError indirectly)
192204 self .components (),
193205 client .call_stub_method (
194206 self ,
195- lambda : self ._async_stub .ListConnections (
207+ lambda : self .stub .ListConnections (
196208 connection_filter ,
197209 timeout = int (DEFAULT_GRPC_CALL_TIMEOUT ),
198210 ),
@@ -249,12 +261,15 @@ async def _new_component_data_receiver(
249261 broadcaster = streaming .GrpcStreamBroadcaster (
250262 f"raw-component-data-{ component_id } " ,
251263 lambda : aiter (
252- self ._async_stub .StreamComponentData (
264+ self .stub .StreamComponentData (
253265 microgrid_pb2 .ComponentIdParam (id = component_id )
254266 )
255267 ),
256268 transform ,
257269 retry_strategy = self ._retry_strategy ,
270+ # We don't expect any data stream to end, so if it is exhausted for any
271+ # reason we want to keep retrying
272+ retry_on_exhausted_stream = True ,
258273 )
259274 self ._broadcasters [component_id ] = broadcaster
260275 return broadcaster .new_receiver (maxsize = maxsize )
@@ -408,7 +423,7 @@ async def set_power( # noqa: DOC502 (raises ApiClientError indirectly)
408423 """
409424 await client .call_stub_method (
410425 self ,
411- lambda : self ._async_stub .SetPowerActive (
426+ lambda : self .stub .SetPowerActive (
412427 microgrid_pb2 .SetPowerActiveParam (
413428 component_id = component_id , power = power_w
414429 ),
@@ -447,7 +462,7 @@ async def set_bounds( # noqa: DOC503 (raises ApiClientError indirectly)
447462 )
448463 await client .call_stub_method (
449464 self ,
450- lambda : self ._async_stub .AddInclusionBounds (
465+ lambda : self .stub .AddInclusionBounds (
451466 microgrid_pb2 .SetBoundsParam (
452467 component_id = component_id ,
453468 target_metric = target_metric ,
0 commit comments