6
6
import abc
7
7
import inspect
8
8
from collections .abc import Awaitable , Callable
9
- from typing import Any , Generic , Self , TypeVar , overload
9
+ from typing import Any , Self , TypeVar , overload
10
10
11
11
from grpc .aio import AioRpcError , Channel
12
12
13
13
from .channel import ChannelOptions , parse_grpc_uri
14
14
from .exception import ApiClientError , ClientNotConnected
15
15
16
- StubT = TypeVar ("StubT" )
17
- """The type of the gRPC stub."""
18
16
19
-
20
- class BaseApiClient (abc .ABC , Generic [StubT ]):
17
+ class BaseApiClient (abc .ABC ):
21
18
"""A base class for API clients.
22
19
23
20
This class provides a common interface for API clients that communicate with a API
@@ -32,12 +29,31 @@ class BaseApiClient(abc.ABC, Generic[StubT]):
32
29
a class that helps sending messages from a gRPC stream to
33
30
a [Broadcast][frequenz.channels.Broadcast] channel.
34
31
32
+ Note:
33
+ Because grpcio doesn't provide proper type hints, a hack is needed to have
34
+ propepr async type hints for the stubs generated by protoc. When using
35
+ `mypy-protobuf`, a `XxxAsyncStub` class is generated for each `XxxStub` class
36
+ but in the `.pyi` file, so the type can be used to specify type hints, but
37
+ **not** in any other context, as the class doesn't really exist for the Python
38
+ interpreter. This include generics, and because of this, this class can't be
39
+ even parametrized using the async class, so the instantiation of the stub can't
40
+ be done in the base class.
41
+
42
+ Because of this, subclasses need to create the stubs by themselves, using the
43
+ real stub class and casting it to the `XxxAsyncStub` class, so `mypy` can use
44
+ the async version of the stubs.
45
+
46
+ It is recommended to define a `stub` property that returns the async stub, so
47
+ this hack is completely hidden from clients, even if they need to access the
48
+ stub for more advanced uses.
49
+
35
50
Example:
36
51
This example illustrates how to create a simple API client that connects to a
37
52
gRPC server and calls a method on a stub.
38
53
39
54
```python
40
55
from collections.abc import AsyncIterable
56
+ from typing import cast
41
57
from frequenz.client.base.client import BaseApiClient, call_stub_method
42
58
from frequenz.client.base.streaming import GrpcStreamBroadcaster
43
59
from frequenz.channels import Receiver
@@ -57,25 +73,42 @@ async def example_method(
57
73
) -> ExampleResponse:
58
74
...
59
75
60
- def example_stream(self) -> AsyncIterable[ExampleResponse]:
76
+ def example_stream(self, _: ExampleRequest) -> AsyncIterable[ExampleResponse]:
77
+ ...
78
+
79
+ class ExampleAsyncStub:
80
+ async def example_method(
81
+ self,
82
+ request: ExampleRequest # pylint: disable=unused-argument
83
+ ) -> ExampleResponse:
84
+ ...
85
+
86
+ def example_stream(self, _: ExampleRequest) -> AsyncIterable[ExampleResponse]:
61
87
...
62
88
# End of generated classes
63
89
64
90
class ExampleResponseWrapper:
65
- def __init__(self, response: ExampleResponse):
91
+ def __init__(self, response: ExampleResponse) -> None :
66
92
self.transformed_value = f"{response.float_value:.2f}"
67
93
68
- class MyApiClient(BaseApiClient[ExampleStub]):
69
- def __init__(self, server_url: str, *, connect: bool = True):
70
- super().__init__(
71
- server_url, ExampleStub, connect=connect
94
+ class MyApiClient(BaseApiClient):
95
+ def __init__(self, server_url: str, *, connect: bool = True) -> None:
96
+ super().__init__(server_url, connect=connect)
97
+ self._stub = cast(
98
+ ExampleAsyncStub, ExampleStub(self.channel)
72
99
)
73
100
self._broadcaster = GrpcStreamBroadcaster(
74
101
"stream",
75
102
lambda: self.stub.example_stream(ExampleRequest()),
76
103
ExampleResponseWrapper,
77
104
)
78
105
106
+ @property
107
+ def stub(self) -> ExampleAsyncStub:
108
+ if self._channel is None:
109
+ raise ClientNotConnected(server_url=self.server_url, operation="stub")
110
+ return self._stub
111
+
79
112
async def example_method(
80
113
self, int_value: int, str_value: str
81
114
) -> ExampleResponseWrapper:
@@ -114,7 +147,6 @@ async def main():
114
147
def __init__ (
115
148
self ,
116
149
server_url : str ,
117
- create_stub : Callable [[Channel ], StubT ],
118
150
* ,
119
151
connect : bool = True ,
120
152
channel_defaults : ChannelOptions = ChannelOptions (),
@@ -123,7 +155,6 @@ def __init__(
123
155
124
156
Args:
125
157
server_url: The URL of the server to connect to.
126
- create_stub: A function that creates a stub from a channel.
127
158
connect: Whether to connect to the server as soon as a client instance is
128
159
created. If `False`, the client will not connect to the server until
129
160
[connect()][frequenz.client.base.client.BaseApiClient.connect] is
@@ -132,10 +163,8 @@ def __init__(
132
163
the server URL.
133
164
"""
134
165
self ._server_url : str = server_url
135
- self ._create_stub : Callable [[Channel ], StubT ] = create_stub
136
166
self ._channel_defaults : ChannelOptions = channel_defaults
137
167
self ._channel : Channel | None = None
138
- self ._stub : StubT | None = None
139
168
if connect :
140
169
self .connect (server_url )
141
170
@@ -165,22 +194,6 @@ def channel_defaults(self) -> ChannelOptions:
165
194
"""The default options for the gRPC channel."""
166
195
return self ._channel_defaults
167
196
168
- @property
169
- def stub (self ) -> StubT :
170
- """The underlying gRPC stub.
171
-
172
- Warning:
173
- This stub is provided as a last resort for advanced users. It is not
174
- recommended to use this property directly unless you know what you are
175
- doing and you don't care about being tied to a specific gRPC library.
176
-
177
- Raises:
178
- ClientNotConnected: If the client is not connected to the server.
179
- """
180
- if self ._stub is None :
181
- raise ClientNotConnected (server_url = self .server_url , operation = "stub" )
182
- return self ._stub
183
-
184
197
@property
185
198
def is_connected (self ) -> bool :
186
199
"""Whether the client is connected to the server."""
@@ -202,7 +215,6 @@ def connect(self, server_url: str | None = None) -> None:
202
215
elif self .is_connected :
203
216
return
204
217
self ._channel = parse_grpc_uri (self ._server_url , self ._channel_defaults )
205
- self ._stub = self ._create_stub (self ._channel )
206
218
207
219
async def disconnect (self ) -> None :
208
220
"""Disconnect from the server.
@@ -227,7 +239,6 @@ async def __aexit__(
227
239
return None
228
240
result = await self ._channel .__aexit__ (_exc_type , _exc_val , _exc_tb )
229
241
self ._channel = None
230
- self ._stub = None
231
242
return result
232
243
233
244
@@ -240,7 +251,7 @@ async def __aexit__(
240
251
241
252
@overload
242
253
async def call_stub_method (
243
- client : BaseApiClient [ StubT ] ,
254
+ client : BaseApiClient ,
244
255
stub_method : Callable [[], Awaitable [StubOutT ]],
245
256
* ,
246
257
method_name : str | None = None ,
@@ -250,7 +261,7 @@ async def call_stub_method(
250
261
251
262
@overload
252
263
async def call_stub_method (
253
- client : BaseApiClient [ StubT ] ,
264
+ client : BaseApiClient ,
254
265
stub_method : Callable [[], Awaitable [StubOutT ]],
255
266
* ,
256
267
method_name : str | None = None ,
@@ -261,7 +272,7 @@ async def call_stub_method(
261
272
# We need the `noqa: DOC503` because `pydoclint` can't figure out that
262
273
# `ApiClientError.from_grpc_error()` returns a `GrpcError` instance.
263
274
async def call_stub_method ( # noqa: DOC503
264
- client : BaseApiClient [ StubT ] ,
275
+ client : BaseApiClient ,
265
276
stub_method : Callable [[], Awaitable [StubOutT ]],
266
277
* ,
267
278
method_name : str | None = None ,
0 commit comments