Skip to content

Commit 92ddc09

Browse files
committed
PYTHON-1662 Add ChangeStream.try_next API
1 parent 11967eb commit 92ddc09

File tree

5 files changed

+226
-62
lines changed

5 files changed

+226
-62
lines changed

doc/api/pymongo/change_stream.rst

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
:mod:`change_stream` -- Watch changes on a collection
2-
=====================================================
1+
:mod:`change_stream` -- Watch changes on a collection, database, or cluster
2+
===========================================================================
33

44
.. automodule:: pymongo.change_stream
55
:members:

doc/changelog.rst

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,9 @@ Changes in Version 3.8.0.dev0
8484
string.
8585
- Add the ``filter`` parameter to
8686
:meth:`~pymongo.database.Database.list_collection_names`.
87+
- Changes can now be requested from a ``ChangeStream`` cursor without blocking
88+
indefinitely using the new
89+
:meth:`pymongo.change_stream.ChangeStream.try_next` method.
8790

8891
Issues Resolved
8992
...............

pymongo/change_stream.py

Lines changed: 98 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
# implied. See the License for the specific language governing
1313
# permissions and limitations under the License.
1414

15-
"""ChangeStream cursor to iterate over changes on a collection."""
15+
"""Watch changes on a collection, a database, or the entire cluster."""
1616

1717
import copy
1818

