6
6
import abc
7
7
import inspect
8
8
from collections .abc import Awaitable , Callable
9
- from typing import Any , Generic , Self , TypeVar , overload
9
+ from typing import Any , Self , TypeVar , overload
10
10
11
11
from grpc .aio import AioRpcError , Channel
12
12
13
13
from .channel import ChannelOptions , parse_grpc_uri
14
14
from .exception import ApiClientError , ClientNotConnected
15
15
16
- StubT = TypeVar ("StubT" )
17
- """The type of the gRPC stub."""
18
16
19
-
20
- class BaseApiClient (abc .ABC , Generic [StubT ]):
17
+ class BaseApiClient (abc .ABC ):
21
18
"""A base class for API clients.
22
19
23
20
This class provides a common interface for API clients that communicate with a API
@@ -32,12 +29,31 @@ class BaseApiClient(abc.ABC, Generic[StubT]):
32
29
a class that helps sending messages from a gRPC stream to
33
30
a [Broadcast][frequenz.channels.Broadcast] channel.
34
31
32
+ Note:
33
+ Because grpcio doesn't provide proper type hints, a hack is needed to have
34
+ propepr async type hints for the stubs generated by protoc. When using
35
+ `mypy-protobuf`, a `XxxAsyncStub` class is generated for each `XxxStub` class
36
+ but in the `.pyi` file, so the type can be used to specify type hints, but
37
+ **not** in any other context, as the class doesn't really exist for the Python
38
+ interpreter. This include generics, and because of this, this class can't be
39
+ even parametrized using the async class, so the instantiation of the stub can't
40
+ be done in the base class.
41
+
42
+ Because of this, subclasses need to create the stubs by themselves, using the
43
+ real stub class and casting it to the `XxxAsyncStub` class, so `mypy` can use
44
+ the async version of the stubs.
45
+
46
+ It is recommended to define a `stub` property that returns the async stub, so
47
+ this hack is completely hidden from clients, even if they need to access the
48
+ stub for more advanced uses.
49
+
35
50
Example:
36
51
This example illustrates how to create a simple API client that connects to a
37
52
gRPC server and calls a method on a stub.
38
53
39
54
```python
40
55
from collections.abc import AsyncIterable
56
+ from typing import cast
41
57
from frequenz.client.base.client import BaseApiClient, call_stub_method
42
58
from frequenz.client.base.streaming import GrpcStreamBroadcaster
43
59
from frequenz.channels import Receiver
@@ -57,25 +73,51 @@ async def example_method(
57
73
) -> ExampleResponse:
58
74
...
59
75
60
- def example_stream(self) -> AsyncIterable[ExampleResponse]:
76
+ def example_stream(self, _: ExampleRequest) -> AsyncIterable[ExampleResponse]:
77
+ ...
78
+
79
+ class ExampleAsyncStub:
80
+ async def example_method(
81
+ self,
82
+ request: ExampleRequest # pylint: disable=unused-argument
83
+ ) -> ExampleResponse:
84
+ ...
85
+
86
+ def example_stream(self, _: ExampleRequest) -> AsyncIterable[ExampleResponse]:
61
87
...
62
88
# End of generated classes
63
89
64
90
class ExampleResponseWrapper:
65
- def __init__(self, response: ExampleResponse):
91
+ def __init__(self, response: ExampleResponse) -> None :
66
92
self.transformed_value = f"{response.float_value:.2f}"
67
93
68
- class MyApiClient(BaseApiClient[ExampleStub]):
69
- def __init__(self, server_url: str, *, connect: bool = True):
70
- super().__init__(
71
- server_url, ExampleStub, connect=connect
94
+ # Change defaults as needed
95
+ DEFAULT_CHANNEL_OPTIONS = ChannelOptions()
96
+
97
+ class MyApiClient(BaseApiClient):
98
+ def __init__(
99
+ self,
100
+ server_url: str,
101
+ *,
102
+ connect: bool = True,
103
+ channel_defaults: ChannelOptions = DEFAULT_CHANNEL_OPTIONS,
104
+ ) -> None:
105
+ super().__init__(server_url, connect=connect, channel_defaults=channel_defaults)
106
+ self._stub = cast(
107
+ ExampleAsyncStub, ExampleStub(self.channel)
72
108
)
73
109
self._broadcaster = GrpcStreamBroadcaster(
74
110
"stream",
75
111
lambda: self.stub.example_stream(ExampleRequest()),
76
112
ExampleResponseWrapper,
77
113
)
78
114
115
+ @property
116
+ def stub(self) -> ExampleAsyncStub:
117
+ if self._channel is None:
118
+ raise ClientNotConnected(server_url=self.server_url, operation="stub")
119
+ return self._stub
120
+
79
121
async def example_method(
80
122
self, int_value: int, str_value: str
81
123
) -> ExampleResponseWrapper:
@@ -114,7 +156,6 @@ async def main():
114
156
def __init__ (
115
157
self ,
116
158
server_url : str ,
117
- create_stub : Callable [[Channel ], StubT ],
118
159
* ,
119
160
connect : bool = True ,
120
161
channel_defaults : ChannelOptions = ChannelOptions (),
@@ -123,7 +164,6 @@ def __init__(
123
164
124
165
Args:
125
166
server_url: The URL of the server to connect to.
126
- create_stub: A function that creates a stub from a channel.
127
167
connect: Whether to connect to the server as soon as a client instance is
128
168
created. If `False`, the client will not connect to the server until
129
169
[connect()][frequenz.client.base.client.BaseApiClient.connect] is
@@ -132,10 +172,8 @@ def __init__(
132
172
the server URL.
133
173
"""
134
174
self ._server_url : str = server_url
135
- self ._create_stub : Callable [[Channel ], StubT ] = create_stub
136
175
self ._channel_defaults : ChannelOptions = channel_defaults
137
176
self ._channel : Channel | None = None
138
- self ._stub : StubT | None = None
139
177
if connect :
140
178
self .connect (server_url )
141
179
@@ -165,22 +203,6 @@ def channel_defaults(self) -> ChannelOptions:
165
203
"""The default options for the gRPC channel."""
166
204
return self ._channel_defaults
167
205
168
- @property
169
- def stub (self ) -> StubT :
170
- """The underlying gRPC stub.
171
-
172
- Warning:
173
- This stub is provided as a last resort for advanced users. It is not
174
- recommended to use this property directly unless you know what you are
175
- doing and you don't care about being tied to a specific gRPC library.
176
-
177
- Raises:
178
- ClientNotConnected: If the client is not connected to the server.
179
- """
180
- if self ._stub is None :
181
- raise ClientNotConnected (server_url = self .server_url , operation = "stub" )
182
- return self ._stub
183
-
184
206
@property
185
207
def is_connected (self ) -> bool :
186
208
"""Whether the client is connected to the server."""
@@ -202,7 +224,6 @@ def connect(self, server_url: str | None = None) -> None:
202
224
elif self .is_connected :
203
225
return
204
226
self ._channel = parse_grpc_uri (self ._server_url , self ._channel_defaults )
205
- self ._stub = self ._create_stub (self ._channel )
206
227
207
228
async def disconnect (self ) -> None :
208
229
"""Disconnect from the server.
@@ -227,7 +248,6 @@ async def __aexit__(
227
248
return None
228
249
result = await self ._channel .__aexit__ (_exc_type , _exc_val , _exc_tb )
229
250
self ._channel = None
230
- self ._stub = None
231
251
return result
232
252
233
253
@@ -240,7 +260,7 @@ async def __aexit__(
240
260
241
261
@overload
242
262
async def call_stub_method (
243
- client : BaseApiClient [ StubT ] ,
263
+ client : BaseApiClient ,
244
264
stub_method : Callable [[], Awaitable [StubOutT ]],
245
265
* ,
246
266
method_name : str | None = None ,
@@ -250,7 +270,7 @@ async def call_stub_method(
250
270
251
271
@overload
252
272
async def call_stub_method (
253
- client : BaseApiClient [ StubT ] ,
273
+ client : BaseApiClient ,
254
274
stub_method : Callable [[], Awaitable [StubOutT ]],
255
275
* ,
256
276
method_name : str | None = None ,
@@ -261,7 +281,7 @@ async def call_stub_method(
261
281
# We need the `noqa: DOC503` because `pydoclint` can't figure out that
262
282
# `ApiClientError.from_grpc_error()` returns a `GrpcError` instance.
263
283
async def call_stub_method ( # noqa: DOC503
264
- client : BaseApiClient [ StubT ] ,
284
+ client : BaseApiClient ,
265
285
stub_method : Callable [[], Awaitable [StubOutT ]],
266
286
* ,
267
287
method_name : str | None = None ,
0 commit comments