Skip to content

Commit 4728868

Browse files
committed
PYTHON-5505 Prototype system overload retry loop for all operations
1 parent cf7a1aa commit 4728868

File tree

11 files changed

+353
-12
lines changed

11 files changed

+353
-12
lines changed

pymongo/asynchronous/collection.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
AsyncCursor,
5959
AsyncRawBatchCursor,
6060
)
61+
from pymongo.asynchronous.helpers import _retry_overload
6162
from pymongo.collation import validate_collation_or_none
6263
from pymongo.common import _ecoc_coll_name, _esc_coll_name
6364
from pymongo.errors import (
@@ -2227,6 +2228,7 @@ async def create_indexes(
22272228
return await self._create_indexes(indexes, session, **kwargs)
22282229

22292230
@_csot.apply
2231+
@_retry_overload
22302232
async def _create_indexes(
22312233
self, indexes: Sequence[IndexModel], session: Optional[AsyncClientSession], **kwargs: Any
22322234
) -> list[str]:
@@ -2422,7 +2424,6 @@ async def drop_indexes(
24222424
kwargs["comment"] = comment
24232425
await self._drop_index("*", session=session, **kwargs)
24242426

2425-
@_csot.apply
24262427
async def drop_index(
24272428
self,
24282429
index_or_name: _IndexKeyHint,
@@ -2472,6 +2473,7 @@ async def drop_index(
24722473
await self._drop_index(index_or_name, session, comment, **kwargs)
24732474

24742475
@_csot.apply
2476+
@_retry_overload
24752477
async def _drop_index(
24762478
self,
24772479
index_or_name: _IndexKeyHint,
@@ -3079,6 +3081,7 @@ async def aggregate_raw_batches(
30793081
)
30803082

30813083
@_csot.apply
3084+
@_retry_overload
30823085
async def rename(
30833086
self,
30843087
new_name: str,

pymongo/asynchronous/database.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
from pymongo.asynchronous.change_stream import AsyncDatabaseChangeStream
3939
from pymongo.asynchronous.collection import AsyncCollection
4040
from pymongo.asynchronous.command_cursor import AsyncCommandCursor
41+
from pymongo.asynchronous.helpers import _retry_overload
4142
from pymongo.common import _ecoc_coll_name, _esc_coll_name
4243
from pymongo.database_shared import _check_name, _CodecDocumentType
4344
from pymongo.errors import CollectionInvalid, InvalidOperation
@@ -477,6 +478,7 @@ async def watch(
477478
return change_stream
478479

479480
@_csot.apply
481+
@_retry_overload
480482
async def create_collection(
481483
self,
482484
name: str,
@@ -816,6 +818,7 @@ async def command(
816818
...
817819

818820
@_csot.apply
821+
@_retry_overload
819822
async def command(
820823
self,
821824
command: Union[str, MutableMapping[str, Any]],
@@ -947,6 +950,7 @@ async def command(
947950
)
948951

949952
@_csot.apply
953+
@_retry_overload
950954
async def cursor_command(
951955
self,
952956
command: Union[str, MutableMapping[str, Any]],
@@ -1264,6 +1268,7 @@ async def _drop_helper(
12641268
)
12651269

12661270
@_csot.apply
1271+
@_retry_overload
12671272
async def drop_collection(
12681273
self,
12691274
name_or_collection: Union[str, AsyncCollection[_DocumentTypeArg]],

pymongo/asynchronous/helpers.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@
1717

1818
import asyncio
1919
import builtins
20+
import random
2021
import socket
2122
import sys
23+
import time
2224
from typing import (
2325
Any,
2426
Callable,
@@ -28,6 +30,7 @@
2830

2931
from pymongo.errors import (
3032
OperationFailure,
33+
PyMongoError,
3134
)
3235
from pymongo.helpers_shared import _REAUTHENTICATION_REQUIRED_CODE
3336

@@ -70,6 +73,41 @@ async def inner(*args: Any, **kwargs: Any) -> Any:
7073
return cast(F, inner)
7174

7275

76+
_MAX_RETRIES = 3
77+
_BACKOFF_INITIAL = 0.05
78+
_BACKOFF_MAX = 10
79+
_TIME = time
80+
81+
82+
async def _backoff(
83+
attempt: int, initial_delay: float = _BACKOFF_INITIAL, max_delay: float = _BACKOFF_MAX
84+
) -> None:
85+
jitter = random.random() # noqa: S311
86+
backoff = jitter * min(initial_delay * (2**attempt), max_delay)
87+
await asyncio.sleep(backoff)
88+
89+
90+
def _retry_overload(func: F) -> F:
91+
async def inner(*args: Any, **kwargs: Any) -> Any:
92+
no_retry = kwargs.pop("no_retry", False)
93+
attempt = 0
94+
while True:
95+
try:
96+
return await func(*args, **kwargs)
97+
except PyMongoError as exc:
98+
if no_retry or not exc.has_error_label("Retryable"):
99+
raise
100+
attempt += 1
101+
if attempt > _MAX_RETRIES:
102+
raise
103+
104+
# Implement exponential backoff on retry.
105+
await _backoff(attempt)
106+
continue
107+
108+
return cast(F, inner)
109+
110+
73111
async def _getaddrinfo(
74112
host: Any, port: Any, **kwargs: Any
75113
) -> list[

pymongo/asynchronous/mongo_client.py

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
from pymongo.asynchronous.client_bulk import _AsyncClientBulk
6868
from pymongo.asynchronous.client_session import _EmptyServerSession
6969
from pymongo.asynchronous.command_cursor import AsyncCommandCursor
70+
from pymongo.asynchronous.helpers import _MAX_RETRIES, _backoff, _retry_overload
7071
from pymongo.asynchronous.settings import TopologySettings
7172
from pymongo.asynchronous.topology import Topology, _ErrorContext
7273
from pymongo.client_options import ClientOptions
@@ -2398,6 +2399,7 @@ async def list_database_names(
23982399
return [doc["name"] async for doc in res]
23992400

24002401
@_csot.apply
2402+
@_retry_overload
24012403
async def drop_database(
24022404
self,
24032405
name_or_database: Union[str, database.AsyncDatabase[_DocumentTypeArg]],
@@ -2783,14 +2785,19 @@ async def run(self) -> T:
27832785
# most likely be a waste of time.
27842786
raise
27852787
except PyMongoError as exc:
2788+
overload = False
27862789
# Execute specialized catch on read
27872790
if self._is_read:
27882791
if isinstance(exc, (ConnectionFailure, OperationFailure)):
27892792
# ConnectionFailures do not supply a code property
27902793
exc_code = getattr(exc, "code", None)
2791-
if self._is_not_eligible_for_retry() or (
2792-
isinstance(exc, OperationFailure)
2793-
and exc_code not in helpers_shared._RETRYABLE_ERROR_CODES
2794+
overload = exc.has_error_label("Retryable")
2795+
if not overload and (
2796+
self._is_not_eligible_for_retry()
2797+
or (
2798+
isinstance(exc, OperationFailure)
2799+
and exc_code not in helpers_shared._RETRYABLE_ERROR_CODES
2800+
)
27942801
):
27952802
raise
27962803
self._retrying = True
@@ -2807,12 +2814,18 @@ async def run(self) -> T:
28072814
retryable_write_error_exc = isinstance(
28082815
exc.error, PyMongoError
28092816
) and exc.error.has_error_label("RetryableWriteError")
2817+
overload = isinstance(
2818+
exc.error, PyMongoError
2819+
) and exc.error.has_error_label("Retryable")
28102820
else:
28112821
retryable_write_error_exc = exc.has_error_label("RetryableWriteError")
2812-
if retryable_write_error_exc:
2822+
overload = exc.has_error_label("Retryable")
2823+
if retryable_write_error_exc or overload:
28132824
assert self._session
28142825
await self._session._unpin()
2815-
if not retryable_write_error_exc or self._is_not_eligible_for_retry():
2826+
if not overload and (
2827+
not retryable_write_error_exc or not self._is_not_eligible_for_retry()
2828+
):
28162829
if exc.has_error_label("NoWritesPerformed") and self._last_error:
28172830
raise self._last_error from exc
28182831
else:
@@ -2830,6 +2843,14 @@ async def run(self) -> T:
28302843
if self._client.topology_description.topology_type == TOPOLOGY_TYPE.Sharded:
28312844
self._deprioritized_servers.append(self._server)
28322845

2846+
if overload:
2847+
if self._attempt_number > _MAX_RETRIES:
2848+
if exc.has_error_label("NoWritesPerformed") and self._last_error:
2849+
raise self._last_error from exc
2850+
else:
2851+
raise
2852+
await _backoff(self._attempt_number)
2853+
28332854
def _is_not_eligible_for_retry(self) -> bool:
28342855
"""Checks if the exchange is not eligible for retry"""
28352856
return not self._retryable or (self._is_retrying() and not self._multiple_retries)

pymongo/synchronous/collection.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@
8989
Cursor,
9090
RawBatchCursor,
9191
)
92+
from pymongo.synchronous.helpers import _retry_overload
9293
from pymongo.typings import _CollationIn, _DocumentType, _DocumentTypeArg, _Pipeline
9394
from pymongo.write_concern import DEFAULT_WRITE_CONCERN, WriteConcern, validate_boolean
9495

@@ -2224,6 +2225,7 @@ def create_indexes(
22242225
return self._create_indexes(indexes, session, **kwargs)
22252226

22262227
@_csot.apply
2228+
@_retry_overload
22272229
def _create_indexes(
22282230
self, indexes: Sequence[IndexModel], session: Optional[ClientSession], **kwargs: Any
22292231
) -> list[str]:
@@ -2419,7 +2421,6 @@ def drop_indexes(
24192421
kwargs["comment"] = comment
24202422
self._drop_index("*", session=session, **kwargs)
24212423

2422-
@_csot.apply
24232424
def drop_index(
24242425
self,
24252426
index_or_name: _IndexKeyHint,
@@ -2469,6 +2470,7 @@ def drop_index(
24692470
self._drop_index(index_or_name, session, comment, **kwargs)
24702471

24712472
@_csot.apply
2473+
@_retry_overload
24722474
def _drop_index(
24732475
self,
24742476
index_or_name: _IndexKeyHint,
@@ -3072,6 +3074,7 @@ def aggregate_raw_batches(
30723074
)
30733075

30743076
@_csot.apply
3077+
@_retry_overload
30753078
def rename(
30763079
self,
30773080
new_name: str,

pymongo/synchronous/database.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
from pymongo.synchronous.change_stream import DatabaseChangeStream
4444
from pymongo.synchronous.collection import Collection
4545
from pymongo.synchronous.command_cursor import CommandCursor
46+
from pymongo.synchronous.helpers import _retry_overload
4647
from pymongo.typings import _CollationIn, _DocumentType, _DocumentTypeArg, _Pipeline
4748

4849
if TYPE_CHECKING:
@@ -477,6 +478,7 @@ def watch(
477478
return change_stream
478479

479480
@_csot.apply
481+
@_retry_overload
480482
def create_collection(
481483
self,
482484
name: str,
@@ -816,6 +818,7 @@ def command(
816818
...
817819

818820
@_csot.apply
821+
@_retry_overload
819822
def command(
820823
self,
821824
command: Union[str, MutableMapping[str, Any]],
@@ -945,6 +948,7 @@ def command(
945948
)
946949

947950
@_csot.apply
951+
@_retry_overload
948952
def cursor_command(
949953
self,
950954
command: Union[str, MutableMapping[str, Any]],
@@ -1257,6 +1261,7 @@ def _drop_helper(
12571261
)
12581262

12591263
@_csot.apply
1264+
@_retry_overload
12601265
def drop_collection(
12611266
self,
12621267
name_or_collection: Union[str, Collection[_DocumentTypeArg]],

pymongo/synchronous/helpers.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@
1717

1818
import asyncio
1919
import builtins
20+
import random
2021
import socket
2122
import sys
23+
import time
2224
from typing import (
2325
Any,
2426
Callable,
@@ -28,6 +30,7 @@
2830

2931
from pymongo.errors import (
3032
OperationFailure,
33+
PyMongoError,
3134
)
3235
from pymongo.helpers_shared import _REAUTHENTICATION_REQUIRED_CODE
3336

@@ -70,6 +73,41 @@ def inner(*args: Any, **kwargs: Any) -> Any:
7073
return cast(F, inner)
7174

7275

76+
_MAX_RETRIES = 3
77+
_BACKOFF_INITIAL = 0.05
78+
_BACKOFF_MAX = 10
79+
_TIME = time
80+
81+
82+
def _backoff(
83+
attempt: int, initial_delay: float = _BACKOFF_INITIAL, max_delay: float = _BACKOFF_MAX
84+
) -> None:
85+
jitter = random.random() # noqa: S311
86+
backoff = jitter * min(initial_delay * (2**attempt), max_delay)
87+
time.sleep(backoff)
88+
89+
90+
def _retry_overload(func: F) -> F:
91+
def inner(*args: Any, **kwargs: Any) -> Any:
92+
no_retry = kwargs.pop("no_retry", False)
93+
attempt = 0
94+
while True:
95+
try:
96+
return func(*args, **kwargs)
97+
except PyMongoError as exc:
98+
if no_retry or not exc.has_error_label("Retryable"):
99+
raise
100+
attempt += 1
101+
if attempt > _MAX_RETRIES:
102+
raise
103+
104+
# Implement exponential backoff on retry.
105+
_backoff(attempt)
106+
continue
107+
108+
return cast(F, inner)
109+
110+
73111
def _getaddrinfo(
74112
host: Any, port: Any, **kwargs: Any
75113
) -> list[

0 commit comments

Comments
 (0)