Skip to content

Commit 57a5d01

Browse files
yuguo.dtpevolc-sdk-team
authored andcommitted
feat: add retryer
1 parent 413e207 commit 57a5d01

File tree

12 files changed

+459
-10
lines changed

12 files changed

+459
-10
lines changed

volcenginesdkcore/api_client.py

Lines changed: 43 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@
33
from __future__ import absolute_import
44

55
import datetime
6+
import logging
67
from multiprocessing.pool import ThreadPool
8+
from time import sleep
79

810
# python 2 and python 3 compatibility library
911
import six
@@ -15,7 +17,9 @@
1517
from volcenginesdkcore.interceptor import DeserializedResponseInterceptor
1618
from volcenginesdkcore.interceptor import InterceptorChain, InterceptorContext
1719
from volcenginesdkcore.interceptor import Request, Response
20+
from volcenginesdkcore.retryer.retryer import DEFAULT_RETRYER
1821

22+
logger = logging.getLogger(__name__)
1923

2024
class ApiClient(object):
2125
"""Generic API client for Swagger client library builds.
@@ -122,14 +126,35 @@ def __call_api(
122126

123127
interceptor_context = self.interceptor_chain.execute_request(interceptor_context)
124128

125-
# perform request and return response
126-
response_data = self.request(
127-
method, url=interceptor_context.request.url, query_params=interceptor_context.request.query_params,
128-
headers=interceptor_context.request.header_params,
129-
post_params=interceptor_context.request.post_params, body=interceptor_context.request.body,
130-
_preload_content=interceptor_context.request.preload_content,
131-
_request_timeout=interceptor_context.request.request_timeout)
132-
self.last_response = response_data
129+
retry_count = 0
130+
response_data = None
131+
retry_err = None
132+
133+
auto_retry = interceptor_context.request.auto_retry
134+
retryer = interceptor_context.request.retryer
135+
num_max_retries = retryer.num_max_retries
136+
137+
while True:
138+
# perform request and return response
139+
try:
140+
response_data = self.request(
141+
method, url=interceptor_context.request.url, query_params=interceptor_context.request.query_params,
142+
headers=interceptor_context.request.header_params,
143+
post_params=interceptor_context.request.post_params, body=interceptor_context.request.body,
144+
_preload_content=interceptor_context.request.preload_content,
145+
_request_timeout=interceptor_context.request.request_timeout)
146+
self.last_response = response_data
147+
except Exception as e:
148+
logger.warning("request error: {}".format(e))
149+
retry_err = e
150+
if retry_count >= num_max_retries:
151+
raise e
152+
153+
if not auto_retry or not self.request_retry(response_data, retry_count, retry_err, retryer):
154+
if retry_err is not None:
155+
raise retry_err
156+
break
157+
retry_count += 1
133158

134159
interceptor_context.response = Response(response_data)
135160
interceptor_context = self.interceptor_chain.execute_response(interceptor_context)
@@ -141,6 +166,16 @@ def __call_api(
141166
return (response.result, response.http_response.status,
142167
response.http_response.getheaders())
143168

169+
def request_retry(self, response_data, retry_count, retry_err, retryer):
170+
if retryer.should_retry(response_data, retry_count, retry_err):
171+
delay = retryer.get_backoff_delay(retry_count)
172+
sleep(delay / 1000)
173+
if self.configuration.debug:
174+
logger.debug("retry backoff strategy:%s, retry condition: %s, max retry count:%d, current retry count: %d, retry delay(ms):%f",
175+
type(retryer.backoff_strategy).__name__,type(retryer.retry_condition).__name__, retryer.num_max_retries, retry_count + 1, delay)
176+
return True
177+
return False
178+
144179
def call_api(self, resource_path, method,
145180
path_params=None, query_params=None, header_params=None,
146181
body=None, post_params=None, files=None,

volcenginesdkcore/configuration.py

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from six.moves import http_client as httplib
1313

1414
from volcenginesdkcore.endpoint import DefaultEndpointProvider
15+
from volcenginesdkcore.retryer.retryer import DEFAULT_RETRYER
1516

1617

1718
class TypeWithDefault(type):
@@ -134,6 +135,12 @@ def __init__(self):
134135

135136
self.endpoint_provider = DefaultEndpointProvider()
136137

138+
self.auto_retry = True
139+
self.__retryer = DEFAULT_RETRYER
140+
self.__retry_error_codes = None
141+
self.__min_retry_delay_ms = None
142+
self.__max_retry_delay_ms = None
143+
137144
@property
138145
def logger_file(self):
139146
"""The logger file.
@@ -259,3 +266,68 @@ def to_debug_report(self):
259266
"Version of the API: 0.1.0\n" \
260267
"SDK Package Version: 3.0.15".\
261268
format(env=sys.platform, pyversion=sys.version)
269+
270+
@property
271+
def num_max_retries(self):
272+
return self.__retryer.num_max_retries
273+
274+
@num_max_retries.setter
275+
def num_max_retries(self, num_max_retries):
276+
if num_max_retries is None:
277+
raise ValueError("num_max_retries cannot be None")
278+
if num_max_retries < 0:
279+
raise ValueError("num_max_retries must be greater than or equal to 0")
280+
self.__retryer.num_max_retries = num_max_retries
281+
282+
@property
283+
def backoff_strategy(self):
284+
return self.__retryer.backoff_strategy
285+
286+
@backoff_strategy.setter
287+
def backoff_strategy(self, value):
288+
self.__retryer.backoff_strategy = value
289+
if self.min_retry_delay_ms is not None:
290+
self.__retryer.backoff_strategy.min_retry_delay_ms = self.min_retry_delay_ms
291+
if self.max_retry_delay_ms is not None:
292+
self.__retryer.backoff_strategy.max_retry_delay_ms = self.max_retry_delay_ms
293+
294+
@property
295+
def retry_condition(self):
296+
return self.__retryer.retry_condition
297+
298+
@retry_condition.setter
299+
def retry_condition(self, value):
300+
self.__retryer.retry_condition = value
301+
if self.retry_error_codes is not None:
302+
self.__retryer.retry_condition.retry_error_codes = self.retry_error_codes
303+
304+
@property
305+
def retry_error_codes(self):
306+
return self.__retry_error_codes
307+
308+
@retry_error_codes.setter
309+
def retry_error_codes(self, value):
310+
self.__retry_error_codes = value
311+
self.__retryer.retry_condition.retry_error_codes = value
312+
313+
@property
314+
def min_retry_delay_ms(self):
315+
return self.__min_retry_delay_ms
316+
317+
@min_retry_delay_ms.setter
318+
def min_retry_delay_ms(self, value):
319+
self.__min_retry_delay_ms = value
320+
self.__retryer.backoff_strategy.min_retry_delay_ms = value
321+
322+
@property
323+
def max_retry_delay_ms(self):
324+
return self.__max_retry_delay_ms
325+
326+
@max_retry_delay_ms.setter
327+
def max_retry_delay_ms(self, value):
328+
self.__max_retry_delay_ms = value
329+
self.__retryer.backoff_strategy.max_retry_delay_ms = value
330+
331+
@property
332+
def retryer(self):
333+
return self.__retryer

