Skip to content

Commit afbf18b

Browse files
committed
PYTHON-1720 Add start_after parameter to watch() methods
1 parent 4170d8a commit afbf18b

File tree

5 files changed

+103
-17
lines changed

5 files changed

+103
-17
lines changed

pymongo/change_stream.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ class ChangeStream(object):
5252
"""
5353
def __init__(self, target, pipeline, full_document, resume_after,
5454
max_await_time_ms, batch_size, collation,
55-
start_at_operation_time, session):
55+
start_at_operation_time, session, start_after):
5656
if pipeline is None:
5757
pipeline = []
5858
elif not isinstance(pipeline, list):
@@ -82,6 +82,7 @@ def __init__(self, target, pipeline, full_document, resume_after,
8282
self._collation = collation
8383
self._start_at_operation_time = start_at_operation_time
8484
self._session = session
85+
self._start_after = copy.deepcopy(start_after)
8586
self._cursor = self._create_cursor()
8687

8788
@property
@@ -101,6 +102,8 @@ def _pipeline_options(self):
101102
options['fullDocument'] = self._full_document
102103
if self._resume_token is not None:
103104
options['resumeAfter'] = self._resume_token
105+
if self._start_after is not None:
106+
options['startAfter'] = self._start_after
104107
if self._start_at_operation_time is not None:
105108
options['startAtOperationTime'] = self._start_at_operation_time
106109
return options
@@ -141,7 +144,7 @@ def _cmd(session, server, sock_info, slave_ok):
141144

142145
if (self._start_at_operation_time is None and
143146
self._resume_token is None and
144-
cursor.get("_id") is None and
147+
self._start_after is None and
145148
sock_info.max_wire_version >= 7):
146149
self._start_at_operation_time = result["operationTime"]
147150

@@ -277,6 +280,7 @@ def try_next(self):
277280
"token is missing.")
278281
self._resume_token = copy.copy(resume_token)
279282
self._start_at_operation_time = None
283+
self._start_after = None
280284

281285
if self._decode_custom:
282286
return _bson_to_dict(change.raw, self._orig_codec_options)

pymongo/collection.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2458,7 +2458,7 @@ def aggregate_raw_batches(self, pipeline, **kwargs):
24582458

24592459
def watch(self, pipeline=None, full_document='default', resume_after=None,
24602460
max_await_time_ms=None, batch_size=None, collation=None,
2461-
start_at_operation_time=None, session=None):
2461+
start_at_operation_time=None, session=None, start_after=None):
24622462
"""Watch changes on this collection.
24632463
24642464
Performs an aggregation with an implicit initial ``$changeStream``
@@ -2519,8 +2519,10 @@ def watch(self, pipeline=None, full_document='default', resume_after=None,
25192519
updates will include both a delta describing the changes to the
25202520
document, as well as a copy of the entire document that was
25212521
changed from some time after the change occurred.
2522-
- `resume_after` (optional): The logical starting point for this
2523-
change stream.
2522+
- `resume_after` (optional): A resume token. If provided, the
2523+
change stream will start returning changes that occur directly
2524+
after the operation specified in the resume token. A resume token
2525+
is the _id value of a change document.
25242526
- `max_await_time_ms` (optional): The maximum time in milliseconds
25252527
for the server to wait for changes before responding to a getMore
25262528
operation.
@@ -2534,10 +2536,16 @@ def watch(self, pipeline=None, full_document='default', resume_after=None,
25342536
MongoDB >= 4.0.
25352537
- `session` (optional): a
25362538
:class:`~pymongo.client_session.ClientSession`.
2539+
- `start_after` (optional): The same as `resume_after` except that
2540+
`start_after` can resume notifications after an invalidate event.
2541+
This option and `resume_after` are mutually exclusive.
25372542
25382543
:Returns:
25392544
A :class:`~pymongo.change_stream.CollectionChangeStream` cursor.
25402545
2546+
.. versionchanged:: 3.9
2547+
Added the ``start_after`` parameter.
2548+
25412549
.. versionchanged:: 3.7
25422550
Added the ``start_at_operation_time`` parameter.
25432551
@@ -2550,8 +2558,8 @@ def watch(self, pipeline=None, full_document='default', resume_after=None,
25502558
"""
25512559
return CollectionChangeStream(
25522560
self, pipeline, full_document, resume_after, max_await_time_ms,
2553-
batch_size, collation, start_at_operation_time, session
2554-
)
2561+
batch_size, collation, start_at_operation_time, session,
2562+
start_after)
25552563

25562564
def group(self, key, condition, initial, reduce, finalize=None, **kwargs):
25572565
"""Perform a query similar to an SQL *group by* operation.

pymongo/database.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -447,7 +447,7 @@ def _fix_outgoing(self, son, collection):
447447

448448
def watch(self, pipeline=None, full_document='default', resume_after=None,
449449
max_await_time_ms=None, batch_size=None, collation=None,
450-
start_at_operation_time=None, session=None):
450+
start_at_operation_time=None, session=None, start_after=None):
451451
"""Watch changes on this database.
452452
453453
Performs an aggregation with an implicit initial ``$changeStream``
@@ -499,8 +499,10 @@ def watch(self, pipeline=None, full_document='default', resume_after=None,
499499
updates will include both a delta describing the changes to the
500500
document, as well as a copy of the entire document that was
501501
changed from some time after the change occurred.
502-
- `resume_after` (optional): The logical starting point for this
503-
change stream.
502+
- `resume_after` (optional): A resume token. If provided, the
503+
change stream will start returning changes that occur directly
504+
after the operation specified in the resume token. A resume token
505+
is the _id value of a change document.
504506
- `max_await_time_ms` (optional): The maximum time in milliseconds
505507
for the server to wait for changes before responding to a getMore
506508
operation.
@@ -514,10 +516,16 @@ def watch(self, pipeline=None, full_document='default', resume_after=None,
514516
MongoDB >= 4.0.
515517
- `session` (optional): a
516518
:class:`~pymongo.client_session.ClientSession`.
519+
- `start_after` (optional): The same as `resume_after` except that
520+
`start_after` can resume notifications after an invalidate event.
521+
This option and `resume_after` are mutually exclusive.
517522
518523
:Returns:
519524
A :class:`~pymongo.change_stream.DatabaseChangeStream` cursor.
520525
526+
.. versionchanged:: 3.9
527+
Added the ``start_after`` parameter.
528+
521529
.. versionadded:: 3.7
522530
523531
.. mongodoc:: changeStreams
@@ -527,8 +535,8 @@ def watch(self, pipeline=None, full_document='default', resume_after=None,
527535
"""
528536
return DatabaseChangeStream(
529537
self, pipeline, full_document, resume_after, max_await_time_ms,
530-
batch_size, collation, start_at_operation_time, session
531-
)
538+
batch_size, collation, start_at_operation_time, session,
539+
start_after)
532540

533541
def _command(self, sock_info, command, slave_ok=False, value=1, check=True,
534542
allowable_errors=None, read_preference=ReadPreference.PRIMARY,

pymongo/mongo_client.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -802,7 +802,7 @@ def _server_property(self, attr_name):
802802

803803
def watch(self, pipeline=None, full_document='default', resume_after=None,
804804
max_await_time_ms=None, batch_size=None, collation=None,
805-
start_at_operation_time=None, session=None):
805+
start_at_operation_time=None, session=None, start_after=None):
806806
"""Watch changes on this cluster.
807807
808808
Performs an aggregation with an implicit initial ``$changeStream``
@@ -854,8 +854,10 @@ def watch(self, pipeline=None, full_document='default', resume_after=None,
854854
updates will include both a delta describing the changes to the
855855
document, as well as a copy of the entire document that was
856856
changed from some time after the change occurred.
857-
- `resume_after` (optional): The logical starting point for this
858-
change stream.
857+
- `resume_after` (optional): A resume token. If provided, the
858+
change stream will start returning changes that occur directly
859+
after the operation specified in the resume token. A resume token
860+
is the _id value of a change document.
859861
- `max_await_time_ms` (optional): The maximum time in milliseconds
860862
for the server to wait for changes before responding to a getMore
861863
operation.
@@ -869,10 +871,16 @@ def watch(self, pipeline=None, full_document='default', resume_after=None,
869871
MongoDB >= 4.0.
870872
- `session` (optional): a
871873
:class:`~pymongo.client_session.ClientSession`.
874+
- `start_after` (optional): The same as `resume_after` except that
875+
`start_after` can resume notifications after an invalidate event.
876+
This option and `resume_after` are mutually exclusive.
872877
873878
:Returns:
874879
A :class:`~pymongo.change_stream.ClusterChangeStream` cursor.
875880
881+
.. versionchanged:: 3.9
882+
Added the ``start_after`` parameter.
883+
876884
.. versionadded:: 3.7
877885
878886
.. mongodoc:: changeStreams
@@ -882,8 +890,8 @@ def watch(self, pipeline=None, full_document='default', resume_after=None,
882890
"""
883891
return ClusterChangeStream(
884892
self.admin, pipeline, full_document, resume_after, max_await_time_ms,
885-
batch_size, collation, start_at_operation_time, session
886-
)
893+
batch_size, collation, start_at_operation_time, session,
894+
start_after)
887895

