@@ -198,14 +198,27 @@ def next(self):
198
198
all changes in the cursor. For example::
199
199
200
200
try:
201
- with db.collection.watch(
202
- [{'$match': {'operationType': 'insert'}}]) as stream:
201
+ resume_token = None
202
+ pipeline = [{'$match': {'operationType': 'insert'}}]
203
+ with db.collection.watch(pipeline) as stream:
203
204
for insert_change in stream:
204
205
print(insert_change)
206
+ resume_token = stream.resume_token
205
207
except pymongo.errors.PyMongoError:
206
208
# The ChangeStream encountered an unrecoverable error or the
207
209
# resume attempt failed to recreate the cursor.
208
- logging.error('...')
210
+ if resume_token is None:
211
+ # There is no usable resume token because there was a
212
+ # failure during ChangeStream initialization.
213
+ logging.error('...')
214
+ else:
215
+ # Use the interrupted ChangeStream's resume token to create
216
+ # a new ChangeStream. The new stream will continue from the
217
+ # last seen insert change without missing any events.
218
+ with db.collection.watch(
219
+ pipeline, resume_after=resume_token) as stream:
220
+ for insert_change in stream:
221
+ print(insert_change)
209
222
210
223
Raises :exc:`StopIteration` if this ChangeStream is closed.
211
224
"""
@@ -238,13 +251,17 @@ def try_next(self):
238
251
with db.collection.watch() as stream:
239
252
while stream.alive:
240
253
change = stream.try_next()
254
+ # Note that the ChangeStream's resume token may be updated
255
+ # even when no changes are returned.
256
+ print("Current resume token: %r" % (stream.resume_token,))
241
257
if change is not None:
242
- print(change)
243
- elif stream.alive:
244
- # We end up here when there are no recent changes.
245
- # Sleep for a while to avoid flooding the server with
246
- # getMore requests when no changes are available.
247
- time.sleep(10)
258
+ print("Change document: %r" % (change,))
259
+ continue
260
+ # We end up here when there are no recent changes.
261
+ # Sleep for a while before trying again to avoid flooding
262
+ # the server with getMore requests when no changes are
263
+ # available.
264
+ time.sleep(10)
248
265
249
266
If no change document is cached locally then this method runs a single
250
267
getMore command. If the getMore yields any documents, the next
0 commit comments