@@ -267,7 +267,7 @@ async def test_batch_size_is_honored(self):
267
267
268
268
# $changeStream.startAtOperationTime was added in 4.0.0.
269
269
@no_type_check
270
- @async_client_context .require_version_min (4 , 0 , 0 )
270
+ @async_client_context .require_version_min (4 , 2 , 0 )
271
271
async def test_start_at_operation_time (self ):
272
272
optime = await self .get_start_at_operation_time ()
273
273
@@ -436,7 +436,7 @@ async def test_change_operations(self):
436
436
await self ._test_get_invalidate_event (change_stream )
437
437
438
438
@no_type_check
439
- @async_client_context .require_version_min (4 , 1 , 1 )
439
+ @async_client_context .require_version_min (4 , 2 , 0 )
440
440
async def test_start_after (self ):
441
441
resume_token = await self .get_resume_token (invalidate = True )
442
442
@@ -452,7 +452,7 @@ async def test_start_after(self):
452
452
self .assertEqual (change ["fullDocument" ], {"_id" : 2 })
453
453
454
454
@no_type_check
455
- @async_client_context .require_version_min (4 , 1 , 1 )
455
+ @async_client_context .require_version_min (4 , 2 , 0 )
456
456
async def test_start_after_resume_process_with_changes (self ):
457
457
resume_token = await self .get_resume_token (invalidate = True )
458
458
@@ -563,27 +563,16 @@ async def _test_update_resume_token(self, expected_rt_getter):
563
563
)
564
564
565
565
# Prose test no. 1
566
- @async_client_context .require_version_min (4 , 0 , 7 )
566
+ @async_client_context .require_version_min (4 , 2 , 0 )
567
567
async def test_update_resume_token (self ):
568
568
await self ._test_update_resume_token (self ._get_expected_resume_token )
569
569
570
- # Prose test no. 1
571
- @async_client_context .require_version_max (4 , 0 , 7 )
572
- async def test_update_resume_token_legacy (self ):
573
- await self ._test_update_resume_token (self ._get_expected_resume_token_legacy )
574
-
575
570
# Prose test no. 2
576
- @async_client_context .require_version_min (4 , 1 , 8 )
571
+ @async_client_context .require_version_min (4 , 2 , 0 )
577
572
async def test_raises_error_on_missing_id_418plus (self ):
578
573
# Server returns an error on 4.1.8+
579
574
await self ._test_raises_error_on_missing_id (OperationFailure )
580
575
581
- # Prose test no. 2
582
- @async_client_context .require_version_max (4 , 1 , 8 )
583
- async def test_raises_error_on_missing_id_418minus (self ):
584
- # PyMongo raises an error
585
- await self ._test_raises_error_on_missing_id (InvalidOperation )
586
-
587
576
# Prose test no. 3
588
577
@no_type_check
589
578
async def test_resume_on_error (self ):
@@ -642,40 +631,12 @@ def raise_error():
642
631
cursor .close = raise_error
643
632
await self .insert_one_and_check (change_stream , {"_id" : 2 })
644
633
645
- # Prose test no. 9
646
- @no_type_check
647
- @async_client_context .require_version_min (4 , 0 , 0 )
648
- @async_client_context .require_version_max (4 , 0 , 7 )
649
- async def test_start_at_operation_time_caching (self ):
650
- # Case 1: change stream not started with startAtOperationTime
651
- client , listener = self .client_with_listener ("aggregate" )
652
- async with await self .change_stream_with_client (client ) as cs :
653
- await self .kill_change_stream_cursor (cs )
654
- await cs .try_next ()
655
- cmd = listener .started_events [- 1 ].command
656
- self .assertIsNotNone (cmd ["pipeline" ][0 ]["$changeStream" ].get ("startAtOperationTime" ))
657
-
658
- # Case 2: change stream started with startAtOperationTime
659
- listener .reset ()
660
- optime = await self .get_start_at_operation_time ()
661
- async with await self .change_stream_with_client (
662
- client , start_at_operation_time = optime
663
- ) as cs :
664
- await self .kill_change_stream_cursor (cs )
665
- await cs .try_next ()
666
- cmd = listener .started_events [- 1 ].command
667
- self .assertEqual (
668
- cmd ["pipeline" ][0 ]["$changeStream" ].get ("startAtOperationTime" ),
669
- optime ,
670
- str ([k .command for k in listener .started_events ]),
671
- )
672
-
673
634
# Prose test no. 10 - SKIPPED
674
635
# This test is identical to prose test no. 3.
675
636
676
637
# Prose test no. 11
677
638
@no_type_check
678
- @async_client_context .require_version_min (4 , 0 , 7 )
639
+ @async_client_context .require_version_min (4 , 2 , 0 )
679
640
async def test_resumetoken_empty_batch (self ):
680
641
client , listener = await self ._client_with_listener ("getMore" )
681
642
async with await self .change_stream_with_client (client ) as change_stream :
@@ -687,7 +648,7 @@ async def test_resumetoken_empty_batch(self):
687
648
688
649
# Prose test no. 11
689
650
@no_type_check
690
- @async_client_context .require_version_min (4 , 0 , 7 )
651
+ @async_client_context .require_version_min (4 , 2 , 0 )
691
652
async def test_resumetoken_exhausted_batch (self ):
692
653
client , listener = await self ._client_with_listener ("getMore" )
693
654
async with await self .change_stream_with_client (client ) as change_stream :
@@ -697,38 +658,6 @@ async def test_resumetoken_exhausted_batch(self):
697
658
response = listener .succeeded_events [- 1 ].reply
698
659
self .assertEqual (resume_token , response ["cursor" ]["postBatchResumeToken" ])
699
660
700
- # Prose test no. 12
701
- @no_type_check
702
- @async_client_context .require_version_max (4 , 0 , 7 )
703
- async def test_resumetoken_empty_batch_legacy (self ):
704
- resume_point = await self .get_resume_token ()
705
-
706
- # Empty resume token when neither resumeAfter or startAfter specified.
707
- async with await self .change_stream () as change_stream :
708
- await change_stream .try_next ()
709
- self .assertIsNone (change_stream .resume_token )
710
-
711
- # Resume token value is same as resumeAfter.
712
- async with await self .change_stream (resume_after = resume_point ) as change_stream :
713
- await change_stream .try_next ()
714
- resume_token = change_stream .resume_token
715
- self .assertEqual (resume_token , resume_point )
716
-
717
- # Prose test no. 12
718
- @no_type_check
719
- @async_client_context .require_version_max (4 , 0 , 7 )
720
- async def test_resumetoken_exhausted_batch_legacy (self ):
721
- # Resume token is _id of last change.
722
- async with await self .change_stream () as change_stream :
723
- change = await self ._populate_and_exhaust_change_stream (change_stream )
724
- self .assertEqual (change_stream .resume_token , change ["_id" ])
725
- resume_point = change ["_id" ]
726
-
727
- # Resume token is _id of last change even if resumeAfter is specified.
728
- async with await self .change_stream (resume_after = resume_point ) as change_stream :
729
- change = await self ._populate_and_exhaust_change_stream (change_stream )
730
- self .assertEqual (change_stream .resume_token , change ["_id" ])
731
-
732
661
# Prose test no. 13
733
662
@no_type_check
734
663
async def test_resumetoken_partially_iterated_batch (self ):
@@ -770,13 +699,13 @@ async def test_resumetoken_uniterated_nonempty_batch_resumeafter(self):
770
699
# Prose test no. 14
771
700
@no_type_check
772
701
@async_client_context .require_no_mongos
773
- @async_client_context .require_version_min (4 , 1 , 1 )
702
+ @async_client_context .require_version_min (4 , 2 , 0 )
774
703
async def test_resumetoken_uniterated_nonempty_batch_startafter (self ):
775
704
await self ._test_resumetoken_uniterated_nonempty_batch ("start_after" )
776
705
777
706
# Prose test no. 17
778
707
@no_type_check
779
- @async_client_context .require_version_min (4 , 1 , 1 )
708
+ @async_client_context .require_version_min (4 , 2 , 0 )
780
709
async def test_startafter_resume_uses_startafter_after_empty_getMore (self ):
781
710
# Resume should use startAfter after no changes have been returned.
782
711
resume_point = await self .get_resume_token ()
@@ -796,7 +725,7 @@ async def test_startafter_resume_uses_startafter_after_empty_getMore(self):
796
725
797
726
# Prose test no. 18
798
727
@no_type_check
799
- @async_client_context .require_version_min (4 , 1 , 1 )
728
+ @async_client_context .require_version_min (4 , 2 , 0 )
800
729
async def test_startafter_resume_uses_resumeafter_after_nonempty_getMore (self ):
801
730
# Resume should use resumeAfter after some changes have been returned.
802
731
resume_point = await self .get_resume_token ()
@@ -843,7 +772,7 @@ async def test_split_large_change(self):
843
772
class TestClusterAsyncChangeStream (TestAsyncChangeStreamBase , APITestsMixin ):
844
773
dbs : list
845
774
846
- @async_client_context .require_version_min (4 , 0 , 0 , - 1 )
775
+ @async_client_context .require_version_min (4 , 2 , 0 )
847
776
@async_client_context .require_change_streams
848
777
async def asyncSetUp (self ) -> None :
849
778
await super ().asyncSetUp ()
@@ -903,7 +832,7 @@ async def test_full_pipeline(self):
903
832
904
833
905
834
class TestAsyncDatabaseAsyncChangeStream (TestAsyncChangeStreamBase , APITestsMixin ):
906
- @async_client_context .require_version_min (4 , 0 , 0 , - 1 )
835
+ @async_client_context .require_version_min (4 , 2 , 0 )
907
836
@async_client_context .require_change_streams
908
837
async def asyncSetUp (self ) -> None :
909
838
await super ().asyncSetUp ()
0 commit comments