Skip to content

Commit c6526f8

Browse files
ShaneHarveyblink1073
authored andcommitted
PYTHON-5504 Prototype exponential backoff in with_transaction (#2492)
PYTHON-5505 Prototype system overload retry loop for all operations (#2497) All commands that fail with the "Retryable" error label will be retried up to 3 times. When the error includes the "SystemOverloaded" error label we apply exponential backoff with jitter before attempting a retry. PYTHON-5506 Prototype adaptive token bucket retry (#2501) Add adaptive token bucket based retry policy. Successfully completed commands deposit 0.1 token. Failed retry attempts consume 1 token. A retry is only permitted if there is an available token. Token bucket starts full with the maximum 1000 tokens. PYTHON-5505 Use proper RetryableError and SystemOverloadedError labels
1 parent ed3a974 commit c6526f8

File tree

12 files changed

+730
-38
lines changed

12 files changed

+730
-38
lines changed

pymongo/asynchronous/client_session.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,9 @@
135135

136136
from __future__ import annotations
137137

138+
import asyncio
138139
import collections
140+
import random
139141
import time
140142
import uuid
141143
from collections.abc import Mapping as _Mapping
@@ -470,6 +472,8 @@ def _max_time_expired_error(exc: PyMongoError) -> bool:
470472
# This limit is non-configurable and was chosen to be twice the 60 second
471473
# default value of MongoDB's `transactionLifetimeLimitSeconds` parameter.
472474
_WITH_TRANSACTION_RETRY_TIME_LIMIT = 120
475+
_BACKOFF_MAX = 1
476+
_BACKOFF_INITIAL = 0.050 # 50ms initial backoff
473477

474478

475479
def _within_time_limit(start_time: float) -> bool:
@@ -703,7 +707,13 @@ async def callback(session, custom_arg, custom_kwarg=None):
703707
https://github.com/mongodb/specifications/blob/master/source/transactions-convenient-api/transactions-convenient-api.md#handling-errors-inside-the-callback
704708
"""
705709
start_time = time.monotonic()
710+
retry = 0
706711
while True:
712+
if retry: # Implement exponential backoff on retry.
713+
jitter = random.random() # noqa: S311
714+
backoff = jitter * min(_BACKOFF_INITIAL * (2**retry), _BACKOFF_MAX)
715+
await asyncio.sleep(backoff)
716+
retry += 1
707717
await self.start_transaction(
708718
read_concern, write_concern, read_preference, max_commit_time_ms
709719
)

pymongo/asynchronous/collection.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
AsyncCursor,
5959
AsyncRawBatchCursor,
6060
)
61+
from pymongo.asynchronous.helpers import _retry_overload
6162
from pymongo.collation import validate_collation_or_none
6263
from pymongo.common import _ecoc_coll_name, _esc_coll_name
6364
from pymongo.errors import (
@@ -252,6 +253,7 @@ def __init__(
252253
unicode_decode_error_handler="replace", document_class=dict
253254
)
254255
self._timeout = database.client.options.timeout
256+
self._retry_policy = database.client._retry_policy
255257

256258
if create or kwargs:
257259
if _IS_SYNC:
@@ -2227,6 +2229,7 @@ async def create_indexes(
22272229
return await self._create_indexes(indexes, session, **kwargs)
22282230

22292231
@_csot.apply
2232+
@_retry_overload
22302233
async def _create_indexes(
22312234
self, indexes: Sequence[IndexModel], session: Optional[AsyncClientSession], **kwargs: Any
22322235
) -> list[str]:
@@ -2422,7 +2425,6 @@ async def drop_indexes(
24222425
kwargs["comment"] = comment
24232426
await self._drop_index("*", session=session, **kwargs)
24242427

2425-
@_csot.apply
24262428
async def drop_index(
24272429
self,
24282430
index_or_name: _IndexKeyHint,
@@ -2472,6 +2474,7 @@ async def drop_index(
24722474
await self._drop_index(index_or_name, session, comment, **kwargs)
24732475

24742476
@_csot.apply
2477+
@_retry_overload
24752478
async def _drop_index(
24762479
self,
24772480
index_or_name: _IndexKeyHint,
@@ -3072,6 +3075,7 @@ async def aggregate_raw_batches(
30723075
)
30733076

30743077
@_csot.apply
3078+
@_retry_overload
30753079
async def rename(
30763080
self,
30773081
new_name: str,

pymongo/asynchronous/database.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
from pymongo.asynchronous.change_stream import AsyncDatabaseChangeStream
3939
from pymongo.asynchronous.collection import AsyncCollection
4040
from pymongo.asynchronous.command_cursor import AsyncCommandCursor
41+
from pymongo.asynchronous.helpers import _retry_overload
4142
from pymongo.common import _ecoc_coll_name, _esc_coll_name
4243
from pymongo.database_shared import _check_name, _CodecDocumentType
4344
from pymongo.errors import CollectionInvalid, InvalidOperation
@@ -135,6 +136,7 @@ def __init__(
135136
self._name = name
136137
self._client: AsyncMongoClient[_DocumentType] = client
137138
self._timeout = client.options.timeout
139+
self._retry_policy = client._retry_policy
138140

139141
@property
140142
def client(self) -> AsyncMongoClient[_DocumentType]:
@@ -477,6 +479,7 @@ async def watch(
477479
return change_stream
478480

479481
@_csot.apply
482+
@_retry_overload
480483
async def create_collection(
481484
self,
482485
name: str,
@@ -819,6 +822,7 @@ async def command(
819822
...
820823

821824
@_csot.apply
825+
@_retry_overload
822826
async def command(
823827
self,
824828
command: Union[str, MutableMapping[str, Any]],
@@ -950,6 +954,7 @@ async def command(
950954
)
951955

952956
@_csot.apply
957+
@_retry_overload
953958
async def cursor_command(
954959
self,
955960
command: Union[str, MutableMapping[str, Any]],
@@ -1265,6 +1270,7 @@ async def _drop_helper(
12651270
)
12661271

12671272
@_csot.apply
1273+
@_retry_overload
12681274
async def drop_collection(
12691275
self,
12701276
name_or_collection: Union[str, AsyncCollection[_DocumentTypeArg]],

pymongo/asynchronous/helpers.py

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,26 @@
1616
from __future__ import annotations
1717

1818
import asyncio
19+
import builtins
20+
import functools
21+
import random
1922
import socket
23+
import time
24+
import time as time # noqa: PLC0414 # needed in sync version
2025
from typing import (
2126
Any,
2227
Callable,
2328
TypeVar,
2429
cast,
2530
)
2631

32+
from pymongo import _csot
2733
from pymongo.errors import (
2834
OperationFailure,
35+
PyMongoError,
2936
)
3037
from pymongo.helpers_shared import _REAUTHENTICATION_REQUIRED_CODE
38+
from pymongo.lock import _async_create_lock
3139

3240
_IS_SYNC = False
3341

@@ -36,6 +44,7 @@
3644

3745

3846
def _handle_reauth(func: F) -> F:
47+
@functools.wraps(func)
3948
async def inner(*args: Any, **kwargs: Any) -> Any:
4049
no_reauth = kwargs.pop("no_reauth", False)
4150
from pymongo.asynchronous.pool import AsyncConnection
@@ -68,6 +77,123 @@ async def inner(*args: Any, **kwargs: Any) -> Any:
6877
return cast(F, inner)
6978

7079

80+
_MAX_RETRIES = 3
81+
_BACKOFF_INITIAL = 0.05
82+
_BACKOFF_MAX = 10
83+
# DRIVERS-3240 will determine these defaults.
84+
DEFAULT_RETRY_TOKEN_CAPACITY = 1000.0
85+
DEFAULT_RETRY_TOKEN_RETURN = 0.1
86+
87+
88+
def _backoff(
89+
attempt: int, initial_delay: float = _BACKOFF_INITIAL, max_delay: float = _BACKOFF_MAX
90+
) -> float:
91+
jitter = random.random() # noqa: S311
92+
return jitter * min(initial_delay * (2**attempt), max_delay)
93+
94+
95+
class _TokenBucket:
96+
"""A token bucket implementation for rate limiting."""
97+
98+
def __init__(
99+
self,
100+
capacity: float = DEFAULT_RETRY_TOKEN_CAPACITY,
101+
return_rate: float = DEFAULT_RETRY_TOKEN_RETURN,
102+
):
103+
self.lock = _async_create_lock()
104+
self.capacity = capacity
105+
# DRIVERS-3240 will determine how full the bucket should start.
106+
self.tokens = capacity
107+
self.return_rate = return_rate
108+
109+
async def consume(self) -> bool:
110+
"""Consume a token from the bucket if available."""
111+
async with self.lock:
112+
if self.tokens >= 1:
113+
self.tokens -= 1
114+
return True
115+
return False
116+
117+
async def deposit(self, retry: bool = False) -> None:
118+
"""Deposit a token back into the bucket."""
119+
retry_token = 1 if retry else 0
120+
async with self.lock:
121+
self.tokens = min(self.capacity, self.tokens + retry_token + self.return_rate)
122+
123+
124+
class _RetryPolicy:
125+
"""A retry limiter that performs exponential backoff with jitter.
126+
127+
Retry attempts are limited by a token bucket to prevent overwhelming the server during
128+
a prolonged outage or high load.
129+
"""
130+
131+
def __init__(
132+
self,
133+
token_bucket: _TokenBucket,
134+
attempts: int = _MAX_RETRIES,
135+
backoff_initial: float = _BACKOFF_INITIAL,
136+
backoff_max: float = _BACKOFF_MAX,
137+
):
138+
self.token_bucket = token_bucket
139+
self.attempts = attempts
140+
self.backoff_initial = backoff_initial
141+
self.backoff_max = backoff_max
142+
143+
async def record_success(self, retry: bool) -> None:
144+
"""Record a successful operation."""
145+
await self.token_bucket.deposit(retry)
146+
147+
def backoff(self, attempt: int) -> float:
148+
"""Return the backoff duration for the given ."""
149+
return _backoff(max(0, attempt - 1), self.backoff_initial, self.backoff_max)
150+
151+
async def should_retry(self, attempt: int, delay: float) -> bool:
152+
"""Return if we have budget to retry and how long to backoff."""
153+
if attempt > self.attempts:
154+
return False
155+
156+
# If the delay would exceed the deadline, bail early before consuming a token.
157+
if _csot.get_timeout():
158+
if time.monotonic() + delay > _csot.get_deadline():
159+
return False
160+
161+
# Check token bucket last since we only want to consume a token if we actually retry.
162+
if not await self.token_bucket.consume():
163+
# DRIVERS-3246 Improve diagnostics when this case happens.
164+
# We could add info to the exception and log.
165+
return False
166+
return True
167+
168+
169+
def _retry_overload(func: F) -> F:
170+
@functools.wraps(func)
171+
async def inner(self: Any, *args: Any, **kwargs: Any) -> Any:
172+
retry_policy = self._retry_policy
173+
attempt = 0
174+
while True:
175+
try:
176+
res = await func(self, *args, **kwargs)
177+
await retry_policy.record_success(retry=attempt > 0)
178+
return res
179+
except PyMongoError as exc:
180+
if not exc.has_error_label("RetryableError"):
181+
raise
182+
attempt += 1
183+
delay = 0
184+
if exc.has_error_label("SystemOverloadedError"):
185+
delay = retry_policy.backoff(attempt)
186+
if not await retry_policy.should_retry(attempt, delay):
187+
raise
188+
189+
# Implement exponential backoff on retry.
190+
if delay:
191+
await asyncio.sleep(delay)
192+
continue
193+
194+
return cast(F, inner)
195+
196+
71197
async def _getaddrinfo(
72198
host: Any, port: Any, **kwargs: Any
73199
) -> list[

0 commit comments

Comments
 (0)