@@ -4,7 +4,7 @@ import { on, once } from 'events';
4
4
import { gte , lt } from 'semver' ;
5
5
import * as sinon from 'sinon' ;
6
6
import { PassThrough } from 'stream' ;
7
- import { setTimeout } from 'timers' ;
7
+ import { clearTimeout , setTimeout } from 'timers' ;
8
8
9
9
import {
10
10
type ChangeStream ,
@@ -773,7 +773,50 @@ describe('Change Streams', function () {
773
773
} ) ;
774
774
} ) ;
775
775
776
- describe ( 'when close is called while changes are pending' , function ( ) {
776
+ describe . only ( 'when close is called while changes are pending' , function ( ) {
777
+ let client ;
778
+ let db ;
779
+ let collection : Collection < { insertCount : number } > ;
780
+ let changeStream : ChangeStream < { insertCount : number } > ;
781
+ let insertInterval = undefined ;
782
+ let insertCount = 0 ;
783
+
784
+ /** insertOne every 300ms without running the next insert before the previous one completes */
785
+ function setInsertInterval ( ) {
786
+ // start an insert
787
+ // if first one, create a timeout and refresh
788
+ // if NOT first one, just refresh
789
+ collection ?. insertOne ( { insertCount : insertCount ++ } ) . then ( ( ) => {
790
+ insertInterval ??= setTimeout ( setInsertInterval , 300 ) ;
791
+ insertInterval . refresh ( ) ;
792
+ } ) ;
793
+ }
794
+
795
+ beforeEach ( async function ( ) {
796
+ client = this . configuration . newClient ( ) ;
797
+ await client . connect ( ) ;
798
+ db = client . db ( 'test' ) ;
799
+ collection = db . collection ( 'test_close' ) ;
800
+ await collection . drop ( ) . catch ( ( ) => null ) ;
801
+ changeStream = collection . watch ( ) ;
802
+
803
+ insertCount = 0 ;
804
+ setInsertInterval ( ) ;
805
+ } ) ;
806
+
807
+ afterEach ( async function ( ) {
808
+ clearTimeout ( insertInterval ) ;
809
+ await collection . drop ( ) . catch ( ( ) => null ) ;
810
+ await client . close ( ) ;
811
+
812
+ db = undefined ;
813
+ client = undefined ;
814
+ collection = undefined ;
815
+ changeStream = undefined ;
816
+ insertInterval = undefined ;
817
+ insertCount = 0 ;
818
+ } ) ;
819
+
777
820
it (
778
821
'rejects promises already returned by next' ,
779
822
{ requires : { topology : 'replicaset' } } ,
@@ -791,6 +834,47 @@ describe('Change Streams', function () {
791
834
}
792
835
}
793
836
) ;
837
+
838
+ it (
839
+ 'rejects promises already returned by next after awaiting the first one' ,
840
+ { requires : { topology : 'replicaset' } } ,
841
+ async function ( ) {
842
+ const changes = Array . from ( { length : 20 } , ( ) => changeStream . next ( ) ) ;
843
+ await changes [ 0 ] ;
844
+ const allChanges = Promise . allSettled ( changes ) ;
845
+
846
+ await changeStream . close ( ) ;
847
+
848
+ const results = await allChanges ;
849
+
850
+ const statuses = results . map ( ( { status } ) => status ) ;
851
+ expect ( statuses ) . to . deep . equal ( [
852
+ 'fulfilled' ,
853
+ ...Array . from ( { length : 19 } , ( ) => 'rejected' )
854
+ ] ) ;
855
+ }
856
+ ) ;
857
+
858
+ it (
859
+ 'rejects promises already returned by next after awaiting half of them' ,
860
+ { requires : { topology : 'replicaset' } } ,
861
+ async function ( ) {
862
+ const changes = Array . from ( { length : 20 } , ( ) => changeStream . next ( ) ) ;
863
+ const allChanges = Promise . allSettled ( changes ) ;
864
+
865
+ await Promise . allSettled ( changes . slice ( 10 ) ) ;
866
+
867
+ await changeStream . close ( ) ;
868
+
869
+ const results = await allChanges ;
870
+
871
+ const statuses = results . map ( ( { status } ) => status ) ;
872
+ expect ( statuses ) . to . deep . equal ( [
873
+ ...Array . from ( { length : 10 } , ( ) => 'fulfilled' ) ,
874
+ ...Array . from ( { length : 10 } , ( ) => 'rejected' )
875
+ ] ) ;
876
+ }
877
+ ) ;
794
878
} ) ;
795
879
796
880
describe ( 'iterator api' , function ( ) {
0 commit comments