@@ -91,9 +91,9 @@ class AsyncChangeStream(Generic[_DocumentType]):
91
91
"""The internal abstract base class for change stream cursors.
92
92
93
93
Should not be called directly by application developers. Use
94
- :meth:`pymongo.collection.AsyncCollection.watch`,
95
- :meth:`pymongo.database.AsyncDatabase.watch`, or
96
- :meth:`pymongo.mongo_client.AsyncMongoClient.watch` instead.
94
+ :meth:`pymongo.asynchronous. collection.AsyncCollection.watch`,
95
+ :meth:`pymongo.asynchronous. database.AsyncDatabase.watch`, or
96
+ :meth:`pymongo.asynchronous. mongo_client.AsyncMongoClient.watch` instead.
97
97
98
98
.. versionadded:: 3.6
99
99
.. seealso:: The MongoDB documentation on `changeStreams <https://mongodb.com/docs/manual/changeStreams/>`_.
@@ -166,7 +166,7 @@ def _aggregation_command_class(self) -> Type[_AggregationCommand]:
166
166
@property
167
167
def _client (self ) -> AsyncMongoClient :
168
168
"""The client against which the aggregation commands for
169
- this ChangeStream will be run.
169
+ this AsyncChangeStream will be run.
170
170
"""
171
171
raise NotImplementedError
172
172
@@ -204,7 +204,7 @@ def _command_options(self) -> dict[str, Any]:
204
204
return options
205
205
206
206
def _aggregation_pipeline (self ) -> list [dict [str , Any ]]:
207
- """Return the full aggregation pipeline for this ChangeStream ."""
207
+ """Return the full aggregation pipeline for this AsyncChangeStream ."""
208
208
options = self ._change_stream_options ()
209
209
full_pipeline : list = [{"$changeStream" : options }]
210
210
full_pipeline .extend (self ._pipeline )
@@ -238,7 +238,7 @@ def _process_result(self, result: Mapping[str, Any], conn: AsyncConnection) -> N
238
238
async def _run_aggregation_cmd (
239
239
self , session : Optional [AsyncClientSession ], explicit_session : bool
240
240
) -> AsyncCommandCursor :
241
- """Run the full aggregation pipeline for this ChangeStream and return
241
+ """Run the full aggregation pipeline for this AsyncChangeStream and return
242
242
the corresponding AsyncCommandCursor.
243
243
"""
244
244
cmd = self ._aggregation_command_class (
@@ -272,7 +272,7 @@ async def _resume(self) -> None:
272
272
self ._cursor = await self ._create_cursor ()
273
273
274
274
async def close (self ) -> None :
275
- """Close this ChangeStream ."""
275
+ """Close this AsyncChangeStream ."""
276
276
self ._closed = True
277
277
await self ._cursor .close ()
278
278
@@ -299,27 +299,27 @@ async def next(self) -> _DocumentType:
299
299
try:
300
300
resume_token = None
301
301
pipeline = [{'$match': {'operationType': 'insert'}}]
302
- async with db.collection.watch(pipeline) as stream:
302
+ async with await db.collection.watch(pipeline) as stream:
303
303
async for insert_change in stream:
304
304
print(insert_change)
305
305
resume_token = stream.resume_token
306
306
except pymongo.errors.PyMongoError:
307
- # The ChangeStream encountered an unrecoverable error or the
307
+ # The AsyncChangeStream encountered an unrecoverable error or the
308
308
# resume attempt failed to recreate the cursor.
309
309
if resume_token is None:
310
310
# There is no usable resume token because there was a
311
- # failure during ChangeStream initialization.
311
+ # failure during AsyncChangeStream initialization.
312
312
logging.error('...')
313
313
else:
314
- # Use the interrupted ChangeStream 's resume token to create
315
- # a new ChangeStream . The new stream will continue from the
314
+ # Use the interrupted AsyncChangeStream 's resume token to create
315
+ # a new AsyncChangeStream . The new stream will continue from the
316
316
# last seen insert change without missing any events.
317
- async with db.collection.watch(
317
+ async with await db.collection.watch(
318
318
pipeline, resume_after=resume_token) as stream:
319
319
async for insert_change in stream:
320
320
print(insert_change)
321
321
322
- Raises :exc:`StopIteration` if this ChangeStream is closed.
322
+ Raises :exc:`StopIteration` if this AsyncChangeStream is closed.
323
323
"""
324
324
while self .alive :
325
325
doc = await self .try_next ()
@@ -348,10 +348,10 @@ async def try_next(self) -> Optional[_DocumentType]:
348
348
This method returns the next change document without waiting
349
349
indefinitely for the next change. For example::
350
350
351
- async with db.collection.watch() as stream:
351
+ async with await db.collection.watch() as stream:
352
352
while stream.alive:
353
353
change = await stream.try_next()
354
- # Note that the ChangeStream 's resume token may be updated
354
+ # Note that the AsyncChangeStream 's resume token may be updated
355
355
# even when no changes are returned.
356
356
print("Current resume token: %r" % (stream.resume_token,))
357
357
if change is not None:
@@ -447,7 +447,7 @@ class AsyncCollectionChangeStream(AsyncChangeStream[_DocumentType]):
447
447
"""A change stream that watches changes on a single collection.
448
448
449
449
Should not be called directly by application developers. Use
450
- helper method :meth:`pymongo.collection.AsyncCollection.watch` instead.
450
+ helper method :meth:`pymongo.asynchronous. collection.AsyncCollection.watch` instead.
451
451
452
452
.. versionadded:: 3.7
453
453
"""
@@ -467,7 +467,7 @@ class AsyncDatabaseChangeStream(AsyncChangeStream[_DocumentType]):
467
467
"""A change stream that watches changes on all collections in a database.
468
468
469
469
Should not be called directly by application developers. Use
470
- helper method :meth:`pymongo.database.AsyncDatabase.watch` instead.
470
+ helper method :meth:`pymongo.asynchronous. database.AsyncDatabase.watch` instead.
471
471
472
472
.. versionadded:: 3.7
473
473
"""
@@ -487,7 +487,7 @@ class AsyncClusterChangeStream(AsyncDatabaseChangeStream[_DocumentType]):
487
487
"""A change stream that watches changes on all collections in the cluster.
488
488
489
489
Should not be called directly by application developers. Use
490
- helper method :meth:`pymongo.mongo_client.AsyncMongoClient.watch` instead.
490
+ helper method :meth:`pymongo.asynchronous. mongo_client.AsyncMongoClient.watch` instead.
491
491
492
492
.. versionadded:: 3.7
493
493
"""
0 commit comments