@@ -773,115 +773,24 @@ describe('Change Streams', function () {
773
773
} ) ;
774
774
} ) ;
775
775
776
- describe ( 'should properly handle a changeStream event being processed mid-close' , function ( ) {
777
- let client , coll , changeStream ;
778
-
779
- function write ( ) {
780
- return Promise . resolve ( )
781
- . then ( ( ) => coll . insertOne ( { a : 1 } ) )
782
- . then ( ( ) => coll . insertOne ( { b : 2 } ) ) ;
783
- }
784
-
785
- function lastWrite ( ) {
786
- return coll . insertOne ( { c : 3 } ) ;
787
- }
788
-
789
- beforeEach ( function ( ) {
790
- client = this . configuration . newClient ( ) ;
791
- return client . connect ( ) . then ( _client => {
792
- client = _client ;
793
- coll = client . db ( this . configuration . db ) . collection ( 'tester' ) ;
794
- changeStream = coll . watch ( ) ;
795
- } ) ;
796
- } ) ;
797
-
798
- afterEach ( async function ( ) {
799
- await changeStream ?. close ( ) ;
800
- await client ?. close ( ) ;
801
- coll = undefined ;
802
- changeStream = undefined ;
803
- client = undefined ;
804
- } ) ;
805
-
806
- it ( 'when invoked with promises' , {
807
- metadata : { requires : { topology : 'replicaset' } } ,
808
- test : function ( ) {
809
- const read = ( ) => {
810
- return Promise . resolve ( )
811
- . then ( ( ) => changeStream . next ( ) )
812
- . then ( ( ) => changeStream . next ( ) )
813
- . then ( ( ) => {
814
- this . defer ( lastWrite ( ) ) ;
815
- const nextP = changeStream . next ( ) ;
816
- return changeStream . close ( ) . then ( ( ) => nextP ) ;
817
- } ) ;
818
- } ;
819
-
820
- return Promise . all ( [ read ( ) , write ( ) ] ) . then (
821
- ( ) => Promise . reject ( new Error ( 'Expected operation to fail with error' ) ) ,
822
- err => expect ( err . message ) . to . equal ( 'ChangeStream is closed' )
823
- ) ;
824
- }
825
- } ) ;
826
-
827
- it ( 'when invoked with callbacks' , {
828
- metadata : { requires : { topology : 'replicaset' } } ,
829
- test : function ( done ) {
830
- const ops = [ ] ;
831
- changeStream . next ( ( ) => {
832
- changeStream . next ( ( ) => {
833
- ops . push ( lastWrite ( ) ) ;
834
-
835
- // explicitly close the change stream after the write has begun
836
- ops . push ( changeStream . close ( ) ) ;
837
-
838
- changeStream . next ( err => {
839
- try {
840
- expect ( err )
841
- . property ( 'message' )
842
- . to . match ( / C h a n g e S t r e a m i s c l o s e d / ) ;
843
- Promise . all ( ops ) . then ( ( ) => done ( ) , done ) ;
844
- } catch ( e ) {
845
- done ( e ) ;
846
- }
847
- } ) ;
848
- } ) ;
849
- } ) ;
850
-
851
- ops . push (
852
- write ( ) . catch ( ( ) => {
853
- // ignore
854
- } )
855
- ) ;
856
- }
857
- } ) ;
858
-
859
- it . skip ( 'when invoked using eventEmitter API' , {
860
- metadata : {
861
- requires : { topology : 'replicaset' }
862
- } ,
863
- async test ( ) {
864
- const changes = on ( changeStream , 'change' ) ;
865
- await once ( changeStream . cursor , 'init' ) ;
866
-
867
- await write ( ) ;
868
- await lastWrite ( ) . catch ( ( ) => null ) ;
869
-
870
- let counter = 0 ;
871
-
872
- for await ( const _ of changes ) {
873
- counter += 1 ;
874
- if ( counter === 2 ) {
875
- await changeStream . close ( ) ;
876
- break ;
877
- }
776
+ describe ( 'when close is called while changes are pending' , function ( ) {
777
+ it (
778
+ 'rejects promises already returned by next' ,
779
+ { requires : { topology : 'replicaset' } } ,
780
+ async function ( ) {
781
+ const changes = Array . from ( { length : 20 } , ( ) => changeStream . next ( ) ) ;
782
+ await changeStream . close ( ) ;
783
+ const results = await Promise . allSettled ( changes ) ;
784
+
785
+ for ( const i of changes . keys ( ) ) {
786
+ expect ( results )
787
+ . to . have . nested . property ( `[${ i } ].reason` )
788
+ . that . is . instanceOf ( MongoAPIError ) ;
789
+ const message = / C h a n g e S t r e a m i s c l o s e d / i;
790
+ expect ( results ) . nested . property ( `[${ i } ].reason` ) . to . match ( message ) ;
878
791
}
879
-
880
- const result = await Promise . race ( [ changes . next ( ) , sleep ( 800 ) . then ( ( ) => 42 ) ] ) ;
881
- expect ( result , 'should not have recieved a third event' ) . to . equal ( 42 ) ;
882
792
}
883
- } ) . skipReason =
884
- 'This test only worked because of timing, changeStream.close does not remove the change listener' ;
793
+ ) ;
885
794
} ) ;
886
795
887
796
describe ( 'iterator api' , function ( ) {
0 commit comments