Skip to content

Commit 75eee91

Browse files
authored
PYTHON-5505 Prototype system overload retry loop for all operations (#2497)
All commands that fail with the "Retryable" error label will be retried up to 3 times. When the error includes the "SystemOverloaded" error label we apply exponential backoff with jitter before attempting a retry.
1 parent cf7a1aa commit 75eee91

File tree

11 files changed

+491
-34
lines changed

11 files changed

+491
-34
lines changed

pymongo/asynchronous/collection.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
AsyncCursor,
5959
AsyncRawBatchCursor,
6060
)
61+
from pymongo.asynchronous.helpers import _retry_overload
6162
from pymongo.collation import validate_collation_or_none
6263
from pymongo.common import _ecoc_coll_name, _esc_coll_name
6364
from pymongo.errors import (
@@ -2227,6 +2228,7 @@ async def create_indexes(
22272228
return await self._create_indexes(indexes, session, **kwargs)
22282229

22292230
@_csot.apply
2231+
@_retry_overload
22302232
async def _create_indexes(
22312233
self, indexes: Sequence[IndexModel], session: Optional[AsyncClientSession], **kwargs: Any
22322234
) -> list[str]:
@@ -2422,7 +2424,6 @@ async def drop_indexes(
24222424
kwargs["comment"] = comment
24232425
await self._drop_index("*", session=session, **kwargs)
24242426

2425-
@_csot.apply
24262427
async def drop_index(
24272428
self,
24282429
index_or_name: _IndexKeyHint,
@@ -2472,6 +2473,7 @@ async def drop_index(
24722473
await self._drop_index(index_or_name, session, comment, **kwargs)
24732474

24742475
@_csot.apply
2476+
@_retry_overload
24752477
async def _drop_index(
24762478
self,
24772479
index_or_name: _IndexKeyHint,
@@ -3079,6 +3081,7 @@ async def aggregate_raw_batches(
30793081
)
30803082

30813083
@_csot.apply
3084+
@_retry_overload
30823085
async def rename(
30833086
self,
30843087
new_name: str,

pymongo/asynchronous/database.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
from pymongo.asynchronous.change_stream import AsyncDatabaseChangeStream
3939
from pymongo.asynchronous.collection import AsyncCollection
4040
from pymongo.asynchronous.command_cursor import AsyncCommandCursor
41+
from pymongo.asynchronous.helpers import _retry_overload
4142
from pymongo.common import _ecoc_coll_name, _esc_coll_name
4243
from pymongo.database_shared import _check_name, _CodecDocumentType
4344
from pymongo.errors import CollectionInvalid, InvalidOperation
@@ -477,6 +478,7 @@ async def watch(
477478
return change_stream
478479

479480
@_csot.apply
481+
@_retry_overload
480482
async def create_collection(
481483
self,
482484
name: str,
@@ -816,6 +818,7 @@ async def command(
816818
...
817819

818820
@_csot.apply
821+
@_retry_overload
819822
async def command(
820823
self,
821824
command: Union[str, MutableMapping[str, Any]],
@@ -947,6 +950,7 @@ async def command(
947950
)
948951

949952
@_csot.apply
953+
@_retry_overload
950954
async def cursor_command(
951955
self,
952956
command: Union[str, MutableMapping[str, Any]],
@@ -1264,6 +1268,7 @@ async def _drop_helper(
12641268
)
12651269

12661270
@_csot.apply
1271+
@_retry_overload
12671272
async def drop_collection(
12681273
self,
12691274
name_or_collection: Union[str, AsyncCollection[_DocumentTypeArg]],

pymongo/asynchronous/helpers.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,11 @@
1717

1818
import asyncio
1919
import builtins
20+
import functools
21+
import random
2022
import socket
2123
import sys
24+
import time
2225
from typing import (
2326
Any,
2427
Callable,
@@ -28,6 +31,7 @@
2831

2932
from pymongo.errors import (
3033
OperationFailure,
34+
PyMongoError,
3135
)
3236
from pymongo.helpers_shared import _REAUTHENTICATION_REQUIRED_CODE
3337

@@ -38,6 +42,7 @@
3842

3943

4044
def _handle_reauth(func: F) -> F:
45+
@functools.wraps(func)
4146
async def inner(*args: Any, **kwargs: Any) -> Any:
4247
no_reauth = kwargs.pop("no_reauth", False)
4348
from pymongo.asynchronous.pool import AsyncConnection
@@ -70,6 +75,42 @@ async def inner(*args: Any, **kwargs: Any) -> Any:
7075
return cast(F, inner)
7176

7277

78+
_MAX_RETRIES = 3
79+
_BACKOFF_INITIAL = 0.05
80+
_BACKOFF_MAX = 10
81+
_TIME = time
82+
83+
84+
async def _backoff(
85+
attempt: int, initial_delay: float = _BACKOFF_INITIAL, max_delay: float = _BACKOFF_MAX
86+
) -> None:
87+
jitter = random.random() # noqa: S311
88+
backoff = jitter * min(initial_delay * (2**attempt), max_delay)
89+
await asyncio.sleep(backoff)
90+
91+
92+
def _retry_overload(func: F) -> F:
93+
@functools.wraps(func)
94+
async def inner(*args: Any, **kwargs: Any) -> Any:
95+
attempt = 0
96+
while True:
97+
try:
98+
return await func(*args, **kwargs)
99+
except PyMongoError as exc:
100+
if not exc.has_error_label("Retryable"):
101+
raise
102+
attempt += 1
103+
if attempt > _MAX_RETRIES:
104+
raise
105+
106+
# Implement exponential backoff on retry.
107+
if exc.has_error_label("SystemOverloaded"):
108+
await _backoff(attempt)
109+
continue
110+
111+
return cast(F, inner)
112+
113+
73114
async def _getaddrinfo(
74115
host: Any, port: Any, **kwargs: Any
75116
) -> list[

pymongo/asynchronous/mongo_client.py

Lines changed: 40 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
from pymongo.asynchronous.client_bulk import _AsyncClientBulk
6868
from pymongo.asynchronous.client_session import _EmptyServerSession
6969
from pymongo.asynchronous.command_cursor import AsyncCommandCursor
70+
from pymongo.asynchronous.helpers import _MAX_RETRIES, _backoff, _retry_overload
7071
from pymongo.asynchronous.settings import TopologySettings
7172
from pymongo.asynchronous.topology import Topology, _ErrorContext
7273
from pymongo.client_options import ClientOptions
@@ -2398,6 +2399,7 @@ async def list_database_names(
23982399
return [doc["name"] async for doc in res]
23992400

24002401
@_csot.apply
2402+
@_retry_overload
24012403
async def drop_database(
24022404
self,
24032405
name_or_database: Union[str, database.AsyncDatabase[_DocumentTypeArg]],
@@ -2735,6 +2737,7 @@ def __init__(
27352737
):
27362738
self._last_error: Optional[Exception] = None
27372739
self._retrying = False
2740+
self._always_retryable = False
27382741
self._multiple_retries = _csot.get_timeout() is not None
27392742
self._client = mongo_client
27402743

@@ -2783,14 +2786,22 @@ async def run(self) -> T:
27832786
# most likely be a waste of time.
27842787
raise
27852788
except PyMongoError as exc:
2789+
always_retryable = False
2790+
overloaded = False
2791+
exc_to_check = exc
27862792
# Execute specialized catch on read
27872793
if self._is_read:
27882794
if isinstance(exc, (ConnectionFailure, OperationFailure)):
27892795
# ConnectionFailures do not supply a code property
27902796
exc_code = getattr(exc, "code", None)
2791-
if self._is_not_eligible_for_retry() or (
2792-
isinstance(exc, OperationFailure)
2793-
and exc_code not in helpers_shared._RETRYABLE_ERROR_CODES
2797+
always_retryable = exc.has_error_label("Retryable")
2798+
overloaded = exc.has_error_label("SystemOverloaded")
2799+
if not always_retryable and (
2800+
self._is_not_eligible_for_retry()
2801+
or (
2802+
isinstance(exc, OperationFailure)
2803+
and exc_code not in helpers_shared._RETRYABLE_ERROR_CODES
2804+
)
27942805
):
27952806
raise
27962807
self._retrying = True
@@ -2801,19 +2812,22 @@ async def run(self) -> T:
28012812

28022813
# Specialized catch on write operation
28032814
if not self._is_read:
2804-
if not self._retryable:
2815+
if isinstance(exc, ClientBulkWriteException) and isinstance(
2816+
exc.error, PyMongoError
2817+
):
2818+
exc_to_check = exc.error
2819+
retryable_write_label = exc_to_check.has_error_label("RetryableWriteError")
2820+
always_retryable = exc_to_check.has_error_label("Retryable")
2821+
overloaded = exc_to_check.has_error_label("SystemOverloaded")
2822+
if not self._retryable and not always_retryable:
28052823
raise
2806-
if isinstance(exc, ClientBulkWriteException) and exc.error:
2807-
retryable_write_error_exc = isinstance(
2808-
exc.error, PyMongoError
2809-
) and exc.error.has_error_label("RetryableWriteError")
2810-
else:
2811-
retryable_write_error_exc = exc.has_error_label("RetryableWriteError")
2812-
if retryable_write_error_exc:
2824+
if retryable_write_label or always_retryable:
28132825
assert self._session
28142826
await self._session._unpin()
2815-
if not retryable_write_error_exc or self._is_not_eligible_for_retry():
2816-
if exc.has_error_label("NoWritesPerformed") and self._last_error:
2827+
if not always_retryable and (
2828+
not retryable_write_label or self._is_not_eligible_for_retry()
2829+
):
2830+
if exc_to_check.has_error_label("NoWritesPerformed") and self._last_error:
28172831
raise self._last_error from exc
28182832
else:
28192833
raise
@@ -2822,14 +2836,24 @@ async def run(self) -> T:
28222836
self._bulk.retrying = True
28232837
else:
28242838
self._retrying = True
2825-
if not exc.has_error_label("NoWritesPerformed"):
2839+
if not exc_to_check.has_error_label("NoWritesPerformed"):
28262840
self._last_error = exc
28272841
if self._last_error is None:
28282842
self._last_error = exc
28292843

28302844
if self._client.topology_description.topology_type == TOPOLOGY_TYPE.Sharded:
28312845
self._deprioritized_servers.append(self._server)
28322846

2847+
self._always_retryable = always_retryable
2848+
if always_retryable:
2849+
if self._attempt_number > _MAX_RETRIES:
2850+
if exc_to_check.has_error_label("NoWritesPerformed") and self._last_error:
2851+
raise self._last_error from exc
2852+
else:
2853+
raise
2854+
if overloaded:
2855+
await _backoff(self._attempt_number)
2856+
28332857
def _is_not_eligible_for_retry(self) -> bool:
28342858
"""Checks if the exchange is not eligible for retry"""
28352859
return not self._retryable or (self._is_retrying() and not self._multiple_retries)
@@ -2891,7 +2915,7 @@ async def _write(self) -> T:
28912915
and conn.supports_sessions
28922916
)
28932917
is_mongos = conn.is_mongos
2894-
if not sessions_supported:
2918+
if not self._always_retryable and not sessions_supported:
28952919
# A retry is not possible because this server does
28962920
# not support sessions raise the last error.
28972921
self._check_last_error()
@@ -2923,7 +2947,7 @@ async def _read(self) -> T:
29232947
conn,
29242948
read_pref,
29252949
):
2926-
if self._retrying and not self._retryable:
2950+
if self._retrying and not self._retryable and not self._always_retryable:
29272951
self._check_last_error()
29282952
if self._retrying:
29292953
_debug_log(

pymongo/synchronous/collection.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@
8989
Cursor,
9090
RawBatchCursor,
9191
)
92+
from pymongo.synchronous.helpers import _retry_overload
9293
from pymongo.typings import _CollationIn, _DocumentType, _DocumentTypeArg, _Pipeline
9394
from pymongo.write_concern import DEFAULT_WRITE_CONCERN, WriteConcern, validate_boolean
9495

@@ -2224,6 +2225,7 @@ def create_indexes(
22242225
return self._create_indexes(indexes, session, **kwargs)
22252226

22262227
@_csot.apply
2228+
@_retry_overload
22272229
def _create_indexes(
22282230
self, indexes: Sequence[IndexModel], session: Optional[ClientSession], **kwargs: Any
22292231
) -> list[str]:
@@ -2419,7 +2421,6 @@ def drop_indexes(
24192421
kwargs["comment"] = comment
24202422
self._drop_index("*", session=session, **kwargs)
24212423

2422-
@_csot.apply
24232424
def drop_index(
24242425
self,
24252426
index_or_name: _IndexKeyHint,
@@ -2469,6 +2470,7 @@ def drop_index(
24692470
self._drop_index(index_or_name, session, comment, **kwargs)
24702471

24712472
@_csot.apply
2473+
@_retry_overload
24722474
def _drop_index(
24732475
self,
24742476
index_or_name: _IndexKeyHint,
@@ -3072,6 +3074,7 @@ def aggregate_raw_batches(
30723074
)
30733075

30743076
@_csot.apply
3077+
@_retry_overload
30753078
def rename(
30763079
self,
30773080
new_name: str,

pymongo/synchronous/database.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
from pymongo.synchronous.change_stream import DatabaseChangeStream
4444
from pymongo.synchronous.collection import Collection
4545
from pymongo.synchronous.command_cursor import CommandCursor
46+
from pymongo.synchronous.helpers import _retry_overload
4647
from pymongo.typings import _CollationIn, _DocumentType, _DocumentTypeArg, _Pipeline
4748

4849
if TYPE_CHECKING:
@@ -477,6 +478,7 @@ def watch(
477478
return change_stream
478479

479480
@_csot.apply
481+
@_retry_overload
480482
def create_collection(
481483
self,
482484
name: str,
@@ -816,6 +818,7 @@ def command(
816818
...
817819

818820
@_csot.apply
821+
@_retry_overload
819822
def command(
820823
self,
821824
command: Union[str, MutableMapping[str, Any]],
@@ -945,6 +948,7 @@ def command(
945948
)
946949

947950
@_csot.apply
951+
@_retry_overload
948952
def cursor_command(
949953
self,
950954
command: Union[str, MutableMapping[str, Any]],
@@ -1257,6 +1261,7 @@ def _drop_helper(
12571261
)
12581262

12591263
@_csot.apply
1264+
@_retry_overload
12601265
def drop_collection(
12611266
self,
12621267
name_or_collection: Union[str, Collection[_DocumentTypeArg]],

0 commit comments

Comments
 (0)