@@ -17,11 +17,13 @@ import (
1717
1818func TestChangeStreamFilter (t * testing.T ) {
1919 verifier := Verifier {}
20- verifier .initializeChangeStreamReaders ()
2120 verifier .SetMetaDBName ("metadb" )
21+ verifier .srcNamespaces = []string {"foo.bar" , "foo.baz" , "test.car" , "test.chaz" }
22+
23+ verifier .initializeChangeStreamReaders ()
24+
2225 require .Equal (t , []bson.D {{{"$match" , bson.D {{"ns.db" , bson.D {{"$ne" , "metadb" }}}}}}},
2326 verifier .srcChangeStreamReader .GetChangeStreamFilter ())
24- verifier .srcNamespaces = []string {"foo.bar" , "foo.baz" , "test.car" , "test.chaz" }
2527 require .Equal (t , []bson.D {
2628 {{"$match" , bson.D {
2729 {"$or" , bson.A {
@@ -48,6 +50,7 @@ func (suite *IntegrationTestSuite) TestChangeStreamResumability() {
4850 verifier1 := suite .BuildVerifier ()
4951 ctx , cancel := context .WithCancel (context .Background ())
5052 defer cancel ()
53+ verifier1 .StartChangeEventHandler (ctx , verifier1 .srcChangeStreamReader )
5154 err := verifier1 .srcChangeStreamReader .StartChangeStream (ctx )
5255 suite .Require ().NoError (err )
5356 }()
@@ -73,6 +76,7 @@ func (suite *IntegrationTestSuite) TestChangeStreamResumability() {
7376
7477 newTime := suite .getClusterTime (ctx , suite .srcMongoClient )
7578
79+ verifier2 .StartChangeEventHandler (ctx , verifier2 .srcChangeStreamReader )
7680 err = verifier2 .srcChangeStreamReader .StartChangeStream (ctx )
7781 suite .Require ().NoError (err )
7882
@@ -148,6 +152,7 @@ func (suite *IntegrationTestSuite) TestStartAtTimeNoChanges() {
148152 suite .Require ().NoError (err )
149153 origStartTs := sess .OperationTime ()
150154 suite .Require ().NotNil (origStartTs )
155+ verifier .StartChangeEventHandler (ctx , verifier .srcChangeStreamReader )
151156 err = verifier .srcChangeStreamReader .StartChangeStream (ctx )
152157 suite .Require ().NoError (err )
153158 suite .Require ().Equal (verifier .srcChangeStreamReader .startAtTs , origStartTs )
@@ -169,6 +174,7 @@ func (suite *IntegrationTestSuite) TestStartAtTimeWithChanges() {
169174
170175 origSessionTime := sess .OperationTime ()
171176 suite .Require ().NotNil (origSessionTime )
177+ verifier .StartChangeEventHandler (ctx , verifier .srcChangeStreamReader )
172178 err = verifier .srcChangeStreamReader .StartChangeStream (ctx )
173179 suite .Require ().NoError (err )
174180
@@ -221,6 +227,7 @@ func (suite *IntegrationTestSuite) TestNoStartAtTime() {
221227 suite .Require ().NoError (err )
222228 origStartTs := sess .OperationTime ()
223229 suite .Require ().NotNil (origStartTs )
230+ verifier .StartChangeEventHandler (ctx , verifier .srcChangeStreamReader )
224231 err = verifier .srcChangeStreamReader .StartChangeStream (ctx )
225232 suite .Require ().NoError (err )
226233 suite .Require ().NotNil (verifier .srcChangeStreamReader .startAtTs )
@@ -240,6 +247,7 @@ func (suite *IntegrationTestSuite) TestWithChangeEventsBatching() {
240247
241248 verifier := suite .BuildVerifier ()
242249
250+ verifier .StartChangeEventHandler (ctx , verifier .srcChangeStreamReader )
243251 suite .Require ().NoError (verifier .srcChangeStreamReader .StartChangeStream (ctx ))
244252
245253 _ , err := coll1 .InsertOne (ctx , bson.D {{"_id" , 1 }})
@@ -261,7 +269,6 @@ func (suite *IntegrationTestSuite) TestWithChangeEventsBatching() {
261269 500 * time .Millisecond ,
262270 "the verifier should flush a recheck doc after a batch" ,
263271 )
264-
265272}
266273
267274func (suite * IntegrationTestSuite ) TestManyInsertsBeforeWritesOff () {
@@ -358,3 +365,65 @@ func (suite *IntegrationTestSuite) TestCreateForbidden() {
358365 suite .Require ().ErrorAs (err , & eventErr )
359366 suite .Assert ().Equal ("create" , eventErr .Event .OpType )
360367}
368+
369+ func (suite * IntegrationTestSuite ) TestRecheckDocsWithDstChangeEvents () {
370+ ctx := suite .Context ()
371+
372+ db := suite .dstMongoClient .Database ("dstDB" )
373+ coll1 := db .Collection ("dstColl1" )
374+ coll2 := db .Collection ("dstColl2" )
375+
376+ for _ , coll := range mslices .Of (coll1 , coll2 ) {
377+ suite .Require ().NoError (db .CreateCollection (ctx , coll .Name ()))
378+ }
379+
380+ verifier := suite .BuildVerifier ()
381+ verifier .SetSrcNamespaces ([]string {"srcDB.srcColl1" , "srcDB.srcColl2" })
382+ verifier .SetDstNamespaces ([]string {"dstDB.dstColl1" , "dstDB.dstColl2" })
383+ verifier .SetNamespaceMap ()
384+
385+ verifier .StartChangeEventHandler (ctx , verifier .dstChangeStreamReader )
386+ suite .Require ().NoError (verifier .dstChangeStreamReader .StartChangeStream (ctx ))
387+
388+ _ , err := coll1 .InsertOne (ctx , bson.D {{"_id" , 1 }})
389+ suite .Require ().NoError (err )
390+ _ , err = coll1 .InsertOne (ctx , bson.D {{"_id" , 2 }})
391+ suite .Require ().NoError (err )
392+
393+ _ , err = coll2 .InsertOne (ctx , bson.D {{"_id" , 1 }})
394+ suite .Require ().NoError (err )
395+
396+ var rechecks []RecheckDoc
397+ require .Eventually (
398+ suite .T (),
399+ func () bool {
400+ recheckColl := verifier .verificationDatabase ().Collection (recheckQueue )
401+ cursor , err := recheckColl .Find (ctx , bson.D {})
402+ if errors .Is (err , mongo .ErrNoDocuments ) {
403+ return false
404+ }
405+
406+ suite .Require ().NoError (err )
407+ suite .Require ().NoError (cursor .All (ctx , & rechecks ))
408+ return len (rechecks ) == 3
409+ },
410+ time .Minute ,
411+ 500 * time .Millisecond ,
412+ "the verifier should flush a recheck doc after a batch" ,
413+ )
414+
415+ coll1RecheckCount , coll2RecheckCount := 0 , 0
416+ for _ , recheck := range rechecks {
417+ suite .Require ().Equal ("srcDB" , recheck .PrimaryKey .DatabaseName )
418+ switch recheck .PrimaryKey .CollectionName {
419+ case "srcColl1" :
420+ coll1RecheckCount ++
421+ case "srcColl2" :
422+ coll2RecheckCount ++
423+ default :
424+ suite .T ().Fatalf ("unknown collection name: %v" , recheck .PrimaryKey .CollectionName )
425+ }
426+ }
427+ suite .Require ().Equal (2 , coll1RecheckCount )
428+ suite .Require ().Equal (1 , coll2RecheckCount )
429+ }
0 commit comments