@@ -47,6 +47,19 @@ const pipeline = [
4747 { $addFields : { comment : 'The documentKey field has been projected out of this document.' } }
4848] ;
4949
50+ async function forcePrimaryStepDown ( client : MongoClient ) {
51+ await client
52+ . db ( 'admin' )
53+ . command ( { replSetFreeze : 0 } , { readPreference : ReadPreference . SECONDARY } ) ;
54+ await client
55+ . db ( 'admin' )
56+ . command ( { replSetStepDown : 15 , secondaryCatchUpPeriodSecs : 10 , force : true } ) ;
57+
58+ // wait for secondary to become primary but also allow previous primary to become next primary
59+ // in subsequent test runs
60+ await sleep ( 15_000 ) ;
61+ }
62+
5063describe ( 'Change Streams' , function ( ) {
5164 let client : MongoClient ;
5265 let collection : Collection ;
@@ -1822,6 +1835,7 @@ describe('Change Streams', function () {
18221835
18231836describe . only ( 'ChangeStream resumability' , function ( ) {
18241837 let client : MongoClient ;
1838+ let utilClient : MongoClient ;
18251839 let collection : Collection ;
18261840 let changeStream : ChangeStream ;
18271841 let aggregateEvents : CommandStartedEvent [ ] = [ ] ;
@@ -1875,14 +1889,15 @@ describe.only('ChangeStream resumability', function () {
18751889 beforeEach ( async function ( ) {
18761890 const dbName = 'resumabilty_tests' ;
18771891 const collectionName = 'foo' ;
1878- const utilClient = this . configuration . newClient ( ) ;
1892+
1893+ utilClient = this . configuration . newClient ( ) ;
1894+
18791895 // 3.6 servers do not support creating a change stream on a database that doesn't exist
18801896 await utilClient
18811897 . db ( dbName )
18821898 . dropDatabase ( )
18831899 . catch ( e => e ) ;
18841900 await utilClient . db ( dbName ) . createCollection ( collectionName ) ;
1885- await utilClient . close ( ) ;
18861901
18871902 // we are going to switch primary in tests and cleanup of failpoints is difficult,
18881903 // so generating unique appname instead of cleaning for each test is an easier solution
@@ -1892,8 +1907,8 @@ describe.only('ChangeStream resumability', function () {
18921907 { } ,
18931908 {
18941909 monitorCommands : true ,
1895- serverSelectionTimeoutMS : 5_000 ,
1896- heartbeatFrequencyMS : 500 ,
1910+ serverSelectionTimeoutMS : 10_000 ,
1911+ heartbeatFrequencyMS : 5_000 ,
18971912 appName : appName
18981913 }
18991914 ) ;
@@ -1903,6 +1918,7 @@ describe.only('ChangeStream resumability', function () {
19031918
19041919 afterEach ( async function ( ) {
19051920 await changeStream . close ( ) ;
1921+ await utilClient . close ( ) ;
19061922 await client . close ( ) ;
19071923 aggregateEvents = [ ] ;
19081924 } ) ;
@@ -2072,7 +2088,7 @@ describe.only('ChangeStream resumability', function () {
20722088
20732089 await collection . insertOne ( { a : 1 } ) ;
20742090
2075- await client . db ( 'admin' ) . command ( {
2091+ await utilClient . db ( 'admin' ) . command ( {
20762092 configureFailPoint : 'failCommand' ,
20772093 mode : 'alwaysOn' ,
20782094 data : {
@@ -2081,12 +2097,8 @@ describe.only('ChangeStream resumability', function () {
20812097 appName : appName
20822098 }
20832099 } as FailCommandFailPoint ) ;
2084- await client
2085- . db ( 'admin' )
2086- . command ( { replSetFreeze : 0 } , { readPreference : ReadPreference . SECONDARY } ) ;
2087- await client . db ( 'admin' ) . command ( { replSetStepDown : 5 , force : true } ) ;
20882100
2089- await sleep ( 500 ) ;
2101+ await forcePrimaryStepDown ( utilClient ) ;
20902102
20912103 const change = await changeStream . next ( ) ;
20922104 expect ( change ) . to . containSubset ( { operationType : 'insert' , fullDocument : { a : 1 } } ) ;
@@ -2422,7 +2434,7 @@ describe.only('ChangeStream resumability', function () {
24222434
24232435 await collection . insertOne ( { a : 1 } ) ;
24242436
2425- await client . db ( 'admin' ) . command ( {
2437+ await utilClient . db ( 'admin' ) . command ( {
24262438 configureFailPoint : 'failCommand' ,
24272439 mode : 'alwaysOn' ,
24282440 data : {
@@ -2431,12 +2443,7 @@ describe.only('ChangeStream resumability', function () {
24312443 appName : appName
24322444 }
24332445 } as FailCommandFailPoint ) ;
2434- await client
2435- . db ( 'admin' )
2436- . command ( { replSetFreeze : 0 } , { readPreference : ReadPreference . SECONDARY } ) ;
2437- await client . db ( 'admin' ) . command ( { replSetStepDown : 5 , force : true } ) ;
2438-
2439- await sleep ( 500 ) ;
2446+ await forcePrimaryStepDown ( utilClient ) ;
24402447
24412448 const change = await changeStream . tryNext ( ) ;
24422449 expect ( change ) . to . containSubset ( { operationType : 'insert' , fullDocument : { a : 1 } } ) ;
@@ -2596,7 +2603,7 @@ describe.only('ChangeStream resumability', function () {
25962603
25972604 await collection . insertOne ( { a : 1 } ) ;
25982605
2599- await client . db ( 'admin' ) . command ( {
2606+ await utilClient . db ( 'admin' ) . command ( {
26002607 configureFailPoint : 'failCommand' ,
26012608 mode : 'alwaysOn' ,
26022609 data : {
@@ -2605,12 +2612,7 @@ describe.only('ChangeStream resumability', function () {
26052612 appName : appName
26062613 }
26072614 } as FailCommandFailPoint ) ;
2608- await client
2609- . db ( 'admin' )
2610- . command ( { replSetFreeze : 0 } , { readPreference : ReadPreference . SECONDARY } ) ;
2611- await client . db ( 'admin' ) . command ( { replSetStepDown : 5 , force : true } ) ;
2612-
2613- await sleep ( 500 ) ;
2615+ await forcePrimaryStepDown ( utilClient ) ;
26142616
26152617 const change = await changeStreamIterator . next ( ) ;
26162618 expect ( change . value ) . to . containSubset ( {
@@ -2815,7 +2817,6 @@ describe.only('ChangeStream resumability', function () {
28152817 } ) ;
28162818
28172819 context ( 'when the error is not a server error' , function ( ) {
2818- // This test requires a replica set to call replSetFreeze command
28192820 it (
28202821 'should resume on ServerSelectionError' ,
28212822 { requires : { topology : [ 'replicaset' ] } } ,
@@ -2833,7 +2834,7 @@ describe.only('ChangeStream resumability', function () {
28332834 fullDocument : { a : 1 }
28342835 } ) ;
28352836
2836- await client . db ( 'admin' ) . command ( {
2837+ await utilClient . db ( 'admin' ) . command ( {
28372838 configureFailPoint : 'failCommand' ,
28382839 mode : 'alwaysOn' ,
28392840 data : {
@@ -2842,12 +2843,7 @@ describe.only('ChangeStream resumability', function () {
28422843 appName : appName
28432844 }
28442845 } as FailCommandFailPoint ) ;
2845- await client
2846- . db ( 'admin' )
2847- . command ( { replSetFreeze : 0 } , { readPreference : ReadPreference . SECONDARY } ) ;
2848- await client . db ( 'admin' ) . command ( { replSetStepDown : 5 , force : true } ) ;
2849-
2850- await sleep ( 500 ) ;
2846+ await forcePrimaryStepDown ( utilClient ) ;
28512847
28522848 await collection . insertOne ( { a : 2 } ) ;
28532849
0 commit comments