From 7a11e3e9b24214f8c8153399968287d393e1e535 Mon Sep 17 00:00:00 2001 From: Jian Guan <61915096+tdq45gj@users.noreply.github.com> Date: Thu, 7 Nov 2024 11:38:08 -0500 Subject: [PATCH 01/12] buffer by change stream batch --- internal/verifier/change_stream.go | 66 ++++++++++++++++--- internal/verifier/change_stream_test.go | 10 +-- internal/verifier/check.go | 2 +- internal/verifier/migration_verifier.go | 3 + internal/verifier/migration_verifier_test.go | 67 ++++++++++---------- internal/verifier/recheck.go | 19 ++---- internal/verifier/recheck_test.go | 5 +- 7 files changed, 106 insertions(+), 66 deletions(-) diff --git a/internal/verifier/change_stream.go b/internal/verifier/change_stream.go index f7ab8e97..ef3c757f 100644 --- a/internal/verifier/change_stream.go +++ b/internal/verifier/change_stream.go @@ -39,6 +39,9 @@ const ( metadataChangeStreamCollectionName = "changeStream" ) +// ChangeEventRecheckBuffer buffers change events recheck docs in memory as a map of namespace -> _ids. +type ChangeEventRecheckBuffer map[string][]interface{} + type UnknownEventError struct { Event *ParsedEvent } @@ -49,7 +52,7 @@ func (uee UnknownEventError) Error() string { // HandleChangeStreamEvent performs the necessary work for change stream events that occur during // operation. -func (verifier *Verifier) HandleChangeStreamEvent(ctx context.Context, changeEvent *ParsedEvent) error { +func (verifier *Verifier) HandleChangeStreamEvent(changeEvent *ParsedEvent) error { if changeEvent.ClusterTime != nil && (verifier.lastChangeEventTime == nil || verifier.lastChangeEventTime.Compare(*changeEvent.ClusterTime) < 0) { @@ -63,7 +66,8 @@ func (verifier *Verifier) HandleChangeStreamEvent(ctx context.Context, changeEve case "replace": fallthrough case "update": - return verifier.InsertChangeEventRecheckDoc(ctx, changeEvent) + verifier.changeEventRecheckBuf[changeEvent.Ns.String()] = append(verifier.changeEventRecheckBuf[changeEvent.Ns.String()], changeEvent.DocKey.ID) + return nil default: return UnknownEventError{Event: changeEvent} } @@ -100,19 +104,30 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha return err } - readOneChangeEvent := func() (bool, error) { - gotEvent := cs.TryNext(ctx) - if gotEvent { + readAndHandleOneChangeEventBatch := func() (bool, error) { + eventsRead := 0 + for hasEventInBatch := true; hasEventInBatch; hasEventInBatch = cs.RemainingBatchLength() > 0 { + gotEvent := cs.TryNext(ctx) + if !gotEvent { + break + } + if err := cs.Decode(&changeEvent); err != nil { return false, errors.Wrap(err, "failed to decode change event") } - err := verifier.HandleChangeStreamEvent(ctx, &changeEvent) + err := verifier.HandleChangeStreamEvent(&changeEvent) if err != nil { return false, errors.Wrap(err, "failed to handle change event") } + + eventsRead++ } - return gotEvent, errors.Wrap(cs.Err(), "change stream iteration failed") + if err := verifier.flushAllBufferedChangeEventRechecks(ctx); err != nil { + return false, errors.Wrap(err, "failed to flush buffered change event rechecks") + } + + return eventsRead > 0, errors.Wrap(cs.Err(), "change stream iteration failed") } for { @@ -136,7 +151,7 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha // Read all change events until the source reports no events. // (i.e., the `getMore` call returns empty) for { - gotEvent, err = readOneChangeEvent() + gotEvent, err = readAndHandleOneChangeEventBatch() if !gotEvent || err != nil { break @@ -148,7 +163,7 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha } default: - _, err = readOneChangeEvent() + _, err = readAndHandleOneChangeEventBatch() } if err == nil { @@ -177,10 +192,14 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha } // StartChangeStream starts the change stream. -func (verifier *Verifier) StartChangeStream(ctx context.Context) error { +func (verifier *Verifier) StartChangeStream(ctx context.Context, batchSize *int32) error { pipeline := verifier.GetChangeStreamFilter() opts := options.ChangeStream().SetMaxAwaitTime(1 * time.Second) + if batchSize != nil { + opts = opts.SetBatchSize(*batchSize) + } + savedResumeToken, err := verifier.loadChangeStreamResumeToken(ctx) if err != nil { return errors.Wrap(err, "failed to load persisted change stream resume token") @@ -340,3 +359,30 @@ func getClusterTimeFromSession(sess mongo.Session) (primitive.Timestamp, error) return ctStruct.ClusterTime.ClusterTime, nil } + +func (verifier *Verifier) flushAllBufferedChangeEventRechecks(ctx context.Context) error { + for namespace, ids := range verifier.changeEventRecheckBuf { + if len(ids) == 0 { + return nil + } + + // We don't know the document sizes for documents for all change events, + // so just be conservative and assume they are maximum size. + // + // Note that this prevents us from being able to report a meaningful + // total data size for noninitial generations in the log. + dataSizes := make([]int, len(ids)) + for i, _ := range ids { + dataSizes[i] = maxBSONObjSize + } + + dbName, collName := SplitNamespace(namespace) + if err := verifier.insertRecheckDocs(ctx, dbName, collName, ids, dataSizes); err != nil { + return errors.Wrapf(err, "failed to insert recheck docs for namespace %s", namespace) + } + + delete(verifier.changeEventRecheckBuf, namespace) + } + + return nil +} diff --git a/internal/verifier/change_stream_test.go b/internal/verifier/change_stream_test.go index 6412eb12..79f69efd 100644 --- a/internal/verifier/change_stream_test.go +++ b/internal/verifier/change_stream_test.go @@ -38,7 +38,7 @@ func (suite *MultiSourceVersionTestSuite) TestChangeStreamResumability() { verifier1 := buildVerifier(suite.T(), suite.srcMongoInstance, suite.dstMongoInstance, suite.metaMongoInstance) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - err := verifier1.StartChangeStream(ctx) + err := verifier1.StartChangeStream(ctx, nil) suite.Require().NoError(err) }() @@ -63,7 +63,7 @@ func (suite *MultiSourceVersionTestSuite) TestChangeStreamResumability() { newTime := suite.getClusterTime(ctx, suite.srcMongoClient) - err = verifier2.StartChangeStream(ctx) + err = verifier2.StartChangeStream(ctx, nil) suite.Require().NoError(err) suite.Require().NotNil(verifier2.srcStartAtTs) @@ -138,7 +138,7 @@ func (suite *MultiSourceVersionTestSuite) TestStartAtTimeNoChanges() { suite.Require().NoError(err) origStartTs := sess.OperationTime() suite.Require().NotNil(origStartTs) - err = verifier.StartChangeStream(ctx) + err = verifier.StartChangeStream(ctx, nil) suite.Require().NoError(err) suite.Require().Equal(verifier.srcStartAtTs, origStartTs) verifier.changeStreamEnderChan <- struct{}{} @@ -158,7 +158,7 @@ func (suite *MultiSourceVersionTestSuite) TestStartAtTimeWithChanges() { suite.Require().NoError(err) origStartTs := sess.OperationTime() suite.Require().NotNil(origStartTs) - err = verifier.StartChangeStream(ctx) + err = verifier.StartChangeStream(ctx, nil) suite.Require().NoError(err) suite.Require().Equal(verifier.srcStartAtTs, origStartTs) _, err = suite.srcMongoClient.Database("testDb").Collection("testColl").InsertOne( @@ -193,7 +193,7 @@ func (suite *MultiSourceVersionTestSuite) TestNoStartAtTime() { suite.Require().NoError(err) origStartTs := sess.OperationTime() suite.Require().NotNil(origStartTs) - err = verifier.StartChangeStream(ctx) + err = verifier.StartChangeStream(ctx, nil) suite.Require().NoError(err) suite.Require().NotNil(verifier.srcStartAtTs) suite.Require().LessOrEqual(origStartTs.Compare(*verifier.srcStartAtTs), 0) diff --git a/internal/verifier/check.go b/internal/verifier/check.go index fa5b2fd0..3ac6bb02 100644 --- a/internal/verifier/check.go +++ b/internal/verifier/check.go @@ -162,7 +162,7 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter map[string]any if !csRunning { verifier.logger.Debug().Msg("Change stream not running; starting change stream") - err = verifier.StartChangeStream(ctx) + err = verifier.StartChangeStream(ctx, nil) if err != nil { return errors.Wrap(err, "failed to start change stream on source") } diff --git a/internal/verifier/migration_verifier.go b/internal/verifier/migration_verifier.go index 47259eb4..794e8727 100644 --- a/internal/verifier/migration_verifier.go +++ b/internal/verifier/migration_verifier.go @@ -138,6 +138,8 @@ type Verifier struct { globalFilter map[string]any pprofInterval time.Duration + + changeEventRecheckBuf ChangeEventRecheckBuffer } // VerificationStatus holds the Verification Status @@ -197,6 +199,7 @@ func NewVerifier(settings VerifierSettings) *Verifier { changeStreamErrChan: make(chan error), changeStreamDoneChan: make(chan struct{}), readConcernSetting: readConcern, + changeEventRecheckBuf: make(ChangeEventRecheckBuffer), } } diff --git a/internal/verifier/migration_verifier_test.go b/internal/verifier/migration_verifier_test.go index 82f4eac8..9c7ed19a 100644 --- a/internal/verifier/migration_verifier_test.go +++ b/internal/verifier/migration_verifier_test.go @@ -231,33 +231,31 @@ func (suite *MultiMetaVersionTestSuite) TestGetNamespaceStatistics_Recheck() { ctx := context.Background() verifier := buildVerifier(suite.T(), suite.srcMongoInstance, suite.dstMongoInstance, suite.metaMongoInstance) - suite.Require().NoError( - verifier.InsertChangeEventRecheckDoc( - ctx, - &ParsedEvent{ - OpType: "insert", - Ns: &Namespace{DB: "mydb", Coll: "coll2"}, - DocKey: DocKey{ - ID: "heyhey", - }, + suite.handleAndFlushChangeEvent( + ctx, + verifier, + ParsedEvent{ + OpType: "insert", + Ns: &Namespace{DB: "mydb", Coll: "coll2"}, + DocKey: DocKey{ + ID: "heyhey", }, - ), + }, ) - suite.Require().NoError( - verifier.InsertChangeEventRecheckDoc( - ctx, - &ParsedEvent{ - ID: bson.M{ - "docID": "ID/docID", - }, - OpType: "insert", - Ns: &Namespace{DB: "mydb", Coll: "coll1"}, - DocKey: DocKey{ - ID: "hoohoo", - }, + suite.handleAndFlushChangeEvent( + ctx, + verifier, + ParsedEvent{ + ID: bson.M{ + "docID": "ID/docID", + }, + OpType: "insert", + Ns: &Namespace{DB: "mydb", Coll: "coll1"}, + DocKey: DocKey{ + ID: "hoohoo", }, - ), + }, ) verifier.generation++ @@ -480,6 +478,14 @@ func (suite *MultiMetaVersionTestSuite) TestGetNamespaceStatistics_Gen0() { ) } +func (suite *MultiMetaVersionTestSuite) handleAndFlushChangeEvent(ctx context.Context, verifier *Verifier, event ParsedEvent) { + err := verifier.HandleChangeStreamEvent(&event) + suite.Require().NoError(err) + + err = verifier.flushAllBufferedChangeEventRechecks(ctx) + suite.Require().NoError(err) +} + func (suite *MultiMetaVersionTestSuite) TestFailedVerificationTaskInsertions() { ctx := context.Background() verifier := buildVerifier(suite.T(), suite.srcMongoInstance, suite.dstMongoInstance, suite.metaMongoInstance) @@ -497,19 +503,16 @@ func (suite *MultiMetaVersionTestSuite) TestFailedVerificationTaskInsertions() { Coll: "bar2", }, } - err = verifier.HandleChangeStreamEvent(ctx, &event) - suite.Require().NoError(err) + + suite.handleAndFlushChangeEvent(ctx, verifier, event) event.OpType = "insert" - err = verifier.HandleChangeStreamEvent(ctx, &event) - suite.Require().NoError(err) + suite.handleAndFlushChangeEvent(ctx, verifier, event) event.OpType = "replace" - err = verifier.HandleChangeStreamEvent(ctx, &event) - suite.Require().NoError(err) + suite.handleAndFlushChangeEvent(ctx, verifier, event) event.OpType = "update" - err = verifier.HandleChangeStreamEvent(ctx, &event) - suite.Require().NoError(err) + suite.handleAndFlushChangeEvent(ctx, verifier, event) event.OpType = "flibbity" - err = verifier.HandleChangeStreamEvent(ctx, &event) + err = verifier.HandleChangeStreamEvent(&event) badEventErr := UnknownEventError{} suite.Require().ErrorAs(err, &badEventErr) suite.Assert().Equal("flibbity", badEventErr.Event.OpType) diff --git a/internal/verifier/recheck.go b/internal/verifier/recheck.go index c9ba7f8a..204415f9 100644 --- a/internal/verifier/recheck.go +++ b/internal/verifier/recheck.go @@ -38,20 +38,6 @@ func (verifier *Verifier) InsertFailedCompareRecheckDocs( dbName, collName, documentIDs, dataSizes) } -func (verifier *Verifier) InsertChangeEventRecheckDoc(ctx context.Context, changeEvent *ParsedEvent) error { - documentIDs := []interface{}{changeEvent.DocKey.ID} - - // We don't know the document sizes for documents for all change events, - // so just be conservative and assume they are maximum size. - // - // Note that this prevents us from being able to report a meaningful - // total data size for noninitial generations in the log. - dataSizes := []int{maxBSONObjSize} - - return verifier.insertRecheckDocs( - ctx, changeEvent.Ns.DB, changeEvent.Ns.Coll, documentIDs, dataSizes) -} - func (verifier *Verifier) insertRecheckDocs( ctx context.Context, dbName, collName string, documentIDs []interface{}, dataSizes []int) error { @@ -91,6 +77,11 @@ func (verifier *Verifier) insertRecheckDocs( verifier.logger.Debug().Msgf("Persisted %d recheck doc(s) for generation %d", len(models), generation) } + // Silence any duplicate key errors as recheck docs should have existed. + if mongo.IsDuplicateKeyError(err) { + err = nil + } + return err } diff --git a/internal/verifier/recheck_test.go b/internal/verifier/recheck_test.go index b3bcc9c3..0f11c923 100644 --- a/internal/verifier/recheck_test.go +++ b/internal/verifier/recheck_test.go @@ -49,10 +49,7 @@ func (suite *MultiMetaVersionTestSuite) TestFailedCompareThenReplace() { }, } - suite.Require().NoError( - verifier.InsertChangeEventRecheckDoc(ctx, &event), - "insert change event recheck", - ) + suite.handleAndFlushChangeEvent(ctx, verifier, event) recheckDocs = suite.fetchRecheckDocs(ctx, verifier) suite.Assert().Equal( From 6dfbc64743703b0065569073595b650da98c3adc Mon Sep 17 00:00:00 2001 From: Jian Guan <61915096+tdq45gj@users.noreply.github.com> Date: Thu, 7 Nov 2024 14:59:32 -0500 Subject: [PATCH 02/12] fix test --- internal/verifier/change_stream.go | 7 ++-- internal/verifier/change_stream_test.go | 36 ++++++++++++++++++++ internal/verifier/migration_verifier_test.go | 21 +++++++++--- 3 files changed, 57 insertions(+), 7 deletions(-) diff --git a/internal/verifier/change_stream.go b/internal/verifier/change_stream.go index ef3c757f..1018a302 100644 --- a/internal/verifier/change_stream.go +++ b/internal/verifier/change_stream.go @@ -66,7 +66,8 @@ func (verifier *Verifier) HandleChangeStreamEvent(changeEvent *ParsedEvent) erro case "replace": fallthrough case "update": - verifier.changeEventRecheckBuf[changeEvent.Ns.String()] = append(verifier.changeEventRecheckBuf[changeEvent.Ns.String()], changeEvent.DocKey.ID) + namespace := fmt.Sprintf("%s.%s", changeEvent.Ns.DB, changeEvent.Ns.Coll) + verifier.changeEventRecheckBuf[namespace] = append(verifier.changeEventRecheckBuf[changeEvent.Ns.String()], changeEvent.DocKey.ID) return nil default: return UnknownEventError{Event: changeEvent} @@ -123,6 +124,8 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha eventsRead++ } + verifier.logger.Debug().Msgf("Received a batch of %d events", eventsRead) + if err := verifier.flushAllBufferedChangeEventRechecks(ctx); err != nil { return false, errors.Wrap(err, "failed to flush buffered change event rechecks") } @@ -372,7 +375,7 @@ func (verifier *Verifier) flushAllBufferedChangeEventRechecks(ctx context.Contex // Note that this prevents us from being able to report a meaningful // total data size for noninitial generations in the log. dataSizes := make([]int, len(ids)) - for i, _ := range ids { + for i := 0; i < len(ids); i++ { dataSizes[i] = maxBSONObjSize } diff --git a/internal/verifier/change_stream_test.go b/internal/verifier/change_stream_test.go index 79f69efd..653a2004 100644 --- a/internal/verifier/change_stream_test.go +++ b/internal/verifier/change_stream_test.go @@ -198,3 +198,39 @@ func (suite *MultiSourceVersionTestSuite) TestNoStartAtTime() { suite.Require().NotNil(verifier.srcStartAtTs) suite.Require().LessOrEqual(origStartTs.Compare(*verifier.srcStartAtTs), 0) } + +func (suite *MultiSourceVersionTestSuite) TestWithChangeStreamBatching() { + verifier := buildVerifier(suite.T(), suite.srcMongoInstance, suite.dstMongoInstance, suite.metaMongoInstance) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + batchSize := int32(2) + verifier.StartChangeStream(ctx, &batchSize) + + _, err := suite.srcMongoClient.Database("testDb").Collection("testColl").InsertOne(ctx, bson.D{{"_id", 1}}) + suite.Require().NoError(err) + + require.Eventually( + suite.T(), + func() bool { + return len(verifier.changeEventRecheckBuf["testDB.testColl"]) == 1 + }, + time.Minute, + 500*time.Millisecond, + "the verifier should buffer a recheck doc", + ) + suite.Require().Empty(suite.fetchVerifierRechecks(ctx, verifier)) + + _, err = suite.srcMongoClient.Database("testDb").Collection("testColl").InsertOne(ctx, bson.D{{"_id", 2}}) + suite.Require().NoError(err) + + require.Eventually( + suite.T(), + func() bool { + return len(suite.fetchVerifierRechecks(ctx, verifier)) == 2 + }, + time.Minute, + 500*time.Millisecond, + "the verifier should flush a recheck doc after a batch", + ) +} diff --git a/internal/verifier/migration_verifier_test.go b/internal/verifier/migration_verifier_test.go index 9c7ed19a..dc719b0c 100644 --- a/internal/verifier/migration_verifier_test.go +++ b/internal/verifier/migration_verifier_test.go @@ -9,6 +9,8 @@ package verifier import ( "context" "fmt" + "github.com/rs/zerolog/log" + "golang.org/x/sync/errgroup" "math/rand" "regexp" "sort" @@ -1350,7 +1352,7 @@ func (suite *MultiDataVersionTestSuite) TestVerificationStatus() { } func (suite *MultiDataVersionTestSuite) TestGenerationalRechecking() { - zerolog.SetGlobalLevel(zerolog.InfoLevel) + zerolog.SetGlobalLevel(zerolog.DebugLevel) verifier := buildVerifier(suite.T(), suite.srcMongoInstance, suite.dstMongoInstance, suite.metaMongoInstance) verifier.SetSrcNamespaces([]string{"testDb1.testColl1"}) verifier.SetDstNamespaces([]string{"testDb2.testColl3"}) @@ -1369,10 +1371,16 @@ func (suite *MultiDataVersionTestSuite) TestGenerationalRechecking() { checkDoneChan := make(chan struct{}) checkContinueChan := make(chan struct{}) - go func() { - err := verifier.CheckDriver(ctx, nil, checkDoneChan, checkContinueChan) - suite.Require().NoError(err) - }() + + errGroup, errGrpCtx := errgroup.WithContext(context.Background()) + errGroup.Go(func() error { + checkDriverErr := verifier.CheckDriver(errGrpCtx, nil, checkDoneChan, checkContinueChan) + // Log this as fatal error so that the test doesn't hang. + if checkDriverErr != nil { + log.Fatal().Err(checkDriverErr).Msg("check driver error") + } + return checkDriverErr + }) waitForTasks := func() *VerificationStatus { status, err := verifier.GetVerificationStatus() @@ -1446,6 +1454,9 @@ func (suite *MultiDataVersionTestSuite) TestGenerationalRechecking() { suite.Require().NoError(err) // there should be a failure from the src insert suite.Require().Equal(VerificationStatus{TotalTasks: 1, FailedTasks: 1}, *status) + + checkContinueChan <- struct{}{} + require.NoError(suite.T(), errGroup.Wait()) } func (suite *MultiDataVersionTestSuite) TestVerifierWithFilter() { From b201a20793aa514ae91866db71beff0cd3518340 Mon Sep 17 00:00:00 2001 From: Jian Guan <61915096+tdq45gj@users.noreply.github.com> Date: Thu, 7 Nov 2024 15:04:43 -0500 Subject: [PATCH 03/12] Update recheck_test.go --- internal/verifier/recheck_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/verifier/recheck_test.go b/internal/verifier/recheck_test.go index 0f11c923..df84e276 100644 --- a/internal/verifier/recheck_test.go +++ b/internal/verifier/recheck_test.go @@ -47,6 +47,7 @@ func (suite *MultiMetaVersionTestSuite) TestFailedCompareThenReplace() { DB: "the", Coll: "namespace", }, + OpType: "update", } suite.handleAndFlushChangeEvent(ctx, verifier, event) From c4c7a6636864f62d6b3547189ddac769726133d1 Mon Sep 17 00:00:00 2001 From: Jian Guan <61915096+tdq45gj@users.noreply.github.com> Date: Thu, 7 Nov 2024 15:21:39 -0500 Subject: [PATCH 04/12] Update change_stream_test.go --- internal/verifier/change_stream_test.go | 24 +++++++++--------------- 1 file changed, 9 insertions(+), 15 deletions(-) diff --git a/internal/verifier/change_stream_test.go b/internal/verifier/change_stream_test.go index 653a2004..56408732 100644 --- a/internal/verifier/change_stream_test.go +++ b/internal/verifier/change_stream_test.go @@ -199,38 +199,32 @@ func (suite *MultiSourceVersionTestSuite) TestNoStartAtTime() { suite.Require().LessOrEqual(origStartTs.Compare(*verifier.srcStartAtTs), 0) } -func (suite *MultiSourceVersionTestSuite) TestWithChangeStreamBatching() { +func (suite *MultiSourceVersionTestSuite) TestWithChangeEventsBatching() { verifier := buildVerifier(suite.T(), suite.srcMongoInstance, suite.dstMongoInstance, suite.metaMongoInstance) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - batchSize := int32(2) + batchSize := int32(3) verifier.StartChangeStream(ctx, &batchSize) - _, err := suite.srcMongoClient.Database("testDb").Collection("testColl").InsertOne(ctx, bson.D{{"_id", 1}}) + _, err := suite.srcMongoClient.Database("testDb").Collection("testColl1").InsertOne(ctx, bson.D{{"_id", 1}}) + suite.Require().NoError(err) + _, err = suite.srcMongoClient.Database("testDb").Collection("testColl1").InsertOne(ctx, bson.D{{"_id", 2}}) suite.Require().NoError(err) - require.Eventually( - suite.T(), - func() bool { - return len(verifier.changeEventRecheckBuf["testDB.testColl"]) == 1 - }, - time.Minute, - 500*time.Millisecond, - "the verifier should buffer a recheck doc", - ) - suite.Require().Empty(suite.fetchVerifierRechecks(ctx, verifier)) - - _, err = suite.srcMongoClient.Database("testDb").Collection("testColl").InsertOne(ctx, bson.D{{"_id", 2}}) + _, err = suite.srcMongoClient.Database("testDb").Collection("testColl2").InsertOne(ctx, bson.D{{"_id", 1}}) suite.Require().NoError(err) require.Eventually( suite.T(), func() bool { + // There should be 2 recheck docs due to batching, one for each namespace. return len(suite.fetchVerifierRechecks(ctx, verifier)) == 2 }, time.Minute, 500*time.Millisecond, "the verifier should flush a recheck doc after a batch", ) + suite.Require().Empty(verifier.changeEventRecheckBuf["testDB.testColl1"]) + suite.Require().Empty(verifier.changeEventRecheckBuf["testDB.testColl2"]) } From 59effc417d07410cf5c14b39f999c29b38116cae Mon Sep 17 00:00:00 2001 From: Jian Guan <61915096+tdq45gj@users.noreply.github.com> Date: Thu, 7 Nov 2024 15:25:50 -0500 Subject: [PATCH 05/12] Update change_stream_test.go --- internal/verifier/change_stream_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/verifier/change_stream_test.go b/internal/verifier/change_stream_test.go index 56408732..d1a664b7 100644 --- a/internal/verifier/change_stream_test.go +++ b/internal/verifier/change_stream_test.go @@ -205,7 +205,7 @@ func (suite *MultiSourceVersionTestSuite) TestWithChangeEventsBatching() { defer cancel() batchSize := int32(3) - verifier.StartChangeStream(ctx, &batchSize) + suite.Require().NoError(verifier.StartChangeStream(ctx, &batchSize)) _, err := suite.srcMongoClient.Database("testDb").Collection("testColl1").InsertOne(ctx, bson.D{{"_id", 1}}) suite.Require().NoError(err) From 84b6df6499540ee695c82390ffb0cd48aa6eebcb Mon Sep 17 00:00:00 2001 From: Jian Guan <61915096+tdq45gj@users.noreply.github.com> Date: Thu, 7 Nov 2024 15:26:32 -0500 Subject: [PATCH 06/12] Update change_stream.go --- internal/verifier/change_stream.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/verifier/change_stream.go b/internal/verifier/change_stream.go index 1018a302..c86809d5 100644 --- a/internal/verifier/change_stream.go +++ b/internal/verifier/change_stream.go @@ -39,7 +39,7 @@ const ( metadataChangeStreamCollectionName = "changeStream" ) -// ChangeEventRecheckBuffer buffers change events recheck docs in memory as a map of namespace -> _ids. +// ChangeEventRecheckBuffer buffers change events recheck docs in memory as a map of namespace -> doc keys. type ChangeEventRecheckBuffer map[string][]interface{} type UnknownEventError struct { From 3c6383b5c9dfb04f8bf149b0ead96a3e200d6ec2 Mon Sep 17 00:00:00 2001 From: Jian Guan <61915096+tdq45gj@users.noreply.github.com> Date: Thu, 7 Nov 2024 16:02:14 -0500 Subject: [PATCH 07/12] Update change_stream_test.go --- internal/verifier/change_stream_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/verifier/change_stream_test.go b/internal/verifier/change_stream_test.go index d1a664b7..cfcd92ec 100644 --- a/internal/verifier/change_stream_test.go +++ b/internal/verifier/change_stream_test.go @@ -2,6 +2,7 @@ package verifier import ( "context" + "fmt" "testing" "time" @@ -218,8 +219,7 @@ func (suite *MultiSourceVersionTestSuite) TestWithChangeEventsBatching() { require.Eventually( suite.T(), func() bool { - // There should be 2 recheck docs due to batching, one for each namespace. - return len(suite.fetchVerifierRechecks(ctx, verifier)) == 2 + return len(suite.fetchVerifierRechecks(ctx, verifier)) == 3 }, time.Minute, 500*time.Millisecond, From 759a9bc2ac4a8d72fa89a6dadf58c909287a66df Mon Sep 17 00:00:00 2001 From: Jian Guan <61915096+tdq45gj@users.noreply.github.com> Date: Thu, 7 Nov 2024 16:03:49 -0500 Subject: [PATCH 08/12] Update change_stream_test.go --- internal/verifier/change_stream_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/internal/verifier/change_stream_test.go b/internal/verifier/change_stream_test.go index cfcd92ec..03045276 100644 --- a/internal/verifier/change_stream_test.go +++ b/internal/verifier/change_stream_test.go @@ -2,7 +2,6 @@ package verifier import ( "context" - "fmt" "testing" "time" From b5a170a2458530fa3e65185ee4d5667cf6022381 Mon Sep 17 00:00:00 2001 From: Jian Guan <61915096+tdq45gj@users.noreply.github.com> Date: Thu, 7 Nov 2024 16:07:17 -0500 Subject: [PATCH 09/12] Update migration_verifier_test.go --- internal/verifier/migration_verifier_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/verifier/migration_verifier_test.go b/internal/verifier/migration_verifier_test.go index dc719b0c..ff5f0fe0 100644 --- a/internal/verifier/migration_verifier_test.go +++ b/internal/verifier/migration_verifier_test.go @@ -9,8 +9,6 @@ package verifier import ( "context" "fmt" - "github.com/rs/zerolog/log" - "golang.org/x/sync/errgroup" "math/rand" "regexp" "sort" @@ -22,6 +20,7 @@ import ( "github.com/10gen/migration-verifier/internal/testutil" "github.com/cespare/permute/v2" "github.com/rs/zerolog" + "github.com/rs/zerolog/log" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" @@ -29,6 +28,7 @@ import ( "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" + "golang.org/x/sync/errgroup" ) var macArmMongoVersions []string = []string{ From 3ac0547c7d3fdf636ea86ac7a79778fed7265d38 Mon Sep 17 00:00:00 2001 From: Jian Guan <61915096+tdq45gj@users.noreply.github.com> Date: Fri, 8 Nov 2024 15:24:58 -0500 Subject: [PATCH 10/12] Felipe's review --- internal/verifier/change_stream.go | 125 +++++++++---------- internal/verifier/change_stream_test.go | 19 ++- internal/verifier/check.go | 2 +- internal/verifier/migration_verifier.go | 3 - internal/verifier/migration_verifier_test.go | 38 +++--- internal/verifier/recheck.go | 24 ++-- internal/verifier/recheck_test.go | 14 ++- 7 files changed, 109 insertions(+), 116 deletions(-) diff --git a/internal/verifier/change_stream.go b/internal/verifier/change_stream.go index e97d0dac..e4523605 100644 --- a/internal/verifier/change_stream.go +++ b/internal/verifier/change_stream.go @@ -39,9 +39,6 @@ const ( metadataChangeStreamCollectionName = "changeStream" ) -// ChangeEventRecheckBuffer buffers change events recheck docs in memory as a map of namespace -> doc keys. -type ChangeEventRecheckBuffer map[string][]interface{} - type UnknownEventError struct { Event *ParsedEvent } @@ -50,31 +47,53 @@ func (uee UnknownEventError) Error() string { return fmt.Sprintf("Received event with unknown optype: %+v", uee.Event) } -// HandleChangeStreamEvent performs the necessary work for change stream events that occur during -// operation. -func (verifier *Verifier) HandleChangeStreamEvent(changeEvent *ParsedEvent) error { - if changeEvent.ClusterTime != nil && - (verifier.lastChangeEventTime == nil || - verifier.lastChangeEventTime.Compare(*changeEvent.ClusterTime) < 0) { - verifier.lastChangeEventTime = changeEvent.ClusterTime +// HandleChangeStreamEvents performs the necessary work for change stream events after receiving a batch. +func (verifier *Verifier) HandleChangeStreamEvents(ctx context.Context, batch []ParsedEvent) error { + if len(batch) == 0 { + return nil } - switch changeEvent.OpType { - case "delete": - fallthrough - case "insert": - fallthrough - case "replace": - fallthrough - case "update": - if err := verifier.eventRecorder.AddEvent(changeEvent); err != nil { - return errors.Wrapf(err, "failed to augment stats with change event: %+v", *changeEvent) + + dbNames := make([]string, len(batch)) + collNames := make([]string, len(batch)) + docIDs := make([]interface{}, len(batch)) + dataSizes := make([]int, len(batch)) + + for i, changeEvent := range batch { + if changeEvent.ClusterTime != nil && + (verifier.lastChangeEventTime == nil || + verifier.lastChangeEventTime.Compare(*changeEvent.ClusterTime) < 0) { + verifier.lastChangeEventTime = changeEvent.ClusterTime + } + switch changeEvent.OpType { + case "delete": + fallthrough + case "insert": + fallthrough + case "replace": + fallthrough + case "update": + if err := verifier.eventRecorder.AddEvent(&changeEvent); err != nil { + return errors.Wrapf(err, "failed to augment stats with change event: %+v", changeEvent) + } + dbNames[i] = changeEvent.Ns.DB + collNames[i] = changeEvent.Ns.Coll + docIDs[i] = changeEvent.DocKey.ID + + // We don't know the document sizes for documents for all change events, + // so just be conservative and assume they are maximum size. + // + // Note that this prevents us from being able to report a meaningful + // total data size for noninitial generations in the log. + dataSizes[i] = maxBSONObjSize + default: + return UnknownEventError{Event: &changeEvent} } - namespace := fmt.Sprintf("%s.%s", changeEvent.Ns.DB, changeEvent.Ns.Coll) - verifier.changeEventRecheckBuf[namespace] = append(verifier.changeEventRecheckBuf[changeEvent.Ns.String()], changeEvent.DocKey.ID) - return nil - default: - return UnknownEventError{Event: changeEvent} } + + verifier.mux.Lock() + defer verifier.mux.Unlock() + + return verifier.insertRecheckDocsWhileLocked(ctx, dbNames, collNames, docIDs, dataSizes) } func (verifier *Verifier) GetChangeStreamFilter() []bson.D { @@ -108,30 +127,33 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha readAndHandleOneChangeEventBatch := func() (bool, error) { eventsRead := 0 + var changeEventBatch []ParsedEvent + for hasEventInBatch := true; hasEventInBatch; hasEventInBatch = cs.RemainingBatchLength() > 0 { gotEvent := cs.TryNext(ctx) - if !gotEvent { + if !gotEvent || cs.Err() != nil { break } - var changeEvent ParsedEvent + if changeEventBatch == nil { + changeEventBatch = make([]ParsedEvent, cs.RemainingBatchLength()+1) + } - if err := cs.Decode(&changeEvent); err != nil { + if err := cs.Decode(&changeEventBatch[eventsRead]); err != nil { return false, errors.Wrap(err, "failed to decode change event") } - err := verifier.HandleChangeStreamEvent(&changeEvent) - if err != nil { - return false, errors.Wrap(err, "failed to handle change event") - } eventsRead++ } - verifier.logger.Debug().Msgf("Received a batch of %d events", eventsRead) + if eventsRead > 0 { + verifier.logger.Debug().Msgf("Received a batch of %d events", eventsRead) + } - if err := verifier.flushAllBufferedChangeEventRechecks(ctx); err != nil { - return false, errors.Wrap(err, "failed to flush buffered change event rechecks") + err := verifier.HandleChangeStreamEvents(ctx, changeEventBatch) + if err != nil { + return false, errors.Wrap(err, "failed to handle change events") } return eventsRead > 0, errors.Wrap(cs.Err(), "change stream iteration failed") @@ -212,14 +234,10 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha } // StartChangeStream starts the change stream. -func (verifier *Verifier) StartChangeStream(ctx context.Context, batchSize *int32) error { +func (verifier *Verifier) StartChangeStream(ctx context.Context) error { pipeline := verifier.GetChangeStreamFilter() opts := options.ChangeStream().SetMaxAwaitTime(1 * time.Second) - if batchSize != nil { - opts = opts.SetBatchSize(*batchSize) - } - savedResumeToken, err := verifier.loadChangeStreamResumeToken(ctx) if err != nil { return errors.Wrap(err, "failed to load persisted change stream resume token") @@ -379,30 +397,3 @@ func getClusterTimeFromSession(sess mongo.Session) (primitive.Timestamp, error) return ctStruct.ClusterTime.ClusterTime, nil } - -func (verifier *Verifier) flushAllBufferedChangeEventRechecks(ctx context.Context) error { - for namespace, ids := range verifier.changeEventRecheckBuf { - if len(ids) == 0 { - return nil - } - - // We don't know the document sizes for documents for all change events, - // so just be conservative and assume they are maximum size. - // - // Note that this prevents us from being able to report a meaningful - // total data size for noninitial generations in the log. - dataSizes := make([]int, len(ids)) - for i := 0; i < len(ids); i++ { - dataSizes[i] = maxBSONObjSize - } - - dbName, collName := SplitNamespace(namespace) - if err := verifier.insertRecheckDocs(ctx, dbName, collName, ids, dataSizes); err != nil { - return errors.Wrapf(err, "failed to insert recheck docs for namespace %s", namespace) - } - - delete(verifier.changeEventRecheckBuf, namespace) - } - - return nil -} diff --git a/internal/verifier/change_stream_test.go b/internal/verifier/change_stream_test.go index 03045276..7170129b 100644 --- a/internal/verifier/change_stream_test.go +++ b/internal/verifier/change_stream_test.go @@ -38,7 +38,7 @@ func (suite *MultiSourceVersionTestSuite) TestChangeStreamResumability() { verifier1 := buildVerifier(suite.T(), suite.srcMongoInstance, suite.dstMongoInstance, suite.metaMongoInstance) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - err := verifier1.StartChangeStream(ctx, nil) + err := verifier1.StartChangeStream(ctx) suite.Require().NoError(err) }() @@ -63,7 +63,7 @@ func (suite *MultiSourceVersionTestSuite) TestChangeStreamResumability() { newTime := suite.getClusterTime(ctx, suite.srcMongoClient) - err = verifier2.StartChangeStream(ctx, nil) + err = verifier2.StartChangeStream(ctx) suite.Require().NoError(err) suite.Require().NotNil(verifier2.srcStartAtTs) @@ -138,7 +138,7 @@ func (suite *MultiSourceVersionTestSuite) TestStartAtTimeNoChanges() { suite.Require().NoError(err) origStartTs := sess.OperationTime() suite.Require().NotNil(origStartTs) - err = verifier.StartChangeStream(ctx, nil) + err = verifier.StartChangeStream(ctx) suite.Require().NoError(err) suite.Require().Equal(verifier.srcStartAtTs, origStartTs) verifier.changeStreamEnderChan <- struct{}{} @@ -158,7 +158,7 @@ func (suite *MultiSourceVersionTestSuite) TestStartAtTimeWithChanges() { suite.Require().NoError(err) origStartTs := sess.OperationTime() suite.Require().NotNil(origStartTs) - err = verifier.StartChangeStream(ctx, nil) + err = verifier.StartChangeStream(ctx) suite.Require().NoError(err) suite.Require().Equal(verifier.srcStartAtTs, origStartTs) _, err = suite.srcMongoClient.Database("testDb").Collection("testColl").InsertOne( @@ -193,7 +193,7 @@ func (suite *MultiSourceVersionTestSuite) TestNoStartAtTime() { suite.Require().NoError(err) origStartTs := sess.OperationTime() suite.Require().NotNil(origStartTs) - err = verifier.StartChangeStream(ctx, nil) + err = verifier.StartChangeStream(ctx) suite.Require().NoError(err) suite.Require().NotNil(verifier.srcStartAtTs) suite.Require().LessOrEqual(origStartTs.Compare(*verifier.srcStartAtTs), 0) @@ -204,8 +204,7 @@ func (suite *MultiSourceVersionTestSuite) TestWithChangeEventsBatching() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - batchSize := int32(3) - suite.Require().NoError(verifier.StartChangeStream(ctx, &batchSize)) + suite.Require().NoError(verifier.StartChangeStream(ctx)) _, err := suite.srcMongoClient.Database("testDb").Collection("testColl1").InsertOne(ctx, bson.D{{"_id", 1}}) suite.Require().NoError(err) @@ -215,15 +214,15 @@ func (suite *MultiSourceVersionTestSuite) TestWithChangeEventsBatching() { _, err = suite.srcMongoClient.Database("testDb").Collection("testColl2").InsertOne(ctx, bson.D{{"_id", 1}}) suite.Require().NoError(err) + var rechecks []bson.M require.Eventually( suite.T(), func() bool { - return len(suite.fetchVerifierRechecks(ctx, verifier)) == 3 + rechecks = suite.fetchVerifierRechecks(ctx, verifier) + return len(rechecks) == 3 }, time.Minute, 500*time.Millisecond, "the verifier should flush a recheck doc after a batch", ) - suite.Require().Empty(verifier.changeEventRecheckBuf["testDB.testColl1"]) - suite.Require().Empty(verifier.changeEventRecheckBuf["testDB.testColl2"]) } diff --git a/internal/verifier/check.go b/internal/verifier/check.go index cd57753a..008545bd 100644 --- a/internal/verifier/check.go +++ b/internal/verifier/check.go @@ -177,7 +177,7 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter map[string]any if !csRunning { verifier.logger.Debug().Msg("Change stream not running; starting change stream") - err = verifier.StartChangeStream(ctx, nil) + err = verifier.StartChangeStream(ctx) if err != nil { return errors.Wrap(err, "failed to start change stream on source") } diff --git a/internal/verifier/migration_verifier.go b/internal/verifier/migration_verifier.go index a7f61b6b..6feea6fb 100644 --- a/internal/verifier/migration_verifier.go +++ b/internal/verifier/migration_verifier.go @@ -138,8 +138,6 @@ type Verifier struct { globalFilter map[string]any pprofInterval time.Duration - - changeEventRecheckBuf ChangeEventRecheckBuffer } // VerificationStatus holds the Verification Status @@ -199,7 +197,6 @@ func NewVerifier(settings VerifierSettings) *Verifier { changeStreamErrChan: make(chan error), changeStreamDoneChan: make(chan struct{}), readConcernSetting: readConcern, - changeEventRecheckBuf: make(ChangeEventRecheckBuffer), // This will get recreated once gen0 starts, but we want it // here in case the change streams gets an event before then. diff --git a/internal/verifier/migration_verifier_test.go b/internal/verifier/migration_verifier_test.go index f91cb64f..eab99d4b 100644 --- a/internal/verifier/migration_verifier_test.go +++ b/internal/verifier/migration_verifier_test.go @@ -230,22 +230,21 @@ func (suite *MultiMetaVersionTestSuite) TestGetNamespaceStatistics_Recheck() { ctx := context.Background() verifier := buildVerifier(suite.T(), suite.srcMongoInstance, suite.dstMongoInstance, suite.metaMongoInstance) - suite.handleAndFlushChangeEvent( + err := verifier.HandleChangeStreamEvents( ctx, - verifier, - ParsedEvent{ + []ParsedEvent{{ OpType: "insert", Ns: &Namespace{DB: "mydb", Coll: "coll2"}, DocKey: DocKey{ ID: "heyhey", }, - }, + }}, ) + suite.Require().NoError(err) - suite.handleAndFlushChangeEvent( + err = verifier.HandleChangeStreamEvents( ctx, - verifier, - ParsedEvent{ + []ParsedEvent{{ ID: bson.M{ "docID": "ID/docID", }, @@ -254,8 +253,9 @@ func (suite *MultiMetaVersionTestSuite) TestGetNamespaceStatistics_Recheck() { DocKey: DocKey{ ID: "hoohoo", }, - }, + }}, ) + suite.Require().NoError(err) verifier.generation++ @@ -477,14 +477,6 @@ func (suite *MultiMetaVersionTestSuite) TestGetNamespaceStatistics_Gen0() { ) } -func (suite *MultiMetaVersionTestSuite) handleAndFlushChangeEvent(ctx context.Context, verifier *Verifier, event ParsedEvent) { - err := verifier.HandleChangeStreamEvent(&event) - suite.Require().NoError(err) - - err = verifier.flushAllBufferedChangeEventRechecks(ctx) - suite.Require().NoError(err) -} - func (suite *MultiMetaVersionTestSuite) TestFailedVerificationTaskInsertions() { ctx := context.Background() verifier := buildVerifier(suite.T(), suite.srcMongoInstance, suite.dstMongoInstance, suite.metaMongoInstance) @@ -503,15 +495,19 @@ func (suite *MultiMetaVersionTestSuite) TestFailedVerificationTaskInsertions() { }, } - suite.handleAndFlushChangeEvent(ctx, verifier, event) + err = verifier.HandleChangeStreamEvents(ctx, []ParsedEvent{event}) + suite.Require().NoError(err) event.OpType = "insert" - suite.handleAndFlushChangeEvent(ctx, verifier, event) + err = verifier.HandleChangeStreamEvents(ctx, []ParsedEvent{event}) + suite.Require().NoError(err) event.OpType = "replace" - suite.handleAndFlushChangeEvent(ctx, verifier, event) + err = verifier.HandleChangeStreamEvents(ctx, []ParsedEvent{event}) + suite.Require().NoError(err) event.OpType = "update" - suite.handleAndFlushChangeEvent(ctx, verifier, event) + err = verifier.HandleChangeStreamEvents(ctx, []ParsedEvent{event}) + suite.Require().NoError(err) event.OpType = "flibbity" - err = verifier.HandleChangeStreamEvent(&event) + err = verifier.HandleChangeStreamEvents(ctx, []ParsedEvent{event}) badEventErr := UnknownEventError{} suite.Require().ErrorAs(err, &badEventErr) suite.Assert().Equal("flibbity", badEventErr.Event.OpType) diff --git a/internal/verifier/recheck.go b/internal/verifier/recheck.go index a02cdec6..e971ee05 100644 --- a/internal/verifier/recheck.go +++ b/internal/verifier/recheck.go @@ -35,16 +35,23 @@ func (verifier *Verifier) InsertFailedCompareRecheckDocs( namespace string, documentIDs []interface{}, dataSizes []int) error { dbName, collName := SplitNamespace(namespace) + dbNames := make([]string, len(documentIDs)) + collNames := make([]string, len(documentIDs)) + for i := range documentIDs { + dbNames[i] = dbName + collNames[i] = collName + } + verifier.mux.Lock() defer verifier.mux.Unlock() - return verifier.insertRecheckDocsUnderLock(context.Background(), - dbName, collName, documentIDs, dataSizes) + return verifier.insertRecheckDocsWhileLocked(context.Background(), + dbNames, collNames, documentIDs, dataSizes) } -func (verifier *Verifier) insertRecheckDocsUnderLock( +func (verifier *Verifier) insertRecheckDocsWhileLocked( ctx context.Context, - dbName, collName string, documentIDs []interface{}, dataSizes []int) error { + dbNames []string, collNames []string, documentIDs []interface{}, dataSizes []int) error { generation, _ := verifier.getGenerationWhileLocked() @@ -52,8 +59,8 @@ func (verifier *Verifier) insertRecheckDocsUnderLock( for i, documentID := range documentIDs { pk := RecheckPrimaryKey{ Generation: generation, - DatabaseName: dbName, - CollectionName: collName, + DatabaseName: dbNames[i], + CollectionName: collNames[i], DocumentID: documentID, } @@ -79,11 +86,6 @@ func (verifier *Verifier) insertRecheckDocsUnderLock( verifier.logger.Debug().Msgf("Persisted %d recheck doc(s) for generation %d", len(models), generation) } - // Silence any duplicate key errors as recheck docs should have existed. - if mongo.IsDuplicateKeyError(err) { - err = nil - } - return err } diff --git a/internal/verifier/recheck_test.go b/internal/verifier/recheck_test.go index 7dbf4b63..38bf6c38 100644 --- a/internal/verifier/recheck_test.go +++ b/internal/verifier/recheck_test.go @@ -48,10 +48,10 @@ func (suite *MultiMetaVersionTestSuite) TestFailedCompareThenReplace() { DB: "the", Coll: "namespace", }, - OpType: "update", } - suite.handleAndFlushChangeEvent(ctx, verifier, event) + err := verifier.HandleChangeStreamEvents(ctx, []ParsedEvent{event}) + suite.Require().NoError(err) recheckDocs = suite.fetchRecheckDocs(ctx, verifier) suite.Assert().Equal( @@ -336,5 +336,13 @@ func insertRecheckDocs( verifier.mux.Lock() defer verifier.mux.Unlock() - return verifier.insertRecheckDocsUnderLock(ctx, dbName, collName, documentIDs, dataSizes) + dbNames := make([]string, len(documentIDs)) + collNames := make([]string, len(documentIDs)) + + for i := range documentIDs { + dbNames[i] = dbName + collNames[i] = collName + } + + return verifier.insertRecheckDocsWhileLocked(ctx, dbNames, collNames, documentIDs, dataSizes) } From 86935c17ad6ee99ba711c53e8e74fb2790e5ae51 Mon Sep 17 00:00:00 2001 From: Jian Guan <61915096+tdq45gj@users.noreply.github.com> Date: Fri, 8 Nov 2024 16:00:06 -0500 Subject: [PATCH 11/12] Dave's review --- internal/verifier/change_stream.go | 5 +---- internal/verifier/recheck.go | 22 +++++++++++++++------- internal/verifier/recheck_test.go | 5 +---- 3 files changed, 17 insertions(+), 15 deletions(-) diff --git a/internal/verifier/change_stream.go b/internal/verifier/change_stream.go index e4523605..76060404 100644 --- a/internal/verifier/change_stream.go +++ b/internal/verifier/change_stream.go @@ -90,10 +90,7 @@ func (verifier *Verifier) HandleChangeStreamEvents(ctx context.Context, batch [] } } - verifier.mux.Lock() - defer verifier.mux.Unlock() - - return verifier.insertRecheckDocsWhileLocked(ctx, dbNames, collNames, docIDs, dataSizes) + return verifier.insertRecheckDocs(ctx, dbNames, collNames, docIDs, dataSizes) } func (verifier *Verifier) GetChangeStreamFilter() []bson.D { diff --git a/internal/verifier/recheck.go b/internal/verifier/recheck.go index e971ee05..b7143d7a 100644 --- a/internal/verifier/recheck.go +++ b/internal/verifier/recheck.go @@ -42,16 +42,24 @@ func (verifier *Verifier) InsertFailedCompareRecheckDocs( collNames[i] = collName } - verifier.mux.Lock() - defer verifier.mux.Unlock() - - return verifier.insertRecheckDocsWhileLocked(context.Background(), - dbNames, collNames, documentIDs, dataSizes) + return verifier.insertRecheckDocs( + context.Background(), + dbNames, + collNames, + documentIDs, + dataSizes, + ) } -func (verifier *Verifier) insertRecheckDocsWhileLocked( +func (verifier *Verifier) insertRecheckDocs( ctx context.Context, - dbNames []string, collNames []string, documentIDs []interface{}, dataSizes []int) error { + dbNames []string, + collNames []string, + documentIDs []interface{}, + dataSizes []int, +) error { + verifier.mux.Lock() + defer verifier.mux.Unlock() generation, _ := verifier.getGenerationWhileLocked() diff --git a/internal/verifier/recheck_test.go b/internal/verifier/recheck_test.go index 38bf6c38..fe4ff0ed 100644 --- a/internal/verifier/recheck_test.go +++ b/internal/verifier/recheck_test.go @@ -333,9 +333,6 @@ func insertRecheckDocs( documentIDs []any, dataSizes []int, ) error { - verifier.mux.Lock() - defer verifier.mux.Unlock() - dbNames := make([]string, len(documentIDs)) collNames := make([]string, len(documentIDs)) @@ -344,5 +341,5 @@ func insertRecheckDocs( collNames[i] = collName } - return verifier.insertRecheckDocsWhileLocked(ctx, dbNames, collNames, documentIDs, dataSizes) + return verifier.insertRecheckDocs(ctx, dbNames, collNames, documentIDs, dataSizes) } From 5c71523a3204e3efc98349071b179cfeac397684 Mon Sep 17 00:00:00 2001 From: Jian Guan <61915096+tdq45gj@users.noreply.github.com> Date: Mon, 11 Nov 2024 09:09:31 -0500 Subject: [PATCH 12/12] Update change_stream.go --- internal/verifier/change_stream.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/internal/verifier/change_stream.go b/internal/verifier/change_stream.go index 76060404..8821b603 100644 --- a/internal/verifier/change_stream.go +++ b/internal/verifier/change_stream.go @@ -145,12 +145,11 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha } if eventsRead > 0 { - verifier.logger.Debug().Msgf("Received a batch of %d events", eventsRead) - } - - err := verifier.HandleChangeStreamEvents(ctx, changeEventBatch) - if err != nil { - return false, errors.Wrap(err, "failed to handle change events") + verifier.logger.Debug().Int("eventsCount", eventsRead).Msgf("Received a batch of events.") + err := verifier.HandleChangeStreamEvents(ctx, changeEventBatch) + if err != nil { + return false, errors.Wrap(err, "failed to handle change events") + } } return eventsRead > 0, errors.Wrap(cs.Err(), "change stream iteration failed")