Skip to content

Commit 9fc7ed1

Browse files
committed
PYTHON-2143 Use an allow-list to determine resumable change stream errors
1 parent 1f4123e commit 9fc7ed1

File tree

6 files changed

+170
-98
lines changed

6 files changed

+170
-98
lines changed

pymongo/change_stream.py

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,18 +25,33 @@
2525
from pymongo.collation import validate_collation_or_none
2626
from pymongo.command_cursor import CommandCursor
2727
from pymongo.errors import (ConnectionFailure,
28+
CursorNotFound,
2829
InvalidOperation,
2930
OperationFailure,
3031
PyMongoError)
3132

3233

3334
# The change streams spec considers the following server errors from the
3435
# getMore command non-resumable. All other getMore errors are resumable.
35-
_NON_RESUMABLE_GETMORE_ERRORS = frozenset([
36-
11601, # Interrupted
37-
136, # CappedPositionLost
38-
237, # CursorKilled
39-
None, # No error code was returned.
36+
_RESUMABLE_GETMORE_ERRORS = frozenset([
37+
6, # HostUnreachable
38+
7, # HostNotFound
39+
89, # NetworkTimeout
40+
91, # ShutdownInProgress
41+
189, # PrimarySteppedDown
42+
262, # ExceededTimeLimit
43+
9001, # SocketException
44+
10107, # NotMaster
45+
11600, # InterruptedAtShutdown
46+
11602, # InterruptedDueToReplStateChange
47+
13435, # NotMasterNoSlaveOk
48+
13436, # NotMasterOrSecondary
49+
63, # StaleShardVersion
50+
150, # StaleEpoch
51+
13388, # StaleConfig
52+
234, # RetryChangeStream
53+
133, # FailedToSatisfyReadPreference
54+
216, # ElectionInProgress
4055
])
4156

4257

@@ -283,12 +298,15 @@ def try_next(self):
283298
# one resume attempt.
284299
try:
285300
change = self._cursor._try_next(True)
286-
except ConnectionFailure:
301+
except (ConnectionFailure, CursorNotFound):
287302
self._resume()
288303
change = self._cursor._try_next(False)
289304
except OperationFailure as exc:
290-
if (exc.code in _NON_RESUMABLE_GETMORE_ERRORS or
291-
exc.has_error_label("NonResumableChangeStreamError")):
305+
is_resumable = ((exc.max_wire_version >= 9 and
306+
exc.has_error_label("ResumableChangeStreamError")) or
307+
(exc.max_wire_version < 9 and
308+
exc.code in _RESUMABLE_GETMORE_ERRORS))
309+
if not is_resumable:
292310
raise
293311
self._resume()
294312
change = self._cursor._try_next(False)

pymongo/errors.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,18 +140,21 @@ class ConfigurationError(PyMongoError):
140140
class OperationFailure(PyMongoError):
141141
"""Raised when a database operation fails.
142142
143+
.. versionadded:: 3.11
144+
The :attr:`max_wire_version` attribute.
143145
.. versionadded:: 2.7
144146
The :attr:`details` attribute.
145147
"""
146148

147-
def __init__(self, error, code=None, details=None):
149+
def __init__(self, error, code=None, details=None, max_wire_version=None):
148150
error_labels = None
149151
if details is not None:
150152
error_labels = details.get('errorLabels')
151153
super(OperationFailure, self).__init__(
152154
error, error_labels=error_labels)
153155
self.__code = code
154156
self.__details = details
157+
self.__max_wire_version = max_wire_version
155158

156159
@property
157160
def code(self):
@@ -171,6 +174,15 @@ def details(self):
171174
"""
172175
return self.__details
173176

177+
@property
178+
def max_wire_version(self):
179+
"""The latest version of the wire protocol supported by the socket
180+
that was used to run the operation that raised this exception.
181+
182+
PyMongo does not always record this value and it may be None.
183+
"""
184+
return self.__max_wire_version
185+
174186
def __str__(self):
175187
output_str = "%s, full error: %s" % (self._message, self.__details)
176188
if sys.version_info[0] == 2 and isinstance(output_str, unicode):

pymongo/helpers.py

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -103,14 +103,16 @@ def _index_document(index_list):
103103

104104

105105
def _check_command_response(response, msg=None, allowable_errors=None,
106-
parse_write_concern_error=False):
106+
parse_write_concern_error=False,
107+
max_wire_version=None):
107108
"""Check the response to a command for errors.
108109
"""
109110
if "ok" not in response:
110111
# Server didn't recognize our message as a command.
111112
raise OperationFailure(response.get("$err"),
112113
response.get("code"),
113-
response)
114+
response,
115+
max_wire_version)
114116

115117
if parse_write_concern_error and 'writeConcernError' in response:
116118
_raise_write_concern_error(response['writeConcernError'])
@@ -146,19 +148,24 @@ def _check_command_response(response, msg=None, allowable_errors=None,
146148
details.get("assertion", ""))
147149
raise OperationFailure(errmsg,
148150
details.get("assertionCode"),
149-
response)
151+
response,
152+
max_wire_version)
150153

151154
# Other errors
152155
# findAndModify with upsert can raise duplicate key error
153156
if code in (11000, 11001, 12582):
154-
raise DuplicateKeyError(errmsg, code, response)
157+
raise DuplicateKeyError(errmsg, code, response,
158+
max_wire_version)
155159
elif code == 50:
156-
raise ExecutionTimeout(errmsg, code, response)
160+
raise ExecutionTimeout(errmsg, code, response,
161+
max_wire_version)
157162
elif code == 43:
158-
raise CursorNotFound(errmsg, code, response)
163+
raise CursorNotFound(errmsg, code, response,
164+
max_wire_version)
159165

160166
msg = msg or "%s"
161-
raise OperationFailure(msg % errmsg, code, response)
167+
raise OperationFailure(msg % errmsg, code, response,
168+
max_wire_version)
162169

163170

164171
def _check_gle_response(result):

pymongo/server.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,8 @@ def run_operation_with_response(
133133
first = docs[0]
134134
operation.client._process_response(
135135
first, operation.session)
136-
_check_command_response(first)
136+
_check_command_response(
137+
first, max_wire_version=sock_info.max_wire_version)
137138
except Exception as exc:
138139
if publish:
139140
duration = datetime.now() - start

test/change_streams/change-streams-errors.json

Lines changed: 48 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,6 @@
7575
{
7676
"description": "Change Stream should error when _id is projected out",
7777
"minServerVersion": "4.1.11",
78-
"maxServerVersion": "4.3.3",
7978
"target": "collection",
8079
"topology": [
8180
"replicaset",
@@ -103,10 +102,54 @@
103102
],
104103
"result": {
105104
"error": {
106-
"code": 280,
107-
"errorLabels": [
108-
"NonResumableChangeStreamError"
109-
]
105+
"code": 280
106+
}
107+
}
108+
},
109+
{
110+
"description": "change stream errors on MaxTimeMSExpired",
111+
"minServerVersion": "4.2",
112+
"failPoint": {
113+
"configureFailPoint": "failCommand",
114+
"mode": {
115+
"times": 1
116+
},
117+
"data": {
118+
"failCommands": [
119+
"getMore"
120+
],
121+
"errorCode": 50,
122+
"closeConnection": false
123+
}
124+
},
125+
"target": "collection",
126+
"topology": [
127+
"replicaset",
128+
"sharded"
129+
],
130+
"changeStreamPipeline": [
131+
{
132+
"$project": {
133+
"_id": 0
134+
}
135+
}
136+
],
137+
"changeStreamOptions": {},
138+
"operations": [
139+
{
140+
"database": "change-stream-tests",
141+
"collection": "test",
142+
"name": "insertOne",
143+
"arguments": {
144+
"document": {
145+
"z": 3
146+
}
147+
}
148+
}
149+
],
150+
"result": {
151+
"error": {
152+
"code": 50
110153
}
111154
}
112155
}

0 commit comments

Comments
 (0)