Skip to content

Commit 4c9d78d

Browse files
committed
Add timeouts to all api calls
Signed-off-by: Mathias L. Baumann <[email protected]>
1 parent fa059ed commit 4c9d78d

File tree

3 files changed

+43
-8
lines changed

3 files changed

+43
-8
lines changed

RELEASE_NOTES.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@
1111
## New Features
1212

1313
* `dispatch-cli` supports now the parameter `--type` and `--running` to filter the list of running services by type and status, respectively.
14+
* Every call now has a default timeout of 60 seconds, streams terminate after five minutes. This can be influenced by the two new parameters for`DispatchApiClient.__init__()`:
15+
* `default_timeout: timedelta` (default: 60 seconds)
16+
* `stream_timeout: timedelta` (default: 5 minutes)
1417

1518
## Bug Fixes
1619

src/frequenz/client/dispatch/_client.py

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,19 +53,24 @@
5353
class DispatchApiClient(BaseApiClient[dispatch_pb2_grpc.MicrogridDispatchServiceStub]):
5454
"""Dispatch API client."""
5555

56+
# pylint: disable-next=too-many-arguments
5657
def __init__(
5758
self,
5859
*,
5960
server_url: str,
6061
key: str,
6162
connect: bool = True,
63+
call_timeout: timedelta = timedelta(seconds=60),
64+
stream_timeout: timedelta = timedelta(minutes=5),
6265
) -> None:
6366
"""Initialize the client.
6467
6568
Args:
6669
server_url: The URL of the server to connect to.
6770
key: API key to use for authentication.
6871
connect: Whether to connect to the service immediately.
72+
call_timeout: Timeout for gRPC calls, default is 60 seconds.
73+
stream_timeout: Timeout for gRPC streams, default is 5 minutes.
6974
"""
7075
super().__init__(
7176
server_url,
@@ -82,6 +87,9 @@ def __init__(
8287
] = {}
8388
"""A dictionary of streamers, keyed by microgrid_id."""
8489

90+
self._call_timeout = call_timeout.total_seconds()
91+
self._stream_timeout = stream_timeout.total_seconds()
92+
8593
@property
8694
def stub(self) -> dispatch_pb2_grpc.MicrogridDispatchServiceAsyncStub:
8795
"""The stub for the service."""
@@ -177,7 +185,9 @@ def to_interval(
177185
while True:
178186
response = await cast(
179187
Awaitable[ListMicrogridDispatchesResponse],
180-
self.stub.ListMicrogridDispatches(request, metadata=self._metadata),
188+
self.stub.ListMicrogridDispatches(
189+
request, metadata=self._metadata, timeout=self._call_timeout
190+
),
181191
)
182192

183193
yield (Dispatch.from_protobuf(dispatch) for dispatch in response.dispatches)
@@ -234,7 +244,9 @@ def _get_stream(
234244
stream_method=lambda: cast(
235245
AsyncIterator[StreamMicrogridDispatchesResponse],
236246
self.stub.StreamMicrogridDispatches(
237-
request, metadata=self._metadata
247+
request,
248+
metadata=self._metadata,
249+
timeout=self._stream_timeout,
238250
),
239251
),
240252
transform=DispatchEvent.from_protobuf,
@@ -303,7 +315,9 @@ async def create( # pylint: disable=too-many-positional-arguments
303315
response = await cast(
304316
Awaitable[CreateMicrogridDispatchResponse],
305317
self.stub.CreateMicrogridDispatch(
306-
request.to_protobuf(), metadata=self._metadata
318+
request.to_protobuf(),
319+
metadata=self._metadata,
320+
timeout=self._call_timeout,
307321
),
308322
)
309323

@@ -394,7 +408,9 @@ async def update(
394408

395409
response = await cast(
396410
Awaitable[UpdateMicrogridDispatchResponse],
397-
self.stub.UpdateMicrogridDispatch(msg, metadata=self._metadata),
411+
self.stub.UpdateMicrogridDispatch(
412+
msg, metadata=self._metadata, timeout=self._call_timeout
413+
),
398414
)
399415

400416
return Dispatch.from_protobuf(response.dispatch)
@@ -414,7 +430,9 @@ async def get(self, *, microgrid_id: int, dispatch_id: int) -> Dispatch:
414430
)
415431
response = await cast(
416432
Awaitable[GetMicrogridDispatchResponse],
417-
self.stub.GetMicrogridDispatch(request, metadata=self._metadata),
433+
self.stub.GetMicrogridDispatch(
434+
request, metadata=self._metadata, timeout=self._call_timeout
435+
),
418436
)
419437
return Dispatch.from_protobuf(response.dispatch)
420438

@@ -430,5 +448,7 @@ async def delete(self, *, microgrid_id: int, dispatch_id: int) -> None:
430448
)
431449
await cast(
432450
Awaitable[None],
433-
self.stub.DeleteMicrogridDispatch(request, metadata=self._metadata),
451+
self.stub.DeleteMicrogridDispatch(
452+
request, metadata=self._metadata, timeout=self._call_timeout
453+
),
434454
)

src/frequenz/client/dispatch/test/_service.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,13 +112,17 @@ def _check_access(self, metadata: grpc.aio.Metadata) -> None:
112112

113113
# pylint: disable=invalid-name
114114
async def ListMicrogridDispatches(
115-
self, request: ListMicrogridDispatchesRequest, metadata: grpc.aio.Metadata
115+
self,
116+
request: ListMicrogridDispatchesRequest,
117+
metadata: grpc.aio.Metadata,
118+
timeout: int = 5, # pylint: disable=unused-argument
116119
) -> ListMicrogridDispatchesResponse:
117120
"""List microgrid dispatches.
118121
119122
Args:
120123
request: The request.
121124
metadata: The metadata.
125+
timeout: timeout for the request, ignored in this mock.
122126
123127
Returns:
124128
The dispatch list.
@@ -141,13 +145,17 @@ async def ListMicrogridDispatches(
141145
)
142146

143147
async def StreamMicrogridDispatches(
144-
self, request: StreamMicrogridDispatchesRequest, metadata: grpc.aio.Metadata
148+
self,
149+
request: StreamMicrogridDispatchesRequest,
150+
metadata: grpc.aio.Metadata,
151+
timeout: int = 5, # pylint: disable=unused-argument
145152
) -> AsyncIterator[StreamMicrogridDispatchesResponse]:
146153
"""Stream microgrid dispatches changes.
147154
148155
Args:
149156
request: The request.
150157
metadata: The metadata.
158+
timeout: timeout for the request, ignored in this mock.
151159
152160
Returns:
153161
An async generator for dispatch changes.
@@ -212,6 +220,7 @@ async def CreateMicrogridDispatch(
212220
self,
213221
request: PBDispatchCreateRequest,
214222
metadata: grpc.aio.Metadata,
223+
timeout: int = 5, # pylint: disable=unused-argument
215224
) -> CreateMicrogridDispatchResponse:
216225
"""Create a new dispatch."""
217226
self._check_access(metadata)
@@ -240,6 +249,7 @@ async def UpdateMicrogridDispatch(
240249
self,
241250
request: UpdateMicrogridDispatchRequest,
242251
metadata: grpc.aio.Metadata,
252+
timeout: int = 5, # pylint: disable=unused-argument
243253
) -> UpdateMicrogridDispatchResponse:
244254
"""Update a dispatch."""
245255
self._check_access(metadata)
@@ -327,6 +337,7 @@ async def GetMicrogridDispatch(
327337
self,
328338
request: GetMicrogridDispatchRequest,
329339
metadata: grpc.aio.Metadata,
340+
timeout: int = 5, # pylint: disable=unused-argument
330341
) -> GetMicrogridDispatchResponse:
331342
"""Get a single dispatch."""
332343
self._check_access(metadata)
@@ -349,6 +360,7 @@ async def DeleteMicrogridDispatch(
349360
self,
350361
request: DeleteMicrogridDispatchRequest,
351362
metadata: grpc.aio.Metadata,
363+
timeout: int = 5, # pylint: disable=unused-argument
352364
) -> Empty:
353365
"""Delete a given dispatch."""
354366
self._check_access(metadata)

0 commit comments

Comments
 (0)