From 3160dc63c4e4bdc0208daac0fa64aa5182d49b19 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Fri, 14 Nov 2025 15:44:28 -0500 Subject: [PATCH 01/15] switch over --- internal/verifier/change_reader.go | 28 ++++++++++++++-- internal/verifier/change_stream.go | 43 ++++++++++++++++-------- internal/verifier/check.go | 50 ++++++++-------------------- internal/verifier/compare.go | 38 +++++++++++---------- internal/verifier/recheck_persist.go | 3 -- 5 files changed, 89 insertions(+), 73 deletions(-) diff --git a/internal/verifier/change_reader.go b/internal/verifier/change_reader.go index 143f5f04..8fae036c 100644 --- a/internal/verifier/change_reader.go +++ b/internal/verifier/change_reader.go @@ -35,6 +35,7 @@ type changeReader interface { getWhichCluster() whichCluster getReadChannel() <-chan changeEventBatch getStartTimestamp() option.Option[bson.Timestamp] + getLatestTimestamp() option.Option[bson.Timestamp] getEventsPerSecond() option.Option[float64] getLag() option.Option[time.Duration] getBufferSaturation() float64 @@ -48,9 +49,8 @@ type changeReader interface { type ChangeReaderCommon struct { readerType whichCluster - lastChangeEventTime *bson.Timestamp - logger *logger.Logger - namespaces []string + logger *logger.Logger + namespaces []string metaDB *mongo.Database watcherClient *mongo.Client @@ -62,6 +62,8 @@ type ChangeReaderCommon struct { changeEventBatchChan chan changeEventBatch writesOffTs *util.Eventual[bson.Timestamp] + lastChangeEventTime *msync.TypedAtomic[option.Option[bson.Timestamp]] + startAtTs *bson.Timestamp lag *msync.TypedAtomic[option.Option[time.Duration]] @@ -70,6 +72,22 @@ type ChangeReaderCommon struct { onDDLEvent ddlEventHandling } +func newChangeReaderCommon(clusterName whichCluster) ChangeReaderCommon { + return ChangeReaderCommon{ + readerType: clusterName, + changeEventBatchChan: make(chan changeEventBatch, batchChanBufferSize), + writesOffTs: util.NewEventual[bson.Timestamp](), + lag: msync.NewTypedAtomic(option.None[time.Duration]()), + lastChangeEventTime: msync.NewTypedAtomic(option.None[bson.Timestamp]()), + batchSizeHistory: history.New[int](time.Minute), + onDDLEvent: lo.Ternary( + clusterName == dst, + onDDLEventAllow, + "", + ), + } +} + func (rc *ChangeReaderCommon) getWhichCluster() whichCluster { return rc.readerType } @@ -90,6 +108,10 @@ func (rc *ChangeReaderCommon) getReadChannel() <-chan changeEventBatch { return rc.changeEventBatchChan } +func (rc *ChangeReaderCommon) getLatestTimestamp() option.Option[bson.Timestamp] { + return rc.lastChangeEventTime.Load() +} + // getBufferSaturation returns the reader’s internal buffer’s saturation level // as a fraction. If saturation rises, that means we’re reading events faster // than we can persist them. diff --git a/internal/verifier/change_stream.go b/internal/verifier/change_stream.go index 76529256..86f59a4a 100644 --- a/internal/verifier/change_stream.go +++ b/internal/verifier/change_stream.go @@ -47,6 +47,28 @@ type ChangeStreamReader struct { var _ changeReader = &ChangeStreamReader{} +func (v *Verifier) newChangeStreamReader( + namespaces []string, + cluster whichCluster, + client *mongo.Client, + clusterInfo util.ClusterInfo, +) *ChangeStreamReader { + common := newChangeReaderCommon(cluster) + common.namespaces = namespaces + common.readerType = cluster + common.watcherClient = client + common.clusterInfo = clusterInfo + + common.logger = v.logger + common.metaDB = v.metaClient.Database(v.metaDBName) + + common.resumeTokenTSExtractor = extractTSFromChangeStreamResumeToken + + csr := &ChangeStreamReader{ChangeReaderCommon: common} + + return csr +} + // GetChangeStreamFilter returns an aggregation pipeline that filters // namespaces as per configuration. // @@ -193,11 +215,9 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch( return errors.Errorf("Change event lacks a namespace: %+v", changeEvents[eventsRead]) } - if changeEvents[eventsRead].ClusterTime != nil && - (csr.lastChangeEventTime == nil || - csr.lastChangeEventTime.Before(*changeEvents[eventsRead].ClusterTime)) { - - csr.lastChangeEventTime = changeEvents[eventsRead].ClusterTime + eventTime := changeEvents[eventsRead].ClusterTime + if eventTime != nil && csr.lastChangeEventTime.Load().OrZero().Before(*eventTime) { + csr.lastChangeEventTime.Store(option.Some(*eventTime)) latestEvent = option.Some(changeEvents[eventsRead]) } @@ -230,9 +250,6 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch( events: changeEvents, resumeToken: cs.ResumeToken(), - - // NB: We know by now that OperationTime is non-nil. - clusterTime: *sess.OperationTime(), }: } @@ -314,8 +331,8 @@ func (csr *ChangeStreamReader) iterateChangeStream( if gotwritesOffTimestamp { csr.running = false - if csr.lastChangeEventTime != nil { - csr.startAtTs = csr.lastChangeEventTime + if ts, has := csr.lastChangeEventTime.Load().Get(); has { + csr.startAtTs = &ts } break @@ -323,10 +340,10 @@ func (csr *ChangeStreamReader) iterateChangeStream( } infoLog := csr.logger.Info() - if csr.lastChangeEventTime == nil { - infoLog = infoLog.Str("lastEventTime", "none") + if ts, has := csr.lastChangeEventTime.Load().Get(); has { + infoLog = infoLog.Any("lastEventTime", ts) } else { - infoLog = infoLog.Any("lastEventTime", *csr.lastChangeEventTime) + infoLog = infoLog.Str("lastEventTime", "none") } infoLog. diff --git a/internal/verifier/check.go b/internal/verifier/check.go index 529c3e3f..0405ffcc 100644 --- a/internal/verifier/check.go +++ b/internal/verifier/check.go @@ -6,13 +6,9 @@ import ( "time" "github.com/10gen/migration-verifier/contextplus" - "github.com/10gen/migration-verifier/history" "github.com/10gen/migration-verifier/internal/logger" "github.com/10gen/migration-verifier/internal/retry" - "github.com/10gen/migration-verifier/internal/util" "github.com/10gen/migration-verifier/mslices" - "github.com/10gen/migration-verifier/msync" - "github.com/10gen/migration-verifier/option" mapset "github.com/deckarep/golang-set/v2" "github.com/goaux/timer" "github.com/pkg/errors" @@ -600,36 +596,18 @@ func (verifier *Verifier) work(ctx context.Context, workerNum int) error { } } -func (verifier *Verifier) initializeChangeReaders() { - srcReader := &ChangeStreamReader{ - ChangeReaderCommon: ChangeReaderCommon{ - readerType: src, - namespaces: verifier.srcNamespaces, - watcherClient: verifier.srcClient, - clusterInfo: *verifier.srcClusterInfo, - }, - } - verifier.srcChangeReader = srcReader - - dstReader := &ChangeStreamReader{ - ChangeReaderCommon: ChangeReaderCommon{ - readerType: dst, - namespaces: verifier.dstNamespaces, - watcherClient: verifier.dstClient, - clusterInfo: *verifier.dstClusterInfo, - onDDLEvent: onDDLEventAllow, - }, - } - verifier.dstChangeReader = dstReader - - // Common elements in both readers: - for _, csr := range mslices.Of(srcReader, dstReader) { - csr.logger = verifier.logger - csr.metaDB = verifier.metaClient.Database(verifier.metaDBName) - csr.changeEventBatchChan = make(chan changeEventBatch, batchChanBufferSize) - csr.writesOffTs = util.NewEventual[bson.Timestamp]() - csr.lag = msync.NewTypedAtomic(option.None[time.Duration]()) - csr.batchSizeHistory = history.New[int](time.Minute) - csr.resumeTokenTSExtractor = extractTSFromChangeStreamResumeToken - } +func (v *Verifier) initializeChangeReaders() { + v.srcChangeReader = v.newChangeStreamReader( + v.srcNamespaces, + src, + v.srcClient, + *v.srcClusterInfo, + ) + + v.dstChangeReader = v.newChangeStreamReader( + v.dstNamespaces, + dst, + v.dstClient, + *v.dstClusterInfo, + ) } diff --git a/internal/verifier/compare.go b/internal/verifier/compare.go index 85fbfca0..d5b8b94d 100644 --- a/internal/verifier/compare.go +++ b/internal/verifier/compare.go @@ -18,7 +18,6 @@ import ( "go.mongodb.org/mongo-driver/v2/bson" "go.mongodb.org/mongo-driver/v2/mongo" "go.mongodb.org/mongo-driver/v2/mongo/options" - "go.mongodb.org/mongo-driver/v2/mongo/readpref" "golang.org/x/exp/slices" ) @@ -467,7 +466,7 @@ func (verifier *Verifier) getFetcherChannelsAndCallbacks( sctx, verifier.srcClientCollection(task), verifier.srcClusterInfo, - verifier.srcChangeReader.getStartTimestamp().ToPointer(), + verifier.srcChangeReader.getLatestTimestamp(), task, ) @@ -500,7 +499,7 @@ func (verifier *Verifier) getFetcherChannelsAndCallbacks( sctx, verifier.dstClientCollection(task), verifier.dstClusterInfo, - verifier.dstChangeReader.getStartTimestamp().ToPointer(), + verifier.dstChangeReader.getLatestTimestamp(), task, ) @@ -573,8 +572,13 @@ func getMapKey(docKeyValues []bson.RawValue) string { return keyBuffer.String() } -func (verifier *Verifier) getDocumentsCursor(ctx context.Context, collection *mongo.Collection, clusterInfo *util.ClusterInfo, - startAtTs *bson.Timestamp, task *VerificationTask) (*mongo.Cursor, error) { +func (verifier *Verifier) getDocumentsCursor( + ctx context.Context, + collection *mongo.Collection, + clusterInfo *util.ClusterInfo, + readConcernTS option.Option[bson.Timestamp], + task *VerificationTask, +) (*mongo.Cursor, error) { var findOptions bson.D runCommandOptions := options.RunCmd() var andPredicates bson.A @@ -656,20 +660,18 @@ func (verifier *Verifier) getDocumentsCursor(ctx context.Context, collection *mo ) } - if verifier.readPreference.Mode() != readpref.PrimaryMode { - runCommandOptions = runCommandOptions.SetReadPreference(verifier.readPreference) - if startAtTs != nil { - readConcern := bson.D{ - {"afterClusterTime", *startAtTs}, - } - - // We never want to read before the change stream start time, - // or for the last generation, the change stream end time. - cmd = append( - cmd, - bson.E{"readConcern", readConcern}, - ) + runCommandOptions = runCommandOptions.SetReadPreference(verifier.readPreference) + if ts, has := readConcernTS.Get(); has { + readConcern := bson.D{ + {"afterClusterTime", ts}, } + + // We never want to read before the change stream start time, + // or for the last generation, the change stream end time. + cmd = append( + cmd, + bson.E{"readConcern", readConcern}, + ) } // Suppress this log for recheck tasks because the list of IDs can be diff --git a/internal/verifier/recheck_persist.go b/internal/verifier/recheck_persist.go index 564e33a2..82db17fc 100644 --- a/internal/verifier/recheck_persist.go +++ b/internal/verifier/recheck_persist.go @@ -13,7 +13,6 @@ import ( type changeEventBatch struct { events []ParsedEvent resumeToken bson.Raw - clusterTime bson.Timestamp } // RunChangeEventPersistor persists rechecks from change event batches. @@ -166,14 +165,12 @@ func (verifier *Verifier) PersistChangeEvents(ctx context.Context, batch changeE } latestTimestampTime := time.Unix(int64(latestTimestamp.T), 0) - lag := time.Unix(int64(batch.clusterTime.T), 0).Sub(latestTimestampTime) verifier.logger.Trace(). Str("origin", string(eventOrigin)). Int("count", len(docIDs)). Any("latestTimestamp", latestTimestamp). Time("latestTimestampTime", latestTimestampTime). - Stringer("lag", lag). Msg("Persisting rechecks for change events.") return verifier.insertRecheckDocs(ctx, dbNames, collNames, docIDs, dataSizes) From 263c9e70a125048835a03641312b11e24c0e8225 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Fri, 14 Nov 2025 15:57:41 -0500 Subject: [PATCH 02/15] fall back to start ts as needed --- internal/verifier/change_reader.go | 4 ++-- internal/verifier/compare.go | 11 +++++++++-- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/internal/verifier/change_reader.go b/internal/verifier/change_reader.go index 8fae036c..c9021ecf 100644 --- a/internal/verifier/change_reader.go +++ b/internal/verifier/change_reader.go @@ -35,7 +35,7 @@ type changeReader interface { getWhichCluster() whichCluster getReadChannel() <-chan changeEventBatch getStartTimestamp() option.Option[bson.Timestamp] - getLatestTimestamp() option.Option[bson.Timestamp] + getLastSeenClusterTime() option.Option[bson.Timestamp] getEventsPerSecond() option.Option[float64] getLag() option.Option[time.Duration] getBufferSaturation() float64 @@ -108,7 +108,7 @@ func (rc *ChangeReaderCommon) getReadChannel() <-chan changeEventBatch { return rc.changeEventBatchChan } -func (rc *ChangeReaderCommon) getLatestTimestamp() option.Option[bson.Timestamp] { +func (rc *ChangeReaderCommon) getLastSeenClusterTime() option.Option[bson.Timestamp] { return rc.lastChangeEventTime.Load() } diff --git a/internal/verifier/compare.go b/internal/verifier/compare.go index d5b8b94d..5e6103f8 100644 --- a/internal/verifier/compare.go +++ b/internal/verifier/compare.go @@ -2,6 +2,7 @@ package verifier import ( "bytes" + "cmp" "context" "fmt" "time" @@ -466,7 +467,10 @@ func (verifier *Verifier) getFetcherChannelsAndCallbacks( sctx, verifier.srcClientCollection(task), verifier.srcClusterInfo, - verifier.srcChangeReader.getLatestTimestamp(), + cmp.Or( + verifier.srcChangeReader.getLastSeenClusterTime(), + verifier.srcChangeReader.getStartTimestamp(), + ), task, ) @@ -499,7 +503,10 @@ func (verifier *Verifier) getFetcherChannelsAndCallbacks( sctx, verifier.dstClientCollection(task), verifier.dstClusterInfo, - verifier.dstChangeReader.getLatestTimestamp(), + cmp.Or( + verifier.dstChangeReader.getLastSeenClusterTime(), + verifier.dstChangeReader.getStartTimestamp(), + ), task, ) From f05a4c68585d42bcb5a630a8aa843827e251db83 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Fri, 14 Nov 2025 17:06:41 -0500 Subject: [PATCH 03/15] use driver API --- internal/verifier/compare.go | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/internal/verifier/compare.go b/internal/verifier/compare.go index 5e6103f8..02b509e3 100644 --- a/internal/verifier/compare.go +++ b/internal/verifier/compare.go @@ -667,18 +667,17 @@ func (verifier *Verifier) getDocumentsCursor( ) } + sess := mongo.SessionFromContext(ctx) + + if sess == nil { + panic("No session?!?") + } + runCommandOptions = runCommandOptions.SetReadPreference(verifier.readPreference) if ts, has := readConcernTS.Get(); has { - readConcern := bson.D{ - {"afterClusterTime", ts}, + if err := sess.AdvanceOperationTime(&ts); err != nil { + return nil, errors.Wrapf(err, "advancing session operation time to %v", ts) } - - // We never want to read before the change stream start time, - // or for the last generation, the change stream end time. - cmd = append( - cmd, - bson.E{"readConcern", readConcern}, - ) } // Suppress this log for recheck tasks because the list of IDs can be From 22d577525e627c20859f8766bd57491388559844 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Fri, 14 Nov 2025 17:15:23 -0500 Subject: [PATCH 04/15] revert --- internal/verifier/compare.go | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/internal/verifier/compare.go b/internal/verifier/compare.go index 02b509e3..f2e17b68 100644 --- a/internal/verifier/compare.go +++ b/internal/verifier/compare.go @@ -587,7 +587,6 @@ func (verifier *Verifier) getDocumentsCursor( task *VerificationTask, ) (*mongo.Cursor, error) { var findOptions bson.D - runCommandOptions := options.RunCmd() var andPredicates bson.A var aggOptions bson.D @@ -673,11 +672,19 @@ func (verifier *Verifier) getDocumentsCursor( panic("No session?!?") } - runCommandOptions = runCommandOptions.SetReadPreference(verifier.readPreference) + runCommandOptions := options.RunCmd().SetReadPreference(verifier.readPreference) + if ts, has := readConcernTS.Get(); has { - if err := sess.AdvanceOperationTime(&ts); err != nil { - return nil, errors.Wrapf(err, "advancing session operation time to %v", ts) + readConcern := bson.D{ + {"afterClusterTime", ts}, } + + // We never want to read before the change stream start time, + // or for the last generation, the change stream end time. + cmd = append( + cmd, + bson.E{"readConcern", readConcern}, + ) } // Suppress this log for recheck tasks because the list of IDs can be From f2ca1e6050c552ed51cb8b5df3d65a5fa8815917 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Fri, 14 Nov 2025 20:35:31 -0500 Subject: [PATCH 05/15] compulsory timestamp --- internal/verifier/change_reader.go | 10 ++++++--- internal/verifier/change_stream_test.go | 18 +++++---------- internal/verifier/compare.go | 29 ++++++++++--------------- 3 files changed, 24 insertions(+), 33 deletions(-) diff --git a/internal/verifier/change_reader.go b/internal/verifier/change_reader.go index c9021ecf..a3d261f9 100644 --- a/internal/verifier/change_reader.go +++ b/internal/verifier/change_reader.go @@ -34,7 +34,7 @@ const ( type changeReader interface { getWhichCluster() whichCluster getReadChannel() <-chan changeEventBatch - getStartTimestamp() option.Option[bson.Timestamp] + getStartTimestamp() bson.Timestamp getLastSeenClusterTime() option.Option[bson.Timestamp] getEventsPerSecond() option.Option[float64] getLag() option.Option[time.Duration] @@ -92,8 +92,12 @@ func (rc *ChangeReaderCommon) getWhichCluster() whichCluster { return rc.readerType } -func (rc *ChangeReaderCommon) getStartTimestamp() option.Option[bson.Timestamp] { - return option.FromPointer(rc.startAtTs) +func (rc *ChangeReaderCommon) getStartTimestamp() bson.Timestamp { + if rc.startAtTs == nil { + panic("no start timestamp yet?!?") + } + + return *rc.startAtTs } func (rc *ChangeReaderCommon) setWritesOff(ts bson.Timestamp) { diff --git a/internal/verifier/change_stream_test.go b/internal/verifier/change_stream_test.go index 5f8e8395..62a39bca 100644 --- a/internal/verifier/change_stream_test.go +++ b/internal/verifier/change_stream_test.go @@ -441,9 +441,7 @@ func (suite *IntegrationTestSuite) TestChangeStreamResumability() { suite.startSrcChangeStreamReaderAndHandler(ctx, verifier2) - startAtTs, hasStartAtTs := verifier2.srcChangeReader.getStartTimestamp().Get() - - suite.Require().True(hasStartAtTs) + startAtTs := verifier2.srcChangeReader.getStartTimestamp() suite.Assert().False( startAtTs.After(newTime), @@ -631,14 +629,13 @@ func (suite *IntegrationTestSuite) TestStartAtTimeNoChanges() { eg := suite.startSrcChangeStreamReaderAndHandler(ctx, verifier) - startAtTs, hasStartAtTs := verifier.srcChangeReader.getStartTimestamp().Get() - suite.Require().True(hasStartAtTs, "startAtTs should be set") + startAtTs := verifier.srcChangeReader.getStartTimestamp() verifier.srcChangeReader.setWritesOff(insertTs) suite.Require().NoError(eg.Wait()) - startAtTs2 := verifier.srcChangeReader.getStartTimestamp().MustGet() + startAtTs2 := verifier.srcChangeReader.getStartTimestamp() suite.Require().False( startAtTs2.Before(startAtTs), @@ -663,8 +660,7 @@ func (suite *IntegrationTestSuite) TestStartAtTimeWithChanges() { suite.Require().NotNil(origSessionTime) eg := suite.startSrcChangeStreamReaderAndHandler(ctx, verifier) - startAtTs, hasStartAtTs := verifier.srcChangeReader.getStartTimestamp().Get() - suite.Require().True(hasStartAtTs, "startAtTs should be set") + startAtTs := verifier.srcChangeReader.getStartTimestamp() // srcStartAtTs derives from the change stream’s resume token, which can // postdate our session time but should not precede it. @@ -697,8 +693,7 @@ func (suite *IntegrationTestSuite) TestStartAtTimeWithChanges() { suite.Require().NoError(eg.Wait()) - startAtTs, hasStartAtTs = verifier.srcChangeReader.getStartTimestamp().Get() - suite.Require().True(hasStartAtTs, "startAtTs should be set") + startAtTs = verifier.srcChangeReader.getStartTimestamp() suite.Assert().Equal( *postEventsSessionTime, @@ -720,8 +715,7 @@ func (suite *IntegrationTestSuite) TestNoStartAtTime() { suite.Require().NotNil(origStartTs) suite.startSrcChangeStreamReaderAndHandler(ctx, verifier) - startAtTs, hasStartAtTs := verifier.srcChangeReader.getStartTimestamp().Get() - suite.Require().True(hasStartAtTs, "startAtTs should be set") + startAtTs := verifier.srcChangeReader.getStartTimestamp() suite.Require().NotNil(startAtTs) suite.Require().LessOrEqual(origStartTs.Compare(startAtTs), 0) diff --git a/internal/verifier/compare.go b/internal/verifier/compare.go index f2e17b68..da9988a1 100644 --- a/internal/verifier/compare.go +++ b/internal/verifier/compare.go @@ -2,7 +2,6 @@ package verifier import ( "bytes" - "cmp" "context" "fmt" "time" @@ -467,8 +466,7 @@ func (verifier *Verifier) getFetcherChannelsAndCallbacks( sctx, verifier.srcClientCollection(task), verifier.srcClusterInfo, - cmp.Or( - verifier.srcChangeReader.getLastSeenClusterTime(), + verifier.srcChangeReader.getLastSeenClusterTime().OrElse( verifier.srcChangeReader.getStartTimestamp(), ), task, @@ -503,8 +501,7 @@ func (verifier *Verifier) getFetcherChannelsAndCallbacks( sctx, verifier.dstClientCollection(task), verifier.dstClusterInfo, - cmp.Or( - verifier.dstChangeReader.getLastSeenClusterTime(), + verifier.dstChangeReader.getLastSeenClusterTime().OrElse( verifier.dstChangeReader.getStartTimestamp(), ), task, @@ -583,7 +580,7 @@ func (verifier *Verifier) getDocumentsCursor( ctx context.Context, collection *mongo.Collection, clusterInfo *util.ClusterInfo, - readConcernTS option.Option[bson.Timestamp], + readConcernTS bson.Timestamp, task *VerificationTask, ) (*mongo.Cursor, error) { var findOptions bson.D @@ -674,18 +671,14 @@ func (verifier *Verifier) getDocumentsCursor( runCommandOptions := options.RunCmd().SetReadPreference(verifier.readPreference) - if ts, has := readConcernTS.Get(); has { - readConcern := bson.D{ - {"afterClusterTime", ts}, - } - - // We never want to read before the change stream start time, - // or for the last generation, the change stream end time. - cmd = append( - cmd, - bson.E{"readConcern", readConcern}, - ) - } + // We never want to read before the change stream start time, + // or for the last generation, the change stream end time. + cmd = append( + cmd, + bson.E{"readConcern", bson.D{ + {"afterClusterTime", readConcernTS}, + }}, + ) // Suppress this log for recheck tasks because the list of IDs can be // quite long. From 22e5072efbfa531684515287578fdb5eb76980e7 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Fri, 14 Nov 2025 20:47:14 -0500 Subject: [PATCH 06/15] start handling --- internal/verifier/check.go | 48 ++++++++++++++++++------------- internal/verifier/compare_test.go | 3 +- 2 files changed, 30 insertions(+), 21 deletions(-) diff --git a/internal/verifier/check.go b/internal/verifier/check.go index 0405ffcc..b606ee84 100644 --- a/internal/verifier/check.go +++ b/internal/verifier/check.go @@ -251,28 +251,10 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh verifier.phase = Idle }() - changeReaderGroup, groupCtx := contextplus.ErrGroup(ctx) - for _, changeReader := range mslices.Of(verifier.srcChangeReader, verifier.dstChangeReader) { - if changeReader.isRunning() { - verifier.logger.Debug().Msgf("Check: %s already running.", changeReader) - } else { - verifier.logger.Debug().Msgf("%s not running; starting change reader", changeReader) - - err = changeReader.start(groupCtx, changeReaderGroup) - if err != nil { - return errors.Wrapf(err, "failed to start %s", changeReader) - } - changeReaderGroup.Go(func() error { - return verifier.RunChangeEventPersistor(groupCtx, changeReader) - }) - } + if err := verifier.startChangeHandling(ctx); err != nil { + return err } - changeHandlingErr := verifier.changeHandlingErr - go func() { - changeHandlingErr.Set(changeReaderGroup.Wait()) - }() - // Log the verification status when initially booting up so it's easy to see the current state verificationStatus, err := verifier.GetVerificationStatus(ctx) if err != nil { @@ -405,6 +387,32 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh } } +func (verifier *Verifier) startChangeHandling(ctx context.Context) error { + changeReaderGroup, groupCtx := contextplus.ErrGroup(ctx) + for _, changeReader := range mslices.Of(verifier.srcChangeReader, verifier.dstChangeReader) { + if changeReader.isRunning() { + verifier.logger.Debug().Msgf("Check: %s already running.", changeReader) + } else { + verifier.logger.Debug().Msgf("%s not running; starting change reader", changeReader) + + err := changeReader.start(groupCtx, changeReaderGroup) + if err != nil { + return errors.Wrapf(err, "failed to start %s", changeReader) + } + changeReaderGroup.Go(func() error { + return verifier.RunChangeEventPersistor(groupCtx, changeReader) + }) + } + } + + changeHandlingErr := verifier.changeHandlingErr + go func() { + changeHandlingErr.Set(changeReaderGroup.Wait()) + }() + + return nil +} + func (verifier *Verifier) setupAllNamespaceList(ctx context.Context) error { // We want to check all user collections on both source and dest. srcNamespaces, err := ListAllUserNamespaces(ctx, verifier.logger, verifier.srcClient, verifier.metaDBName) diff --git a/internal/verifier/compare_test.go b/internal/verifier/compare_test.go index 884e38a6..15686ba6 100644 --- a/internal/verifier/compare_test.go +++ b/internal/verifier/compare_test.go @@ -14,7 +14,7 @@ import ( "go.mongodb.org/mongo-driver/v2/bson" ) -// TestFetchAndCompareDocuments_ContextCancellation ensures that nothing hangs +// TestFetchAndCompareDocuments_Context ensures that nothing hangs // when a context is canceled during FetchAndCompareDocuments(). func (s *IntegrationTestSuite) TestFetchAndCompareDocuments_Context() { ctx := s.Context() @@ -49,6 +49,7 @@ func (s *IntegrationTestSuite) TestFetchAndCompareDocuments_Context() { } verifier := s.BuildVerifier() + s.Require().NoError(verifier.startChangeHandling(ctx)) for range 100 { cancelableCtx, cancel := contextplus.WithCancelCause(ctx) From 88b065708c2291e0442ec3a846ea43ee591a3a7f Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Fri, 14 Nov 2025 20:51:55 -0500 Subject: [PATCH 07/15] another --- internal/verifier/migration_verifier_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/verifier/migration_verifier_test.go b/internal/verifier/migration_verifier_test.go index 396077bf..631ea2d4 100644 --- a/internal/verifier/migration_verifier_test.go +++ b/internal/verifier/migration_verifier_test.go @@ -126,6 +126,8 @@ func (suite *IntegrationTestSuite) TestProcessVerifyTask_Failure() { ctx := suite.Context() t := suite.T() + suite.Require().NoError(verifier.startChangeHandling(ctx)) + dbName := suite.DBNameForTest() collName := "coll" From ae85756fe4c4ed3b635f62cb99ce58a0e6cc1f71 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Fri, 14 Nov 2025 20:56:50 -0500 Subject: [PATCH 08/15] fix another --- internal/verifier/migration_verifier_test.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/internal/verifier/migration_verifier_test.go b/internal/verifier/migration_verifier_test.go index 631ea2d4..0e4050bb 100644 --- a/internal/verifier/migration_verifier_test.go +++ b/internal/verifier/migration_verifier_test.go @@ -379,8 +379,6 @@ func (suite *IntegrationTestSuite) TestVerifier_DocFilter_ObjectID() { } func (suite *IntegrationTestSuite) TestTypesBetweenBoundaries() { - verifier := suite.BuildVerifier() - ctx := suite.Context() task := &VerificationTask{ PrimaryKey: bson.NewObjectID(), @@ -396,20 +394,25 @@ func (suite *IntegrationTestSuite) TestTypesBetweenBoundaries() { }, } - _, err := verifier.srcClient.Database("keyhole").Collection("dealers").InsertMany(ctx, []any{ + _, err := suite.srcMongoClient.Database("keyhole").Collection("dealers").InsertMany(ctx, []any{ bson.D{{"_id", nil}}, bson.D{{"_id", int32(123)}}, bson.D{{"_id", bson.Symbol("oh yeah")}}, }) suite.Require().NoError(err) - _, err = verifier.dstClient.Database("keyhole").Collection("dealers").InsertMany(ctx, []any{ + _, err = suite.dstMongoClient.Database("keyhole").Collection("dealers").InsertMany(ctx, []any{ bson.D{{"_id", nil}}, bson.D{{"_id", int32(123)}}, bson.D{{"_id", "oh yeah"}}, }) suite.Require().NoError(err) + verifier := suite.BuildVerifier() + ctx := suite.Context() + + suite.Require().NoError(verifier.startChangeHandling(ctx)) + cases := []struct { label string lower, upper any From 37373ac1f0c373a8c87b53852db0ef31eb7c22d2 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Fri, 14 Nov 2025 20:57:11 -0500 Subject: [PATCH 09/15] fix --- internal/verifier/migration_verifier_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/verifier/migration_verifier_test.go b/internal/verifier/migration_verifier_test.go index 0e4050bb..b67e86cc 100644 --- a/internal/verifier/migration_verifier_test.go +++ b/internal/verifier/migration_verifier_test.go @@ -379,6 +379,7 @@ func (suite *IntegrationTestSuite) TestVerifier_DocFilter_ObjectID() { } func (suite *IntegrationTestSuite) TestTypesBetweenBoundaries() { + ctx := suite.Context() task := &VerificationTask{ PrimaryKey: bson.NewObjectID(), From b30e9a898c26f32251b60b0af88106d8431fbda0 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Fri, 14 Nov 2025 20:59:57 -0500 Subject: [PATCH 10/15] fix again --- internal/verifier/migration_verifier_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/internal/verifier/migration_verifier_test.go b/internal/verifier/migration_verifier_test.go index b67e86cc..40f51f26 100644 --- a/internal/verifier/migration_verifier_test.go +++ b/internal/verifier/migration_verifier_test.go @@ -410,7 +410,6 @@ func (suite *IntegrationTestSuite) TestTypesBetweenBoundaries() { suite.Require().NoError(err) verifier := suite.BuildVerifier() - ctx := suite.Context() suite.Require().NoError(verifier.startChangeHandling(ctx)) From 0546f156ea3b50dbd418eaa8376dfbf79fc953a8 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Fri, 14 Nov 2025 21:09:54 -0500 Subject: [PATCH 11/15] fix test --- internal/verifier/migration_verifier_test.go | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/internal/verifier/migration_verifier_test.go b/internal/verifier/migration_verifier_test.go index 40f51f26..c83de5c7 100644 --- a/internal/verifier/migration_verifier_test.go +++ b/internal/verifier/migration_verifier_test.go @@ -490,12 +490,12 @@ func (suite *IntegrationTestSuite) TestTypesBetweenBoundaries() { } func (suite *IntegrationTestSuite) TestVerifierFetchDocuments() { - verifier := suite.BuildVerifier() + ctx := suite.Context() drop := func() { - err := verifier.srcClient.Database("keyhole").Drop(ctx) + err := suite.srcMongoClient.Database("keyhole").Drop(ctx) suite.Require().NoError(err) - err = verifier.dstClient.Database("keyhole").Drop(ctx) + err = suite.dstMongoClient.Database("keyhole").Drop(ctx) suite.Require().NoError(err) } drop() @@ -511,12 +511,12 @@ func (suite *IntegrationTestSuite) TestVerifierFetchDocuments() { } id := rand.Intn(1000) - _, err := verifier.srcClient.Database("keyhole").Collection("dealers").InsertMany(ctx, []any{ + _, err := suite.srcMongoClient.Database("keyhole").Collection("dealers").InsertMany(ctx, []any{ bson.D{{"_id", id}, {"num", 99}, {"name", "srcTest"}}, bson.D{{"_id", id + 1}, {"num", 101}, {"name", "srcTest"}}, }) suite.Require().NoError(err) - _, err = verifier.dstClient.Database("keyhole").Collection("dealers").InsertMany(ctx, []any{ + _, err = suite.dstMongoClient.Database("keyhole").Collection("dealers").InsertMany(ctx, []any{ bson.D{{"_id", id}, {"num", 99}, {"name", "dstTest"}}, bson.D{{"_id", id + 1}, {"num", 101}, {"name", "dstTest"}}, }) @@ -528,6 +528,9 @@ func (suite *IntegrationTestSuite) TestVerifierFetchDocuments() { QueryFilter: basicQueryFilter("keyhole.dealers"), } + verifier := suite.BuildVerifier() + suite.Require().NoError(verifier.startChangeHandling(ctx)) + // Test fetchDocuments without global filter. verifier.globalFilter = nil results, docCount, byteCount, err := verifier.FetchAndCompareDocuments(ctx, 0, task) From 1ffb5f69a205b7a0407b58244f14c4b524e7ba93 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Fri, 14 Nov 2025 21:16:32 -0500 Subject: [PATCH 12/15] more --- internal/verifier/migration_verifier_test.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/internal/verifier/migration_verifier_test.go b/internal/verifier/migration_verifier_test.go index c83de5c7..deb77e4f 100644 --- a/internal/verifier/migration_verifier_test.go +++ b/internal/verifier/migration_verifier_test.go @@ -332,15 +332,15 @@ func getShardIds(t *testing.T, client *mongo.Client) []string { } func (suite *IntegrationTestSuite) TestVerifier_DocFilter_ObjectID() { - verifier := suite.BuildVerifier() + ctx := suite.Context() t := suite.T() dbName := suite.DBNameForTest() collName := "coll" - srcColl := verifier.srcClient.Database(dbName).Collection(collName) - dstColl := verifier.dstClient.Database(dbName).Collection(collName) + srcColl := suite.srcMongoClient.Database(dbName).Collection(collName) + dstColl := suite.dstMongoClient.Database(dbName).Collection(collName) id1 := bson.NewObjectID() _, err := srcColl.InsertOne(ctx, bson.D{{"_id", id1}}) @@ -364,6 +364,9 @@ func (suite *IntegrationTestSuite) TestVerifier_DocFilter_ObjectID() { }, } + verifier := suite.BuildVerifier() + suite.Require().NoError(verifier.startChangeHandling(ctx)) + verifier.globalFilter = bson.D{{"_id", id1}} results, docCount, _, err := verifier.FetchAndCompareDocuments(ctx, 0, task) From a463151398b210223f266f00fd70c0f8582a5d91 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Fri, 14 Nov 2025 21:21:35 -0500 Subject: [PATCH 13/15] move --- internal/verifier/migration_verifier_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/verifier/migration_verifier_test.go b/internal/verifier/migration_verifier_test.go index deb77e4f..c8a4ddd7 100644 --- a/internal/verifier/migration_verifier_test.go +++ b/internal/verifier/migration_verifier_test.go @@ -384,6 +384,10 @@ func (suite *IntegrationTestSuite) TestVerifier_DocFilter_ObjectID() { func (suite *IntegrationTestSuite) TestTypesBetweenBoundaries() { ctx := suite.Context() + verifier := suite.BuildVerifier() + + suite.Require().NoError(verifier.startChangeHandling(ctx)) + task := &VerificationTask{ PrimaryKey: bson.NewObjectID(), QueryFilter: QueryFilter{ @@ -412,10 +416,6 @@ func (suite *IntegrationTestSuite) TestTypesBetweenBoundaries() { }) suite.Require().NoError(err) - verifier := suite.BuildVerifier() - - suite.Require().NoError(verifier.startChangeHandling(ctx)) - cases := []struct { label string lower, upper any From a5ab5031dd82d1df451a8b3f0fe6acef5d0f89e3 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Fri, 14 Nov 2025 21:22:55 -0500 Subject: [PATCH 14/15] save --- internal/verifier/migration_verifier_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/verifier/migration_verifier_test.go b/internal/verifier/migration_verifier_test.go index c8a4ddd7..6f51bb70 100644 --- a/internal/verifier/migration_verifier_test.go +++ b/internal/verifier/migration_verifier_test.go @@ -493,8 +493,11 @@ func (suite *IntegrationTestSuite) TestTypesBetweenBoundaries() { } func (suite *IntegrationTestSuite) TestVerifierFetchDocuments() { - ctx := suite.Context() + + verifier := suite.BuildVerifier() + suite.Require().NoError(verifier.startChangeHandling(ctx)) + drop := func() { err := suite.srcMongoClient.Database("keyhole").Drop(ctx) suite.Require().NoError(err) @@ -531,9 +534,6 @@ func (suite *IntegrationTestSuite) TestVerifierFetchDocuments() { QueryFilter: basicQueryFilter("keyhole.dealers"), } - verifier := suite.BuildVerifier() - suite.Require().NoError(verifier.startChangeHandling(ctx)) - // Test fetchDocuments without global filter. verifier.globalFilter = nil results, docCount, byteCount, err := verifier.FetchAndCompareDocuments(ctx, 0, task) From fe2224230a352f6f237b80ef2ef2930aeef2acf9 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Fri, 14 Nov 2025 21:31:36 -0500 Subject: [PATCH 15/15] ctx --- internal/verifier/migration_verifier_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/verifier/migration_verifier_test.go b/internal/verifier/migration_verifier_test.go index 6f51bb70..4d70ccd8 100644 --- a/internal/verifier/migration_verifier_test.go +++ b/internal/verifier/migration_verifier_test.go @@ -304,6 +304,7 @@ func (suite *IntegrationTestSuite) TestVerifier_Dotted_Shard_Key() { } verifier := suite.BuildVerifier() + suite.Require().NoError(verifier.startChangeHandling(ctx)) results, docCount, _, err := verifier.FetchAndCompareDocuments(ctx, 0, task) require.NoError(err, "should fetch & compare") assert.EqualValues(suite.T(), len(docs), docCount, "expected # of docs")