Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ venv*
.tox
build/
dist
.python-version
18 changes: 18 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,24 @@ sdk = SDK(iam_token="t1.9eu...", endpoint="api.yandexcloud.kz")
set_up_yc_api_endpoint(kz_region_endpoint)
```

### Retries
If you want to retry SDK requests, build SDK with `retry_policy` field using `RetryPolicy` class:

```python
import grpc
from yandexcloud import SDK, RetryPolicy

sdk = SDK(retry_policy=RetryPolicy(max_attempts=2, status_codes=(grpc.StatusCode.UNAVAILABLE,)))
```

It's **strongly recommended** to use default settings of RetryPolicy to avoid retry amplification:
```python
import grpc
from yandexcloud import SDK, RetryPolicy

sdk = SDK(retry_policy=RetryPolicy())
```

## Contributing
### Dependencies
We use [uv](https://docs.astral.sh/uv) to manage dependencies and run commands in Makefile.
Expand Down
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ dependencies = [
"requests>=2.32.3,<3",
"six>=1.17.0,<2",
"grpcio-tools>=1.68.1",
"deprecated>=1.2.18",
]

[project.readme]
Expand Down Expand Up @@ -57,6 +58,7 @@ type = [
"grpc-stubs>=1.53.0.5",
"types-requests>=2.32.0.20241016",
"types-six>=1.17.0.20241205",
"types-deprecated>=1.2.15.20241117",
]
style = [
"flake8>=7.1.1",
Expand Down
123 changes: 123 additions & 0 deletions tests/test_retry_policy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
from concurrent import futures
from unittest.mock import Mock, patch

import grpc
import pytest

from yandex.cloud.vpc.v1.network_pb2 import Network
from yandex.cloud.vpc.v1.network_service_pb2 import GetNetworkRequest
from yandex.cloud.vpc.v1.network_service_pb2_grpc import (
NetworkServiceStub,
add_NetworkServiceServicer_to_server,
)
from yandexcloud import SDK, RetryPolicy
from yandexcloud._channels import Channels

INSECURE_SERVICE_PORT = "50051"
SERVICE_ADDR = "localhost"


def side_effect_internal(_, context):
context.set_code(grpc.StatusCode.INTERNAL)


def side_effect_unavailable(_, context):
context.set_code(grpc.StatusCode.UNAVAILABLE)


class VPCServiceMock:
def __init__(self, fn):
self.Get = Mock(return_value=Network(id="12342314"))
self.Create = Mock()
self.Update = Mock()
self.Delete = Mock()
self.ListSubnets = Mock()
self.ListSecurityGroups = Mock()
self.ListRouteTables = Mock()
self.ListOperations = Mock()
self.Move = Mock()
self.List = Mock()


@pytest.fixture
def mock_channel():
with patch.multiple(
Channels,
_get_creds=lambda self, endpoint: grpc.local_channel_credentials(),
_get_endpoints=lambda self: {
"vpc": SERVICE_ADDR + ":" + INSECURE_SERVICE_PORT,
"iam": SERVICE_ADDR + ":" + INSECURE_SERVICE_PORT,
},
) as channel_patch:
yield channel_patch


def grpc_server(side_effect):
service = VPCServiceMock(side_effect)
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
server.add_insecure_port("localhost:" + INSECURE_SERVICE_PORT)
add_NetworkServiceServicer_to_server(service, server)
server.start()
return server, service


def test_default_retries(mock_channel):
server, service = grpc_server(side_effect_unavailable)

sdk = SDK(
retry_policy=RetryPolicy(),
endpoint=f"localhost:{INSECURE_SERVICE_PORT}",
endpoints={
"vpc": SERVICE_ADDR + ":" + INSECURE_SERVICE_PORT,
"iam": SERVICE_ADDR + ":" + INSECURE_SERVICE_PORT,
},
)
network_client = sdk.client(NetworkServiceStub, insecure=True)
try:
request = GetNetworkRequest(network_id="asdf")
network_client.Get(request)
except grpc.RpcError:
assert service.Get.call_count == 4

server.stop(0)


def test_custom_retries(mock_channel):
server, service = grpc_server(side_effect_internal)

sdk = SDK(
retry_policy=RetryPolicy(status_codes=(grpc.StatusCode.INTERNAL,), max_attempts=4),
endpoint=f"localhost:{INSECURE_SERVICE_PORT}",
endpoints={
"vpc": SERVICE_ADDR + ":" + INSECURE_SERVICE_PORT,
"iam": SERVICE_ADDR + ":" + INSECURE_SERVICE_PORT,
},
)
network_client = sdk.client(NetworkServiceStub, insecure=True)
try:
request = GetNetworkRequest(network_id="asdf")
network_client.Get(request)
except grpc.RpcError:
assert service.Get.call_count == 4

server.stop(0)


def test_no_retries(mock_channel):
server, service = grpc_server(side_effect_internal)

sdk = SDK(
endpoint=f"localhost:{INSECURE_SERVICE_PORT}",
endpoints={
"vpc": SERVICE_ADDR + ":" + INSECURE_SERVICE_PORT,
"iam": SERVICE_ADDR + ":" + INSECURE_SERVICE_PORT,
},
)
network_client = sdk.client(NetworkServiceStub, insecure=True)
try:
request = GetNetworkRequest(network_id="asdf")
network_client.Get(request)
except grpc.RpcError:
assert service.Get.call_count == 1

server.stop(0)
16 changes: 15 additions & 1 deletion uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions yandexcloud/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
default_backoff,
)
from yandexcloud._retry_interceptor import RetryInterceptor
from yandexcloud._retry_policy import RetryPolicy
from yandexcloud._sdk import SDK

__version__ = "0.333.0"
11 changes: 8 additions & 3 deletions yandexcloud/_channels.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def __init__(
root_certificates: Optional[bytes] = None,
private_key: Optional[bytes] = None,
certificate_chain: Optional[bytes] = None,
service_config: Optional[str] = None,
**_: str,
) -> None:
self._channel_creds = grpc.ssl_channel_credentials(
Expand All @@ -48,10 +49,14 @@ def __init__(
self._client_user_agent = client_user_agent
self._config_endpoints = endpoints if endpoints is not None else {}
self._endpoints: Optional[Dict[str, str]] = None
# flake8: noqa
self.channel_options = tuple(
("grpc.primary_user_agent", user_agent)
for user_agent in [self._client_user_agent, SDK_USER_AGENT]
if user_agent is not None
[
("grpc.primary_user_agent", user_agent)
for user_agent in [self._client_user_agent, SDK_USER_AGENT]
if user_agent is not None
]
+ ([("grpc.service_config", service_config)] if service_config is not None else [])
)

def channel(self, service: str, endpoint: Optional[str] = None, insecure: bool = False) -> grpc.Channel:
Expand Down
2 changes: 2 additions & 0 deletions yandexcloud/_retry_interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import Callable, Iterable, Optional

import grpc
from deprecated import deprecated


class _ClientCallDetails(
Expand All @@ -19,6 +20,7 @@ class _RetryCall(Exception):
pass


@deprecated(version="0.334.0", reason="Instead of this class use retry_policy field when building the SDK")
class RetryInterceptor(grpc.UnaryUnaryClientInterceptor):
"""RetryInterceptor implements grpc retries.
It supports retries quantity, list of retriable codes, backoff function,
Expand Down
31 changes: 31 additions & 0 deletions yandexcloud/_retry_policy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import json
from typing import Tuple

import grpc


class RetryPolicy:
def __init__(
self,
max_attempts: int = 4,
status_codes: Tuple["grpc.StatusCode"] = (grpc.StatusCode.UNAVAILABLE,),
):
self.__policy = {
"methodConfig": [
{
"name": [{}],
"retryPolicy": {
"maxAttempts": max_attempts,
"initialBackoff": "0.1s",
"maxBackoff": "20s",
"backoffMultiplier": 2,
"retryableStatusCodes": [status.name for status in status_codes],
},
}
],
"retryThrottling": {"maxTokens": 100, "tokenRatio": 0.1},
"waitForReady": True,
}

def to_json(self) -> str:
return json.dumps(self.__policy)
15 changes: 5 additions & 10 deletions yandexcloud/_sdk.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@

import grpc

from yandexcloud import _channels, _helpers, _operation_waiter
from yandexcloud._backoff import backoff_exponential_with_jitter
from yandexcloud._retry_interceptor import RetryInterceptor
from yandexcloud import _channels, _helpers, _operation_waiter, _retry_policy
from yandexcloud._wrappers import Wrappers

if TYPE_CHECKING:
Expand Down Expand Up @@ -41,14 +39,16 @@ def __init__(
root_certificates: Optional[bytes] = None,
private_key: Optional[bytes] = None,
certificate_chain: Optional[bytes] = None,
retry_policy: Optional[_retry_policy.RetryPolicy] = None,
**kwargs: str,
):
"""
API entry-point object.

:param interceptor: GRPC interceptor to be used instead of default RetryInterceptor
:param interceptor: GRPC interceptor to be used
:param user_agent: String to prepend User-Agent metadata header for all GRPC requests made via SDK object
:param endpoints: Dict with services endpoints overrides. Example: {'vpc': 'new.vpc.endpoint:443'}
:param retry_policy: Retry policy configuration object to retry all failed GRPC requests

"""
self._channels = _channels.Channels(
Expand All @@ -61,14 +61,9 @@ def __init__(
root_certificates,
private_key,
certificate_chain,
retry_policy.to_json() if retry_policy is not None else None,
**kwargs,
)
if interceptor is None:
interceptor = RetryInterceptor(
max_retry_count=5,
per_call_timeout=30,
back_off_func=backoff_exponential_with_jitter(1, 30),
)
self._default_interceptor = interceptor
self.helpers = _helpers.Helpers(self)
self.wrappers = Wrappers(self)
Expand Down
Loading