Skip to content

Commit 134c96d

Browse files
Add a timeout in the gRPC function calls
Signed-off-by: camille-bouvy-frequenz <[email protected]>
1 parent 779b3a5 commit 134c96d

File tree

1 file changed

+44
-11
lines changed

1 file changed

+44
-11
lines changed

src/frequenz/client/electricity_trading/_client.py

Lines changed: 44 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,11 @@
55

66
from __future__ import annotations
77

8+
import asyncio
89
import logging
910
from datetime import datetime, timezone
1011
from decimal import Decimal, InvalidOperation
11-
from typing import Awaitable, cast
12+
from typing import Any, Awaitable, Callable, cast
1213

1314
import grpc
1415

@@ -91,6 +92,31 @@ def validate_decimal_places(value: Decimal, decimal_places: int, name: str) -> N
9192
) from exc
9293

9394

95+
async def grpc_call_with_timeout(
96+
call: Callable[..., Awaitable[Any]], *args: Any, timeout: float = 300, **kwargs: Any
97+
) -> Any:
98+
"""
99+
Call a gRPC function with a timeout (in seconds).
100+
101+
Args:
102+
call: The gRPC method to be called.
103+
*args: Positional arguments for the gRPC call.
104+
timeout: Timeout duration in seconds. Defaults to 300.
105+
**kwargs: Keyword arguments for the gRPC call.
106+
107+
Returns:
108+
The result of the gRPC call.
109+
110+
Raises:
111+
asyncio.TimeoutError: If the call exceeds the timeout.
112+
"""
113+
try:
114+
return await asyncio.wait_for(call(*args, **kwargs), timeout=timeout)
115+
except asyncio.TimeoutError:
116+
_logger.error("Timeout while calling %s", call)
117+
raise
118+
119+
94120
class Client(BaseApiClient[ElectricityTradingServiceStub]):
95121
"""Electricity trading client."""
96122

@@ -503,10 +529,10 @@ async def create_gridpool_order(
503529
try:
504530
response = await cast(
505531
Awaitable[electricity_trading_pb2.CreateGridpoolOrderResponse],
506-
self.stub.CreateGridpoolOrder(
532+
grpc_call_with_timeout(
533+
self.stub.CreateGridpoolOrder,
507534
electricity_trading_pb2.CreateGridpoolOrderRequest(
508-
gridpool_id=gridpool_id,
509-
order=order.to_pb(),
535+
gridpool_id=gridpool_id, order=order.to_pb()
510536
),
511537
metadata=self._metadata,
512538
),
@@ -615,7 +641,8 @@ async def update_gridpool_order(
615641
try:
616642
response = await cast(
617643
Awaitable[electricity_trading_pb2.UpdateGridpoolOrderResponse],
618-
self.stub.UpdateGridpoolOrder(
644+
grpc_call_with_timeout(
645+
self.stub.UpdateGridpoolOrder,
619646
electricity_trading_pb2.UpdateGridpoolOrderRequest(
620647
gridpool_id=gridpool_id,
621648
order_id=order_id,
@@ -650,7 +677,8 @@ async def cancel_gridpool_order(
650677
try:
651678
response = await cast(
652679
Awaitable[electricity_trading_pb2.CancelGridpoolOrderResponse],
653-
self.stub.CancelGridpoolOrder(
680+
grpc_call_with_timeout(
681+
self.stub.CancelGridpoolOrder,
654682
electricity_trading_pb2.CancelGridpoolOrderRequest(
655683
gridpool_id=gridpool_id, order_id=order_id
656684
),
@@ -678,7 +706,8 @@ async def cancel_all_gridpool_orders(self, gridpool_id: int) -> int:
678706
try:
679707
response = await cast(
680708
Awaitable[electricity_trading_pb2.CancelAllGridpoolOrdersResponse],
681-
self.stub.CancelAllGridpoolOrders(
709+
grpc_call_with_timeout(
710+
self.stub.CancelAllGridpoolOrders,
682711
electricity_trading_pb2.CancelAllGridpoolOrdersRequest(
683712
gridpool_id=gridpool_id
684713
),
@@ -710,7 +739,8 @@ async def get_gridpool_order(self, gridpool_id: int, order_id: int) -> OrderDeta
710739
try:
711740
response = await cast(
712741
Awaitable[electricity_trading_pb2.GetGridpoolOrderResponse],
713-
self.stub.GetGridpoolOrder(
742+
grpc_call_with_timeout(
743+
self.stub.GetGridpoolOrder,
714744
electricity_trading_pb2.GetGridpoolOrderRequest(
715745
gridpool_id=gridpool_id, order_id=order_id
716746
),
@@ -770,7 +800,8 @@ async def list_gridpool_orders(
770800
try:
771801
response = await cast(
772802
Awaitable[electricity_trading_pb2.ListGridpoolOrdersResponse],
773-
self.stub.ListGridpoolOrders(
803+
grpc_call_with_timeout(
804+
self.stub.ListGridpoolOrders,
774805
electricity_trading_pb2.ListGridpoolOrdersRequest(
775806
gridpool_id=gridpool_id,
776807
filter=gridpool_order_filer.to_pb(),
@@ -842,7 +873,8 @@ async def list_gridpool_trades(
842873
try:
843874
response = await cast(
844875
Awaitable[electricity_trading_pb2.ListGridpoolTradesResponse],
845-
self.stub.ListGridpoolTrades(
876+
grpc_call_with_timeout(
877+
self.stub.ListGridpoolTrades,
846878
electricity_trading_pb2.ListGridpoolTradesRequest(
847879
gridpool_id=gridpool_id,
848880
filter=gridpool_trade_filter.to_pb(),
@@ -899,7 +931,8 @@ async def list_public_trades(
899931
try:
900932
response = await cast(
901933
Awaitable[electricity_trading_pb2.ListPublicTradesResponse],
902-
self.stub.ListPublicTrades(
934+
grpc_call_with_timeout(
935+
self.stub.ListPublicTrades,
903936
electricity_trading_pb2.ListPublicTradesRequest(
904937
filter=public_trade_filter.to_pb(),
905938
pagination_params=pagination_params.to_proto(),

0 commit comments

Comments
 (0)