@@ -780,115 +780,24 @@ describe('Change Streams', function () {
780
780
} ) ;
781
781
} ) ;
782
782
783
- describe ( 'should properly handle a changeStream event being processed mid-close' , function ( ) {
784
- let client , coll , changeStream ;
785
-
786
- function write ( ) {
787
- return Promise . resolve ( )
788
- . then ( ( ) => coll . insertOne ( { a : 1 } ) )
789
- . then ( ( ) => coll . insertOne ( { b : 2 } ) ) ;
790
- }
791
-
792
- function lastWrite ( ) {
793
- return coll . insertOne ( { c : 3 } ) ;
794
- }
795
-
796
- beforeEach ( function ( ) {
797
- client = this . configuration . newClient ( ) ;
798
- return client . connect ( ) . then ( _client => {
799
- client = _client ;
800
- coll = client . db ( this . configuration . db ) . collection ( 'tester' ) ;
801
- changeStream = coll . watch ( ) ;
802
- } ) ;
803
- } ) ;
804
-
805
- afterEach ( async function ( ) {
806
- await changeStream ?. close ( ) ;
807
- await client ?. close ( ) ;
808
- coll = undefined ;
809
- changeStream = undefined ;
810
- client = undefined ;
811
- } ) ;
812
-
813
- it ( 'when invoked with promises' , {
814
- metadata : { requires : { topology : 'replicaset' } } ,
815
- test : function ( ) {
816
- const read = ( ) => {
817
- return Promise . resolve ( )
818
- . then ( ( ) => changeStream . next ( ) )
819
- . then ( ( ) => changeStream . next ( ) )
820
- . then ( ( ) => {
821
- this . defer ( lastWrite ( ) ) ;
822
- const nextP = changeStream . next ( ) ;
823
- return changeStream . close ( ) . then ( ( ) => nextP ) ;
824
- } ) ;
825
- } ;
826
-
827
- return Promise . all ( [ read ( ) , write ( ) ] ) . then (
828
- ( ) => Promise . reject ( new Error ( 'Expected operation to fail with error' ) ) ,
829
- err => expect ( err . message ) . to . equal ( 'ChangeStream is closed' )
830
- ) ;
831
- }
832
- } ) ;
833
-
834
- it ( 'when invoked with callbacks' , {
835
- metadata : { requires : { topology : 'replicaset' } } ,
836
- test : function ( done ) {
837
- const ops = [ ] ;
838
- changeStream . next ( ( ) => {
839
- changeStream . next ( ( ) => {
840
- ops . push ( lastWrite ( ) ) ;
841
-
842
- // explicitly close the change stream after the write has begun
843
- ops . push ( changeStream . close ( ) ) ;
844
-
845
- changeStream . next ( err => {
846
- try {
847
- expect ( err )
848
- . property ( 'message' )
849
- . to . match ( / C h a n g e S t r e a m i s c l o s e d / ) ;
850
- Promise . all ( ops ) . then ( ( ) => done ( ) , done ) ;
851
- } catch ( e ) {
852
- done ( e ) ;
853
- }
854
- } ) ;
855
- } ) ;
856
- } ) ;
857
-
858
- ops . push (
859
- write ( ) . catch ( ( ) => {
860
- // ignore
861
- } )
862
- ) ;
863
- }
864
- } ) ;
865
-
866
- it . skip ( 'when invoked using eventEmitter API' , {
867
- metadata : {
868
- requires : { topology : 'replicaset' }
869
- } ,
870
- async test ( ) {
871
- const changes = on ( changeStream , 'change' ) ;
872
- await once ( changeStream . cursor , 'init' ) ;
873
-
874
- await write ( ) ;
875
- await lastWrite ( ) . catch ( ( ) => null ) ;
876
-
877
- let counter = 0 ;
878
-
879
- for await ( const _ of changes ) {
880
- counter += 1 ;
881
- if ( counter === 2 ) {
882
- await changeStream . close ( ) ;
883
- break ;
884
- }
783
+ describe ( 'when close is called while changes are pending' , function ( ) {
784
+ it (
785
+ 'rejects promises already returned by next' ,
786
+ { requires : { topology : 'replicaset' } } ,
787
+ async function ( ) {
788
+ const changes = Array . from ( { length : 20 } , ( ) => changeStream . next ( ) ) ;
789
+ await changeStream . close ( ) ;
790
+ const results = await Promise . allSettled ( changes ) ;
791
+
792
+ for ( const i of changes . keys ( ) ) {
793
+ expect ( results )
794
+ . to . have . nested . property ( `[${ i } ].reason` )
795
+ . that . is . instanceOf ( MongoAPIError ) ;
796
+ const message = / C h a n g e S t r e a m i s c l o s e d / i;
797
+ expect ( results ) . nested . property ( `[${ i } ].reason` ) . to . match ( message ) ;
885
798
}
886
-
887
- const result = await Promise . race ( [ changes . next ( ) , sleep ( 800 ) . then ( ( ) => 42 ) ] ) ;
888
- expect ( result , 'should not have recieved a third event' ) . to . equal ( 42 ) ;
889
799
}
890
- } ) . skipReason =
891
- 'This test only worked because of timing, changeStream.close does not remove the change listener' ;
800
+ ) ;
892
801
} ) ;
893
802
894
803
describe ( 'iterator api' , function ( ) {
0 commit comments