888896
@property
889897
def event_listeners(self):

test/test_change_stream.py

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -597,6 +597,64 @@ def test_read_concern(self):
597597
with coll.watch():
598598
pass
599599

600+
def invalidate_resume_token(self):
601+
with self.coll.watch(
602+
[{'$match': {'operationType': 'invalidate'}}]) as cs:
603+
self.coll.insert_one({'_id': 1})
604+
self.coll.drop()
605+
resume_token = cs.next()['_id']
606+
self.assertFalse(cs.alive)
607+
return resume_token
608+
609+
@client_context.require_version_min(4, 1, 1)
610+
def test_start_after(self):
611+
resume_token = self.invalidate_resume_token()
612+
613+
# resume_after cannot resume after invalidate.
614+
with self.assertRaises(OperationFailure):
615+
self.coll.watch(resume_after=resume_token)
616+
617+
# start_after can resume after invalidate.
618+
with self.coll.watch(start_after=resume_token) as change_stream:
619+
self.coll.insert_one({'_id': 2})
620+
change = change_stream.next()
621+
self.assertEqual(change['operationType'], 'insert')
622+
self.assertEqual(change['fullDocument'], {'_id': 2})
623+
624+
@client_context.require_version_min(4, 1, 1)
625+
def test_start_after_resume_process_with_changes(self):
626+
resume_token = self.invalidate_resume_token()
627+
628+
with self.coll.watch(start_after=resume_token,
629+
max_await_time_ms=250) as change_stream:
630+
self.coll.insert_one({'_id': 2})
631+
change = change_stream.next()
632+
self.assertEqual(change['operationType'], 'insert')
633+
self.assertEqual(change['fullDocument'], {'_id': 2})
634+
635+
self.assertIsNone(change_stream.try_next())
636+
self.kill_change_stream_cursor(change_stream)
637+
638+
self.coll.insert_one({'_id': 3})
639+
change = change_stream.next()
640+
self.assertEqual(change['operationType'], 'insert')
641+
self.assertEqual(change['fullDocument'], {'_id': 3})
642+
643+
@client_context.require_no_mongos # Remove after SERVER-41196
644+
@client_context.require_version_min(4, 1, 1)
645+
def test_start_after_resume_process_without_changes(self):
646+
resume_token = self.invalidate_resume_token()
647+
648+
with self.coll.watch(start_after=resume_token,
649+
max_await_time_ms=250) as change_stream:
650+
self.assertIsNone(change_stream.try_next())
651+
self.kill_change_stream_cursor(change_stream)
652+
653+
self.coll.insert_one({'_id': 2})
654+
change = change_stream.next()
655+
self.assertEqual(change['operationType'], 'insert')
656+
self.assertEqual(change['fullDocument'], {'_id': 2})
657+
600658

601659
class TestAllScenarios(unittest.TestCase):
602660

0 commit comments

Comments
 (0)