diff --git a/.gitignore b/.gitignore index 15144a7e..2b55a928 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,4 @@ venv* .tox build/ dist +.python-version diff --git a/README.md b/README.md index 7f58cd2b..d8bd8605 100644 --- a/README.md +++ b/README.md @@ -156,6 +156,24 @@ sdk = SDK(iam_token="t1.9eu...", endpoint="api.yandexcloud.kz") set_up_yc_api_endpoint(kz_region_endpoint) ``` +### Retries +If you want to retry SDK requests, build SDK with `retry_policy` field using `RetryPolicy` class: + +```python +import grpc +from yandexcloud import SDK, RetryPolicy + +sdk = SDK(retry_policy=RetryPolicy(max_attempts=2, status_codes=(grpc.StatusCode.UNAVAILABLE,))) +``` + +It's **strongly recommended** to use default settings of RetryPolicy to avoid retry amplification: +```python +import grpc +from yandexcloud import SDK, RetryPolicy + +sdk = SDK(retry_policy=RetryPolicy()) +``` + ## Contributing ### Dependencies We use [uv](https://docs.astral.sh/uv) to manage dependencies and run commands in Makefile. diff --git a/pyproject.toml b/pyproject.toml index 58b347f1..e2ef32c1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -27,6 +27,7 @@ dependencies = [ "requests>=2.32.3,<3", "six>=1.17.0,<2", "grpcio-tools>=1.68.1", + "deprecated>=1.2.18", ] [project.readme] @@ -57,6 +58,7 @@ type = [ "grpc-stubs>=1.53.0.5", "types-requests>=2.32.0.20241016", "types-six>=1.17.0.20241205", + "types-deprecated>=1.2.15.20241117", ] style = [ "flake8>=7.1.1", diff --git a/tests/test_retry_policy.py b/tests/test_retry_policy.py new file mode 100644 index 00000000..49fefa0c --- /dev/null +++ b/tests/test_retry_policy.py @@ -0,0 +1,123 @@ +from concurrent import futures +from unittest.mock import Mock, patch + +import grpc +import pytest + +from yandex.cloud.vpc.v1.network_pb2 import Network +from yandex.cloud.vpc.v1.network_service_pb2 import GetNetworkRequest +from yandex.cloud.vpc.v1.network_service_pb2_grpc import ( + NetworkServiceStub, + add_NetworkServiceServicer_to_server, +) +from yandexcloud import SDK, RetryPolicy +from yandexcloud._channels import Channels + +INSECURE_SERVICE_PORT = "50051" +SERVICE_ADDR = "localhost" + + +def side_effect_internal(_, context): + context.set_code(grpc.StatusCode.INTERNAL) + + +def side_effect_unavailable(_, context): + context.set_code(grpc.StatusCode.UNAVAILABLE) + + +class VPCServiceMock: + def __init__(self, fn): + self.Get = Mock(return_value=Network(id="12342314")) + self.Create = Mock() + self.Update = Mock() + self.Delete = Mock() + self.ListSubnets = Mock() + self.ListSecurityGroups = Mock() + self.ListRouteTables = Mock() + self.ListOperations = Mock() + self.Move = Mock() + self.List = Mock() + + +@pytest.fixture +def mock_channel(): + with patch.multiple( + Channels, + _get_creds=lambda self, endpoint: grpc.local_channel_credentials(), + _get_endpoints=lambda self: { + "vpc": SERVICE_ADDR + ":" + INSECURE_SERVICE_PORT, + "iam": SERVICE_ADDR + ":" + INSECURE_SERVICE_PORT, + }, + ) as channel_patch: + yield channel_patch + + +def grpc_server(side_effect): + service = VPCServiceMock(side_effect) + server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) + server.add_insecure_port("localhost:" + INSECURE_SERVICE_PORT) + add_NetworkServiceServicer_to_server(service, server) + server.start() + return server, service + + +def test_default_retries(mock_channel): + server, service = grpc_server(side_effect_unavailable) + + sdk = SDK( + retry_policy=RetryPolicy(), + endpoint=f"localhost:{INSECURE_SERVICE_PORT}", + endpoints={ + "vpc": SERVICE_ADDR + ":" + INSECURE_SERVICE_PORT, + "iam": SERVICE_ADDR + ":" + INSECURE_SERVICE_PORT, + }, + ) + network_client = sdk.client(NetworkServiceStub, insecure=True) + try: + request = GetNetworkRequest(network_id="asdf") + network_client.Get(request) + except grpc.RpcError: + assert service.Get.call_count == 4 + + server.stop(0) + + +def test_custom_retries(mock_channel): + server, service = grpc_server(side_effect_internal) + + sdk = SDK( + retry_policy=RetryPolicy(status_codes=(grpc.StatusCode.INTERNAL,), max_attempts=4), + endpoint=f"localhost:{INSECURE_SERVICE_PORT}", + endpoints={ + "vpc": SERVICE_ADDR + ":" + INSECURE_SERVICE_PORT, + "iam": SERVICE_ADDR + ":" + INSECURE_SERVICE_PORT, + }, + ) + network_client = sdk.client(NetworkServiceStub, insecure=True) + try: + request = GetNetworkRequest(network_id="asdf") + network_client.Get(request) + except grpc.RpcError: + assert service.Get.call_count == 4 + + server.stop(0) + + +def test_no_retries(mock_channel): + server, service = grpc_server(side_effect_internal) + + sdk = SDK( + endpoint=f"localhost:{INSECURE_SERVICE_PORT}", + endpoints={ + "vpc": SERVICE_ADDR + ":" + INSECURE_SERVICE_PORT, + "iam": SERVICE_ADDR + ":" + INSECURE_SERVICE_PORT, + }, + ) + network_client = sdk.client(NetworkServiceStub, insecure=True) + try: + request = GetNetworkRequest(network_id="asdf") + network_client.Get(request) + except grpc.RpcError: + assert service.Get.call_count == 1 + + server.stop(0) diff --git a/uv.lock b/uv.lock index 714b8bd7..b45a51eb 100644 --- a/uv.lock +++ b/uv.lock @@ -1,4 +1,5 @@ version = 1 +revision = 1 requires-python = ">=3.9" resolution-markers = [ "python_full_version < '3.11'", @@ -249,7 +250,7 @@ name = "click" version = "8.1.8" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "colorama", marker = "platform_system == 'Windows'" }, + { name = "colorama", marker = "sys_platform == 'win32'" }, ] sdist = { url = "https://files.pythonhosted.org/packages/b9/2e/0090cbf739cee7d23781ad4b89a9894a41538e4fcf4c31dcdd705b78eb8b/click-8.1.8.tar.gz", hash = "sha256:ed53c9d8990d83c2a27deae68e4ee337473f6330c040a31d4225c9574d16096a", size = 226593 } wheels = [ @@ -1316,6 +1317,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/13/62/db91eff00205846878e0184d4ba5b24e35cf5540e16b77ea8e5644206dff/tox_gh-1.5.0-py3-none-any.whl", hash = "sha256:fd7e8c826f4576a02af4737fd4b738817660b63898c161d6ee8f658c885f7fa1", size = 6844 }, ] +[[package]] +name = "types-deprecated" +version = "1.2.15.20241117" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/a8/76/d3735190891b12533115e73ac835cfdd1f28378b6b39fd50dfe2fbd63143/types-Deprecated-1.2.15.20241117.tar.gz", hash = "sha256:924002c8b7fddec51ba4949788a702411a2e3636cd9b2a33abd8ee119701d77e", size = 3377 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/27/ed/9091bd7a90d3e2e08ee8c0bdbf0c826d3d9e3730ddd9b15cb64f4ae51b9b/types_Deprecated-1.2.15.20241117-py3-none-any.whl", hash = "sha256:a0cc5e39f769fc54089fd8e005416b55d74aa03f6964d2ed1a0b0b2e28751884", size = 3779 }, +] + [[package]] name = "types-protobuf" version = "5.29.1.20250208" @@ -1459,6 +1469,7 @@ version = "0.333.0" source = { editable = "." } dependencies = [ { name = "cryptography" }, + { name = "deprecated" }, { name = "googleapis-common-protos" }, { name = "grpcio" }, { name = "grpcio-tools" }, @@ -1495,6 +1506,7 @@ test = [ type = [ { name = "grpc-stubs" }, { name = "mypy" }, + { name = "types-deprecated" }, { name = "types-requests" }, { name = "types-six" }, ] @@ -1502,6 +1514,7 @@ type = [ [package.metadata] requires-dist = [ { name = "cryptography", specifier = ">=44.0.0" }, + { name = "deprecated", specifier = ">=1.2.18" }, { name = "googleapis-common-protos", specifier = ">=1.66.0,<2" }, { name = "grpcio", specifier = ">=1.68.1,<2" }, { name = "grpcio-tools", specifier = ">=1.68.1" }, @@ -1536,6 +1549,7 @@ test = [{ name = "pytest", specifier = ">=8.3.4" }] type = [ { name = "grpc-stubs", specifier = ">=1.53.0.5" }, { name = "mypy", specifier = ">=1.13.0" }, + { name = "types-deprecated", specifier = ">=1.2.15.20241117" }, { name = "types-requests", specifier = ">=2.32.0.20241016" }, { name = "types-six", specifier = ">=1.17.0.20241205" }, ] diff --git a/yandexcloud/__init__.py b/yandexcloud/__init__.py index 902e81d9..dc9925df 100644 --- a/yandexcloud/__init__.py +++ b/yandexcloud/__init__.py @@ -8,6 +8,7 @@ default_backoff, ) from yandexcloud._retry_interceptor import RetryInterceptor +from yandexcloud._retry_policy import RetryPolicy from yandexcloud._sdk import SDK __version__ = "0.333.0" diff --git a/yandexcloud/_channels.py b/yandexcloud/_channels.py index 73aa3f90..d23f96a4 100644 --- a/yandexcloud/_channels.py +++ b/yandexcloud/_channels.py @@ -30,6 +30,7 @@ def __init__( root_certificates: Optional[bytes] = None, private_key: Optional[bytes] = None, certificate_chain: Optional[bytes] = None, + service_config: Optional[str] = None, **_: str, ) -> None: self._channel_creds = grpc.ssl_channel_credentials( @@ -48,10 +49,14 @@ def __init__( self._client_user_agent = client_user_agent self._config_endpoints = endpoints if endpoints is not None else {} self._endpoints: Optional[Dict[str, str]] = None + # flake8: noqa self.channel_options = tuple( - ("grpc.primary_user_agent", user_agent) - for user_agent in [self._client_user_agent, SDK_USER_AGENT] - if user_agent is not None + [ + ("grpc.primary_user_agent", user_agent) + for user_agent in [self._client_user_agent, SDK_USER_AGENT] + if user_agent is not None + ] + + ([("grpc.service_config", service_config)] if service_config is not None else []) ) def channel(self, service: str, endpoint: Optional[str] = None, insecure: bool = False) -> grpc.Channel: diff --git a/yandexcloud/_retry_interceptor.py b/yandexcloud/_retry_interceptor.py index d3261f01..27abdd20 100644 --- a/yandexcloud/_retry_interceptor.py +++ b/yandexcloud/_retry_interceptor.py @@ -4,6 +4,7 @@ from typing import Callable, Iterable, Optional import grpc +from deprecated import deprecated class _ClientCallDetails( @@ -19,6 +20,7 @@ class _RetryCall(Exception): pass +@deprecated(version="0.334.0", reason="Instead of this class use retry_policy field when building the SDK") class RetryInterceptor(grpc.UnaryUnaryClientInterceptor): """RetryInterceptor implements grpc retries. It supports retries quantity, list of retriable codes, backoff function, diff --git a/yandexcloud/_retry_policy.py b/yandexcloud/_retry_policy.py new file mode 100644 index 00000000..b04b67ca --- /dev/null +++ b/yandexcloud/_retry_policy.py @@ -0,0 +1,31 @@ +import json +from typing import Tuple + +import grpc + + +class RetryPolicy: + def __init__( + self, + max_attempts: int = 4, + status_codes: Tuple["grpc.StatusCode"] = (grpc.StatusCode.UNAVAILABLE,), + ): + self.__policy = { + "methodConfig": [ + { + "name": [{}], + "retryPolicy": { + "maxAttempts": max_attempts, + "initialBackoff": "0.1s", + "maxBackoff": "20s", + "backoffMultiplier": 2, + "retryableStatusCodes": [status.name for status in status_codes], + }, + } + ], + "retryThrottling": {"maxTokens": 100, "tokenRatio": 0.1}, + "waitForReady": True, + } + + def to_json(self) -> str: + return json.dumps(self.__policy) diff --git a/yandexcloud/_sdk.py b/yandexcloud/_sdk.py index 58825c50..ce6b36c1 100644 --- a/yandexcloud/_sdk.py +++ b/yandexcloud/_sdk.py @@ -3,9 +3,7 @@ import grpc -from yandexcloud import _channels, _helpers, _operation_waiter -from yandexcloud._backoff import backoff_exponential_with_jitter -from yandexcloud._retry_interceptor import RetryInterceptor +from yandexcloud import _channels, _helpers, _operation_waiter, _retry_policy from yandexcloud._wrappers import Wrappers if TYPE_CHECKING: @@ -41,14 +39,16 @@ def __init__( root_certificates: Optional[bytes] = None, private_key: Optional[bytes] = None, certificate_chain: Optional[bytes] = None, + retry_policy: Optional[_retry_policy.RetryPolicy] = None, **kwargs: str, ): """ API entry-point object. - :param interceptor: GRPC interceptor to be used instead of default RetryInterceptor + :param interceptor: GRPC interceptor to be used :param user_agent: String to prepend User-Agent metadata header for all GRPC requests made via SDK object :param endpoints: Dict with services endpoints overrides. Example: {'vpc': 'new.vpc.endpoint:443'} + :param retry_policy: Retry policy configuration object to retry all failed GRPC requests """ self._channels = _channels.Channels( @@ -61,14 +61,9 @@ def __init__( root_certificates, private_key, certificate_chain, + retry_policy.to_json() if retry_policy is not None else None, **kwargs, ) - if interceptor is None: - interceptor = RetryInterceptor( - max_retry_count=5, - per_call_timeout=30, - back_off_func=backoff_exponential_with_jitter(1, 30), - ) self._default_interceptor = interceptor self.helpers = _helpers.Helpers(self) self.wrappers = Wrappers(self)