volcenginesdkcore/interceptor/interceptors/build_request_interceptor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
import datetime
22
import mimetypes
33
import os
4-
from urllib.parse import quote
54

65
import six
6+
from six.moves.urllib.parse import quote
77

88
from .interceptor import RequestInterceptor
99

volcenginesdkcore/interceptor/interceptors/request.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
1+
from volcenginesdkcore.retryer.retryer import DefaultRetryCondition, ExponentialWithRandomJitterBackoffStrategy
2+
3+
14
class RuntimeOption(object):
25
def __init__(self, client_side_validation, ak=None, sk=None, session_token=None, region=None, scheme=None,
3-
endpoint_provider=None, connect_timeout=None, read_timeout=None):
6+
endpoint_provider=None, connect_timeout=None, read_timeout=None, auto_retry=None, num_max_retries=None,
7+
backoff_strategy=None, retry_condition=None,
8+
retry_error_codes=None, min_retry_delay_ms=None, max_retry_delay_ms=None):
49
self.ak = ak
510
self.sk = sk
611
self.session_token = session_token
@@ -10,6 +15,13 @@ def __init__(self, client_side_validation, ak=None, sk=None, session_token=None,
1015
self.connect_timeout = connect_timeout
1116
self.read_timeout = read_timeout
1217
self.client_side_validation = client_side_validation
18+
self.auto_retry = auto_retry
19+
self.num_max_retries = num_max_retries
20+
self.backoff_strategy = backoff_strategy
21+
self.retry_condition = retry_condition
22+
self.retry_error_codes = retry_error_codes
23+
self.min_retry_delay_ms = min_retry_delay_ms
24+
self.max_retry_delay_ms = max_retry_delay_ms
1325

1426

1527
class Request(object):
@@ -49,6 +61,10 @@ def __init__(
4961
self.custom_bootstrap_region = configuration.custom_bootstrap_region
5062
self.use_dual_stack = configuration.use_dual_stack
5163

64+
# retryer setting, default use global configration if value not set
65+
self.auto_retry = configuration.auto_retry
66+
self.retryer = configuration.retryer
67+
5268
self.runtime_options = None
5369
if hasattr(body, '_configuration') and isinstance(body._configuration, RuntimeOption):
5470
self.runtime_options = body._configuration

volcenginesdkcore/interceptor/interceptors/runtime_options_interceptor.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,25 @@ def intercept(self, context):
3232
context.request.endpoint_provider = opt.endpoint_provider
3333
context.request.host = None
3434

35+
# retryer arguments set
36+
RuntimeOptionsInterceptor.__update_retryer(context, opt)
3537
return context
38+
39+
@staticmethod
40+
def __update_retryer(context, opt):
41+
min_retry_delay_ms = opt.min_retry_delay_ms \
42+
if opt.min_retry_delay_ms is not None else context.request.retryer.backoff_strategy.min_retry_delay_ms
43+
max_retry_delay_ms = opt.max_retry_delay_ms \
44+
if opt.max_retry_delay_ms is not None else context.request.retryer.backoff_strategy.max_retry_delay_ms
45+
retry_error_codes = opt.retry_error_codes \
46+
if opt.retry_error_codes is not None else context.request.retryer.retry_condition.retry_error_codes
47+
context.request.auto_retry = opt.auto_retry if opt.auto_retry is not None else context.request.auto_retry
48+
context.request.retryer.num_max_retries = opt.num_max_retries \
49+
if opt.num_max_retries is not None else context.request.num_max_retries
50+
context.request.retryer.backoff_strategy = opt.backoff_strategy \
51+
if opt.backoff_strategy is not None else context.request.retryer.backoff_strategy
52+
context.request.retryer.backoff_strategy.min_retry_delay_ms = min_retry_delay_ms
53+
context.request.retryer.backoff_strategy.max_retry_delay_ms = max_retry_delay_ms
54+
context.request.retryer.retry_condition = opt.retry_condition \
55+
if opt.retry_condition is not None else context.request.retryer.retry_condition
56+
context.request.retryer.retry_condition.retry_error_codes = retry_error_codes

volcenginesdkcore/rest.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
# python 2 and python 3 compatibility library
1313
import six
1414
from six.moves.urllib.parse import urlencode
15+
from urllib3 import Retry
1516

1617
try:
1718
import urllib3
@@ -88,6 +89,7 @@ def __init__(self, configuration, pools_size=4, maxsize=None):
8889
key_file=configuration.key_file,
8990
proxy_url=configuration.proxy,
9091
timeout=timeout,
92+
retries=Retry(total=False),
9193
**addition_pool_args
9294
)
9395
else:
@@ -99,6 +101,7 @@ def __init__(self, configuration, pools_size=4, maxsize=None):
99101
cert_file=configuration.cert_file,
100102
key_file=configuration.key_file,
101103
timeout=timeout,
104+
retries=Retry(total=False),
102105
**addition_pool_args
103106
)
104107