@@ -41,14 +41,12 @@ class ChangeStream(object):
4141
"""The internal abstract base class for change stream cursors.
4242
4343
Should not be called directly by application developers. Use
44-
:meth:pymongo.collection.Collection.watch,
45-
:meth:pymongo.database.Database.watch, or
46-
:meth:pymongo.mongo_client.MongoClient.watch instead.
47-
48-
Defines the interface for change streams. Should be subclassed to
49-
implement the `ChangeStream._create_cursor` abstract method, and
50-
the `ChangeStream._database`and ChangeStream._aggregation_target`
51-
abstract properties.
44+
:meth:`pymongo.collection.Collection.watch`,
45+
:meth:`pymongo.database.Database.watch`, or
46+
:meth:`pymongo.mongo_client.MongoClient.watch` instead.
47+
48+
.. versionadded:: 3.6
49+
.. mongodoc:: changeStreams
5250
"""
5351
def __init__(self, target, pipeline, full_document, resume_after,
5452
max_await_time_ms, batch_size, collation,
@@ -176,34 +174,97 @@ def next(self):
176174
"""Advance the cursor.
177175
178176
This method blocks until the next change document is returned or an
179-
unrecoverable error is raised.
177+
unrecoverable error is raised. This method is used when iterating over
178+
all changes in the cursor. For example::
179+
180+
try:
181+
with db.collection.watch(
182+
[{'$match': {'operationType': 'insert'}}]) as stream:
183+
for insert_change in stream:
184+
print(insert_change)
185+
except pymongo.errors.PyMongoError:
186+
# The ChangeStream encountered an unrecoverable error or the
187+
# resume attempt failed to recreate the cursor.
188+
logging.error('...')
180189
181190
Raises :exc:`StopIteration` if this ChangeStream is closed.
182191
"""
183-
while True:
184-
try:
185-
change = self._cursor.next()
186-
except ConnectionFailure:
187-
self._resume()
188-
continue
189-
except OperationFailure as exc:
190-
if exc.code in _NON_RESUMABLE_GETMORE_ERRORS:
191-
raise
192-
self._resume()
193-
continue
194-
try:
195-
resume_token = change['_id']
196-
except KeyError:
197-
self.close()
198-
raise InvalidOperation(
199-
"Cannot provide resume functionality when the resume "
200-
"token is missing.")
201-
self._resume_token = copy.copy(resume_token)
202-
self._start_at_operation_time = None
203-
return change
192+
while self.alive:
193+
doc = self.try_next()
194+
if doc is not None:
195+
return doc
196+
197+
raise StopIteration
204198

205199
__next__ = next
206200

201+
@property
202+
def alive(self):
203+
"""Does this cursor have the potential to return more data?
204+
205+
.. note:: Even if :attr:`alive` is ``True``, :meth:`next` can raise
206+
:exc:`StopIteration` and :meth:`try_next` can return ``None``.
207+
208+
.. versionadded:: 3.8
209+
"""
210+
return self._cursor.alive
211+
212+
def try_next(self):
213+
"""Advance the cursor without blocking indefinitely.
214+
215+
This method returns the next change document without waiting
216+
indefinitely for the next change. For example::
217+
218+
with db.collection.watch() as stream:
219+
while stream.alive:
220+
change = stream.try_next()
221+
if change is not None:
222+
print(change)
223+
elif stream.alive:
224+
# We end up here when there are no recent changes.
225+
# Sleep for a while to avoid flooding the server with
226+
# getMore requests when no changes are available.
227+
time.sleep(10)
228+
229+
If no change document is cached locally then this method runs a single
230+
getMore command. If the getMore yields any documents, the next
231+
document is returned, otherwise, if the getMore returns no documents
232+
(because there have been no changes) then ``None`` is returned.
233+
234+
:Returns:
235+
The next change document or ``None`` when no document is available
236+
after running a single getMore or when the cursor is closed.
237+
238+
.. versionadded:: 3.8
239+
"""
240+
# Attempt to get the next change with at most one getMore and at most
241+
# one resume attempt.
242+
try:
243+
change = self._cursor._try_next(True)
244+
except ConnectionFailure:
245+
self._resume()
246+
change = self._cursor._try_next(False)
247+
except OperationFailure as exc:
248+
if exc.code in _NON_RESUMABLE_GETMORE_ERRORS:
249+
raise
250+
self._resume()
251+
change = self._cursor._try_next(False)
252+
253+
# No changes are available.
254+
if change is None:
255+
return None
256+
257+
try:
258+
resume_token = change['_id']
259+
except KeyError:
260+
self.close()
261+
raise InvalidOperation(
262+
"Cannot provide resume functionality when the resume "
263+
"token is missing.")
264+
self._resume_token = copy.copy(resume_token)
265+
self._start_at_operation_time = None
266+
return change
267+
207268
def __enter__(self):
208269
return self
209270

@@ -212,13 +273,12 @@ def __exit__(self, exc_type, exc_val, exc_tb):
212273

213274

214275
class CollectionChangeStream(ChangeStream):
215-
"""Class for creating a change stream on a collection.
276+
"""A change stream that watches changes on a single collection.
216277
217278
Should not be called directly by application developers. Use
218279
helper method :meth:`pymongo.collection.Collection.watch` instead.
219280
220-
.. versionadded: 3.6
221-
.. mongodoc:: changeStreams
281+
.. versionadded:: 3.7
222282
"""
223283
@property
224284
def _aggregation_target(self):
@@ -230,13 +290,12 @@ def _database(self):
230290

231291

232292
class DatabaseChangeStream(ChangeStream):
233-
"""Class for creating a change stream on all collections in a database.
293+
"""A change stream that watches changes on all collections in a database.
234294
235295
Should not be called directly by application developers. Use
236296
helper method :meth:`pymongo.database.Database.watch` instead.
237297
238-
.. versionadded: 3.7
239-
.. mongodoc:: changeStreams
298+
.. versionadded:: 3.7
240299
"""
241300
@property
242301
def _aggregation_target(self):
@@ -248,13 +307,12 @@ def _database(self):
248307

249308

250309
class ClusterChangeStream(DatabaseChangeStream):
251-
"""Class for creating a change stream on all collections on a cluster.
310+
"""A change stream that watches changes on all collections in the cluster.
252311
253312
Should not be called directly by application developers. Use
254313
helper method :meth:`pymongo.mongo_client.MongoClient.watch` instead.
255314
256-
.. versionadded: 3.7
257-
.. mongodoc:: changeStreams
315+
.. versionadded:: 3.7
258316
"""
259317
def _pipeline_options(self):
260318
options = super(ClusterChangeStream, self)._pipeline_options()

pymongo/command_cursor.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -285,15 +285,24 @@ def __iter__(self):
285285
def next(self):
286286
"""Advance the cursor."""
287287
# Block until a document is returnable.
288-
while not len(self.__data) and not self.__killed:
288+
while self.alive:
289+
doc = self._try_next(True)
290+
if doc is not None:
291+
return doc
292+
293+
raise StopIteration
294+
295+
__next__ = next
296+
297+
def _try_next(self, get_more_allowed):
298+
"""Advance the cursor blocking for at most one getMore command."""
299+
if not len(self.__data) and not self.__killed and get_more_allowed:
289300
self._refresh()
290301
if len(self.__data):
291302
coll = self.__collection
292303
return coll.database._fix_outgoing(self.__data.popleft(), coll)
293304
else:
294-
raise StopIteration
295-
296-
__next__ = next
305+
return None
297306

298307
def __enter__(self):
299308
return self

0 commit comments

Comments
 (0)