@@ -4,7 +4,7 @@ import { on, once } from 'events';
4
4
import { gte , lt } from 'semver' ;
5
5
import * as sinon from 'sinon' ;
6
6
import { PassThrough } from 'stream' ;
7
- import { setTimeout } from 'timers' ;
7
+ import { clearTimeout , setTimeout } from 'timers' ;
8
8
9
9
import {
10
10
type ChangeStream ,
@@ -780,7 +780,50 @@ describe('Change Streams', function () {
780
780
} ) ;
781
781
} ) ;
782
782
783
- describe ( 'when close is called while changes are pending' , function ( ) {
783
+ describe . only ( 'when close is called while changes are pending' , function ( ) {
784
+ let client ;
785
+ let db ;
786
+ let collection : Collection < { insertCount : number } > ;
787
+ let changeStream : ChangeStream < { insertCount : number } > ;
788
+ let insertInterval = undefined ;
789
+ let insertCount = 0 ;
790
+
791
+ /** insertOne every 300ms without running the next insert before the previous one completes */
792
+ function setInsertInterval ( ) {
793
+ // start an insert
794
+ // if first one, create a timeout and refresh
795
+ // if NOT first one, just refresh
796
+ collection ?. insertOne ( { insertCount : insertCount ++ } ) . then ( ( ) => {
797
+ insertInterval ??= setTimeout ( setInsertInterval , 300 ) ;
798
+ insertInterval . refresh ( ) ;
799
+ } ) ;
800
+ }
801
+
802
+ beforeEach ( async function ( ) {
803
+ client = this . configuration . newClient ( ) ;
804
+ await client . connect ( ) ;
805
+ db = client . db ( 'test' ) ;
806
+ collection = db . collection ( 'test_close' ) ;
807
+ await collection . drop ( ) . catch ( ( ) => null ) ;
808
+ changeStream = collection . watch ( ) ;
809
+
810
+ insertCount = 0 ;
811
+ setInsertInterval ( ) ;
812
+ } ) ;
813
+
814
+ afterEach ( async function ( ) {
815
+ clearTimeout ( insertInterval ) ;
816
+ await collection . drop ( ) . catch ( ( ) => null ) ;
817
+ await client . close ( ) ;
818
+
819
+ db = undefined ;
820
+ client = undefined ;
821
+ collection = undefined ;
822
+ changeStream = undefined ;
823
+ insertInterval = undefined ;
824
+ insertCount = 0 ;
825
+ } ) ;
826
+
784
827
it (
785
828
'rejects promises already returned by next' ,
786
829
{ requires : { topology : 'replicaset' } } ,
@@ -798,6 +841,47 @@ describe('Change Streams', function () {
798
841
}
799
842
}
800
843
) ;
844
+
845
+ it (
846
+ 'rejects promises already returned by next after awaiting the first one' ,
847
+ { requires : { topology : 'replicaset' } } ,
848
+ async function ( ) {
849
+ const changes = Array . from ( { length : 20 } , ( ) => changeStream . next ( ) ) ;
850
+ await changes [ 0 ] ;
851
+ const allChanges = Promise . allSettled ( changes ) ;
852
+
853
+ await changeStream . close ( ) ;
854
+
855
+ const results = await allChanges ;
856
+
857
+ const statuses = results . map ( ( { status } ) => status ) ;
858
+ expect ( statuses ) . to . deep . equal ( [
859
+ 'fulfilled' ,
860
+ ...Array . from ( { length : 19 } , ( ) => 'rejected' )
861
+ ] ) ;
862
+ }
863
+ ) ;
864
+
865
+ it (
866
+ 'rejects promises already returned by next after awaiting half of them' ,
867
+ { requires : { topology : 'replicaset' } } ,
868
+ async function ( ) {
869
+ const changes = Array . from ( { length : 20 } , ( ) => changeStream . next ( ) ) ;
870
+ const allChanges = Promise . allSettled ( changes ) ;
871
+
872
+ await Promise . allSettled ( changes . slice ( 10 ) ) ;
873
+
874
+ await changeStream . close ( ) ;
875
+
876
+ const results = await allChanges ;
877
+
878
+ const statuses = results . map ( ( { status } ) => status ) ;
879
+ expect ( statuses ) . to . deep . equal ( [
880
+ ...Array . from ( { length : 10 } , ( ) => 'fulfilled' ) ,
881
+ ...Array . from ( { length : 10 } , ( ) => 'rejected' )
882
+ ] ) ;
883
+ }
884
+ ) ;
801
885
} ) ;
802
886
803
887
describe ( 'iterator api' , function ( ) {
0 commit comments