Skip to content

Commit d09e069

Browse files
Write a backoff module handling retries with increasing timeouts
Signed-off-by: camille-bouvy-frequenz <[email protected]>
1 parent 5b95dd9 commit d09e069

File tree

2 files changed

+170
-13
lines changed

2 files changed

+170
-13
lines changed
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
# License: MIT
2+
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Module to define a backoff strategy to handle retries with increasing timeouts."""
5+
6+
import asyncio
7+
import logging
8+
from datetime import datetime, timezone
9+
from typing import Any, Awaitable, Callable
10+
11+
_logger = logging.getLogger(__name__)
12+
13+
14+
class Backoff:
15+
"""Backoff manager to handle retries with increasing timeouts."""
16+
17+
def __init__( # pylint: disable=too-many-arguments, too-many-positional-arguments
18+
self,
19+
initial_timeout: float,
20+
max_timeout: float,
21+
timeout_growth_factor: float,
22+
max_retries: int,
23+
reset_interval: float,
24+
) -> None:
25+
"""
26+
Initialize the backoff manager.
27+
28+
Args:
29+
initial_timeout: The initial timeout duration in seconds.
30+
max_timeout: The maximum timeout duration in seconds.
31+
timeout_growth_factor: Factor by which to increase the timeout on each retry.
32+
max_retries: The maximum number of retry attempts.
33+
reset_interval: Time in seconds before resetting the timeout if calls succeed.
34+
35+
Raises:
36+
ValueError: If the timeout growth factor is less than or equal to 1.
37+
"""
38+
if timeout_growth_factor <= 1.0:
39+
raise ValueError("timeout_growth_factor must be greater than 1.0")
40+
if reset_interval < 0:
41+
raise ValueError("reset_interval must be non-negative.")
42+
self._initial_timeout = initial_timeout
43+
self._timeout = initial_timeout
44+
self._max_timeout = max_timeout
45+
self._timeout_growth_factor = timeout_growth_factor
46+
self._max_retries = max_retries
47+
self._reset_interval = reset_interval
48+
self._last_success_time = datetime.now(timezone.utc)
49+
50+
def _increase_timeout(self) -> None:
51+
"""Increase the timeout using the growth factor, but not beyond the maximum."""
52+
if self._timeout < self._max_timeout:
53+
self._timeout = min(
54+
self._max_timeout, self._timeout * self._timeout_growth_factor
55+
)
56+
_logger.info("Increased timeout to %ds.", self._timeout)
57+
else:
58+
_logger.warning(
59+
"Timeout is already at the maximum (%ds). No further increments.",
60+
self._max_timeout,
61+
)
62+
63+
def _reset_timeout_if_needed(self) -> None:
64+
"""Reset the timeout to the initial value if the reset interval has passed."""
65+
if self._reset_interval == 0:
66+
# Skip reset logic if reset_interval is 0
67+
return
68+
69+
now = datetime.now(timezone.utc)
70+
if (now - self._last_success_time).total_seconds() >= self._reset_interval:
71+
self._timeout = self._initial_timeout
72+
_logger.info(
73+
"Timeout reset to initial value %ds after %ds seconds without timeout errors.",
74+
self._initial_timeout,
75+
self._reset_interval,
76+
)
77+
78+
async def execute_with_backoff(
79+
self,
80+
call: Callable[..., Awaitable[Any]],
81+
*args: Any,
82+
**kwargs: Any,
83+
) -> Any:
84+
"""
85+
Execute a function with backoff logic for retries on timeout.
86+
87+
Args:
88+
call: The asynchronous function to call.
89+
*args: Positional arguments for the function.
90+
**kwargs: Keyword arguments for the function.
91+
92+
Returns:
93+
The result of the function call.
94+
95+
Raises:
96+
asyncio.TimeoutError: If all retries are exhausted.
97+
"""
98+
for attempt in range(self._max_retries + 1):
99+
try:
100+
self._reset_timeout_if_needed()
101+
result = await asyncio.wait_for(
102+
call(*args, **kwargs), timeout=self._timeout
103+
)
104+
# Update last success time on successful call
105+
self._last_success_time = datetime.now(timezone.utc)
106+
return result
107+
except asyncio.TimeoutError:
108+
call_name = call.__name__ if hasattr(call, "__name__") else str(call)
109+
if attempt == self._max_retries:
110+
_logger.error(
111+
"Timeout after %d retries (timeout=%ds): %s",
112+
self._max_retries,
113+
self._timeout,
114+
call,
115+
)
116+
raise
117+
_logger.warning(
118+
"Timeout on %s attempt %d/%d...",
119+
call_name,
120+
attempt + 1,
121+
self._max_retries + 1,
122+
)
123+
self._increase_timeout()

src/frequenz/client/electricity_trading/_client.py

Lines changed: 47 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33

44
"""Module to define the client class."""
55

6+
# pylint: disable=too-many-lines
7+
68
from __future__ import annotations
79

810
import logging
@@ -27,6 +29,7 @@
2729
from frequenz.client.common.pagination import Params
2830
from google.protobuf import field_mask_pb2, struct_pb2
2931

32+
from ._backoff import Backoff
3033
from ._types import (
3134
DeliveryArea,
3235
DeliveryPeriod,
@@ -91,7 +94,9 @@ def validate_decimal_places(value: Decimal, decimal_places: int, name: str) -> N
9194
) from exc
9295

9396

94-
class Client(BaseApiClient[ElectricityTradingServiceStub]):
97+
class Client(
98+
BaseApiClient[ElectricityTradingServiceStub]
99+
): # pylint:disable=too-many-instance-attributes
95100
"""Electricity trading client."""
96101

97102
_instances: dict[tuple[str, str | None], "Client"] = {}
@@ -120,15 +125,28 @@ def __new__(
120125

121126
return cls._instances[key]
122127

123-
def __init__(
124-
self, server_url: str, connect: bool = True, auth_key: str | None = None
128+
def __init__( # pylint: disable=too-many-arguments, too-many-positional-arguments
129+
self,
130+
server_url: str,
131+
connect: bool = True,
132+
auth_key: str | None = None,
133+
initial_timeout: float = 10.0,
134+
max_timeout: float = 300.0,
135+
timeout_growth_factor: float = 2.0,
136+
max_timeout_retries: int = 5,
137+
reset_interval: int = 600,
125138
) -> None:
126139
"""Initialize the client.
127140
128141
Args:
129142
server_url: The URL of the Electricity Trading service.
130143
connect: Whether to connect to the server immediately.
131144
auth_key: The API key for the authorization.
145+
initial_timeout: Initial timeout duration for gRPC calls (in seconds).
146+
max_timeout: Maximum timeout duration for gRPC calls (in seconds).
147+
timeout_growth_factor: Factor by which to increase the timeout on each retry.
148+
max_timeout_retries: Maximum number of retry attempts when a timeout is reached.
149+
reset_interval: Time (in seconds) before resetting timeout if everything behaves fine.
132150
"""
133151
if not hasattr(
134152
self, "_initialized"
@@ -163,6 +181,15 @@ def __init__(
163181

164182
self._metadata = (("key", auth_key),) if auth_key else ()
165183

184+
# Backoff strategy configuration to handle gRPC connection failures
185+
self._backoff = Backoff(
186+
initial_timeout=initial_timeout,
187+
max_timeout=max_timeout,
188+
timeout_growth_factor=timeout_growth_factor,
189+
max_retries=max_timeout_retries,
190+
reset_interval=reset_interval,
191+
)
192+
166193
@property
167194
def stub(self) -> electricity_trading_pb2_grpc.ElectricityTradingServiceAsyncStub:
168195
"""
@@ -503,10 +530,10 @@ async def create_gridpool_order(
503530
try:
504531
response = await cast(
505532
Awaitable[electricity_trading_pb2.CreateGridpoolOrderResponse],
506-
self.stub.CreateGridpoolOrder(
533+
self._backoff.execute_with_backoff(
534+
self.stub.CreateGridpoolOrder,
507535
electricity_trading_pb2.CreateGridpoolOrderRequest(
508-
gridpool_id=gridpool_id,
509-
order=order.to_pb(),
536+
gridpool_id=gridpool_id, order=order.to_pb()
510537
),
511538
metadata=self._metadata,
512539
),
@@ -615,7 +642,8 @@ async def update_gridpool_order(
615642
try:
616643
response = await cast(
617644
Awaitable[electricity_trading_pb2.UpdateGridpoolOrderResponse],
618-
self.stub.UpdateGridpoolOrder(
645+
self._backoff.execute_with_backoff(
646+
self.stub.UpdateGridpoolOrder,
619647
electricity_trading_pb2.UpdateGridpoolOrderRequest(
620648
gridpool_id=gridpool_id,
621649
order_id=order_id,
@@ -650,7 +678,8 @@ async def cancel_gridpool_order(
650678
try:
651679
response = await cast(
652680
Awaitable[electricity_trading_pb2.CancelGridpoolOrderResponse],
653-
self.stub.CancelGridpoolOrder(
681+
self._backoff.execute_with_backoff(
682+
self.stub.CancelGridpoolOrder,
654683
electricity_trading_pb2.CancelGridpoolOrderRequest(
655684
gridpool_id=gridpool_id, order_id=order_id
656685
),
@@ -678,7 +707,8 @@ async def cancel_all_gridpool_orders(self, gridpool_id: int) -> int:
678707
try:
679708
response = await cast(
680709
Awaitable[electricity_trading_pb2.CancelAllGridpoolOrdersResponse],
681-
self.stub.CancelAllGridpoolOrders(
710+
self._backoff.execute_with_backoff(
711+
self.stub.CancelAllGridpoolOrders,
682712
electricity_trading_pb2.CancelAllGridpoolOrdersRequest(
683713
gridpool_id=gridpool_id
684714
),
@@ -710,7 +740,8 @@ async def get_gridpool_order(self, gridpool_id: int, order_id: int) -> OrderDeta
710740
try:
711741
response = await cast(
712742
Awaitable[electricity_trading_pb2.GetGridpoolOrderResponse],
713-
self.stub.GetGridpoolOrder(
743+
self._backoff.execute_with_backoff(
744+
self.stub.GetGridpoolOrder,
714745
electricity_trading_pb2.GetGridpoolOrderRequest(
715746
gridpool_id=gridpool_id, order_id=order_id
716747
),
@@ -770,7 +801,8 @@ async def list_gridpool_orders(
770801
try:
771802
response = await cast(
772803
Awaitable[electricity_trading_pb2.ListGridpoolOrdersResponse],
773-
self.stub.ListGridpoolOrders(
804+
self._backoff.execute_with_backoff(
805+
self.stub.ListGridpoolOrders,
774806
electricity_trading_pb2.ListGridpoolOrdersRequest(
775807
gridpool_id=gridpool_id,
776808
filter=gridpool_order_filer.to_pb(),
@@ -842,7 +874,8 @@ async def list_gridpool_trades(
842874
try:
843875
response = await cast(
844876
Awaitable[electricity_trading_pb2.ListGridpoolTradesResponse],
845-
self.stub.ListGridpoolTrades(
877+
self._backoff.execute_with_backoff(
878+
self.stub.ListGridpoolTrades,
846879
electricity_trading_pb2.ListGridpoolTradesRequest(
847880
gridpool_id=gridpool_id,
848881
filter=gridpool_trade_filter.to_pb(),
@@ -899,7 +932,8 @@ async def list_public_trades(
899932
try:
900933
response = await cast(
901934
Awaitable[electricity_trading_pb2.ListPublicTradesResponse],
902-
self.stub.ListPublicTrades(
935+
self._backoff.execute_with_backoff(
936+
self.stub.ListPublicTrades,
903937
electricity_trading_pb2.ListPublicTradesRequest(
904938
filter=public_trade_filter.to_pb(),
905939
pagination_params=pagination_params.to_proto(),

0 commit comments

Comments
 (0)