Skip to content

Commit d2ed5e4

Browse files
committed
refactor(retries): added new retry policy to prevent retry amplification
1 parent f4735c9 commit d2ed5e4

File tree

10 files changed

+200
-14
lines changed

10 files changed

+200
-14
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,4 @@ venv*
77
.tox
88
build/
99
dist
10+
.python-version

README.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,24 @@ sdk = SDK(iam_token="t1.9eu...", endpoint="api.yandexcloud.kz")
156156
set_up_yc_api_endpoint(kz_region_endpoint)
157157
```
158158

159+
### Retries
160+
If you want to retry SDK requests, build SDK with `retry_policy` field using `RetryPolicy` class:
161+
162+
```python
163+
import grpc
164+
from yandexcloud import SDK, RetryPolicy
165+
166+
sdk = SDK(retry_policy=RetryPolicy(max_attempts=2, status_codes=(grpc.StatusCode.UNAVAILABLE,)))
167+
```
168+
169+
It's strongly recommended to use default settings of RetryPolicy to avoid retry amplification:
170+
```python
171+
import grpc
172+
from yandexcloud import SDK, RetryPolicy
173+
174+
sdk = SDK(retry_policy=RetryPolicy())
175+
```
176+
159177
## Contributing
160178
### Dependencies
161179
We use [uv](https://docs.astral.sh/uv) to manage dependencies and run commands in Makefile.

pyproject.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ dependencies = [
2727
"requests>=2.32.3,<3",
2828
"six>=1.17.0,<2",
2929
"grpcio-tools>=1.68.1",
30+
"deprecated>=1.2.18",
3031
]
3132

3233
[project.readme]
@@ -57,6 +58,7 @@ type = [
5758
"grpc-stubs>=1.53.0.5",
5859
"types-requests>=2.32.0.20241016",
5960
"types-six>=1.17.0.20241205",
61+
"types-deprecated>=1.2.15.20241117",
6062
]
6163
style = [
6264
"flake8>=7.1.1",

tests/test_retry_policy.py

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
from concurrent import futures
2+
from unittest.mock import Mock, patch
3+
4+
import grpc
5+
import pytest
6+
7+
from yandex.cloud.vpc.v1.network_service_pb2 import GetNetworkRequest
8+
from yandex.cloud.vpc.v1.network_service_pb2_grpc import (
9+
NetworkServiceStub,
10+
add_NetworkServiceServicer_to_server,
11+
)
12+
from yandexcloud import SDK, RetryPolicy
13+
from yandexcloud._channels import Channels
14+
15+
INSECURE_SERVICE_PORT = "5058"
16+
SERVICE_ADDR = "localhost"
17+
18+
19+
def side_effect_internal(_, context):
20+
context.set_code(grpc.StatusCode.INTERNAL)
21+
context.set_details("internal")
22+
23+
24+
def side_effect_unavailable(_, context):
25+
context.set_code(grpc.StatusCode.UNAVAILABLE)
26+
context.set_details("unavailable")
27+
28+
29+
class VPCServiceMock:
30+
def __init__(self, fn):
31+
self.Get = Mock(side_effect=fn)
32+
self.Create = Mock()
33+
self.Update = Mock()
34+
self.Delete = Mock()
35+
self.ListSubnets = Mock()
36+
self.ListSecurityGroups = Mock()
37+
self.ListRouteTables = Mock()
38+
self.ListOperations = Mock()
39+
self.Move = Mock()
40+
self.List = Mock()
41+
42+
43+
@pytest.fixture
44+
def mock_channel():
45+
with patch.multiple(
46+
Channels,
47+
_get_creds=lambda self, endpoint: grpc.local_channel_credentials(),
48+
_get_endpoints=lambda self: {
49+
"vpc": SERVICE_ADDR + ":" + INSECURE_SERVICE_PORT,
50+
"iam": SERVICE_ADDR + ":" + INSECURE_SERVICE_PORT,
51+
},
52+
) as channel_patch:
53+
yield channel_patch
54+
55+
56+
def grpc_server(side_effect):
57+
service = VPCServiceMock(side_effect)
58+
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
59+
server.add_insecure_port("localhost:" + INSECURE_SERVICE_PORT)
60+
add_NetworkServiceServicer_to_server(service, server)
61+
server.start()
62+
return server, service
63+
64+
65+
def test_default_retries(mock_channel):
66+
_, service = grpc_server(side_effect_unavailable)
67+
68+
sdk = SDK(
69+
retry_policy=RetryPolicy(),
70+
endpoint=f"localhost:{INSECURE_SERVICE_PORT}",
71+
endpoints={
72+
"vpc": SERVICE_ADDR + ":" + INSECURE_SERVICE_PORT,
73+
"iam": SERVICE_ADDR + ":" + INSECURE_SERVICE_PORT,
74+
},
75+
)
76+
network_client = sdk.client(NetworkServiceStub, insecure=True)
77+
try:
78+
request = GetNetworkRequest(network_id="asdf")
79+
network_client.Get(request)
80+
except grpc.RpcError:
81+
assert service.Get.call_count == 4
82+
83+
84+
def test_custom_retries(mock_channel):
85+
_, service = grpc_server(side_effect_internal)
86+
87+
sdk = SDK(
88+
retry_policy=RetryPolicy(status_codes=(grpc.StatusCode.INTERNAL,), max_attempts=4),
89+
endpoint=f"localhost:{INSECURE_SERVICE_PORT}",
90+
endpoints={
91+
"vpc": SERVICE_ADDR + ":" + INSECURE_SERVICE_PORT,
92+
"iam": SERVICE_ADDR + ":" + INSECURE_SERVICE_PORT,
93+
},
94+
)
95+
network_client = sdk.client(NetworkServiceStub, insecure=True)
96+
try:
97+
request = GetNetworkRequest(network_id="asdf")
98+
network_client.Get(request)
99+
except grpc.RpcError:
100+
assert service.Get.call_count == 4
101+
102+
103+
def test_no_retries(mock_channel):
104+
_, service = grpc_server(side_effect_internal)
105+
106+
sdk = SDK(
107+
endpoint=f"localhost:{INSECURE_SERVICE_PORT}",
108+
endpoints={
109+
"vpc": SERVICE_ADDR + ":" + INSECURE_SERVICE_PORT,
110+
"iam": SERVICE_ADDR + ":" + INSECURE_SERVICE_PORT,
111+
},
112+
)
113+
network_client = sdk.client(NetworkServiceStub, insecure=True)
114+
try:
115+
request = GetNetworkRequest(network_id="asdf")
116+
network_client.Get(request)
117+
except grpc.RpcError:
118+
assert service.Get.call_count == 1

uv.lock

Lines changed: 15 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

yandexcloud/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
default_backoff,
99
)
1010
from yandexcloud._retry_interceptor import RetryInterceptor
11+
from yandexcloud._retry_policy import RetryPolicy
1112
from yandexcloud._sdk import SDK
1213

1314
__version__ = "0.333.0"

yandexcloud/_channels.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ def __init__(
3030
root_certificates: Optional[bytes] = None,
3131
private_key: Optional[bytes] = None,
3232
certificate_chain: Optional[bytes] = None,
33+
service_config: Optional[str] = None,
3334
**_: str,
3435
) -> None:
3536
self._channel_creds = grpc.ssl_channel_credentials(
@@ -48,10 +49,14 @@ def __init__(
4849
self._client_user_agent = client_user_agent
4950
self._config_endpoints = endpoints if endpoints is not None else {}
5051
self._endpoints: Optional[Dict[str, str]] = None
52+
# flake8: noqa
5153
self.channel_options = tuple(
52-
("grpc.primary_user_agent", user_agent)
53-
for user_agent in [self._client_user_agent, SDK_USER_AGENT]
54-
if user_agent is not None
54+
[
55+
("grpc.primary_user_agent", user_agent)
56+
for user_agent in [self._client_user_agent, SDK_USER_AGENT]
57+
if user_agent is not None
58+
]
59+
+ ([("grpc.service_config", service_config)] if service_config is not None else [])
5560
)
5661

5762
def channel(self, service: str, endpoint: Optional[str] = None, insecure: bool = False) -> grpc.Channel:

yandexcloud/_retry_interceptor.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from typing import Callable, Iterable, Optional
55

66
import grpc
7+
from deprecated import deprecated
78

89

910
class _ClientCallDetails(
@@ -19,6 +20,7 @@ class _RetryCall(Exception):
1920
pass
2021

2122

23+
@deprecated(version="0.334.0", reason="Instead of this class use retry_policy field when building the SDK")
2224
class RetryInterceptor(grpc.UnaryUnaryClientInterceptor):
2325
"""RetryInterceptor implements grpc retries.
2426
It supports retries quantity, list of retriable codes, backoff function,

yandexcloud/_retry_policy.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
import json
2+
from typing import Tuple
3+
4+
import grpc
5+
6+
7+
class RetryPolicy:
8+
def __init__(
9+
self,
10+
max_attempts: int = 4,
11+
status_codes: Tuple["grpc.StatusCode"] = (grpc.StatusCode.UNAVAILABLE,),
12+
):
13+
self.__policy = {
14+
"methodConfig": [
15+
{
16+
"name": [{}],
17+
"retryPolicy": {
18+
"maxAttempts": max_attempts,
19+
"initialBackoff": "0.1s",
20+
"maxBackoff": "1s",
21+
"backoffMultiplier": 2,
22+
"retryableStatusCodes": list(map(lambda status: status.name, list(status_codes))),
23+
},
24+
}
25+
],
26+
"retryThrottling": {"maxTokens": 100, "tokenRatio": 0.1},
27+
"waitForReady": True,
28+
}
29+
30+
def to_json(self) -> str:
31+
return json.dumps(self.__policy)

yandexcloud/_sdk.py

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,7 @@
33

44
import grpc
55

6-
from yandexcloud import _channels, _helpers, _operation_waiter
7-
from yandexcloud._backoff import backoff_exponential_with_jitter
8-
from yandexcloud._retry_interceptor import RetryInterceptor
6+
from yandexcloud import _channels, _helpers, _operation_waiter, _retry_policy
97
from yandexcloud._wrappers import Wrappers
108

119
if TYPE_CHECKING:
@@ -41,12 +39,13 @@ def __init__(
4139
root_certificates: Optional[bytes] = None,
4240
private_key: Optional[bytes] = None,
4341
certificate_chain: Optional[bytes] = None,
42+
retry_policy: Optional[_retry_policy.RetryPolicy] = None,
4443
**kwargs: str,
4544
):
4645
"""
4746
API entry-point object.
4847
49-
:param interceptor: GRPC interceptor to be used instead of default RetryInterceptor
48+
:param interceptor: GRPC interceptor to be used
5049
:param user_agent: String to prepend User-Agent metadata header for all GRPC requests made via SDK object
5150
:param endpoints: Dict with services endpoints overrides. Example: {'vpc': 'new.vpc.endpoint:443'}
5251
@@ -61,14 +60,9 @@ def __init__(
6160
root_certificates,
6261
private_key,
6362
certificate_chain,
63+
retry_policy.to_json() if retry_policy is not None else None,
6464
**kwargs,
6565
)
66-
if interceptor is None:
67-
interceptor = RetryInterceptor(
68-
max_retry_count=5,
69-
per_call_timeout=30,
70-
back_off_func=backoff_exponential_with_jitter(1, 30),
71-
)
7266
self._default_interceptor = interceptor
7367
self.helpers = _helpers.Helpers(self)
7468
self.wrappers = Wrappers(self)

0 commit comments

Comments
 (0)