Skip to content

Commit 065001e

Browse files
PYTHON-2305 Cache postBatchResumeToken when an aggregate command returns an empty firstBatch (#456)
1 parent 1c29c1a commit 065001e

File tree

3 files changed

+20
-10
lines changed

3 files changed

+20
-10
lines changed

doc/changelog.rst

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@ Changelog
44
Changes in Version 3.11.0b2.dev0
55
--------------------------------
66

7-
Version 3.11 adds support for MongoDB 4.4. Highlights include:
7+
Version 3.11 adds support for MongoDB 4.4 and includes a number of bug fixes.
8+
Highlights include:
89

910
- Support for :ref:`OCSP` (Online Certificate Status Protocol).
1011
- Support for `PyOpenSSL <https://pypi.org/project/pyOpenSSL/>`_ as an
@@ -52,6 +53,9 @@ Version 3.11 adds support for MongoDB 4.4. Highlights include:
5253
- Deprecated :meth:`pymongo.collection.Collection.reindex`. Use
5354
:meth:`~pymongo.database.Database.command` to run the ``reIndex`` command
5455
instead.
56+
- Fixed a bug in change streams that could cause PyMongo to miss some change
57+
documents when resuming a stream that was started without a resume token and
58+
whose first batch did not contain any change documents.
5559

5660
Unavoidable breaking changes:
5761

pymongo/change_stream.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ def _change_stream_options(self):
125125
if resume_token is not None:
126126
if self._uses_start_after:
127127
options['startAfter'] = resume_token
128-
if self._uses_resume_after:
128+
else:
129129
options['resumeAfter'] = resume_token
130130

131131
if self._start_at_operation_time is not None:
@@ -149,17 +149,20 @@ def _aggregation_pipeline(self):
149149
return full_pipeline
150150

151151
def _process_result(self, result, session, server, sock_info, slave_ok):
152-
"""Callback that caches the startAtOperationTime from a changeStream
153-
aggregate command response containing an empty batch of change
154-
documents.
152+
"""Callback that caches the postBatchResumeToken or
153+
startAtOperationTime from a changeStream aggregate command response
154+
containing an empty batch of change documents.
155155
156156
This is implemented as a callback because we need access to the wire
157157
version in order to determine whether to cache this value.
158158
"""
159159
if not result['cursor']['firstBatch']:
160-
if (self._start_at_operation_time is None and
161-
self.resume_token is None and
162-
sock_info.max_wire_version >= 7):
160+
if 'postBatchResumeToken' in result['cursor']:
161+
self._resume_token = result['cursor']['postBatchResumeToken']
162+
elif (self._start_at_operation_time is None and
163+
self._uses_resume_after is False and
164+
self._uses_start_after is False and
165+
sock_info.max_wire_version >= 7):
163166
self._start_at_operation_time = result.get("operationTime")
164167
# PYTHON-2181: informative error on missing operationTime.
165168
if self._start_at_operation_time is None:

test/test_change_stream.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,6 @@ def test_watch(self):
127127
self.assertEqual([{'$project': {'foo': 0}}],
128128
change_stream._pipeline)
129129
self.assertEqual('updateLookup', change_stream._full_document)
130-
self.assertIsNone(change_stream.resume_token)
131130
self.assertEqual(1000, change_stream._max_await_time_ms)
132131
self.assertEqual(100, change_stream._batch_size)
133132
self.assertIsInstance(change_stream._cursor, CommandCursor)
@@ -472,8 +471,10 @@ def _get_expected_resume_token(self, stream, listener,
472471
listener is a WhiteListEventListener that listens for aggregate and
473472
getMore commands."""
474473
if previous_change is None or stream._cursor._has_next():
475-
return self._get_expected_resume_token_legacy(
474+
token = self._get_expected_resume_token_legacy(
476475
stream, listener, previous_change)
476+
if token is not None:
477+
return token
477478

478479
response = listener.results['succeeded'][-1].reply
479480
return response['cursor']['postBatchResumeToken']
@@ -1061,6 +1062,8 @@ def setFailPoint(self, scenario_dict):
10611062
fail_point = scenario_dict.get("failPoint")
10621063
if fail_point is None:
10631064
return
1065+
elif not client_context.test_commands_enabled:
1066+
self.skipTest("Test commands must be enabled")
10641067

10651068
fail_cmd = SON([('configureFailPoint', 'failCommand')])
10661069
fail_cmd.update(fail_point)

0 commit comments

Comments
 (0)