Skip to content

Commit 7c19ff7

Browse files
authored
PYTHON-3389 Close ChangeStream after non-resumable non-timeout errors (#1029)
1 parent fbb8dde commit 7c19ff7

File tree

2 files changed

+35
-20
lines changed

2 files changed

+35
-20
lines changed

pymongo/change_stream.py

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,19 @@
6868
from pymongo.mongo_client import MongoClient
6969

7070

71+
def _resumable(exc: PyMongoError) -> bool:
72+
"""Return True if given a resumable change stream error."""
73+
if isinstance(exc, (ConnectionFailure, CursorNotFound)):
74+
return True
75+
if isinstance(exc, OperationFailure):
76+
if exc._max_wire_version is None:
77+
return False
78+
return (
79+
exc._max_wire_version >= 9 and exc.has_error_label("ResumableChangeStreamError")
80+
) or (exc._max_wire_version < 9 and exc.code in _RESUMABLE_GETMORE_ERRORS)
81+
return False
82+
83+
7184
class ChangeStream(Generic[_DocumentType]):
7285
"""The internal abstract base class for change stream cursors.
7386
@@ -343,20 +356,21 @@ def try_next(self) -> Optional[_DocumentType]:
343356
# Attempt to get the next change with at most one getMore and at most
344357
# one resume attempt.
345358
try:
346-
change = self._cursor._try_next(True)
347-
except (ConnectionFailure, CursorNotFound):
348-
self._resume()
349-
change = self._cursor._try_next(False)
350-
except OperationFailure as exc:
351-
if exc._max_wire_version is None:
352-
raise
353-
is_resumable = (
354-
exc._max_wire_version >= 9 and exc.has_error_label("ResumableChangeStreamError")
355-
) or (exc._max_wire_version < 9 and exc.code in _RESUMABLE_GETMORE_ERRORS)
356-
if not is_resumable:
357-
raise
358-
self._resume()
359-
change = self._cursor._try_next(False)
359+
try:
360+
change = self._cursor._try_next(True)
361+
except PyMongoError as exc:
362+
if not _resumable(exc):
363+
raise
364+
self._resume()
365+
change = self._cursor._try_next(False)
366+
except PyMongoError as exc:
367+
# Close the stream after a fatal error.
368+
if not _resumable(exc) and not exc.timeout:
369+
self.close()
370+
raise
371+
except Exception:
372+
self.close()
373+
raise
360374

361375
# Check if the cursor was invalidated.
362376
if not self._cursor.alive:

test/test_change_stream.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -486,15 +486,16 @@ def _get_expected_resume_token(self, stream, listener, previous_change=None):
486486
return response["cursor"]["postBatchResumeToken"]
487487

488488
@no_type_check
489-
def _test_raises_error_on_missing_id(self, expected_exception, expected_exception2):
489+
def _test_raises_error_on_missing_id(self, expected_exception):
490490
"""ChangeStream will raise an exception if the server response is
491491
missing the resume token.
492492
"""
493493
with self.change_stream([{"$project": {"_id": 0}}]) as change_stream:
494494
self.watched_collection().insert_one({})
495495
with self.assertRaises(expected_exception):
496496
next(change_stream)
497-
with self.assertRaises(expected_exception2):
497+
# The cursor should now be closed.
498+
with self.assertRaises(StopIteration):
498499
next(change_stream)
499500

500501
@no_type_check
@@ -526,14 +527,14 @@ def test_update_resume_token_legacy(self):
526527
# Prose test no. 2
527528
@client_context.require_version_min(4, 1, 8)
528529
def test_raises_error_on_missing_id_418plus(self):
529-
# Server returns an error on 4.1.8+, subsequent next() resumes and gets the same error.
530-
self._test_raises_error_on_missing_id(OperationFailure, OperationFailure)
530+
# Server returns an error on 4.1.8+
531+
self._test_raises_error_on_missing_id(OperationFailure)
531532

532533
# Prose test no. 2
533534
@client_context.require_version_max(4, 1, 8)
534535
def test_raises_error_on_missing_id_418minus(self):
535-
# PyMongo raises an error, closes the cursor, subsequent next() raises StopIteration.
536-
self._test_raises_error_on_missing_id(InvalidOperation, StopIteration)
536+
# PyMongo raises an error
537+
self._test_raises_error_on_missing_id(InvalidOperation)
537538

538539
# Prose test no. 3
539540
@no_type_check

0 commit comments

Comments
 (0)