diff --git a/internal/verifier/change_stream.go b/internal/verifier/change_stream.go index 113357c3..8821b603 100644 --- a/internal/verifier/change_stream.go +++ b/internal/verifier/change_stream.go @@ -47,30 +47,50 @@ func (uee UnknownEventError) Error() string { return fmt.Sprintf("Received event with unknown optype: %+v", uee.Event) } -// HandleChangeStreamEvent performs the necessary work for change stream events that occur during -// operation. -func (verifier *Verifier) HandleChangeStreamEvent(ctx context.Context, changeEvent *ParsedEvent) error { - if changeEvent.ClusterTime != nil && - (verifier.lastChangeEventTime == nil || - verifier.lastChangeEventTime.Compare(*changeEvent.ClusterTime) < 0) { - verifier.lastChangeEventTime = changeEvent.ClusterTime +// HandleChangeStreamEvents performs the necessary work for change stream events after receiving a batch. +func (verifier *Verifier) HandleChangeStreamEvents(ctx context.Context, batch []ParsedEvent) error { + if len(batch) == 0 { + return nil } - switch changeEvent.OpType { - case "delete": - fallthrough - case "insert": - fallthrough - case "replace": - fallthrough - case "update": - if err := verifier.eventRecorder.AddEvent(changeEvent); err != nil { - return errors.Wrapf(err, "failed to augment stats with change event: %+v", *changeEvent) - } - return verifier.InsertChangeEventRecheckDoc(ctx, changeEvent) - default: - return UnknownEventError{Event: changeEvent} + dbNames := make([]string, len(batch)) + collNames := make([]string, len(batch)) + docIDs := make([]interface{}, len(batch)) + dataSizes := make([]int, len(batch)) + + for i, changeEvent := range batch { + if changeEvent.ClusterTime != nil && + (verifier.lastChangeEventTime == nil || + verifier.lastChangeEventTime.Compare(*changeEvent.ClusterTime) < 0) { + verifier.lastChangeEventTime = changeEvent.ClusterTime + } + switch changeEvent.OpType { + case "delete": + fallthrough + case "insert": + fallthrough + case "replace": + fallthrough + case "update": + if err := verifier.eventRecorder.AddEvent(&changeEvent); err != nil { + return errors.Wrapf(err, "failed to augment stats with change event: %+v", changeEvent) + } + dbNames[i] = changeEvent.Ns.DB + collNames[i] = changeEvent.Ns.Coll + docIDs[i] = changeEvent.DocKey.ID + + // We don't know the document sizes for documents for all change events, + // so just be conservative and assume they are maximum size. + // + // Note that this prevents us from being able to report a meaningful + // total data size for noninitial generations in the log. + dataSizes[i] = maxBSONObjSize + default: + return UnknownEventError{Event: &changeEvent} + } } + + return verifier.insertRecheckDocs(ctx, dbNames, collNames, docIDs, dataSizes) } func (verifier *Verifier) GetChangeStreamFilter() []bson.D { @@ -102,20 +122,37 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha return err } - readOneChangeEvent := func() (bool, error) { - gotEvent := cs.TryNext(ctx) - if gotEvent { - var changeEvent ParsedEvent - if err := cs.Decode(&changeEvent); err != nil { + readAndHandleOneChangeEventBatch := func() (bool, error) { + eventsRead := 0 + var changeEventBatch []ParsedEvent + + for hasEventInBatch := true; hasEventInBatch; hasEventInBatch = cs.RemainingBatchLength() > 0 { + gotEvent := cs.TryNext(ctx) + + if !gotEvent || cs.Err() != nil { + break + } + + if changeEventBatch == nil { + changeEventBatch = make([]ParsedEvent, cs.RemainingBatchLength()+1) + } + + if err := cs.Decode(&changeEventBatch[eventsRead]); err != nil { return false, errors.Wrap(err, "failed to decode change event") } - err := verifier.HandleChangeStreamEvent(ctx, &changeEvent) + + eventsRead++ + } + + if eventsRead > 0 { + verifier.logger.Debug().Int("eventsCount", eventsRead).Msgf("Received a batch of events.") + err := verifier.HandleChangeStreamEvents(ctx, changeEventBatch) if err != nil { - return false, errors.Wrap(err, "failed to handle change event") + return false, errors.Wrap(err, "failed to handle change events") } } - return gotEvent, errors.Wrap(cs.Err(), "change stream iteration failed") + return eventsRead > 0, errors.Wrap(cs.Err(), "change stream iteration failed") } for { @@ -141,7 +178,7 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha // (i.e., the `getMore` call returns empty) for { var gotEvent bool - gotEvent, err = readOneChangeEvent() + gotEvent, err = readAndHandleOneChangeEventBatch() if !gotEvent || err != nil { break @@ -149,7 +186,7 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha } default: - _, err = readOneChangeEvent() + _, err = readAndHandleOneChangeEventBatch() } if err == nil { diff --git a/internal/verifier/change_stream_test.go b/internal/verifier/change_stream_test.go index 6412eb12..7170129b 100644 --- a/internal/verifier/change_stream_test.go +++ b/internal/verifier/change_stream_test.go @@ -198,3 +198,31 @@ func (suite *MultiSourceVersionTestSuite) TestNoStartAtTime() { suite.Require().NotNil(verifier.srcStartAtTs) suite.Require().LessOrEqual(origStartTs.Compare(*verifier.srcStartAtTs), 0) } + +func (suite *MultiSourceVersionTestSuite) TestWithChangeEventsBatching() { + verifier := buildVerifier(suite.T(), suite.srcMongoInstance, suite.dstMongoInstance, suite.metaMongoInstance) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + suite.Require().NoError(verifier.StartChangeStream(ctx)) + + _, err := suite.srcMongoClient.Database("testDb").Collection("testColl1").InsertOne(ctx, bson.D{{"_id", 1}}) + suite.Require().NoError(err) + _, err = suite.srcMongoClient.Database("testDb").Collection("testColl1").InsertOne(ctx, bson.D{{"_id", 2}}) + suite.Require().NoError(err) + + _, err = suite.srcMongoClient.Database("testDb").Collection("testColl2").InsertOne(ctx, bson.D{{"_id", 1}}) + suite.Require().NoError(err) + + var rechecks []bson.M + require.Eventually( + suite.T(), + func() bool { + rechecks = suite.fetchVerifierRechecks(ctx, verifier) + return len(rechecks) == 3 + }, + time.Minute, + 500*time.Millisecond, + "the verifier should flush a recheck doc after a batch", + ) +} diff --git a/internal/verifier/migration_verifier_test.go b/internal/verifier/migration_verifier_test.go index 5a9417f6..eab99d4b 100644 --- a/internal/verifier/migration_verifier_test.go +++ b/internal/verifier/migration_verifier_test.go @@ -18,6 +18,7 @@ import ( "github.com/10gen/migration-verifier/internal/testutil" "github.com/cespare/permute/v2" "github.com/rs/zerolog" + "github.com/rs/zerolog/log" "github.com/samber/lo" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -26,6 +27,7 @@ import ( "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" + "golang.org/x/sync/errgroup" ) var macArmMongoVersions []string = []string{ @@ -228,34 +230,32 @@ func (suite *MultiMetaVersionTestSuite) TestGetNamespaceStatistics_Recheck() { ctx := context.Background() verifier := buildVerifier(suite.T(), suite.srcMongoInstance, suite.dstMongoInstance, suite.metaMongoInstance) - suite.Require().NoError( - verifier.InsertChangeEventRecheckDoc( - ctx, - &ParsedEvent{ - OpType: "insert", - Ns: &Namespace{DB: "mydb", Coll: "coll2"}, - DocKey: DocKey{ - ID: "heyhey", - }, + err := verifier.HandleChangeStreamEvents( + ctx, + []ParsedEvent{{ + OpType: "insert", + Ns: &Namespace{DB: "mydb", Coll: "coll2"}, + DocKey: DocKey{ + ID: "heyhey", }, - ), + }}, ) + suite.Require().NoError(err) - suite.Require().NoError( - verifier.InsertChangeEventRecheckDoc( - ctx, - &ParsedEvent{ - ID: bson.M{ - "docID": "ID/docID", - }, - OpType: "insert", - Ns: &Namespace{DB: "mydb", Coll: "coll1"}, - DocKey: DocKey{ - ID: "hoohoo", - }, + err = verifier.HandleChangeStreamEvents( + ctx, + []ParsedEvent{{ + ID: bson.M{ + "docID": "ID/docID", }, - ), + OpType: "insert", + Ns: &Namespace{DB: "mydb", Coll: "coll1"}, + DocKey: DocKey{ + ID: "hoohoo", + }, + }}, ) + suite.Require().NoError(err) verifier.generation++ @@ -494,19 +494,20 @@ func (suite *MultiMetaVersionTestSuite) TestFailedVerificationTaskInsertions() { Coll: "bar2", }, } - err = verifier.HandleChangeStreamEvent(ctx, &event) + + err = verifier.HandleChangeStreamEvents(ctx, []ParsedEvent{event}) suite.Require().NoError(err) event.OpType = "insert" - err = verifier.HandleChangeStreamEvent(ctx, &event) + err = verifier.HandleChangeStreamEvents(ctx, []ParsedEvent{event}) suite.Require().NoError(err) event.OpType = "replace" - err = verifier.HandleChangeStreamEvent(ctx, &event) + err = verifier.HandleChangeStreamEvents(ctx, []ParsedEvent{event}) suite.Require().NoError(err) event.OpType = "update" - err = verifier.HandleChangeStreamEvent(ctx, &event) + err = verifier.HandleChangeStreamEvents(ctx, []ParsedEvent{event}) suite.Require().NoError(err) event.OpType = "flibbity" - err = verifier.HandleChangeStreamEvent(ctx, &event) + err = verifier.HandleChangeStreamEvents(ctx, []ParsedEvent{event}) badEventErr := UnknownEventError{} suite.Require().ErrorAs(err, &badEventErr) suite.Assert().Equal("flibbity", badEventErr.Event.OpType) @@ -1363,7 +1364,7 @@ func (suite *MultiDataVersionTestSuite) TestVerificationStatus() { } func (suite *MultiDataVersionTestSuite) TestGenerationalRechecking() { - zerolog.SetGlobalLevel(zerolog.InfoLevel) + zerolog.SetGlobalLevel(zerolog.DebugLevel) verifier := buildVerifier(suite.T(), suite.srcMongoInstance, suite.dstMongoInstance, suite.metaMongoInstance) verifier.SetSrcNamespaces([]string{"testDb1.testColl1"}) verifier.SetDstNamespaces([]string{"testDb2.testColl3"}) @@ -1382,10 +1383,16 @@ func (suite *MultiDataVersionTestSuite) TestGenerationalRechecking() { checkDoneChan := make(chan struct{}) checkContinueChan := make(chan struct{}) - go func() { - err := verifier.CheckDriver(ctx, nil, checkDoneChan, checkContinueChan) - suite.Require().NoError(err) - }() + + errGroup, errGrpCtx := errgroup.WithContext(context.Background()) + errGroup.Go(func() error { + checkDriverErr := verifier.CheckDriver(errGrpCtx, nil, checkDoneChan, checkContinueChan) + // Log this as fatal error so that the test doesn't hang. + if checkDriverErr != nil { + log.Fatal().Err(checkDriverErr).Msg("check driver error") + } + return checkDriverErr + }) waitForTasks := func() *VerificationStatus { status, err := verifier.GetVerificationStatus() @@ -1459,6 +1466,9 @@ func (suite *MultiDataVersionTestSuite) TestGenerationalRechecking() { suite.Require().NoError(err) // there should be a failure from the src insert suite.Require().Equal(VerificationStatus{TotalTasks: 1, FailedTasks: 1}, *status) + + checkContinueChan <- struct{}{} + require.NoError(suite.T(), errGroup.Wait()) } func (suite *MultiDataVersionTestSuite) TestVerifierWithFilter() { diff --git a/internal/verifier/recheck.go b/internal/verifier/recheck.go index a6208b03..b7143d7a 100644 --- a/internal/verifier/recheck.go +++ b/internal/verifier/recheck.go @@ -4,7 +4,6 @@ import ( "context" "github.com/10gen/migration-verifier/internal/types" - "github.com/pkg/errors" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" @@ -36,37 +35,31 @@ func (verifier *Verifier) InsertFailedCompareRecheckDocs( namespace string, documentIDs []interface{}, dataSizes []int) error { dbName, collName := SplitNamespace(namespace) - verifier.mux.Lock() - defer verifier.mux.Unlock() - - return verifier.insertRecheckDocsUnderLock(context.Background(), - dbName, collName, documentIDs, dataSizes) -} - -func (verifier *Verifier) InsertChangeEventRecheckDoc(ctx context.Context, changeEvent *ParsedEvent) error { - documentIDs := []interface{}{changeEvent.DocKey.ID} - - // We don't know the document sizes for documents for all change events, - // so just be conservative and assume they are maximum size. - // - // Note that this prevents us from being able to report a meaningful - // total data size for noninitial generations in the log. - dataSizes := []int{maxBSONObjSize} - - verifier.mux.Lock() - defer verifier.mux.Unlock() - - if err := verifier.eventRecorder.AddEvent(changeEvent); err != nil { - return errors.Wrapf(err, "failed to augment stats with change event: %+v", *changeEvent) + dbNames := make([]string, len(documentIDs)) + collNames := make([]string, len(documentIDs)) + for i := range documentIDs { + dbNames[i] = dbName + collNames[i] = collName } - return verifier.insertRecheckDocsUnderLock( - ctx, changeEvent.Ns.DB, changeEvent.Ns.Coll, documentIDs, dataSizes) + return verifier.insertRecheckDocs( + context.Background(), + dbNames, + collNames, + documentIDs, + dataSizes, + ) } -func (verifier *Verifier) insertRecheckDocsUnderLock( +func (verifier *Verifier) insertRecheckDocs( ctx context.Context, - dbName, collName string, documentIDs []interface{}, dataSizes []int) error { + dbNames []string, + collNames []string, + documentIDs []interface{}, + dataSizes []int, +) error { + verifier.mux.Lock() + defer verifier.mux.Unlock() generation, _ := verifier.getGenerationWhileLocked() @@ -74,8 +67,8 @@ func (verifier *Verifier) insertRecheckDocsUnderLock( for i, documentID := range documentIDs { pk := RecheckPrimaryKey{ Generation: generation, - DatabaseName: dbName, - CollectionName: collName, + DatabaseName: dbNames[i], + CollectionName: collNames[i], DocumentID: documentID, } diff --git a/internal/verifier/recheck_test.go b/internal/verifier/recheck_test.go index 6eb6ac78..fe4ff0ed 100644 --- a/internal/verifier/recheck_test.go +++ b/internal/verifier/recheck_test.go @@ -50,10 +50,8 @@ func (suite *MultiMetaVersionTestSuite) TestFailedCompareThenReplace() { }, } - suite.Require().NoError( - verifier.InsertChangeEventRecheckDoc(ctx, &event), - "insert change event recheck", - ) + err := verifier.HandleChangeStreamEvents(ctx, []ParsedEvent{event}) + suite.Require().NoError(err) recheckDocs = suite.fetchRecheckDocs(ctx, verifier) suite.Assert().Equal( @@ -335,8 +333,13 @@ func insertRecheckDocs( documentIDs []any, dataSizes []int, ) error { - verifier.mux.Lock() - defer verifier.mux.Unlock() + dbNames := make([]string, len(documentIDs)) + collNames := make([]string, len(documentIDs)) + + for i := range documentIDs { + dbNames[i] = dbName + collNames[i] = collName + } - return verifier.insertRecheckDocsUnderLock(ctx, dbName, collName, documentIDs, dataSizes) + return verifier.insertRecheckDocs(ctx, dbNames, collNames, documentIDs, dataSizes) }