Skip to content

Commit be93d05

Browse files
committed
wait for recheck docs creation
1 parent f14962f commit be93d05

File tree

2 files changed

+23
-1
lines changed

2 files changed

+23
-1
lines changed

internal/verifier/change_stream.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ func (verifier *Verifier) StartChangeEventHandler(ctx context.Context, reader *C
114114
// Change stream reader has closed the event batch channel because it has finished.
115115
return nil
116116
}
117+
verifier.logger.Trace().Msgf("Verifier is handling a change event batch from %s: %v", reader, batch)
117118
err := verifier.HandleChangeStreamEvents(ctx, batch, reader.readerType)
118119
if err != nil {
119120
reader.ChangeStreamErrChan <- err
@@ -277,6 +278,8 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch(
277278
return errors.Wrapf(err, "failed to decode change event to %T", changeEventBatch[eventsRead])
278279
}
279280

281+
csr.logger.Trace().Msgf("%s received a change event: %v", csr, changeEventBatch[eventsRead])
282+
280283
if changeEventBatch[eventsRead].ClusterTime != nil &&
281284
(csr.lastChangeEventTime == nil ||
282285
csr.lastChangeEventTime.Before(*changeEventBatch[eventsRead].ClusterTime)) {
@@ -606,7 +609,7 @@ func (csr *ChangeStreamReader) persistChangeStreamResumeToken(ctx context.Contex
606609
Msg("failed to extract resume token timestamp")
607610
}
608611

609-
logEvent.Msg("Persisted change stream resume token.")
612+
logEvent.Msgf("Persisted %s's resume token.", csr)
610613

611614
return nil
612615
}

internal/verifier/migration_verifier_test.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1553,8 +1553,22 @@ func (suite *IntegrationTestSuite) TestVerifierWithFilter() {
15531553
<-checkDoneChan
15541554
}
15551555

1556+
func (suite *IntegrationTestSuite) waitForRecheckDocs(verifier *Verifier) {
1557+
suite.Eventually(func() bool {
1558+
cursor, err := suite.metaMongoClient.Database(verifier.metaDBName).Collection(recheckQueue).Find(suite.Context(), bson.D{})
1559+
var docs []bson.D
1560+
suite.Require().NoError(err)
1561+
suite.Require().NoError(cursor.All(suite.Context(), &docs))
1562+
return len(docs) > 0
1563+
}, 1*time.Minute, 100*time.Millisecond)
1564+
}
1565+
15561566
func (suite *IntegrationTestSuite) TestChangesOnDstBeforeSrc() {
1567+
zerolog.SetGlobalLevel(zerolog.TraceLevel)
1568+
defer zerolog.SetGlobalLevel(zerolog.DebugLevel)
1569+
15571570
ctx := suite.Context()
1571+
15581572
collName := "mycoll"
15591573

15601574
srcDB := suite.srcMongoClient.Database(suite.DBNameForTest())
@@ -1574,7 +1588,9 @@ func (suite *IntegrationTestSuite) TestChangesOnDstBeforeSrc() {
15741588
suite.Require().NoError(err)
15751589
_, err = dstDB.Collection(collName).InsertOne(ctx, bson.D{{"_id", 2}})
15761590
suite.Require().NoError(err)
1591+
15771592
suite.Require().NoError(runner.AwaitGenerationEnd())
1593+
suite.waitForRecheckDocs(verifier)
15781594

15791595
// Run generation 2 and get verification status.
15801596
suite.Require().NoError(runner.StartNextGeneration())
@@ -1591,6 +1607,8 @@ func (suite *IntegrationTestSuite) TestChangesOnDstBeforeSrc() {
15911607
_, err = srcDB.Collection(collName).InsertOne(ctx, bson.D{{"_id", 1}})
15921608
suite.Require().NoError(err)
15931609
suite.Require().NoError(runner.AwaitGenerationEnd())
1610+
suite.waitForRecheckDocs(verifier)
1611+
15941612
status, err = verifier.GetVerificationStatus(ctx)
15951613
suite.Require().NoError(err)
15961614
suite.Assert().Equal(
@@ -1603,6 +1621,7 @@ func (suite *IntegrationTestSuite) TestChangesOnDstBeforeSrc() {
16031621
_, err = srcDB.Collection(collName).InsertOne(ctx, bson.D{{"_id", 2}})
16041622
suite.Require().NoError(err)
16051623
suite.Require().NoError(runner.AwaitGenerationEnd())
1624+
suite.waitForRecheckDocs(verifier)
16061625

16071626
// Everything should match by the end of it.
16081627
status, err = verifier.GetVerificationStatus(ctx)

0 commit comments

Comments
 (0)