@@ -47,6 +47,19 @@ const pipeline = [
4747 { $addFields : { comment : 'The documentKey field has been projected out of this document.' } }
4848] ;
4949
50+ async function forcePrimaryStepDown ( client : MongoClient ) {
51+ await client
52+ . db ( 'admin' )
53+ . command ( { replSetFreeze : 0 } , { readPreference : ReadPreference . SECONDARY } ) ;
54+ await client
55+ . db ( 'admin' )
56+ . command ( { replSetStepDown : 15 , secondaryCatchUpPeriodSecs : 10 , force : true } ) ;
57+
58+ // wait for secondary to become primary but also allow previous primary to become next primary
59+ // in subsequent test runs
60+ await sleep ( 15_000 ) ;
61+ }
62+
5063describe ( 'Change Streams' , function ( ) {
5164 let client : MongoClient ;
5265 let collection : Collection ;
@@ -2005,6 +2018,7 @@ describe('Change Streams', function () {
20052018
20062019describe . only ( 'ChangeStream resumability' , function ( ) {
20072020 let client : MongoClient ;
2021+ let utilClient : MongoClient ;
20082022 let collection : Collection ;
20092023 let changeStream : ChangeStream ;
20102024 let aggregateEvents : CommandStartedEvent [ ] = [ ] ;
@@ -2058,14 +2072,15 @@ describe.only('ChangeStream resumability', function () {
20582072 beforeEach ( async function ( ) {
20592073 const dbName = 'resumabilty_tests' ;
20602074 const collectionName = 'foo' ;
2061- const utilClient = this . configuration . newClient ( ) ;
2075+
2076+ utilClient = this . configuration . newClient ( ) ;
2077+
20622078 // 3.6 servers do not support creating a change stream on a database that doesn't exist
20632079 await utilClient
20642080 . db ( dbName )
20652081 . dropDatabase ( )
20662082 . catch ( e => e ) ;
20672083 await utilClient . db ( dbName ) . createCollection ( collectionName ) ;
2068- await utilClient . close ( ) ;
20692084
20702085 // we are going to switch primary in tests and cleanup of failpoints is difficult,
20712086 // so generating unique appname instead of cleaning for each test is an easier solution
@@ -2075,8 +2090,8 @@ describe.only('ChangeStream resumability', function () {
20752090 { } ,
20762091 {
20772092 monitorCommands : true ,
2078- serverSelectionTimeoutMS : 5_000 ,
2079- heartbeatFrequencyMS : 500 ,
2093+ serverSelectionTimeoutMS : 10_000 ,
2094+ heartbeatFrequencyMS : 5_000 ,
20802095 appName : appName
20812096 }
20822097 ) ;
@@ -2086,6 +2101,7 @@ describe.only('ChangeStream resumability', function () {
20862101
20872102 afterEach ( async function ( ) {
20882103 await changeStream . close ( ) ;
2104+ await utilClient . close ( ) ;
20892105 await client . close ( ) ;
20902106 aggregateEvents = [ ] ;
20912107 } ) ;
@@ -2255,7 +2271,7 @@ describe.only('ChangeStream resumability', function () {
22552271
22562272 await collection . insertOne ( { a : 1 } ) ;
22572273
2258- await client . db ( 'admin' ) . command ( {
2274+ await utilClient . db ( 'admin' ) . command ( {
22592275 configureFailPoint : 'failCommand' ,
22602276 mode : 'alwaysOn' ,
22612277 data : {
@@ -2264,12 +2280,8 @@ describe.only('ChangeStream resumability', function () {
22642280 appName : appName
22652281 }
22662282 } as FailCommandFailPoint ) ;
2267- await client
2268- . db ( 'admin' )
2269- . command ( { replSetFreeze : 0 } , { readPreference : ReadPreference . SECONDARY } ) ;
2270- await client . db ( 'admin' ) . command ( { replSetStepDown : 5 , force : true } ) ;
22712283
2272- await sleep ( 500 ) ;
2284+ await forcePrimaryStepDown ( utilClient ) ;
22732285
22742286 const change = await changeStream . next ( ) ;
22752287 expect ( change ) . to . containSubset ( { operationType : 'insert' , fullDocument : { a : 1 } } ) ;
@@ -2605,7 +2617,7 @@ describe.only('ChangeStream resumability', function () {
26052617
26062618 await collection . insertOne ( { a : 1 } ) ;
26072619
2608- await client . db ( 'admin' ) . command ( {
2620+ await utilClient . db ( 'admin' ) . command ( {
26092621 configureFailPoint : 'failCommand' ,
26102622 mode : 'alwaysOn' ,
26112623 data : {
@@ -2614,12 +2626,7 @@ describe.only('ChangeStream resumability', function () {
26142626 appName : appName
26152627 }
26162628 } as FailCommandFailPoint ) ;
2617- await client
2618- . db ( 'admin' )
2619- . command ( { replSetFreeze : 0 } , { readPreference : ReadPreference . SECONDARY } ) ;
2620- await client . db ( 'admin' ) . command ( { replSetStepDown : 5 , force : true } ) ;
2621-
2622- await sleep ( 500 ) ;
2629+ await forcePrimaryStepDown ( utilClient ) ;
26232630
26242631 const change = await changeStream . tryNext ( ) ;
26252632 expect ( change ) . to . containSubset ( { operationType : 'insert' , fullDocument : { a : 1 } } ) ;
@@ -2779,7 +2786,7 @@ describe.only('ChangeStream resumability', function () {
27792786
27802787 await collection . insertOne ( { a : 1 } ) ;
27812788
2782- await client . db ( 'admin' ) . command ( {
2789+ await utilClient . db ( 'admin' ) . command ( {
27832790 configureFailPoint : 'failCommand' ,
27842791 mode : 'alwaysOn' ,
27852792 data : {
@@ -2788,12 +2795,7 @@ describe.only('ChangeStream resumability', function () {
27882795 appName : appName
27892796 }
27902797 } as FailCommandFailPoint ) ;
2791- await client
2792- . db ( 'admin' )
2793- . command ( { replSetFreeze : 0 } , { readPreference : ReadPreference . SECONDARY } ) ;
2794- await client . db ( 'admin' ) . command ( { replSetStepDown : 5 , force : true } ) ;
2795-
2796- await sleep ( 500 ) ;
2798+ await forcePrimaryStepDown ( utilClient ) ;
27972799
27982800 const change = await changeStreamIterator . next ( ) ;
27992801 expect ( change . value ) . to . containSubset ( {
@@ -2998,7 +3000,6 @@ describe.only('ChangeStream resumability', function () {
29983000 } ) ;
29993001
30003002 context ( 'when the error is not a server error' , function ( ) {
3001- // This test requires a replica set to call replSetFreeze command
30023003 it (
30033004 'should resume on ServerSelectionError' ,
30043005 { requires : { topology : [ 'replicaset' ] } } ,
@@ -3016,7 +3017,7 @@ describe.only('ChangeStream resumability', function () {
30163017 fullDocument : { a : 1 }
30173018 } ) ;
30183019
3019- await client . db ( 'admin' ) . command ( {
3020+ await utilClient . db ( 'admin' ) . command ( {
30203021 configureFailPoint : 'failCommand' ,
30213022 mode : 'alwaysOn' ,
30223023 data : {
@@ -3025,12 +3026,7 @@ describe.only('ChangeStream resumability', function () {
30253026 appName : appName
30263027 }
30273028 } as FailCommandFailPoint ) ;
3028- await client
3029- . db ( 'admin' )
3030- . command ( { replSetFreeze : 0 } , { readPreference : ReadPreference . SECONDARY } ) ;
3031- await client . db ( 'admin' ) . command ( { replSetStepDown : 5 , force : true } ) ;
3032-
3033- await sleep ( 500 ) ;
3029+ await forcePrimaryStepDown ( utilClient ) ;
30343030
30353031 await collection . insertOne ( { a : 2 } ) ;
30363032
0 commit comments