diff --git a/internal/verifier/change_reader.go b/internal/verifier/change_reader.go index 143f5f04..a3d261f9 100644 --- a/internal/verifier/change_reader.go +++ b/internal/verifier/change_reader.go @@ -34,7 +34,8 @@ const ( type changeReader interface { getWhichCluster() whichCluster getReadChannel() <-chan changeEventBatch - getStartTimestamp() option.Option[bson.Timestamp] + getStartTimestamp() bson.Timestamp + getLastSeenClusterTime() option.Option[bson.Timestamp] getEventsPerSecond() option.Option[float64] getLag() option.Option[time.Duration] getBufferSaturation() float64 @@ -48,9 +49,8 @@ type changeReader interface { type ChangeReaderCommon struct { readerType whichCluster - lastChangeEventTime *bson.Timestamp - logger *logger.Logger - namespaces []string + logger *logger.Logger + namespaces []string metaDB *mongo.Database watcherClient *mongo.Client @@ -62,6 +62,8 @@ type ChangeReaderCommon struct { changeEventBatchChan chan changeEventBatch writesOffTs *util.Eventual[bson.Timestamp] + lastChangeEventTime *msync.TypedAtomic[option.Option[bson.Timestamp]] + startAtTs *bson.Timestamp lag *msync.TypedAtomic[option.Option[time.Duration]] @@ -70,12 +72,32 @@ type ChangeReaderCommon struct { onDDLEvent ddlEventHandling } +func newChangeReaderCommon(clusterName whichCluster) ChangeReaderCommon { + return ChangeReaderCommon{ + readerType: clusterName, + changeEventBatchChan: make(chan changeEventBatch, batchChanBufferSize), + writesOffTs: util.NewEventual[bson.Timestamp](), + lag: msync.NewTypedAtomic(option.None[time.Duration]()), + lastChangeEventTime: msync.NewTypedAtomic(option.None[bson.Timestamp]()), + batchSizeHistory: history.New[int](time.Minute), + onDDLEvent: lo.Ternary( + clusterName == dst, + onDDLEventAllow, + "", + ), + } +} + func (rc *ChangeReaderCommon) getWhichCluster() whichCluster { return rc.readerType } -func (rc *ChangeReaderCommon) getStartTimestamp() option.Option[bson.Timestamp] { - return option.FromPointer(rc.startAtTs) +func (rc *ChangeReaderCommon) getStartTimestamp() bson.Timestamp { + if rc.startAtTs == nil { + panic("no start timestamp yet?!?") + } + + return *rc.startAtTs } func (rc *ChangeReaderCommon) setWritesOff(ts bson.Timestamp) { @@ -90,6 +112,10 @@ func (rc *ChangeReaderCommon) getReadChannel() <-chan changeEventBatch { return rc.changeEventBatchChan } +func (rc *ChangeReaderCommon) getLastSeenClusterTime() option.Option[bson.Timestamp] { + return rc.lastChangeEventTime.Load() +} + // getBufferSaturation returns the reader’s internal buffer’s saturation level // as a fraction. If saturation rises, that means we’re reading events faster // than we can persist them. diff --git a/internal/verifier/change_stream.go b/internal/verifier/change_stream.go index 76529256..86f59a4a 100644 --- a/internal/verifier/change_stream.go +++ b/internal/verifier/change_stream.go @@ -47,6 +47,28 @@ type ChangeStreamReader struct { var _ changeReader = &ChangeStreamReader{} +func (v *Verifier) newChangeStreamReader( + namespaces []string, + cluster whichCluster, + client *mongo.Client, + clusterInfo util.ClusterInfo, +) *ChangeStreamReader { + common := newChangeReaderCommon(cluster) + common.namespaces = namespaces + common.readerType = cluster + common.watcherClient = client + common.clusterInfo = clusterInfo + + common.logger = v.logger + common.metaDB = v.metaClient.Database(v.metaDBName) + + common.resumeTokenTSExtractor = extractTSFromChangeStreamResumeToken + + csr := &ChangeStreamReader{ChangeReaderCommon: common} + + return csr +} + // GetChangeStreamFilter returns an aggregation pipeline that filters // namespaces as per configuration. // @@ -193,11 +215,9 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch( return errors.Errorf("Change event lacks a namespace: %+v", changeEvents[eventsRead]) } - if changeEvents[eventsRead].ClusterTime != nil && - (csr.lastChangeEventTime == nil || - csr.lastChangeEventTime.Before(*changeEvents[eventsRead].ClusterTime)) { - - csr.lastChangeEventTime = changeEvents[eventsRead].ClusterTime + eventTime := changeEvents[eventsRead].ClusterTime + if eventTime != nil && csr.lastChangeEventTime.Load().OrZero().Before(*eventTime) { + csr.lastChangeEventTime.Store(option.Some(*eventTime)) latestEvent = option.Some(changeEvents[eventsRead]) } @@ -230,9 +250,6 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch( events: changeEvents, resumeToken: cs.ResumeToken(), - - // NB: We know by now that OperationTime is non-nil. - clusterTime: *sess.OperationTime(), }: } @@ -314,8 +331,8 @@ func (csr *ChangeStreamReader) iterateChangeStream( if gotwritesOffTimestamp { csr.running = false - if csr.lastChangeEventTime != nil { - csr.startAtTs = csr.lastChangeEventTime + if ts, has := csr.lastChangeEventTime.Load().Get(); has { + csr.startAtTs = &ts } break @@ -323,10 +340,10 @@ func (csr *ChangeStreamReader) iterateChangeStream( } infoLog := csr.logger.Info() - if csr.lastChangeEventTime == nil { - infoLog = infoLog.Str("lastEventTime", "none") + if ts, has := csr.lastChangeEventTime.Load().Get(); has { + infoLog = infoLog.Any("lastEventTime", ts) } else { - infoLog = infoLog.Any("lastEventTime", *csr.lastChangeEventTime) + infoLog = infoLog.Str("lastEventTime", "none") } infoLog. diff --git a/internal/verifier/change_stream_test.go b/internal/verifier/change_stream_test.go index 5f8e8395..62a39bca 100644 --- a/internal/verifier/change_stream_test.go +++ b/internal/verifier/change_stream_test.go @@ -441,9 +441,7 @@ func (suite *IntegrationTestSuite) TestChangeStreamResumability() { suite.startSrcChangeStreamReaderAndHandler(ctx, verifier2) - startAtTs, hasStartAtTs := verifier2.srcChangeReader.getStartTimestamp().Get() - - suite.Require().True(hasStartAtTs) + startAtTs := verifier2.srcChangeReader.getStartTimestamp() suite.Assert().False( startAtTs.After(newTime), @@ -631,14 +629,13 @@ func (suite *IntegrationTestSuite) TestStartAtTimeNoChanges() { eg := suite.startSrcChangeStreamReaderAndHandler(ctx, verifier) - startAtTs, hasStartAtTs := verifier.srcChangeReader.getStartTimestamp().Get() - suite.Require().True(hasStartAtTs, "startAtTs should be set") + startAtTs := verifier.srcChangeReader.getStartTimestamp() verifier.srcChangeReader.setWritesOff(insertTs) suite.Require().NoError(eg.Wait()) - startAtTs2 := verifier.srcChangeReader.getStartTimestamp().MustGet() + startAtTs2 := verifier.srcChangeReader.getStartTimestamp() suite.Require().False( startAtTs2.Before(startAtTs), @@ -663,8 +660,7 @@ func (suite *IntegrationTestSuite) TestStartAtTimeWithChanges() { suite.Require().NotNil(origSessionTime) eg := suite.startSrcChangeStreamReaderAndHandler(ctx, verifier) - startAtTs, hasStartAtTs := verifier.srcChangeReader.getStartTimestamp().Get() - suite.Require().True(hasStartAtTs, "startAtTs should be set") + startAtTs := verifier.srcChangeReader.getStartTimestamp() // srcStartAtTs derives from the change stream’s resume token, which can // postdate our session time but should not precede it. @@ -697,8 +693,7 @@ func (suite *IntegrationTestSuite) TestStartAtTimeWithChanges() { suite.Require().NoError(eg.Wait()) - startAtTs, hasStartAtTs = verifier.srcChangeReader.getStartTimestamp().Get() - suite.Require().True(hasStartAtTs, "startAtTs should be set") + startAtTs = verifier.srcChangeReader.getStartTimestamp() suite.Assert().Equal( *postEventsSessionTime, @@ -720,8 +715,7 @@ func (suite *IntegrationTestSuite) TestNoStartAtTime() { suite.Require().NotNil(origStartTs) suite.startSrcChangeStreamReaderAndHandler(ctx, verifier) - startAtTs, hasStartAtTs := verifier.srcChangeReader.getStartTimestamp().Get() - suite.Require().True(hasStartAtTs, "startAtTs should be set") + startAtTs := verifier.srcChangeReader.getStartTimestamp() suite.Require().NotNil(startAtTs) suite.Require().LessOrEqual(origStartTs.Compare(startAtTs), 0) diff --git a/internal/verifier/check.go b/internal/verifier/check.go index 529c3e3f..b606ee84 100644 --- a/internal/verifier/check.go +++ b/internal/verifier/check.go @@ -6,13 +6,9 @@ import ( "time" "github.com/10gen/migration-verifier/contextplus" - "github.com/10gen/migration-verifier/history" "github.com/10gen/migration-verifier/internal/logger" "github.com/10gen/migration-verifier/internal/retry" - "github.com/10gen/migration-verifier/internal/util" "github.com/10gen/migration-verifier/mslices" - "github.com/10gen/migration-verifier/msync" - "github.com/10gen/migration-verifier/option" mapset "github.com/deckarep/golang-set/v2" "github.com/goaux/timer" "github.com/pkg/errors" @@ -255,28 +251,10 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh verifier.phase = Idle }() - changeReaderGroup, groupCtx := contextplus.ErrGroup(ctx) - for _, changeReader := range mslices.Of(verifier.srcChangeReader, verifier.dstChangeReader) { - if changeReader.isRunning() { - verifier.logger.Debug().Msgf("Check: %s already running.", changeReader) - } else { - verifier.logger.Debug().Msgf("%s not running; starting change reader", changeReader) - - err = changeReader.start(groupCtx, changeReaderGroup) - if err != nil { - return errors.Wrapf(err, "failed to start %s", changeReader) - } - changeReaderGroup.Go(func() error { - return verifier.RunChangeEventPersistor(groupCtx, changeReader) - }) - } + if err := verifier.startChangeHandling(ctx); err != nil { + return err } - changeHandlingErr := verifier.changeHandlingErr - go func() { - changeHandlingErr.Set(changeReaderGroup.Wait()) - }() - // Log the verification status when initially booting up so it's easy to see the current state verificationStatus, err := verifier.GetVerificationStatus(ctx) if err != nil { @@ -409,6 +387,32 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh } } +func (verifier *Verifier) startChangeHandling(ctx context.Context) error { + changeReaderGroup, groupCtx := contextplus.ErrGroup(ctx) + for _, changeReader := range mslices.Of(verifier.srcChangeReader, verifier.dstChangeReader) { + if changeReader.isRunning() { + verifier.logger.Debug().Msgf("Check: %s already running.", changeReader) + } else { + verifier.logger.Debug().Msgf("%s not running; starting change reader", changeReader) + + err := changeReader.start(groupCtx, changeReaderGroup) + if err != nil { + return errors.Wrapf(err, "failed to start %s", changeReader) + } + changeReaderGroup.Go(func() error { + return verifier.RunChangeEventPersistor(groupCtx, changeReader) + }) + } + } + + changeHandlingErr := verifier.changeHandlingErr + go func() { + changeHandlingErr.Set(changeReaderGroup.Wait()) + }() + + return nil +} + func (verifier *Verifier) setupAllNamespaceList(ctx context.Context) error { // We want to check all user collections on both source and dest. srcNamespaces, err := ListAllUserNamespaces(ctx, verifier.logger, verifier.srcClient, verifier.metaDBName) @@ -600,36 +604,18 @@ func (verifier *Verifier) work(ctx context.Context, workerNum int) error { } } -func (verifier *Verifier) initializeChangeReaders() { - srcReader := &ChangeStreamReader{ - ChangeReaderCommon: ChangeReaderCommon{ - readerType: src, - namespaces: verifier.srcNamespaces, - watcherClient: verifier.srcClient, - clusterInfo: *verifier.srcClusterInfo, - }, - } - verifier.srcChangeReader = srcReader - - dstReader := &ChangeStreamReader{ - ChangeReaderCommon: ChangeReaderCommon{ - readerType: dst, - namespaces: verifier.dstNamespaces, - watcherClient: verifier.dstClient, - clusterInfo: *verifier.dstClusterInfo, - onDDLEvent: onDDLEventAllow, - }, - } - verifier.dstChangeReader = dstReader - - // Common elements in both readers: - for _, csr := range mslices.Of(srcReader, dstReader) { - csr.logger = verifier.logger - csr.metaDB = verifier.metaClient.Database(verifier.metaDBName) - csr.changeEventBatchChan = make(chan changeEventBatch, batchChanBufferSize) - csr.writesOffTs = util.NewEventual[bson.Timestamp]() - csr.lag = msync.NewTypedAtomic(option.None[time.Duration]()) - csr.batchSizeHistory = history.New[int](time.Minute) - csr.resumeTokenTSExtractor = extractTSFromChangeStreamResumeToken - } +func (v *Verifier) initializeChangeReaders() { + v.srcChangeReader = v.newChangeStreamReader( + v.srcNamespaces, + src, + v.srcClient, + *v.srcClusterInfo, + ) + + v.dstChangeReader = v.newChangeStreamReader( + v.dstNamespaces, + dst, + v.dstClient, + *v.dstClusterInfo, + ) } diff --git a/internal/verifier/compare.go b/internal/verifier/compare.go index 85fbfca0..da9988a1 100644 --- a/internal/verifier/compare.go +++ b/internal/verifier/compare.go @@ -18,7 +18,6 @@ import ( "go.mongodb.org/mongo-driver/v2/bson" "go.mongodb.org/mongo-driver/v2/mongo" "go.mongodb.org/mongo-driver/v2/mongo/options" - "go.mongodb.org/mongo-driver/v2/mongo/readpref" "golang.org/x/exp/slices" ) @@ -467,7 +466,9 @@ func (verifier *Verifier) getFetcherChannelsAndCallbacks( sctx, verifier.srcClientCollection(task), verifier.srcClusterInfo, - verifier.srcChangeReader.getStartTimestamp().ToPointer(), + verifier.srcChangeReader.getLastSeenClusterTime().OrElse( + verifier.srcChangeReader.getStartTimestamp(), + ), task, ) @@ -500,7 +501,9 @@ func (verifier *Verifier) getFetcherChannelsAndCallbacks( sctx, verifier.dstClientCollection(task), verifier.dstClusterInfo, - verifier.dstChangeReader.getStartTimestamp().ToPointer(), + verifier.dstChangeReader.getLastSeenClusterTime().OrElse( + verifier.dstChangeReader.getStartTimestamp(), + ), task, ) @@ -573,10 +576,14 @@ func getMapKey(docKeyValues []bson.RawValue) string { return keyBuffer.String() } -func (verifier *Verifier) getDocumentsCursor(ctx context.Context, collection *mongo.Collection, clusterInfo *util.ClusterInfo, - startAtTs *bson.Timestamp, task *VerificationTask) (*mongo.Cursor, error) { +func (verifier *Verifier) getDocumentsCursor( + ctx context.Context, + collection *mongo.Collection, + clusterInfo *util.ClusterInfo, + readConcernTS bson.Timestamp, + task *VerificationTask, +) (*mongo.Cursor, error) { var findOptions bson.D - runCommandOptions := options.RunCmd() var andPredicates bson.A var aggOptions bson.D @@ -656,22 +663,23 @@ func (verifier *Verifier) getDocumentsCursor(ctx context.Context, collection *mo ) } - if verifier.readPreference.Mode() != readpref.PrimaryMode { - runCommandOptions = runCommandOptions.SetReadPreference(verifier.readPreference) - if startAtTs != nil { - readConcern := bson.D{ - {"afterClusterTime", *startAtTs}, - } + sess := mongo.SessionFromContext(ctx) - // We never want to read before the change stream start time, - // or for the last generation, the change stream end time. - cmd = append( - cmd, - bson.E{"readConcern", readConcern}, - ) - } + if sess == nil { + panic("No session?!?") } + runCommandOptions := options.RunCmd().SetReadPreference(verifier.readPreference) + + // We never want to read before the change stream start time, + // or for the last generation, the change stream end time. + cmd = append( + cmd, + bson.E{"readConcern", bson.D{ + {"afterClusterTime", readConcernTS}, + }}, + ) + // Suppress this log for recheck tasks because the list of IDs can be // quite long. if !task.IsRecheck() { diff --git a/internal/verifier/compare_test.go b/internal/verifier/compare_test.go index 884e38a6..15686ba6 100644 --- a/internal/verifier/compare_test.go +++ b/internal/verifier/compare_test.go @@ -14,7 +14,7 @@ import ( "go.mongodb.org/mongo-driver/v2/bson" ) -// TestFetchAndCompareDocuments_ContextCancellation ensures that nothing hangs +// TestFetchAndCompareDocuments_Context ensures that nothing hangs // when a context is canceled during FetchAndCompareDocuments(). func (s *IntegrationTestSuite) TestFetchAndCompareDocuments_Context() { ctx := s.Context() @@ -49,6 +49,7 @@ func (s *IntegrationTestSuite) TestFetchAndCompareDocuments_Context() { } verifier := s.BuildVerifier() + s.Require().NoError(verifier.startChangeHandling(ctx)) for range 100 { cancelableCtx, cancel := contextplus.WithCancelCause(ctx) diff --git a/internal/verifier/migration_verifier_test.go b/internal/verifier/migration_verifier_test.go index 396077bf..4d70ccd8 100644 --- a/internal/verifier/migration_verifier_test.go +++ b/internal/verifier/migration_verifier_test.go @@ -126,6 +126,8 @@ func (suite *IntegrationTestSuite) TestProcessVerifyTask_Failure() { ctx := suite.Context() t := suite.T() + suite.Require().NoError(verifier.startChangeHandling(ctx)) + dbName := suite.DBNameForTest() collName := "coll" @@ -302,6 +304,7 @@ func (suite *IntegrationTestSuite) TestVerifier_Dotted_Shard_Key() { } verifier := suite.BuildVerifier() + suite.Require().NoError(verifier.startChangeHandling(ctx)) results, docCount, _, err := verifier.FetchAndCompareDocuments(ctx, 0, task) require.NoError(err, "should fetch & compare") assert.EqualValues(suite.T(), len(docs), docCount, "expected # of docs") @@ -330,15 +333,15 @@ func getShardIds(t *testing.T, client *mongo.Client) []string { } func (suite *IntegrationTestSuite) TestVerifier_DocFilter_ObjectID() { - verifier := suite.BuildVerifier() + ctx := suite.Context() t := suite.T() dbName := suite.DBNameForTest() collName := "coll" - srcColl := verifier.srcClient.Database(dbName).Collection(collName) - dstColl := verifier.dstClient.Database(dbName).Collection(collName) + srcColl := suite.srcMongoClient.Database(dbName).Collection(collName) + dstColl := suite.dstMongoClient.Database(dbName).Collection(collName) id1 := bson.NewObjectID() _, err := srcColl.InsertOne(ctx, bson.D{{"_id", id1}}) @@ -362,6 +365,9 @@ func (suite *IntegrationTestSuite) TestVerifier_DocFilter_ObjectID() { }, } + verifier := suite.BuildVerifier() + suite.Require().NoError(verifier.startChangeHandling(ctx)) + verifier.globalFilter = bson.D{{"_id", id1}} results, docCount, _, err := verifier.FetchAndCompareDocuments(ctx, 0, task) @@ -377,9 +383,12 @@ func (suite *IntegrationTestSuite) TestVerifier_DocFilter_ObjectID() { } func (suite *IntegrationTestSuite) TestTypesBetweenBoundaries() { - verifier := suite.BuildVerifier() ctx := suite.Context() + verifier := suite.BuildVerifier() + + suite.Require().NoError(verifier.startChangeHandling(ctx)) + task := &VerificationTask{ PrimaryKey: bson.NewObjectID(), QueryFilter: QueryFilter{ @@ -394,14 +403,14 @@ func (suite *IntegrationTestSuite) TestTypesBetweenBoundaries() { }, } - _, err := verifier.srcClient.Database("keyhole").Collection("dealers").InsertMany(ctx, []any{ + _, err := suite.srcMongoClient.Database("keyhole").Collection("dealers").InsertMany(ctx, []any{ bson.D{{"_id", nil}}, bson.D{{"_id", int32(123)}}, bson.D{{"_id", bson.Symbol("oh yeah")}}, }) suite.Require().NoError(err) - _, err = verifier.dstClient.Database("keyhole").Collection("dealers").InsertMany(ctx, []any{ + _, err = suite.dstMongoClient.Database("keyhole").Collection("dealers").InsertMany(ctx, []any{ bson.D{{"_id", nil}}, bson.D{{"_id", int32(123)}}, bson.D{{"_id", "oh yeah"}}, @@ -485,12 +494,15 @@ func (suite *IntegrationTestSuite) TestTypesBetweenBoundaries() { } func (suite *IntegrationTestSuite) TestVerifierFetchDocuments() { - verifier := suite.BuildVerifier() ctx := suite.Context() + + verifier := suite.BuildVerifier() + suite.Require().NoError(verifier.startChangeHandling(ctx)) + drop := func() { - err := verifier.srcClient.Database("keyhole").Drop(ctx) + err := suite.srcMongoClient.Database("keyhole").Drop(ctx) suite.Require().NoError(err) - err = verifier.dstClient.Database("keyhole").Drop(ctx) + err = suite.dstMongoClient.Database("keyhole").Drop(ctx) suite.Require().NoError(err) } drop() @@ -506,12 +518,12 @@ func (suite *IntegrationTestSuite) TestVerifierFetchDocuments() { } id := rand.Intn(1000) - _, err := verifier.srcClient.Database("keyhole").Collection("dealers").InsertMany(ctx, []any{ + _, err := suite.srcMongoClient.Database("keyhole").Collection("dealers").InsertMany(ctx, []any{ bson.D{{"_id", id}, {"num", 99}, {"name", "srcTest"}}, bson.D{{"_id", id + 1}, {"num", 101}, {"name", "srcTest"}}, }) suite.Require().NoError(err) - _, err = verifier.dstClient.Database("keyhole").Collection("dealers").InsertMany(ctx, []any{ + _, err = suite.dstMongoClient.Database("keyhole").Collection("dealers").InsertMany(ctx, []any{ bson.D{{"_id", id}, {"num", 99}, {"name", "dstTest"}}, bson.D{{"_id", id + 1}, {"num", 101}, {"name", "dstTest"}}, }) diff --git a/internal/verifier/recheck_persist.go b/internal/verifier/recheck_persist.go index 564e33a2..82db17fc 100644 --- a/internal/verifier/recheck_persist.go +++ b/internal/verifier/recheck_persist.go @@ -13,7 +13,6 @@ import ( type changeEventBatch struct { events []ParsedEvent resumeToken bson.Raw - clusterTime bson.Timestamp } // RunChangeEventPersistor persists rechecks from change event batches. @@ -166,14 +165,12 @@ func (verifier *Verifier) PersistChangeEvents(ctx context.Context, batch changeE } latestTimestampTime := time.Unix(int64(latestTimestamp.T), 0) - lag := time.Unix(int64(batch.clusterTime.T), 0).Sub(latestTimestampTime) verifier.logger.Trace(). Str("origin", string(eventOrigin)). Int("count", len(docIDs)). Any("latestTimestamp", latestTimestamp). Time("latestTimestampTime", latestTimestampTime). - Stringer("lag", lag). Msg("Persisting rechecks for change events.") return verifier.insertRecheckDocs(ctx, dbNames, collNames, docIDs, dataSizes)