-
Notifications
You must be signed in to change notification settings - Fork 1.1k
PYTHON-5505 Prototype system overload retry loop for all operations #2497
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 10 commits
4728868
a83dea8
c2ff971
b6dd34d
eb113f0
b436647
f3be7f7
65fbd7e
194863f
989fdfe
1f40e11
6114788
d2e3354
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -67,6 +67,7 @@ | |
from pymongo.asynchronous.client_bulk import _AsyncClientBulk | ||
from pymongo.asynchronous.client_session import _EmptyServerSession | ||
from pymongo.asynchronous.command_cursor import AsyncCommandCursor | ||
from pymongo.asynchronous.helpers import _MAX_RETRIES, _backoff, _retry_overload | ||
from pymongo.asynchronous.settings import TopologySettings | ||
from pymongo.asynchronous.topology import Topology, _ErrorContext | ||
from pymongo.client_options import ClientOptions | ||
|
@@ -2398,6 +2399,7 @@ async def list_database_names( | |
return [doc["name"] async for doc in res] | ||
|
||
@_csot.apply | ||
@_retry_overload | ||
async def drop_database( | ||
self, | ||
name_or_database: Union[str, database.AsyncDatabase[_DocumentTypeArg]], | ||
|
@@ -2735,6 +2737,7 @@ def __init__( | |
): | ||
self._last_error: Optional[Exception] = None | ||
self._retrying = False | ||
self._always_retryable = False | ||
self._multiple_retries = _csot.get_timeout() is not None | ||
self._client = mongo_client | ||
|
||
|
@@ -2783,14 +2786,21 @@ async def run(self) -> T: | |
# most likely be a waste of time. | ||
raise | ||
except PyMongoError as exc: | ||
always_retryable = False | ||
overloaded = False | ||
# Execute specialized catch on read | ||
if self._is_read: | ||
if isinstance(exc, (ConnectionFailure, OperationFailure)): | ||
# ConnectionFailures do not supply a code property | ||
exc_code = getattr(exc, "code", None) | ||
if self._is_not_eligible_for_retry() or ( | ||
isinstance(exc, OperationFailure) | ||
and exc_code not in helpers_shared._RETRYABLE_ERROR_CODES | ||
always_retryable = exc.has_error_label("Retryable") | ||
overloaded = exc.has_error_label("SystemOverloaded") | ||
if not always_retryable and ( | ||
self._is_not_eligible_for_retry() | ||
or ( | ||
isinstance(exc, OperationFailure) | ||
and exc_code not in helpers_shared._RETRYABLE_ERROR_CODES | ||
) | ||
): | ||
raise | ||
self._retrying = True | ||
|
@@ -2801,18 +2811,24 @@ async def run(self) -> T: | |
|
||
# Specialized catch on write operation | ||
if not self._is_read: | ||
if not self._retryable: | ||
raise | ||
retryable_write_label = False | ||
if isinstance(exc, ClientBulkWriteException) and exc.error: | ||
retryable_write_error_exc = isinstance( | ||
exc.error, PyMongoError | ||
) and exc.error.has_error_label("RetryableWriteError") | ||
if isinstance(exc.error, PyMongoError): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah this can be cleaned up. I this it's a problem with ClientBulkWriteException violating Liskov substitution principle. Perhaps ClientBulkWriteException should override has_error_label() to behave correctly. I suspect the NoWritesPerformed check below is buggy because of this issue too. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Refactored to remove the duplicate label checks. I decided not to do PYTHON-5511 yet to reduce the scope of this PR. |
||
retryable_write_label = exc.error.has_error_label("RetryableWriteError") | ||
always_retryable = exc.error.has_error_label("Retryable") | ||
overloaded = exc.error.has_error_label("SystemOverloaded") | ||
else: | ||
retryable_write_error_exc = exc.has_error_label("RetryableWriteError") | ||
if retryable_write_error_exc: | ||
retryable_write_label = exc.has_error_label("RetryableWriteError") | ||
always_retryable = exc.has_error_label("Retryable") | ||
overloaded = exc.has_error_label("SystemOverloaded") | ||
if not self._retryable and not always_retryable: | ||
raise | ||
if retryable_write_label or always_retryable: | ||
assert self._session | ||
await self._session._unpin() | ||
if not retryable_write_error_exc or self._is_not_eligible_for_retry(): | ||
if not always_retryable and ( | ||
not retryable_write_label or not self._is_not_eligible_for_retry() | ||
): | ||
if exc.has_error_label("NoWritesPerformed") and self._last_error: | ||
raise self._last_error from exc | ||
else: | ||
|
@@ -2830,6 +2846,16 @@ async def run(self) -> T: | |
if self._client.topology_description.topology_type == TOPOLOGY_TYPE.Sharded: | ||
self._deprioritized_servers.append(self._server) | ||
|
||
self._always_retryable = always_retryable | ||
if always_retryable: | ||
if self._attempt_number > _MAX_RETRIES: | ||
if exc.has_error_label("NoWritesPerformed") and self._last_error: | ||
raise self._last_error from exc | ||
else: | ||
raise | ||
if overloaded: | ||
await _backoff(self._attempt_number) | ||
|
||
def _is_not_eligible_for_retry(self) -> bool: | ||
"""Checks if the exchange is not eligible for retry""" | ||
return not self._retryable or (self._is_retrying() and not self._multiple_retries) | ||
|
@@ -2923,7 +2949,7 @@ async def _read(self) -> T: | |
conn, | ||
read_pref, | ||
): | ||
if self._retrying and not self._retryable: | ||
if self._retrying and not self._retryable and not self._always_retryable: | ||
self._check_last_error() | ||
if self._retrying: | ||
_debug_log( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
always_retryable
represents what exactly? That an error is retryable under all conditions?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes it means it's safe to retry even if the operation is otherwise non-retryable, like an update_many. There's probably a better name for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
overloaded
still takes precedence overalways_retryable
, correct?