@@ -209,68 +209,79 @@ describe('Change Streams', function () {
209209 }
210210 } ) ;
211211
212- it (
213- 'should support creating multiple simultaneous ChangeStreams' ,
214- { requires : { topology : 'replicaset' } } ,
215- async function ( ) {
216- const configuration = this . configuration ;
217- const client = configuration . newClient ( ) ;
212+ describe ( 'when creating multiple simultaneous ChangeStreams' , ( ) => {
213+ let client ;
214+ let changeStream1 ;
215+ let changeStream2 ;
216+ let changeStream3 ;
218217
219- await client . connect ( ) ;
220-
221- this . defer ( ( ) => client . close ( ) ) ;
222-
223- const database = client . db ( 'integration_tests' ) ;
224- const collection1 = database . collection ( 'simultaneous1' ) ;
225- const collection2 = database . collection ( 'simultaneous2' ) ;
226-
227- const changeStream1 = collection1 . watch ( [ { $addFields : { changeStreamNumber : 1 } } ] ) ;
228- this . defer ( ( ) => changeStream1 . close ( ) ) ;
229- const changeStream2 = collection2 . watch ( [ { $addFields : { changeStreamNumber : 2 } } ] ) ;
230- this . defer ( ( ) => changeStream2 . close ( ) ) ;
231- const changeStream3 = collection2 . watch ( [ { $addFields : { changeStreamNumber : 3 } } ] ) ;
232- this . defer ( ( ) => changeStream3 . close ( ) ) ;
233-
234- setTimeout ( ( ) => {
235- collection1 . insertMany ( [ { a : 1 } ] ) . then ( ( ) => collection2 . insertMany ( [ { a : 1 } ] ) ) ;
236- } , 50 ) ;
218+ beforeEach ( async function ( ) {
219+ client = this . configuration . newClient ( ) ;
220+ } ) ;
237221
238- await Promise . resolve ( )
239- . then ( ( ) =>
240- Promise . all ( [ changeStream1 . hasNext ( ) , changeStream2 . hasNext ( ) , changeStream3 . hasNext ( ) ] )
241- )
242- . then ( function ( hasNexts ) {
243- // Check all the Change Streams have a next item
244- assert . ok ( hasNexts [ 0 ] ) ;
245- assert . ok ( hasNexts [ 1 ] ) ;
246- assert . ok ( hasNexts [ 2 ] ) ;
222+ afterEach ( async function ( ) {
223+ await changeStream1 ?. close ( ) ;
224+ await changeStream2 ?. close ( ) ;
225+ await changeStream3 ?. close ( ) ;
226+ await client ?. close ( ) ;
227+ } ) ;
247228
248- return Promise . all ( [ changeStream1 . next ( ) , changeStream2 . next ( ) , changeStream3 . next ( ) ] ) ;
249- } )
250- . then ( function ( changes ) {
251- // Check the values of the change documents are correct
252- assert . equal ( changes [ 0 ] . operationType , 'insert' ) ;
253- assert . equal ( changes [ 1 ] . operationType , 'insert' ) ;
254- assert . equal ( changes [ 2 ] . operationType , 'insert' ) ;
255-
256- expect ( changes [ 0 ] ) . to . have . nested . property ( 'fullDocument.a' , 1 ) ;
257- expect ( changes [ 1 ] ) . to . have . nested . property ( 'fullDocument.a' , 1 ) ;
258- expect ( changes [ 2 ] ) . to . have . nested . property ( 'fullDocument.a' , 1 ) ;
259-
260- expect ( changes [ 0 ] ) . to . have . nested . property ( 'ns.db' , 'integration_tests' ) ;
261- expect ( changes [ 1 ] ) . to . have . nested . property ( 'ns.db' , 'integration_tests' ) ;
262- expect ( changes [ 2 ] ) . to . have . nested . property ( 'ns.db' , 'integration_tests' ) ;
263-
264- expect ( changes [ 0 ] ) . to . have . nested . property ( 'ns.coll' , 'simultaneous1' ) ;
265- expect ( changes [ 1 ] ) . to . have . nested . property ( 'ns.coll' , 'simultaneous2' ) ;
266- expect ( changes [ 2 ] ) . to . have . nested . property ( 'ns.coll' , 'simultaneous2' ) ;
267-
268- expect ( changes [ 0 ] ) . to . have . nested . property ( 'changeStreamNumber' , 1 ) ;
269- expect ( changes [ 1 ] ) . to . have . nested . property ( 'changeStreamNumber' , 2 ) ;
270- expect ( changes [ 2 ] ) . to . have . nested . property ( 'changeStreamNumber' , 3 ) ;
271- } ) ;
272- }
273- ) ;
229+ it (
230+ 'supports simultaneous parallel ChangeStream use' ,
231+ { requires : { topology : '!single' } } ,
232+ async function ( ) {
233+ const database = client . db ( 'integration_tests' ) ;
234+ const collection1 = database . collection ( 'simultaneous1' ) ;
235+ const collection2 = database . collection ( 'simultaneous2' ) ;
236+
237+ changeStream1 = collection1 . watch ( [ { $addFields : { changeStreamNumber : 1 } } ] ) ;
238+ changeStream2 = collection2 . watch ( [ { $addFields : { changeStreamNumber : 2 } } ] ) ;
239+ changeStream3 = collection2 . watch ( [ { $addFields : { changeStreamNumber : 3 } } ] ) ;
240+
241+ setTimeout ( ( ) => {
242+ collection1 . insertMany ( [ { a : 1 } ] ) . then ( ( ) => collection2 . insertMany ( [ { a : 1 } ] ) ) ;
243+ } , 50 ) ;
244+
245+ const hasNexts = await Promise . all ( [
246+ changeStream1 . hasNext ( ) ,
247+ changeStream2 . hasNext ( ) ,
248+ changeStream3 . hasNext ( )
249+ ] ) ;
250+
251+ // Check all the Change Streams have a next item
252+ expect ( hasNexts [ 0 ] ) . to . be . true ;
253+ expect ( hasNexts [ 1 ] ) . to . be . true ;
254+ expect ( hasNexts [ 2 ] ) . to . be . true ;
255+
256+ const changes = await Promise . all ( [
257+ changeStream1 . next ( ) ,
258+ changeStream2 . next ( ) ,
259+ changeStream3 . next ( )
260+ ] ) ;
261+
262+ // Check the values of the change documents are correct
263+ expect ( changes [ 0 ] . operationType ) . to . be . equal ( 'insert' ) ;
264+ expect ( changes [ 1 ] . operationType ) . to . be . equal ( 'insert' ) ;
265+ expect ( changes [ 2 ] . operationType ) . to . be . equal ( 'insert' ) ;
266+
267+ expect ( changes [ 0 ] ) . to . have . nested . property ( 'fullDocument.a' , 1 ) ;
268+ expect ( changes [ 1 ] ) . to . have . nested . property ( 'fullDocument.a' , 1 ) ;
269+ expect ( changes [ 2 ] ) . to . have . nested . property ( 'fullDocument.a' , 1 ) ;
270+
271+ expect ( changes [ 0 ] ) . to . have . nested . property ( 'ns.db' , 'integration_tests' ) ;
272+ expect ( changes [ 1 ] ) . to . have . nested . property ( 'ns.db' , 'integration_tests' ) ;
273+ expect ( changes [ 2 ] ) . to . have . nested . property ( 'ns.db' , 'integration_tests' ) ;
274+
275+ expect ( changes [ 0 ] ) . to . have . nested . property ( 'ns.coll' , 'simultaneous1' ) ;
276+ expect ( changes [ 1 ] ) . to . have . nested . property ( 'ns.coll' , 'simultaneous2' ) ;
277+ expect ( changes [ 2 ] ) . to . have . nested . property ( 'ns.coll' , 'simultaneous2' ) ;
278+
279+ expect ( changes [ 0 ] ) . to . have . nested . property ( 'changeStreamNumber' , 1 ) ;
280+ expect ( changes [ 1 ] ) . to . have . nested . property ( 'changeStreamNumber' , 2 ) ;
281+ expect ( changes [ 2 ] ) . to . have . nested . property ( 'changeStreamNumber' , 3 ) ;
282+ }
283+ ) ;
284+ } ) ;
274285
275286 it ( 'should properly close ChangeStream cursor' , {
276287 metadata : { requires : { topology : 'replicaset' } } ,
@@ -799,7 +810,7 @@ describe('Change Streams', function () {
799810 } ) ;
800811
801812 it ( 'when invoked with promises' , {
802- metadata : { requires : { topology : 'replicaset ' } } ,
813+ metadata : { requires : { topology : '!single ' } } ,
803814 test : async function ( ) {
804815 const read = async ( ) => {
805816 await changeStream . next ( ) ;
0 commit comments