volcenginesdkcore/retryer/__init__.py

Whitespace-only changes.
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
# coding: utf-8
2+
3+
import random
4+
from abc import abstractmethod
5+
6+
from volcenginesdkcore.utils import six_utils
7+
8+
class BackoffStrategy(six_utils.get_abstract_meta_class()):
9+
"""
10+
Abstract base class for all backoff strategies.
11+
Defines the interface for computing retry delays.
12+
"""
13+
14+
def __init__(self,
15+
min_retry_delay_ms=30,
16+
max_retry_delay_ms=300 * 1000
17+
):
18+
# type:(float, float) -> None
19+
"""
20+
Initializes the BackoffStrategy with a min_retry_delay_ms and max_retry_delay_ms values.
21+
Args:
22+
:param min_retry_delay_ms: The minimum retry delay in milliseconds.
23+
:param max_retry_delay_ms: The maximum retry delay in milliseconds.
24+
"""
25+
self.min_retry_delay_ms = min_retry_delay_ms
26+
self.max_retry_delay_ms = max_retry_delay_ms
27+
28+
29+
@abstractmethod
30+
def compute_delay(self, retry_count):
31+
# type: (int) -> float
32+
"""
33+
Args:
34+
:param retry_count: The number of retries.
35+
Returns:
36+
float: The computed delay in milliseconds.
37+
"""
38+
pass
39+
40+
41+
class NoBackoffStrategy(BackoffStrategy):
42+
"""
43+
A backoff strategy that implements no delay between retries.
44+
"""
45+
46+
def compute_delay(self, retry_count):
47+
# type: (int) -> float
48+
"""
49+
Args:
50+
:param retry_count: The number of retries.
51+
Returns:
52+
float: The computed delay in milliseconds.
53+
"""
54+
return 0.0
55+
56+
57+
class ExponentialBackoffStrategy(BackoffStrategy):
58+
"""
59+
A backoff strategy that increases the delay exponentially with each retry.
60+
The delay is calculated as: min_retry_delay_ms * (2 ** retry_count),
61+
capped by max_retry_delay_ms.
62+
"""
63+
64+
def compute_delay(self, retry_count):
65+
# type: (int) -> float
66+
"""
67+
Args:
68+
:param retry_count: The number of retries.
69+
Returns:
70+
float: The computed delay in milliseconds.
71+
"""
72+
min_retry_delay_ms = self.min_retry_delay_ms
73+
max_retry_delay_ms = self.max_retry_delay_ms
74+
delay = min(min_retry_delay_ms * (2 ** retry_count), max_retry_delay_ms)
75+
return delay
76+
77+
class ExponentialWithRandomJitterBackoffStrategy(ExponentialBackoffStrategy):
78+
"""
79+
An exponential backoff strategy with random jitter.
80+
The delay is calculated as: base + random_uniform(0, base),
81+
capped by max_retry_delay_ms.
82+
This effectively means the delay is a random value between base and 2*base,
83+
then capped by max_retry_delay_ms.
84+
"""
85+
86+
def compute_delay(self, retry_count):
87+
# type: (int) -> float
88+
"""
89+
Args:
90+
:param retry_count: The number of retries.
91+
Returns:
92+
float: The computed delay in milliseconds.
93+
"""
94+
base = super(ExponentialWithRandomJitterBackoffStrategy, self).compute_delay(retry_count)
95+
jitter = base + random.uniform(0, base)
96+
max_retry_delay_ms = self.max_retry_delay_ms
97+
return min(max_retry_delay_ms, jitter)

0 commit comments

Comments
 (0)