Skip to content

Commit b8a5008

Browse files
Add a timeout in the gRPC function calls
Signed-off-by: camille-bouvy-frequenz <[email protected]>
1 parent 5da75cf commit b8a5008

File tree

1 file changed

+44
-11
lines changed

1 file changed

+44
-11
lines changed

src/frequenz/client/electricity_trading/_client.py

Lines changed: 44 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,11 @@
55

66
from __future__ import annotations
77

8+
import asyncio
89
import logging
910
from datetime import datetime, timezone
1011
from decimal import Decimal, InvalidOperation
11-
from typing import Awaitable, cast
12+
from typing import Any, Awaitable, Callable, cast
1213

1314
import grpc
1415

@@ -87,6 +88,31 @@ def validate_decimal_places(value: Decimal, decimal_places: int, name: str) -> N
8788
) from exc
8889

8990

91+
async def grpc_call_with_timeout(
92+
call: Callable[..., Awaitable[Any]], *args: Any, timeout: float = 300, **kwargs: Any
93+
) -> Any:
94+
"""
95+
Call a gRPC function with a timeout (in seconds).
96+
97+
Args:
98+
call: The gRPC method to be called.
99+
*args: Positional arguments for the gRPC call.
100+
timeout: Timeout duration in seconds. Defaults to 300.
101+
**kwargs: Keyword arguments for the gRPC call.
102+
103+
Returns:
104+
The result of the gRPC call.
105+
106+
Raises:
107+
asyncio.TimeoutError: If the call exceeds the timeout.
108+
"""
109+
try:
110+
return await asyncio.wait_for(call(*args, **kwargs), timeout=timeout)
111+
except asyncio.TimeoutError:
112+
_logger.error("Timeout while calling %s", call)
113+
raise
114+
115+
90116
class Client(BaseApiClient[ElectricityTradingServiceStub]):
91117
"""Electricity trading client."""
92118

@@ -493,10 +519,10 @@ async def create_gridpool_order(
493519
try:
494520
response = await cast(
495521
Awaitable[electricity_trading_pb2.CreateGridpoolOrderResponse],
496-
self.stub.CreateGridpoolOrder(
522+
grpc_call_with_timeout(
523+
self.stub.CreateGridpoolOrder,
497524
electricity_trading_pb2.CreateGridpoolOrderRequest(
498-
gridpool_id=gridpool_id,
499-
order=order.to_pb(),
525+
gridpool_id=gridpool_id, order=order.to_pb()
500526
),
501527
metadata=self._metadata,
502528
),
@@ -605,7 +631,8 @@ async def update_gridpool_order(
605631
try:
606632
response = await cast(
607633
Awaitable[electricity_trading_pb2.UpdateGridpoolOrderResponse],
608-
self.stub.UpdateGridpoolOrder(
634+
grpc_call_with_timeout(
635+
self.stub.UpdateGridpoolOrder,
609636
electricity_trading_pb2.UpdateGridpoolOrderRequest(
610637
gridpool_id=gridpool_id,
611638
order_id=order_id,
@@ -640,7 +667,8 @@ async def cancel_gridpool_order(
640667
try:
641668
response = await cast(
642669
Awaitable[electricity_trading_pb2.CancelGridpoolOrderResponse],
643-
self.stub.CancelGridpoolOrder(
670+
grpc_call_with_timeout(
671+
self.stub.CancelGridpoolOrder,
644672
electricity_trading_pb2.CancelGridpoolOrderRequest(
645673
gridpool_id=gridpool_id, order_id=order_id
646674
),
@@ -668,7 +696,8 @@ async def cancel_all_gridpool_orders(self, gridpool_id: int) -> int:
668696
try:
669697
response = await cast(
670698
Awaitable[electricity_trading_pb2.CancelAllGridpoolOrdersResponse],
671-
self.stub.CancelAllGridpoolOrders(
699+
grpc_call_with_timeout(
700+
self.stub.CancelAllGridpoolOrders,
672701
electricity_trading_pb2.CancelAllGridpoolOrdersRequest(
673702
gridpool_id=gridpool_id
674703
),
@@ -700,7 +729,8 @@ async def get_gridpool_order(self, gridpool_id: int, order_id: int) -> OrderDeta
700729
try:
701730
response = await cast(
702731
Awaitable[electricity_trading_pb2.GetGridpoolOrderResponse],
703-
self.stub.GetGridpoolOrder(
732+
grpc_call_with_timeout(
733+
self.stub.GetGridpoolOrder,
704734
electricity_trading_pb2.GetGridpoolOrderRequest(
705735
gridpool_id=gridpool_id, order_id=order_id
706736
),
@@ -760,7 +790,8 @@ async def list_gridpool_orders(
760790
try:
761791
response = await cast(
762792
Awaitable[electricity_trading_pb2.ListGridpoolOrdersResponse],
763-
self.stub.ListGridpoolOrders(
793+
grpc_call_with_timeout(
794+
self.stub.ListGridpoolOrders,
764795
electricity_trading_pb2.ListGridpoolOrdersRequest(
765796
gridpool_id=gridpool_id,
766797
filter=gridpool_order_filer.to_pb(),
@@ -832,7 +863,8 @@ async def list_gridpool_trades(
832863
try:
833864
response = await cast(
834865
Awaitable[electricity_trading_pb2.ListGridpoolTradesResponse],
835-
self.stub.ListGridpoolTrades(
866+
grpc_call_with_timeout(
867+
self.stub.ListGridpoolTrades,
836868
electricity_trading_pb2.ListGridpoolTradesRequest(
837869
gridpool_id=gridpool_id,
838870
filter=gridpool_trade_filter.to_pb(),
@@ -889,7 +921,8 @@ async def list_public_trades(
889921
try:
890922
response = await cast(
891923
Awaitable[electricity_trading_pb2.ListPublicTradesResponse],
892-
self.stub.ListPublicTrades(
924+
grpc_call_with_timeout(
925+
self.stub.ListPublicTrades,
893926
electricity_trading_pb2.ListPublicTradesRequest(
894927
filter=public_trade_filter.to_pb(),
895928
pagination_params=pagination_params.to_proto(),

0 commit comments

Comments
 (0)