@@ -620,7 +620,12 @@ public function provideNonResumableErrorCodes()
620
620
];
621
621
}
622
622
623
- public function testNextResumeTokenNotFound ()
623
+ /**
624
+ * Prose test: "ChangeStream will throw an exception if the server response
625
+ * is missing the resume token (if wire version is < 8, this is a driver-
626
+ * side error; for 8+, this is a server-side error)"
627
+ */
628
+ public function testResumeTokenNotFoundClientSideError ()
624
629
{
625
630
if (version_compare ($ this ->getServerVersion (), '4.1.8 ' , '>= ' )) {
626
631
$ this ->markTestSkipped ('Server rejects change streams that modify resume token (SERVER-37786) ' );
@@ -631,16 +636,47 @@ public function testNextResumeTokenNotFound()
631
636
$ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), $ pipeline , $ this ->defaultOptions );
632
637
$ changeStream = $ operation ->execute ($ this ->getPrimaryServer ());
633
638
634
- /* Note: we intentionally do not start iteration with rewind() to ensure
635
- * that we test extraction functionality within next(). */
639
+ $ changeStream ->rewind ();
640
+
641
+ /* Insert two documents to ensure the client does not ignore the first
642
+ * document's resume token in favor of a postBatchResumeToken */
636
643
$ this ->insertDocument (['x ' => 1 ]);
644
+ $ this ->insertDocument (['x ' => 2 ]);
637
645
638
646
$ this ->expectException (ResumeTokenException::class);
639
647
$ this ->expectExceptionMessage ('Resume token not found in change document ' );
640
648
$ changeStream ->next ();
641
649
}
642
650
643
- public function testNextResumeTokenInvalidType ()
651
+ /**
652
+ * Prose test: "ChangeStream will throw an exception if the server response
653
+ * is missing the resume token (if wire version is < 8, this is a driver-
654
+ * side error; for 8+, this is a server-side error)"
655
+ */
656
+ public function testResumeTokenNotFoundServerSideError ()
657
+ {
658
+ if (version_compare ($ this ->getServerVersion (), '4.1.8 ' , '< ' )) {
659
+ $ this ->markTestSkipped ('Server does not reject change streams that modify resume token ' );
660
+ }
661
+
662
+ $ pipeline = [['$project ' => ['_id ' => 0 ]]];
663
+
664
+ $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), $ pipeline , $ this ->defaultOptions );
665
+ $ changeStream = $ operation ->execute ($ this ->getPrimaryServer ());
666
+
667
+ $ changeStream ->rewind ();
668
+ $ this ->insertDocument (['x ' => 1 ]);
669
+
670
+ $ this ->expectException (ServerException::class);
671
+ $ changeStream ->next ();
672
+ }
673
+
674
+ /**
675
+ * Prose test: "ChangeStream will throw an exception if the server response
676
+ * is missing the resume token (if wire version is < 8, this is a driver-
677
+ * side error; for 8+, this is a server-side error)"
678
+ */
679
+ public function testResumeTokenInvalidTypeClientSideError ()
644
680
{
645
681
if (version_compare ($ this ->getServerVersion (), '4.1.8 ' , '>= ' )) {
646
682
$ this ->markTestSkipped ('Server rejects change streams that modify resume token (SERVER-37786) ' );
@@ -651,15 +687,41 @@ public function testNextResumeTokenInvalidType()
651
687
$ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), $ pipeline , $ this ->defaultOptions );
652
688
$ changeStream = $ operation ->execute ($ this ->getPrimaryServer ());
653
689
654
- /* Note: we intentionally do not start iteration with rewind() to ensure
655
- * that we test extraction functionality within next(). */
690
+ $ changeStream ->rewind ();
691
+
692
+ /* Insert two documents to ensure the client does not ignore the first
693
+ * document's resume token in favor of a postBatchResumeToken */
656
694
$ this ->insertDocument (['x ' => 1 ]);
695
+ $ this ->insertDocument (['x ' => 2 ]);
657
696
658
697
$ this ->expectException (ResumeTokenException::class);
659
698
$ this ->expectExceptionMessage ('Expected resume token to have type "array or object" but found "string" ' );
660
699
$ changeStream ->next ();
661
700
}
662
701
702
+ /**
703
+ * Prose test: "ChangeStream will throw an exception if the server response
704
+ * is missing the resume token (if wire version is < 8, this is a driver-
705
+ * side error; for 8+, this is a server-side error)"
706
+ */
707
+ public function testResumeTokenInvalidTypeServerSideError ()
708
+ {
709
+ if (version_compare ($ this ->getServerVersion (), '4.1.8 ' , '< ' )) {
710
+ $ this ->markTestSkipped ('Server does not reject change streams that modify resume token ' );
711
+ }
712
+
713
+ $ pipeline = [['$project ' => ['_id ' => ['$literal ' => 'foo ' ]]]];
714
+
715
+ $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), $ pipeline , $ this ->defaultOptions );
716
+ $ changeStream = $ operation ->execute ($ this ->getPrimaryServer ());
717
+
718
+ $ changeStream ->rewind ();
719
+ $ this ->insertDocument (['x ' => 1 ]);
720
+
721
+ $ this ->expectException (ServerException::class);
722
+ $ changeStream ->next ();
723
+ }
724
+
663
725
public function testMaxAwaitTimeMS ()
664
726
{
665
727
/* On average, an acknowledged write takes about 20 ms to appear in a
@@ -908,10 +970,6 @@ public function testNextAdvancesKey()
908
970
909
971
public function testResumeTokenNotFoundDoesNotAdvanceKey ()
910
972
{
911
- if (version_compare ($ this ->getServerVersion (), '4.1.8 ' , '>= ' )) {
912
- $ this ->markTestSkipped ('Server rejects change streams that modify resume token (SERVER-37786) ' );
913
- }
914
-
915
973
$ pipeline = [['$project ' => ['_id ' => 0 ]]];
916
974
917
975
$ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), $ pipeline , $ this ->defaultOptions );
@@ -923,20 +981,40 @@ public function testResumeTokenNotFoundDoesNotAdvanceKey()
923
981
924
982
$ changeStream ->rewind ();
925
983
$ this ->assertFalse ($ changeStream ->valid ());
984
+ $ this ->assertNull ($ changeStream ->key ());
926
985
927
986
try {
928
987
$ changeStream ->next ();
929
- $ this ->fail ('ResumeTokenException was not thrown ' );
930
- } catch (ResumeTokenException $ e ) {}
988
+ $ this ->fail ('Exception for missing resume token was not thrown ' );
989
+ } catch (ResumeTokenException $ e ) {
990
+ /* If a client-side error is thrown (server < 4.1.8), the tailable
991
+ * cursor's position is still valid. This may change once PHPLIB-456
992
+ * is implemented. */
993
+ $ expectedValid = true ;
994
+ $ expectedKey = 0 ;
995
+ } catch (ServerException $ e ) {
996
+ /* If a server-side error is thrown (server >= 4.1.8), the tailable
997
+ * cursor's position is not valid. */
998
+ $ expectedValid = false ;
999
+ $ expectedKey = null ;
1000
+ }
931
1001
932
- $ this ->assertSame (0 , $ changeStream ->key ());
1002
+ $ this ->assertSame ($ expectedValid , $ changeStream ->valid ());
1003
+ $ this ->assertSame ($ expectedKey , $ changeStream ->key ());
933
1004
934
1005
try {
935
1006
$ changeStream ->next ();
936
- $ this ->fail ('ResumeTokenException was not thrown ' );
937
- } catch (ResumeTokenException $ e ) {}
1007
+ $ this ->fail ('Exception for missing resume token was not thrown ' );
1008
+ } catch (ResumeTokenException $ e ) {
1009
+ $ expectedValid = true ;
1010
+ $ expectedKey = 0 ;
1011
+ } catch (ServerException $ e ) {
1012
+ $ expectedValid = false ;
1013
+ $ expectedKey = null ;
1014
+ }
938
1015
939
- $ this ->assertSame (0 , $ changeStream ->key ());
1016
+ $ this ->assertSame ($ expectedValid , $ changeStream ->valid ());
1017
+ $ this ->assertSame ($ expectedKey , $ changeStream ->key ());
940
1018
}
941
1019
942
1020
public function testSessionPersistsAfterResume ()
0 commit comments