From 687a57e2af6840d6448bc4c13cc24839a333b3c7 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Tue, 11 Nov 2025 14:44:52 -0500 Subject: [PATCH 01/24] move to common --- internal/verifier/change_stream.go | 28 ++++++++++++++++++---------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/internal/verifier/change_stream.go b/internal/verifier/change_stream.go index 06aacca9..b53cbdbe 100644 --- a/internal/verifier/change_stream.go +++ b/internal/verifier/change_stream.go @@ -64,7 +64,7 @@ type changeEventBatch struct { clusterTime bson.Timestamp } -type ChangeStreamReader struct { +type ChangeReaderCommon struct { readerType whichCluster lastChangeEventTime *bson.Timestamp @@ -90,21 +90,29 @@ type ChangeStreamReader struct { onDDLEvent ddlEventHandling } +type ChangeStreamReader struct { + ChangeReaderCommon +} + func (verifier *Verifier) initializeChangeStreamReaders() { srcReader := &ChangeStreamReader{ - readerType: src, - namespaces: verifier.srcNamespaces, - watcherClient: verifier.srcClient, - clusterInfo: *verifier.srcClusterInfo, + ChangeReaderCommon: ChangeReaderCommon{ + readerType: src, + namespaces: verifier.srcNamespaces, + watcherClient: verifier.srcClient, + clusterInfo: *verifier.srcClusterInfo, + }, } verifier.srcChangeStreamReader = srcReader dstReader := &ChangeStreamReader{ - readerType: dst, - namespaces: verifier.dstNamespaces, - watcherClient: verifier.dstClient, - clusterInfo: *verifier.dstClusterInfo, - onDDLEvent: onDDLEventAllow, + ChangeReaderCommon: ChangeReaderCommon{ + readerType: dst, + namespaces: verifier.dstNamespaces, + watcherClient: verifier.dstClient, + clusterInfo: *verifier.dstClusterInfo, + onDDLEvent: onDDLEventAllow, + }, } verifier.dstChangeStreamReader = dstReader From 61b89e35c07f3a151ba4603211aa67897091d291 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Tue, 11 Nov 2025 15:52:30 -0500 Subject: [PATCH 02/24] move --- internal/util/clusterinfo.go | 10 ++ internal/verifier/change_reader.go | 207 ++++++++++++++++++++++++ internal/verifier/change_stream.go | 37 +---- internal/verifier/change_stream_test.go | 84 +++++++--- internal/verifier/check.go | 20 +-- internal/verifier/compare.go | 4 +- internal/verifier/migration_verifier.go | 16 +- internal/verifier/summary.go | 8 +- 8 files changed, 305 insertions(+), 81 deletions(-) create mode 100644 internal/verifier/change_reader.go diff --git a/internal/util/clusterinfo.go b/internal/util/clusterinfo.go index 2364db93..7f66f8e7 100644 --- a/internal/util/clusterinfo.go +++ b/internal/util/clusterinfo.go @@ -19,6 +19,16 @@ type ClusterInfo struct { Topology ClusterTopology } +func ClusterHasBSONSize(va [2]int) bool { + major := va[0] + + if major == 4 { + return va[1] >= 4 + } + + return major > 4 +} + const ( TopologySharded ClusterTopology = "sharded" TopologyReplset ClusterTopology = "replset" diff --git a/internal/verifier/change_reader.go b/internal/verifier/change_reader.go new file mode 100644 index 00000000..cd6e30dd --- /dev/null +++ b/internal/verifier/change_reader.go @@ -0,0 +1,207 @@ +package verifier + +import ( + "context" + "time" + + "github.com/10gen/migration-verifier/history" + "github.com/10gen/migration-verifier/internal/logger" + "github.com/10gen/migration-verifier/internal/util" + "github.com/10gen/migration-verifier/msync" + "github.com/10gen/migration-verifier/option" + "github.com/pkg/errors" + "github.com/samber/lo" + "go.mongodb.org/mongo-driver/v2/bson" + "go.mongodb.org/mongo-driver/v2/mongo" + "go.mongodb.org/mongo-driver/v2/mongo/options" +) + +type changeReader interface { + getWhichCluster() whichCluster + getReadChannel() <-chan changeEventBatch + getError() *util.Eventual[error] + getStartTimestamp() option.Option[bson.Timestamp] + getEventsPerSecond() option.Option[float64] + getLag() option.Option[time.Duration] + getBufferSaturation() float64 + setWritesOff(bson.Timestamp) + setPersistorError(error) + StartChangeStream(context.Context) error + done() <-chan struct{} + persistChangeStreamResumeToken(context.Context, bson.Raw) error + isRunning() bool + String() string +} + +type ChangeReaderCommon struct { + readerType whichCluster + + lastChangeEventTime *bson.Timestamp + logger *logger.Logger + namespaces []string + + metaDB *mongo.Database + watcherClient *mongo.Client + clusterInfo util.ClusterInfo + + resumeTokenTSExtractor func(bson.Raw) (bson.Timestamp, error) + + changeStreamRunning bool + changeEventBatchChan chan changeEventBatch + writesOffTs *util.Eventual[bson.Timestamp] + readerError *util.Eventual[error] + handlerError *util.Eventual[error] + doneChan chan struct{} + + startAtTs *bson.Timestamp + + lag *msync.TypedAtomic[option.Option[time.Duration]] + batchSizeHistory *history.History[int] + + onDDLEvent ddlEventHandling +} + +func (rc ChangeReaderCommon) getWhichCluster() whichCluster { + return rc.readerType +} + +func (rc ChangeReaderCommon) setPersistorError(err error) { + rc.handlerError.Set(err) +} + +func (rc ChangeReaderCommon) getError() *util.Eventual[error] { + return rc.readerError +} + +func (rc ChangeReaderCommon) getStartTimestamp() option.Option[bson.Timestamp] { + return option.FromPointer(rc.startAtTs) +} + +func (rc ChangeReaderCommon) setWritesOff(ts bson.Timestamp) { + rc.writesOffTs.Set(ts) +} + +func (rc ChangeReaderCommon) isRunning() bool { + return rc.isRunning() +} + +func (rc ChangeReaderCommon) getReadChannel() <-chan changeEventBatch { + return rc.changeEventBatchChan +} + +func (rc ChangeReaderCommon) done() <-chan struct{} { + return rc.doneChan +} + +func (rc ChangeReaderCommon) getBufferSaturation() float64 { + return util.DivideToF64(len(rc.changeEventBatchChan), cap(rc.changeEventBatchChan)) +} + +func (rc ChangeReaderCommon) getLag() option.Option[time.Duration] { + return rc.lag.Load() +} + +// getEventsPerSecond returns the number of change events per second we’ve been +// seeing “recently”. (See implementation for the actual period over which we +// compile this metric.) +func (rc ChangeReaderCommon) getEventsPerSecond() option.Option[float64] { + logs := rc.batchSizeHistory.Get() + lastLog, hasLogs := lo.Last(logs) + + if hasLogs && lastLog.At != logs[0].At { + span := lastLog.At.Sub(logs[0].At) + + // Each log contains a time and a # of events that happened since + // the prior log. Thus, each log’s Datum is a count of events that + // happened before the timestamp. Since we want the # of events that + // happened between the first & last times, we only want events *after* + // the first time. Thus, we skip the first log entry here. + totalEvents := 0 + for _, log := range logs[1:] { + totalEvents += log.Datum + } + + return option.Some(util.DivideToF64(totalEvents, span.Seconds())) + } + + return option.None[float64]() +} + +func (rc ChangeReaderCommon) persistChangeStreamResumeToken(ctx context.Context, token bson.Raw) error { + coll := rc.metaDB.Collection(metadataChangeStreamCollectionName) + _, err := coll.ReplaceOne( + ctx, + bson.D{{"_id", rc.resumeTokenDocID()}}, + token, + options.Replace().SetUpsert(true), + ) + + if err == nil { + ts, err := rc.resumeTokenTSExtractor(token) + + logEvent := rc.logger.Debug() + + if err == nil { + logEvent = addTimestampToLogEvent(ts, logEvent) + } else { + rc.logger.Warn().Err(err). + Msg("failed to extract resume token timestamp") + } + + logEvent.Msgf("Persisted %s's resume token.", rc.readerType) + + return nil + } + + return errors.Wrapf(err, "failed to persist change stream resume token (%v)", token) +} + +func (rc ChangeReaderCommon) resumeTokenDocID() string { + switch rc.readerType { + case src: + return "srcResumeToken" + case dst: + return "dstResumeToken" + default: + panic("unknown readerType: " + rc.readerType) + } +} + +func (rc ChangeReaderCommon) getMetadataCollection() *mongo.Collection { + return rc.metaDB.Collection(metadataChangeStreamCollectionName) +} + +func (rc ChangeReaderCommon) loadResumeToken(ctx context.Context) (option.Option[bson.Raw], error) { + coll := rc.getMetadataCollection() + + token, err := coll.FindOne( + ctx, + bson.D{{"_id", rc.resumeTokenDocID()}}, + ).Raw() + + if errors.Is(err, mongo.ErrNoDocuments) { + return option.None[bson.Raw](), nil + } + + return option.Some(token), err +} + +func (rc *ChangeReaderCommon) updateLag(sess *mongo.Session, token bson.Raw) { + var tokenTs bson.Timestamp + tokenTs, err := rc.resumeTokenTSExtractor(token) + if err == nil { + lagSecs := int64(sess.OperationTime().T) - int64(tokenTs.T) + rc.lag.Store(option.Some(time.Second * time.Duration(lagSecs))) + } else { + rc.logger.Warn(). + Err(err). + Msgf("Failed to extract timestamp from %s's resume token to compute lag.", rc.readerType) + } +} + +func (rc *ChangeReaderCommon) logIgnoredDDL(rawEvent bson.Raw) { + rc.logger.Info(). + Str("reader", string(rc.readerType)). + Stringer("event", rawEvent). + Msg("Ignoring event with unrecognized type on destination. (It’s assumedly internal to the migration.)") +} diff --git a/internal/verifier/change_stream.go b/internal/verifier/change_stream.go index b53cbdbe..8254be70 100644 --- a/internal/verifier/change_stream.go +++ b/internal/verifier/change_stream.go @@ -7,7 +7,6 @@ import ( "github.com/10gen/migration-verifier/history" "github.com/10gen/migration-verifier/internal/keystring" - "github.com/10gen/migration-verifier/internal/logger" "github.com/10gen/migration-verifier/internal/retry" "github.com/10gen/migration-verifier/internal/util" "github.com/10gen/migration-verifier/mbson" @@ -64,36 +63,12 @@ type changeEventBatch struct { clusterTime bson.Timestamp } -type ChangeReaderCommon struct { - readerType whichCluster - - lastChangeEventTime *bson.Timestamp - logger *logger.Logger - namespaces []string - - metaDB *mongo.Database - watcherClient *mongo.Client - clusterInfo util.ClusterInfo - - changeStreamRunning bool - changeEventBatchChan chan changeEventBatch - writesOffTs *util.Eventual[bson.Timestamp] - readerError *util.Eventual[error] - handlerError *util.Eventual[error] - doneChan chan struct{} - - startAtTs *bson.Timestamp - - lag *msync.TypedAtomic[option.Option[time.Duration]] - batchSizeHistory *history.History[int] - - onDDLEvent ddlEventHandling -} - type ChangeStreamReader struct { ChangeReaderCommon } +var _ changeReader = &ChangeStreamReader{} + func (verifier *Verifier) initializeChangeStreamReaders() { srcReader := &ChangeStreamReader{ ChangeReaderCommon: ChangeReaderCommon{ @@ -133,7 +108,7 @@ func (verifier *Verifier) initializeChangeStreamReaders() { // RunChangeEventHandler handles change event batches from the reader. // It needs to be started after the reader starts and should run in its own // goroutine. -func (verifier *Verifier) RunChangeEventHandler(ctx context.Context, reader *ChangeStreamReader) error { +func (verifier *Verifier) RunChangeEventHandler(ctx context.Context, reader changeReader) error { var err error var lastPersistedTime time.Time @@ -161,7 +136,7 @@ HandlerLoop: Err(err). Stringer("changeStreamReader", reader). Msg("Change event handler failed.") - case batch, more := <-reader.changeEventBatchChan: + case batch, more := <-reader.getReadChannel(): if !more { verifier.logger.Debug(). Stringer("changeStreamReader", reader). @@ -177,7 +152,7 @@ HandlerLoop: Msg("Handling change event batch.") err = errors.Wrap( - verifier.HandleChangeStreamEvents(ctx, batch, reader.readerType), + verifier.HandleChangeStreamEvents(ctx, batch, reader.getWhichCluster()), "failed to handle change stream events", ) @@ -190,7 +165,7 @@ HandlerLoop: // This will prevent the reader from hanging because the reader checks // this along with checks for context expiry. if err != nil { - reader.handlerError.Set(err) + reader.setPersistorError(err) } return err diff --git a/internal/verifier/change_stream_test.go b/internal/verifier/change_stream_test.go index cbe3928e..35fa08a1 100644 --- a/internal/verifier/change_stream_test.go +++ b/internal/verifier/change_stream_test.go @@ -32,7 +32,12 @@ func (suite *IntegrationTestSuite) TestChangeStreamFilter_NoNamespaces() { verifier := suite.BuildVerifier() - filter := verifier.srcChangeStreamReader.GetChangeStreamFilter() + changeStreamReader, ok := verifier.srcChangeStreamReader.(*ChangeStreamReader) + if !ok { + suite.T().Skipf("source change reader is a %T; this test needs a %T", verifier.srcChangeStreamReader, changeStreamReader) + } + + filter := changeStreamReader.GetChangeStreamFilter() _, err := suite.srcMongoClient. Database("realUserDatabase"). @@ -96,18 +101,23 @@ func (suite *IntegrationTestSuite) TestChangeStreamFilter_BsonSize() { ctx := suite.Context() verifier := suite.BuildVerifier() - if !verifier.srcChangeStreamReader.hasBsonSize() { + if !util.ClusterHasBSONSize([2]int(verifier.srcClusterInfo.VersionArray)) { suite.T().Skip("Need a source version that has $bsonSize") } + changeStreamReader, ok := verifier.srcChangeStreamReader.(*ChangeStreamReader) + if !ok { + suite.T().Skipf("source change reader is a %T; this test needs a %T", verifier.srcChangeStreamReader, changeStreamReader) + } + srcColl := verifier.srcClient.Database(suite.DBNameForTest()).Collection("coll") _, err := srcColl.InsertOne(ctx, bson.D{{"_id", 123}}) suite.Require().NoError(err) - verifier.srcChangeStreamReader.namespaces = mslices.Of(FullName(srcColl)) + changeStreamReader.namespaces = mslices.Of(FullName(srcColl)) - filter := verifier.srcChangeStreamReader.GetChangeStreamFilter() + filter := changeStreamReader.GetChangeStreamFilter() cs, err := suite.srcMongoClient.Watch( ctx, @@ -157,14 +167,20 @@ func (suite *IntegrationTestSuite) TestChangeStreamFilter_WithNamespaces() { ctx := suite.Context() verifier := suite.BuildVerifier() - verifier.srcChangeStreamReader.namespaces = []string{ + + changeStreamReader, ok := verifier.srcChangeStreamReader.(*ChangeStreamReader) + if !ok { + suite.T().Skipf("source change reader is a %T; this test needs a %T", verifier.srcChangeStreamReader, changeStreamReader) + } + + changeStreamReader.namespaces = []string{ "foo.bar", "foo.baz", "test.car", "test.chaz", } - filter := verifier.srcChangeStreamReader.GetChangeStreamFilter() + filter := changeStreamReader.GetChangeStreamFilter() cs, err := suite.srcMongoClient.Watch(ctx, filter) suite.Require().NoError(err) @@ -188,7 +204,7 @@ func (suite *IntegrationTestSuite) TestChangeStreamFilter_WithNamespaces() { suite.Require().NoError(err) sctx := mongo.NewSessionContext(ctx, sess) - for _, ns := range verifier.srcChangeStreamReader.namespaces { + for _, ns := range changeStreamReader.namespaces { dbAndColl := strings.Split(ns, ".") _, err := suite.srcMongoClient. @@ -223,7 +239,7 @@ func (suite *IntegrationTestSuite) TestChangeStreamFilter_WithNamespaces() { suite.Assert().Len( events, - len(verifier.srcChangeStreamReader.namespaces), + len(changeStreamReader.namespaces), "should have 1 event per in-filter namespace", ) suite.Assert().True( @@ -266,7 +282,9 @@ func (suite *IntegrationTestSuite) TestChangeStream_Resume_NoSkip() { defer v1Cancel(ctx.Err()) suite.startSrcChangeStreamReaderAndHandler(v1Ctx, verifier1) - changeStreamMetaColl := verifier1.srcChangeStreamReader.getChangeStreamMetadataCollection() + changeStreamMetaColl := verifier1.metaClient. + Database(verifier1.metaDBName). + Collection(metadataChangeStreamCollectionName) var originalResumeToken bson.Raw @@ -417,10 +435,12 @@ func (suite *IntegrationTestSuite) TestChangeStreamResumability() { suite.startSrcChangeStreamReaderAndHandler(ctx, verifier2) - suite.Require().NotNil(verifier2.srcChangeStreamReader.startAtTs) + startAtTs, hasStartAtTs := verifier2.srcChangeStreamReader.getStartTimestamp().Get() + + suite.Require().True(hasStartAtTs) suite.Assert().False( - verifier2.srcChangeStreamReader.startAtTs.After(newTime), + startAtTs.After(newTime), "verifier2's change stream should be no later than this new session", ) @@ -569,7 +589,7 @@ func (suite *IntegrationTestSuite) TestChangeStreamLag() { verifierRunner.AwaitGenerationEnd(), ) - return verifier.srcChangeStreamReader.GetLag().IsSome() + return verifier.srcChangeStreamReader.getLag().IsSome() }, time.Minute, 100*time.Millisecond, @@ -578,7 +598,7 @@ func (suite *IntegrationTestSuite) TestChangeStreamLag() { // NB: The lag will include whatever time elapsed above before // verifier read the event, so it can be several seconds. suite.Assert().Less( - verifier.srcChangeStreamReader.GetLag().MustGet(), + verifier.srcChangeStreamReader.getLag().MustGet(), 10*time.Minute, "verifier lag is as expected", ) @@ -605,18 +625,20 @@ func (suite *IntegrationTestSuite) TestStartAtTimeNoChanges() { suite.startSrcChangeStreamReaderAndHandler(ctx, verifier) - startAtTs := verifier.srcChangeStreamReader.startAtTs - suite.Require().NotNil(startAtTs, "startAtTs should be set") + startAtTs, hasStartAtTs := verifier.srcChangeStreamReader.getStartTimestamp().Get() + suite.Require().True(hasStartAtTs, "startAtTs should be set") + + verifier.srcChangeStreamReader.setWritesOff(insertTs) - verifier.srcChangeStreamReader.writesOffTs.Set(insertTs) + <-verifier.srcChangeStreamReader.done() - <-verifier.srcChangeStreamReader.doneChan + startAtTs2 := verifier.srcChangeStreamReader.getStartTimestamp().MustGet() suite.Require().False( - verifier.srcChangeStreamReader.startAtTs.Before(*startAtTs), + startAtTs2.Before(startAtTs), "new startAtTs (%+v) should be no earlier than last one (%+v)", - verifier.srcChangeStreamReader.startAtTs, - *startAtTs, + startAtTs2, + startAtTs, ) } } @@ -635,10 +657,13 @@ func (suite *IntegrationTestSuite) TestStartAtTimeWithChanges() { suite.Require().NotNil(origSessionTime) suite.startSrcChangeStreamReaderAndHandler(ctx, verifier) + startAtTs, hasStartAtTs := verifier.srcChangeStreamReader.getStartTimestamp().Get() + suite.Require().True(hasStartAtTs, "startAtTs should be set") + // srcStartAtTs derives from the change stream’s resume token, which can // postdate our session time but should not precede it. suite.Require().False( - verifier.srcChangeStreamReader.startAtTs.Before(*origSessionTime), + startAtTs.Before(*origSessionTime), "srcStartAtTs should be >= the insert’s optime", ) @@ -662,12 +687,15 @@ func (suite *IntegrationTestSuite) TestStartAtTimeWithChanges() { "session time after events should exceed the original", ) - verifier.srcChangeStreamReader.writesOffTs.Set(*postEventsSessionTime) - <-verifier.srcChangeStreamReader.doneChan + verifier.srcChangeStreamReader.setWritesOff(*postEventsSessionTime) + <-verifier.srcChangeStreamReader.done() + + startAtTs, hasStartAtTs = verifier.srcChangeStreamReader.getStartTimestamp().Get() + suite.Require().True(hasStartAtTs, "startAtTs should be set") suite.Assert().Equal( *postEventsSessionTime, - *verifier.srcChangeStreamReader.startAtTs, + startAtTs, "verifier.srcStartAtTs should now be our session timestamp", ) } @@ -684,8 +712,12 @@ func (suite *IntegrationTestSuite) TestNoStartAtTime() { origStartTs := sess.OperationTime() suite.Require().NotNil(origStartTs) suite.startSrcChangeStreamReaderAndHandler(ctx, verifier) - suite.Require().NotNil(verifier.srcChangeStreamReader.startAtTs) - suite.Require().LessOrEqual(origStartTs.Compare(*verifier.srcChangeStreamReader.startAtTs), 0) + + startAtTs, hasStartAtTs := verifier.srcChangeStreamReader.getStartTimestamp().Get() + suite.Require().True(hasStartAtTs, "startAtTs should be set") + + suite.Require().NotNil(startAtTs) + suite.Require().LessOrEqual(origStartTs.Compare(startAtTs), 0) } func (suite *IntegrationTestSuite) TestWithChangeEventsBatching() { diff --git a/internal/verifier/check.go b/internal/verifier/check.go index e8269ecd..29b97670 100644 --- a/internal/verifier/check.go +++ b/internal/verifier/check.go @@ -50,16 +50,16 @@ func (verifier *Verifier) Check(ctx context.Context, filter bson.D) { verifier.MaybeStartPeriodicHeapProfileCollection(ctx) } -func (verifier *Verifier) waitForChangeStream(ctx context.Context, csr *ChangeStreamReader) error { +func (verifier *Verifier) waitForChangeStream(ctx context.Context, csr changeReader) error { select { case <-ctx.Done(): return util.WrapCtxErrWithCause(ctx) - case <-csr.readerError.Ready(): - err := csr.readerError.Get() + case <-csr.getError().Ready(): + err := csr.getError().Get() verifier.logger.Warn().Err(err). Msgf("Received error from %s.", csr) return err - case <-csr.doneChan: + case <-csr.done(): verifier.logger.Debug(). Msgf("Received completion signal from %s.", csr) break @@ -93,11 +93,11 @@ func (verifier *Verifier) CheckWorker(ctxIn context.Context) error { // If the change stream fails, everything should stop. eg.Go(func() error { select { - case <-verifier.srcChangeStreamReader.readerError.Ready(): - err := verifier.srcChangeStreamReader.readerError.Get() + case <-verifier.srcChangeStreamReader.getError().Ready(): + err := verifier.srcChangeStreamReader.getError().Get() return errors.Wrapf(err, "%s failed", verifier.srcChangeStreamReader) - case <-verifier.dstChangeStreamReader.readerError.Ready(): - err := verifier.dstChangeStreamReader.readerError.Get() + case <-verifier.dstChangeStreamReader.getError().Ready(): + err := verifier.dstChangeStreamReader.getError().Get() return errors.Wrapf(err, "%s failed", verifier.dstChangeStreamReader) case <-ctx.Done(): return nil @@ -270,8 +270,8 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh }() ceHandlerGroup, groupCtx := contextplus.ErrGroup(ctx) - for _, csReader := range []*ChangeStreamReader{verifier.srcChangeStreamReader, verifier.dstChangeStreamReader} { - if csReader.changeStreamRunning { + for _, csReader := range mslices.Of(verifier.srcChangeStreamReader, verifier.dstChangeStreamReader) { + if csReader.isRunning() { verifier.logger.Debug().Msgf("Check: %s already running.", csReader) } else { verifier.logger.Debug().Msgf("%s not running; starting change stream", csReader) diff --git a/internal/verifier/compare.go b/internal/verifier/compare.go index fc49aaa4..c21fe2d0 100644 --- a/internal/verifier/compare.go +++ b/internal/verifier/compare.go @@ -467,7 +467,7 @@ func (verifier *Verifier) getFetcherChannelsAndCallbacks( sctx, verifier.srcClientCollection(task), verifier.srcClusterInfo, - verifier.srcChangeStreamReader.startAtTs, + verifier.srcChangeStreamReader.getStartTimestamp().ToPointer(), task, ) @@ -500,7 +500,7 @@ func (verifier *Verifier) getFetcherChannelsAndCallbacks( sctx, verifier.dstClientCollection(task), verifier.dstClusterInfo, - verifier.dstChangeStreamReader.startAtTs, + verifier.dstChangeStreamReader.getStartTimestamp().ToPointer(), task, ) diff --git a/internal/verifier/migration_verifier.go b/internal/verifier/migration_verifier.go index c7c1102b..ef239dfb 100644 --- a/internal/verifier/migration_verifier.go +++ b/internal/verifier/migration_verifier.go @@ -132,8 +132,8 @@ type Verifier struct { mux sync.RWMutex - srcChangeStreamReader *ChangeStreamReader - dstChangeStreamReader *ChangeStreamReader + srcChangeStreamReader changeReader + dstChangeStreamReader changeReader readConcernSetting ReadConcernSetting @@ -273,19 +273,19 @@ func (verifier *Verifier) WritesOff(ctx context.Context) error { // might be inserting docs into the recheck queue, which happens // under the lock. select { - case <-verifier.srcChangeStreamReader.readerError.Ready(): - err := verifier.srcChangeStreamReader.readerError.Get() + case <-verifier.srcChangeStreamReader.getError().Ready(): + err := verifier.srcChangeStreamReader.getError().Get() return errors.Wrapf(err, "tried to send writes-off timestamp to %s, but change stream already failed", verifier.srcChangeStreamReader) default: - verifier.srcChangeStreamReader.writesOffTs.Set(srcFinalTs) + verifier.srcChangeStreamReader.setWritesOff(srcFinalTs) } select { - case <-verifier.dstChangeStreamReader.readerError.Ready(): - err := verifier.dstChangeStreamReader.readerError.Get() + case <-verifier.dstChangeStreamReader.getError().Ready(): + err := verifier.dstChangeStreamReader.getError().Get() return errors.Wrapf(err, "tried to send writes-off timestamp to %s, but change stream already failed", verifier.dstChangeStreamReader) default: - verifier.dstChangeStreamReader.writesOffTs.Set(dstFinalTs) + verifier.dstChangeStreamReader.setWritesOff(dstFinalTs) } return nil diff --git a/internal/verifier/summary.go b/internal/verifier/summary.go index f04607da..7079b294 100644 --- a/internal/verifier/summary.go +++ b/internal/verifier/summary.go @@ -557,7 +557,7 @@ func (verifier *Verifier) printChangeEventStatistics(builder io.Writer) { for _, cluster := range []struct { title string eventRecorder *EventRecorder - csReader *ChangeStreamReader + csReader changeReader }{ {"Source", verifier.srcEventRecorder, verifier.srcChangeStreamReader}, {"Destination", verifier.dstEventRecorder, verifier.dstChangeStreamReader}, @@ -584,16 +584,16 @@ func (verifier *Verifier) printChangeEventStatistics(builder io.Writer) { fmt.Fprintf(builder, "%s change events this generation: %s\n", cluster.title, eventsDescr) - if eventsPerSec, has := cluster.csReader.GetEventsPerSecond().Get(); has { + if eventsPerSec, has := cluster.csReader.getEventsPerSecond().Get(); has { var lagNote string - lag, hasLag := cluster.csReader.GetLag().Get() + lag, hasLag := cluster.csReader.getLag().Get() if hasLag { lagNote = fmt.Sprintf("lag: %s; ", reportutils.DurationToHMS(lag)) } - saturation := cluster.csReader.GetSaturation() + saturation := cluster.csReader.getBufferSaturation() fmt.Fprintf( builder, From 3d35d297118b13bb906d5409fe8cb206ba81d64a Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Tue, 11 Nov 2025 16:05:26 -0500 Subject: [PATCH 03/24] fix lint --- internal/verifier/change_reader.go | 2 +- internal/verifier/change_stream.go | 71 +++++------------------------- 2 files changed, 11 insertions(+), 62 deletions(-) diff --git a/internal/verifier/change_reader.go b/internal/verifier/change_reader.go index cd6e30dd..7e260ef7 100644 --- a/internal/verifier/change_reader.go +++ b/internal/verifier/change_reader.go @@ -82,7 +82,7 @@ func (rc ChangeReaderCommon) setWritesOff(ts bson.Timestamp) { } func (rc ChangeReaderCommon) isRunning() bool { - return rc.isRunning() + return rc.changeStreamRunning } func (rc ChangeReaderCommon) getReadChannel() <-chan changeEventBatch { diff --git a/internal/verifier/change_stream.go b/internal/verifier/change_stream.go index 8254be70..204ee2d9 100644 --- a/internal/verifier/change_stream.go +++ b/internal/verifier/change_stream.go @@ -102,6 +102,7 @@ func (verifier *Verifier) initializeChangeStreamReaders() { csr.doneChan = make(chan struct{}) csr.lag = msync.NewTypedAtomic(option.None[time.Duration]()) csr.batchSizeHistory = history.New[int](time.Minute) + csr.resumeTokenTSExtractor = extractTimestampFromResumeToken } } @@ -407,10 +408,7 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch( // indexes are created after initial sync. if csr.onDDLEvent == onDDLEventAllow { - csr.logger.Info(). - Stringer("changeStream", csr). - Stringer("event", cs.Current). - Msg("Ignoring event with unrecognized type on destination. (It’s assumedly internal to the migration.)") + csr.logIgnoredDDL(cs.Current) // Discard this event, then keep reading. changeEvents = changeEvents[:len(changeEvents)-1] @@ -437,16 +435,7 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch( eventsRead++ } - var tokenTs bson.Timestamp - tokenTs, err := extractTimestampFromResumeToken(cs.ResumeToken()) - if err == nil { - lagSecs := int64(sess.OperationTime().T) - int64(tokenTs.T) - csr.lag.Store(option.Some(time.Second * time.Duration(lagSecs))) - } else { - csr.logger.Warn(). - Err(err). - Msgf("Failed to extract timestamp from %s's resume token to compute change stream lag.", csr) - } + csr.updateLag(sess, cs.ResumeToken()) if eventsRead == 0 { ri.NoteSuccess("received an empty change stream response") @@ -535,7 +524,7 @@ func (csr *ChangeStreamReader) iterateChangeStream( // (i.e., the `getMore` call returns empty) for { var curTs bson.Timestamp - curTs, err = extractTimestampFromResumeToken(cs.ResumeToken()) + curTs, err = csr.resumeTokenTSExtractor(cs.ResumeToken()) if err != nil { return errors.Wrap(err, "failed to extract timestamp from change stream's resume token") } @@ -608,18 +597,18 @@ func (csr *ChangeStreamReader) createChangeStream( ) } - savedResumeToken, err := csr.loadChangeStreamResumeToken(ctx) + savedResumeToken, err := csr.loadResumeToken(ctx) if err != nil { return nil, nil, bson.Timestamp{}, errors.Wrap(err, "failed to load persisted change stream resume token") } csStartLogEvent := csr.logger.Info() - if savedResumeToken != nil { + if token, hasToken := savedResumeToken.Get(); hasToken { logEvent := csStartLogEvent. - Stringer(csr.resumeTokenDocID(), savedResumeToken) + Stringer(csr.resumeTokenDocID(), token) - ts, err := extractTimestampFromResumeToken(savedResumeToken) + ts, err := csr.resumeTokenTSExtractor(token) if err == nil { logEvent = addTimestampToLogEvent(ts, logEvent) } else { @@ -630,7 +619,7 @@ func (csr *ChangeStreamReader) createChangeStream( logEvent.Msg("Starting change stream from persisted resume token.") - opts = opts.SetStartAfter(savedResumeToken) + opts = opts.SetStartAfter(token) } else { csStartLogEvent.Msgf("Starting change stream from current %s cluster time.", csr.readerType) } @@ -650,7 +639,7 @@ func (csr *ChangeStreamReader) createChangeStream( return nil, nil, bson.Timestamp{}, err } - startTs, err := extractTimestampFromResumeToken(changeStream.ResumeToken()) + startTs, err := csr.resumeTokenTSExtractor(changeStream.ResumeToken()) if err != nil { return nil, nil, bson.Timestamp{}, errors.Wrap(err, "failed to extract timestamp from change stream's resume token") } @@ -827,46 +816,6 @@ func (csr *ChangeStreamReader) String() string { return fmt.Sprintf("%s change stream reader", csr.readerType) } -func (csr *ChangeStreamReader) resumeTokenDocID() string { - switch csr.readerType { - case src: - return "srcResumeToken" - case dst: - return "dstResumeToken" - default: - panic("unknown readerType: " + csr.readerType) - } -} - -func (csr *ChangeStreamReader) persistChangeStreamResumeToken(ctx context.Context, token bson.Raw) error { - coll := csr.getChangeStreamMetadataCollection() - _, err := coll.ReplaceOne( - ctx, - bson.D{{"_id", csr.resumeTokenDocID()}}, - token, - options.Replace().SetUpsert(true), - ) - - if err == nil { - ts, err := extractTimestampFromResumeToken(token) - - logEvent := csr.logger.Debug() - - if err == nil { - logEvent = addTimestampToLogEvent(ts, logEvent) - } else { - csr.logger.Warn().Err(err). - Msg("failed to extract resume token timestamp") - } - - logEvent.Msgf("Persisted %s's resume token.", csr) - - return nil - } - - return errors.Wrapf(err, "failed to persist change stream resume token (%v)", token) -} - func extractTimestampFromResumeToken(resumeToken bson.Raw) (bson.Timestamp, error) { // Change stream token is always a V1 keystring in the _data field tokenDataRV, err := resumeToken.LookupErr("_data") From 1c672f9b34834232ba5cb7bc701430d82e995370 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Tue, 11 Nov 2025 16:06:32 -0500 Subject: [PATCH 04/24] unused --- internal/verifier/change_stream.go | 19 ------------------- 1 file changed, 19 deletions(-) diff --git a/internal/verifier/change_stream.go b/internal/verifier/change_stream.go index 204ee2d9..b11548d6 100644 --- a/internal/verifier/change_stream.go +++ b/internal/verifier/change_stream.go @@ -793,25 +793,6 @@ func addTimestampToLogEvent(ts bson.Timestamp, event *zerolog.Event) *zerolog.Ev Time("time", time.Unix(int64(ts.T), int64(0))) } -func (csr *ChangeStreamReader) getChangeStreamMetadataCollection() *mongo.Collection { - return csr.metaDB.Collection(metadataChangeStreamCollectionName) -} - -func (csr *ChangeStreamReader) loadChangeStreamResumeToken(ctx context.Context) (bson.Raw, error) { - coll := csr.getChangeStreamMetadataCollection() - - token, err := coll.FindOne( - ctx, - bson.D{{"_id", csr.resumeTokenDocID()}}, - ).Raw() - - if errors.Is(err, mongo.ErrNoDocuments) { - return nil, nil - } - - return token, err -} - func (csr *ChangeStreamReader) String() string { return fmt.Sprintf("%s change stream reader", csr.readerType) } From aefe4d6fe80056fd7c9bc1fcf3a6268ac0febcb8 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Tue, 11 Nov 2025 16:15:21 -0500 Subject: [PATCH 05/24] pointer receivers --- internal/verifier/change_reader.go | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/internal/verifier/change_reader.go b/internal/verifier/change_reader.go index 7e260ef7..1168c422 100644 --- a/internal/verifier/change_reader.go +++ b/internal/verifier/change_reader.go @@ -61,50 +61,50 @@ type ChangeReaderCommon struct { onDDLEvent ddlEventHandling } -func (rc ChangeReaderCommon) getWhichCluster() whichCluster { +func (rc *ChangeReaderCommon) getWhichCluster() whichCluster { return rc.readerType } -func (rc ChangeReaderCommon) setPersistorError(err error) { +func (rc *ChangeReaderCommon) setPersistorError(err error) { rc.handlerError.Set(err) } -func (rc ChangeReaderCommon) getError() *util.Eventual[error] { +func (rc *ChangeReaderCommon) getError() *util.Eventual[error] { return rc.readerError } -func (rc ChangeReaderCommon) getStartTimestamp() option.Option[bson.Timestamp] { +func (rc *ChangeReaderCommon) getStartTimestamp() option.Option[bson.Timestamp] { return option.FromPointer(rc.startAtTs) } -func (rc ChangeReaderCommon) setWritesOff(ts bson.Timestamp) { +func (rc *ChangeReaderCommon) setWritesOff(ts bson.Timestamp) { rc.writesOffTs.Set(ts) } -func (rc ChangeReaderCommon) isRunning() bool { +func (rc *ChangeReaderCommon) isRunning() bool { return rc.changeStreamRunning } -func (rc ChangeReaderCommon) getReadChannel() <-chan changeEventBatch { +func (rc *ChangeReaderCommon) getReadChannel() <-chan changeEventBatch { return rc.changeEventBatchChan } -func (rc ChangeReaderCommon) done() <-chan struct{} { +func (rc *ChangeReaderCommon) done() <-chan struct{} { return rc.doneChan } -func (rc ChangeReaderCommon) getBufferSaturation() float64 { +func (rc *ChangeReaderCommon) getBufferSaturation() float64 { return util.DivideToF64(len(rc.changeEventBatchChan), cap(rc.changeEventBatchChan)) } -func (rc ChangeReaderCommon) getLag() option.Option[time.Duration] { +func (rc *ChangeReaderCommon) getLag() option.Option[time.Duration] { return rc.lag.Load() } // getEventsPerSecond returns the number of change events per second we’ve been // seeing “recently”. (See implementation for the actual period over which we // compile this metric.) -func (rc ChangeReaderCommon) getEventsPerSecond() option.Option[float64] { +func (rc *ChangeReaderCommon) getEventsPerSecond() option.Option[float64] { logs := rc.batchSizeHistory.Get() lastLog, hasLogs := lo.Last(logs) @@ -127,7 +127,7 @@ func (rc ChangeReaderCommon) getEventsPerSecond() option.Option[float64] { return option.None[float64]() } -func (rc ChangeReaderCommon) persistChangeStreamResumeToken(ctx context.Context, token bson.Raw) error { +func (rc *ChangeReaderCommon) persistChangeStreamResumeToken(ctx context.Context, token bson.Raw) error { coll := rc.metaDB.Collection(metadataChangeStreamCollectionName) _, err := coll.ReplaceOne( ctx, @@ -156,7 +156,7 @@ func (rc ChangeReaderCommon) persistChangeStreamResumeToken(ctx context.Context, return errors.Wrapf(err, "failed to persist change stream resume token (%v)", token) } -func (rc ChangeReaderCommon) resumeTokenDocID() string { +func (rc *ChangeReaderCommon) resumeTokenDocID() string { switch rc.readerType { case src: return "srcResumeToken" @@ -167,11 +167,11 @@ func (rc ChangeReaderCommon) resumeTokenDocID() string { } } -func (rc ChangeReaderCommon) getMetadataCollection() *mongo.Collection { +func (rc *ChangeReaderCommon) getMetadataCollection() *mongo.Collection { return rc.metaDB.Collection(metadataChangeStreamCollectionName) } -func (rc ChangeReaderCommon) loadResumeToken(ctx context.Context) (option.Option[bson.Raw], error) { +func (rc *ChangeReaderCommon) loadResumeToken(ctx context.Context) (option.Option[bson.Raw], error) { coll := rc.getMetadataCollection() token, err := coll.FindOne( From 1cb493b476540ac69d3b6cfe2ccbbe4fb9890f1f Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Tue, 11 Nov 2025 16:24:43 -0500 Subject: [PATCH 06/24] renames/refacotrs --- internal/verifier/change_reader.go | 2 +- internal/verifier/change_stream.go | 170 +----------------- internal/verifier/change_stream_test.go | 44 ++--- internal/verifier/check.go | 42 ++--- internal/verifier/compare.go | 4 +- internal/verifier/integration_test_suite.go | 2 +- internal/verifier/migration_verifier.go | 24 +-- internal/verifier/migration_verifier_test.go | 14 +- internal/verifier/recheck_persist.go | 180 +++++++++++++++++++ internal/verifier/summary.go | 8 +- 10 files changed, 254 insertions(+), 236 deletions(-) create mode 100644 internal/verifier/recheck_persist.go diff --git a/internal/verifier/change_reader.go b/internal/verifier/change_reader.go index 1168c422..663eb89f 100644 --- a/internal/verifier/change_reader.go +++ b/internal/verifier/change_reader.go @@ -26,7 +26,7 @@ type changeReader interface { getBufferSaturation() float64 setWritesOff(bson.Timestamp) setPersistorError(error) - StartChangeStream(context.Context) error + start(context.Context) error done() <-chan struct{} persistChangeStreamResumeToken(context.Context, bson.Raw) error isRunning() bool diff --git a/internal/verifier/change_stream.go b/internal/verifier/change_stream.go index b11548d6..b69254b7 100644 --- a/internal/verifier/change_stream.go +++ b/internal/verifier/change_stream.go @@ -69,7 +69,7 @@ type ChangeStreamReader struct { var _ changeReader = &ChangeStreamReader{} -func (verifier *Verifier) initializeChangeStreamReaders() { +func (verifier *Verifier) initializeChangeReaders() { srcReader := &ChangeStreamReader{ ChangeReaderCommon: ChangeReaderCommon{ readerType: src, @@ -78,7 +78,7 @@ func (verifier *Verifier) initializeChangeStreamReaders() { clusterInfo: *verifier.srcClusterInfo, }, } - verifier.srcChangeStreamReader = srcReader + verifier.srcChangeReader = srcReader dstReader := &ChangeStreamReader{ ChangeReaderCommon: ChangeReaderCommon{ @@ -89,7 +89,7 @@ func (verifier *Verifier) initializeChangeStreamReaders() { onDDLEvent: onDDLEventAllow, }, } - verifier.dstChangeStreamReader = dstReader + verifier.dstChangeReader = dstReader // Common elements in both readers: for _, csr := range mslices.Of(srcReader, dstReader) { @@ -106,168 +106,6 @@ func (verifier *Verifier) initializeChangeStreamReaders() { } } -// RunChangeEventHandler handles change event batches from the reader. -// It needs to be started after the reader starts and should run in its own -// goroutine. -func (verifier *Verifier) RunChangeEventHandler(ctx context.Context, reader changeReader) error { - var err error - - var lastPersistedTime time.Time - persistResumeTokenIfNeeded := func(ctx context.Context, token bson.Raw) { - if time.Since(lastPersistedTime) >= minChangeStreamPersistInterval { - persistErr := reader.persistChangeStreamResumeToken(ctx, token) - if persistErr != nil { - verifier.logger.Warn(). - Stringer("changeReader", reader). - Err(persistErr). - Msg("Failed to persist resume token. Because of this, if the verifier restarts, it will have to re-process already-handled change events. This error may be transient, but if it recurs, investigate.") - } else { - lastPersistedTime = time.Now() - } - } - } - -HandlerLoop: - for err == nil { - select { - case <-ctx.Done(): - err = util.WrapCtxErrWithCause(ctx) - - verifier.logger.Debug(). - Err(err). - Stringer("changeStreamReader", reader). - Msg("Change event handler failed.") - case batch, more := <-reader.getReadChannel(): - if !more { - verifier.logger.Debug(). - Stringer("changeStreamReader", reader). - Msg("Change event batch channel has been closed.") - - break HandlerLoop - } - - verifier.logger.Trace(). - Stringer("changeStreamReader", reader). - Int("batchSize", len(batch.events)). - Any("batch", batch). - Msg("Handling change event batch.") - - err = errors.Wrap( - verifier.HandleChangeStreamEvents(ctx, batch, reader.getWhichCluster()), - "failed to handle change stream events", - ) - - if err == nil && batch.resumeToken != nil { - persistResumeTokenIfNeeded(ctx, batch.resumeToken) - } - } - } - - // This will prevent the reader from hanging because the reader checks - // this along with checks for context expiry. - if err != nil { - reader.setPersistorError(err) - } - - return err -} - -// HandleChangeStreamEvents performs the necessary work for change stream events after receiving a batch. -func (verifier *Verifier) HandleChangeStreamEvents(ctx context.Context, batch changeEventBatch, eventOrigin whichCluster) error { - if len(batch.events) == 0 { - return nil - } - - dbNames := make([]string, len(batch.events)) - collNames := make([]string, len(batch.events)) - docIDs := make([]bson.RawValue, len(batch.events)) - dataSizes := make([]int32, len(batch.events)) - - latestTimestamp := bson.Timestamp{} - - for i, changeEvent := range batch.events { - if !supportedEventOpTypes.Contains(changeEvent.OpType) { - panic(fmt.Sprintf("Unsupported optype in event; should have failed already! event=%+v", changeEvent)) - } - - if changeEvent.ClusterTime == nil { - verifier.logger.Warn(). - Any("event", changeEvent). - Msg("Change event unexpectedly lacks a clusterTime?!?") - } else if changeEvent.ClusterTime.After(latestTimestamp) { - latestTimestamp = *changeEvent.ClusterTime - } - - var srcDBName, srcCollName string - - var eventRecorder EventRecorder - - // Recheck Docs are keyed by source namespaces. - // We need to retrieve the source namespaces if change events are from the destination. - switch eventOrigin { - case dst: - eventRecorder = *verifier.dstEventRecorder - - if verifier.nsMap.Len() == 0 { - // Namespace is not remapped. Source namespace is the same as the destination. - srcDBName = changeEvent.Ns.DB - srcCollName = changeEvent.Ns.Coll - } else { - dstNs := fmt.Sprintf("%s.%s", changeEvent.Ns.DB, changeEvent.Ns.Coll) - srcNs, exist := verifier.nsMap.GetSrcNamespace(dstNs) - if !exist { - return errors.Errorf("no source namespace corresponding to the destination namepsace %s", dstNs) - } - srcDBName, srcCollName = SplitNamespace(srcNs) - } - case src: - eventRecorder = *verifier.srcEventRecorder - - srcDBName = changeEvent.Ns.DB - srcCollName = changeEvent.Ns.Coll - default: - panic(fmt.Sprintf("unknown event origin: %s", eventOrigin)) - } - - dbNames[i] = srcDBName - collNames[i] = srcCollName - docIDs[i] = changeEvent.DocID - - if changeEvent.FullDocLen.OrZero() > 0 { - dataSizes[i] = int32(changeEvent.FullDocLen.OrZero()) - } else if changeEvent.FullDocument == nil { - // This happens for deletes and for some updates. - // The document is probably, but not necessarily, deleted. - dataSizes[i] = fauxDocSizeForDeleteEvents - } else { - // This happens for inserts, replaces, and most updates. - dataSizes[i] = int32(len(changeEvent.FullDocument)) - } - - if err := eventRecorder.AddEvent(&changeEvent); err != nil { - return errors.Wrapf( - err, - "failed to augment stats with %s change event (%+v)", - eventOrigin, - changeEvent, - ) - } - } - - latestTimestampTime := time.Unix(int64(latestTimestamp.T), 0) - lag := time.Unix(int64(batch.clusterTime.T), 0).Sub(latestTimestampTime) - - verifier.logger.Trace(). - Str("origin", string(eventOrigin)). - Int("count", len(docIDs)). - Any("latestTimestamp", latestTimestamp). - Time("latestTimestampTime", latestTimestampTime). - Stringer("lag", lag). - Msg("Persisting rechecks for change events.") - - return verifier.insertRecheckDocs(ctx, dbNames, collNames, docIDs, dataSizes) -} - // GetChangeStreamFilter returns an aggregation pipeline that filters // namespaces as per configuration. // @@ -666,7 +504,7 @@ func (csr *ChangeStreamReader) createChangeStream( } // StartChangeStream starts the change stream. -func (csr *ChangeStreamReader) StartChangeStream(ctx context.Context) error { +func (csr *ChangeStreamReader) start(ctx context.Context) error { // This channel holds the first change stream creation's result, whether // success or failure. Rather than using a Result we could make separate // Timestamp and error channels, but the single channel is cleaner since diff --git a/internal/verifier/change_stream_test.go b/internal/verifier/change_stream_test.go index 35fa08a1..c69e38c5 100644 --- a/internal/verifier/change_stream_test.go +++ b/internal/verifier/change_stream_test.go @@ -32,9 +32,9 @@ func (suite *IntegrationTestSuite) TestChangeStreamFilter_NoNamespaces() { verifier := suite.BuildVerifier() - changeStreamReader, ok := verifier.srcChangeStreamReader.(*ChangeStreamReader) + changeStreamReader, ok := verifier.srcChangeReader.(*ChangeStreamReader) if !ok { - suite.T().Skipf("source change reader is a %T; this test needs a %T", verifier.srcChangeStreamReader, changeStreamReader) + suite.T().Skipf("source change reader is a %T; this test needs a %T", verifier.srcChangeReader, changeStreamReader) } filter := changeStreamReader.GetChangeStreamFilter() @@ -105,9 +105,9 @@ func (suite *IntegrationTestSuite) TestChangeStreamFilter_BsonSize() { suite.T().Skip("Need a source version that has $bsonSize") } - changeStreamReader, ok := verifier.srcChangeStreamReader.(*ChangeStreamReader) + changeStreamReader, ok := verifier.srcChangeReader.(*ChangeStreamReader) if !ok { - suite.T().Skipf("source change reader is a %T; this test needs a %T", verifier.srcChangeStreamReader, changeStreamReader) + suite.T().Skipf("source change reader is a %T; this test needs a %T", verifier.srcChangeReader, changeStreamReader) } srcColl := verifier.srcClient.Database(suite.DBNameForTest()).Collection("coll") @@ -168,9 +168,9 @@ func (suite *IntegrationTestSuite) TestChangeStreamFilter_WithNamespaces() { verifier := suite.BuildVerifier() - changeStreamReader, ok := verifier.srcChangeStreamReader.(*ChangeStreamReader) + changeStreamReader, ok := verifier.srcChangeReader.(*ChangeStreamReader) if !ok { - suite.T().Skipf("source change reader is a %T; this test needs a %T", verifier.srcChangeStreamReader, changeStreamReader) + suite.T().Skipf("source change reader is a %T; this test needs a %T", verifier.srcChangeReader, changeStreamReader) } changeStreamReader.namespaces = []string{ @@ -254,10 +254,10 @@ func (suite *IntegrationTestSuite) TestChangeStreamFilter_WithNamespaces() { } func (suite *IntegrationTestSuite) startSrcChangeStreamReaderAndHandler(ctx context.Context, verifier *Verifier) { - err := verifier.srcChangeStreamReader.StartChangeStream(ctx) + err := verifier.srcChangeReader.start(ctx) suite.Require().NoError(err) go func() { - err := verifier.RunChangeEventHandler(ctx, verifier.srcChangeStreamReader) + err := verifier.RunChangeEventPersistor(ctx, verifier.srcChangeReader) if errors.Is(err, context.Canceled) { return } @@ -435,7 +435,7 @@ func (suite *IntegrationTestSuite) TestChangeStreamResumability() { suite.startSrcChangeStreamReaderAndHandler(ctx, verifier2) - startAtTs, hasStartAtTs := verifier2.srcChangeStreamReader.getStartTimestamp().Get() + startAtTs, hasStartAtTs := verifier2.srcChangeReader.getStartTimestamp().Get() suite.Require().True(hasStartAtTs) @@ -589,7 +589,7 @@ func (suite *IntegrationTestSuite) TestChangeStreamLag() { verifierRunner.AwaitGenerationEnd(), ) - return verifier.srcChangeStreamReader.getLag().IsSome() + return verifier.srcChangeReader.getLag().IsSome() }, time.Minute, 100*time.Millisecond, @@ -598,7 +598,7 @@ func (suite *IntegrationTestSuite) TestChangeStreamLag() { // NB: The lag will include whatever time elapsed above before // verifier read the event, so it can be several seconds. suite.Assert().Less( - verifier.srcChangeStreamReader.getLag().MustGet(), + verifier.srcChangeReader.getLag().MustGet(), 10*time.Minute, "verifier lag is as expected", ) @@ -625,14 +625,14 @@ func (suite *IntegrationTestSuite) TestStartAtTimeNoChanges() { suite.startSrcChangeStreamReaderAndHandler(ctx, verifier) - startAtTs, hasStartAtTs := verifier.srcChangeStreamReader.getStartTimestamp().Get() + startAtTs, hasStartAtTs := verifier.srcChangeReader.getStartTimestamp().Get() suite.Require().True(hasStartAtTs, "startAtTs should be set") - verifier.srcChangeStreamReader.setWritesOff(insertTs) + verifier.srcChangeReader.setWritesOff(insertTs) - <-verifier.srcChangeStreamReader.done() + <-verifier.srcChangeReader.done() - startAtTs2 := verifier.srcChangeStreamReader.getStartTimestamp().MustGet() + startAtTs2 := verifier.srcChangeReader.getStartTimestamp().MustGet() suite.Require().False( startAtTs2.Before(startAtTs), @@ -657,7 +657,7 @@ func (suite *IntegrationTestSuite) TestStartAtTimeWithChanges() { suite.Require().NotNil(origSessionTime) suite.startSrcChangeStreamReaderAndHandler(ctx, verifier) - startAtTs, hasStartAtTs := verifier.srcChangeStreamReader.getStartTimestamp().Get() + startAtTs, hasStartAtTs := verifier.srcChangeReader.getStartTimestamp().Get() suite.Require().True(hasStartAtTs, "startAtTs should be set") // srcStartAtTs derives from the change stream’s resume token, which can @@ -687,10 +687,10 @@ func (suite *IntegrationTestSuite) TestStartAtTimeWithChanges() { "session time after events should exceed the original", ) - verifier.srcChangeStreamReader.setWritesOff(*postEventsSessionTime) - <-verifier.srcChangeStreamReader.done() + verifier.srcChangeReader.setWritesOff(*postEventsSessionTime) + <-verifier.srcChangeReader.done() - startAtTs, hasStartAtTs = verifier.srcChangeStreamReader.getStartTimestamp().Get() + startAtTs, hasStartAtTs = verifier.srcChangeReader.getStartTimestamp().Get() suite.Require().True(hasStartAtTs, "startAtTs should be set") suite.Assert().Equal( @@ -713,7 +713,7 @@ func (suite *IntegrationTestSuite) TestNoStartAtTime() { suite.Require().NotNil(origStartTs) suite.startSrcChangeStreamReaderAndHandler(ctx, verifier) - startAtTs, hasStartAtTs := verifier.srcChangeStreamReader.getStartTimestamp().Get() + startAtTs, hasStartAtTs := verifier.srcChangeReader.getStartTimestamp().Get() suite.Require().True(hasStartAtTs, "startAtTs should be set") suite.Require().NotNil(startAtTs) @@ -1063,9 +1063,9 @@ func (suite *IntegrationTestSuite) TestRecheckDocsWithDstChangeEvents() { verifier.SetDstNamespaces([]string{dstDBName + ".dstColl1", dstDBName + ".dstColl2"}) verifier.SetNamespaceMap() - suite.Require().NoError(verifier.dstChangeStreamReader.StartChangeStream(ctx)) + suite.Require().NoError(verifier.dstChangeReader.start(ctx)) go func() { - err := verifier.RunChangeEventHandler(ctx, verifier.dstChangeStreamReader) + err := verifier.RunChangeEventPersistor(ctx, verifier.dstChangeReader) if errors.Is(err, context.Canceled) { return } diff --git a/internal/verifier/check.go b/internal/verifier/check.go index 29b97670..44f80519 100644 --- a/internal/verifier/check.go +++ b/internal/verifier/check.go @@ -50,7 +50,7 @@ func (verifier *Verifier) Check(ctx context.Context, filter bson.D) { verifier.MaybeStartPeriodicHeapProfileCollection(ctx) } -func (verifier *Verifier) waitForChangeStream(ctx context.Context, csr changeReader) error { +func (verifier *Verifier) waitForChangeReader(ctx context.Context, csr changeReader) error { select { case <-ctx.Done(): return util.WrapCtxErrWithCause(ctx) @@ -90,15 +90,15 @@ func (verifier *Verifier) CheckWorker(ctxIn context.Context) error { cancelableCtx, canceler := contextplus.WithCancelCause(ctxIn) eg, ctx := contextplus.ErrGroup(cancelableCtx) - // If the change stream fails, everything should stop. + // If the change reader fails, everything should stop. eg.Go(func() error { select { - case <-verifier.srcChangeStreamReader.getError().Ready(): - err := verifier.srcChangeStreamReader.getError().Get() - return errors.Wrapf(err, "%s failed", verifier.srcChangeStreamReader) - case <-verifier.dstChangeStreamReader.getError().Ready(): - err := verifier.dstChangeStreamReader.getError().Get() - return errors.Wrapf(err, "%s failed", verifier.dstChangeStreamReader) + case <-verifier.srcChangeReader.getError().Ready(): + err := verifier.srcChangeReader.getError().Get() + return errors.Wrapf(err, "%s failed", verifier.srcChangeReader) + case <-verifier.dstChangeReader.getError().Ready(): + err := verifier.dstChangeReader.getError().Get() + return errors.Wrapf(err, "%s failed", verifier.dstChangeReader) case <-ctx.Done(): return nil } @@ -229,11 +229,11 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh } } - verifier.logger.Info().Msg("Starting change streams.") + verifier.logger.Info().Msg("Starting change readers.") // Now that we’ve initialized verifier.generation we can // start the change stream readers. - verifier.initializeChangeStreamReaders() + verifier.initializeChangeReaders() verifier.mux.Unlock() err = retry.New().WithCallback( @@ -270,18 +270,18 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh }() ceHandlerGroup, groupCtx := contextplus.ErrGroup(ctx) - for _, csReader := range mslices.Of(verifier.srcChangeStreamReader, verifier.dstChangeStreamReader) { - if csReader.isRunning() { - verifier.logger.Debug().Msgf("Check: %s already running.", csReader) + for _, changeReader := range mslices.Of(verifier.srcChangeReader, verifier.dstChangeReader) { + if changeReader.isRunning() { + verifier.logger.Debug().Msgf("Check: %s already running.", changeReader) } else { - verifier.logger.Debug().Msgf("%s not running; starting change stream", csReader) + verifier.logger.Debug().Msgf("%s not running; starting change reader", changeReader) - err = csReader.StartChangeStream(ctx) + err = changeReader.start(ctx) if err != nil { - return errors.Wrapf(err, "failed to start %s", csReader) + return errors.Wrapf(err, "failed to start %s", changeReader) } ceHandlerGroup.Go(func() error { - return verifier.RunChangeEventHandler(groupCtx, csReader) + return verifier.RunChangeEventPersistor(groupCtx, changeReader) }) } } @@ -364,14 +364,14 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh // caught again on the next iteration. if verifier.writesOff { verifier.logger.Debug(). - Msg("Waiting for change streams to end.") + Msg("Waiting for change readers to end.") - // It's necessary to wait for the change stream to finish before incrementing the + // It's necessary to wait for the change reader to finish before incrementing the // generation number, or the last changes will not be checked. verifier.mux.Unlock() - for _, csr := range mslices.Of(verifier.srcChangeStreamReader, verifier.dstChangeStreamReader) { - if err = verifier.waitForChangeStream(ctx, csr); err != nil { + for _, csr := range mslices.Of(verifier.srcChangeReader, verifier.dstChangeReader) { + if err = verifier.waitForChangeReader(ctx, csr); err != nil { return errors.Wrapf( err, "an error interrupted the wait for closure of %s", diff --git a/internal/verifier/compare.go b/internal/verifier/compare.go index c21fe2d0..3172c0d4 100644 --- a/internal/verifier/compare.go +++ b/internal/verifier/compare.go @@ -467,7 +467,7 @@ func (verifier *Verifier) getFetcherChannelsAndCallbacks( sctx, verifier.srcClientCollection(task), verifier.srcClusterInfo, - verifier.srcChangeStreamReader.getStartTimestamp().ToPointer(), + verifier.srcChangeReader.getStartTimestamp().ToPointer(), task, ) @@ -500,7 +500,7 @@ func (verifier *Verifier) getFetcherChannelsAndCallbacks( sctx, verifier.dstClientCollection(task), verifier.dstClusterInfo, - verifier.dstChangeStreamReader.getStartTimestamp().ToPointer(), + verifier.dstChangeReader.getStartTimestamp().ToPointer(), task, ) diff --git a/internal/verifier/integration_test_suite.go b/internal/verifier/integration_test_suite.go index 897678da..19aff0ab 100644 --- a/internal/verifier/integration_test_suite.go +++ b/internal/verifier/integration_test_suite.go @@ -191,7 +191,7 @@ func (suite *IntegrationTestSuite) BuildVerifier() *Verifier { "should set metadata connection string", ) verifier.SetMetaDBName(metaDBName) - verifier.initializeChangeStreamReaders() + verifier.initializeChangeReaders() suite.Require().NoError(verifier.srcClientCollection(&task).Drop(ctx)) suite.Require().NoError(verifier.dstClientCollection(&task).Drop(ctx)) diff --git a/internal/verifier/migration_verifier.go b/internal/verifier/migration_verifier.go index ef239dfb..9e75bf18 100644 --- a/internal/verifier/migration_verifier.go +++ b/internal/verifier/migration_verifier.go @@ -132,8 +132,8 @@ type Verifier struct { mux sync.RWMutex - srcChangeStreamReader changeReader - dstChangeStreamReader changeReader + srcChangeReader changeReader + dstChangeReader changeReader readConcernSetting ReadConcernSetting @@ -188,7 +188,7 @@ func NewVerifier(settings VerifierSettings, logPath string) *Verifier { readConcernSetting: readConcern, // This will get recreated once gen0 starts, but we want it - // here in case the change streams gets an event before then. + // here in case the change readers get an event before then. srcEventRecorder: NewEventRecorder(), dstEventRecorder: NewEventRecorder(), @@ -269,23 +269,23 @@ func (verifier *Verifier) WritesOff(ctx context.Context) error { return err } - // This has to happen outside the lock because the change streams + // This has to happen outside the lock because the change readers // might be inserting docs into the recheck queue, which happens // under the lock. select { - case <-verifier.srcChangeStreamReader.getError().Ready(): - err := verifier.srcChangeStreamReader.getError().Get() - return errors.Wrapf(err, "tried to send writes-off timestamp to %s, but change stream already failed", verifier.srcChangeStreamReader) + case <-verifier.srcChangeReader.getError().Ready(): + err := verifier.srcChangeReader.getError().Get() + return errors.Wrapf(err, "tried to send writes-off timestamp to %s, but change reader already failed", verifier.srcChangeReader) default: - verifier.srcChangeStreamReader.setWritesOff(srcFinalTs) + verifier.srcChangeReader.setWritesOff(srcFinalTs) } select { - case <-verifier.dstChangeStreamReader.getError().Ready(): - err := verifier.dstChangeStreamReader.getError().Get() - return errors.Wrapf(err, "tried to send writes-off timestamp to %s, but change stream already failed", verifier.dstChangeStreamReader) + case <-verifier.dstChangeReader.getError().Ready(): + err := verifier.dstChangeReader.getError().Get() + return errors.Wrapf(err, "tried to send writes-off timestamp to %s, but change reader already failed", verifier.dstChangeReader) default: - verifier.dstChangeStreamReader.setWritesOff(dstFinalTs) + verifier.dstChangeReader.setWritesOff(dstFinalTs) } return nil diff --git a/internal/verifier/migration_verifier_test.go b/internal/verifier/migration_verifier_test.go index caa2b714..4519f341 100644 --- a/internal/verifier/migration_verifier_test.go +++ b/internal/verifier/migration_verifier_test.go @@ -681,7 +681,7 @@ func (suite *IntegrationTestSuite) TestGetPersistedNamespaceStatistics_Recheck() ctx := suite.Context() verifier := suite.BuildVerifier() - err := verifier.HandleChangeStreamEvents( + err := verifier.PersistChangeEvents( ctx, changeEventBatch{ events: []ParsedEvent{{ @@ -697,7 +697,7 @@ func (suite *IntegrationTestSuite) TestGetPersistedNamespaceStatistics_Recheck() ) suite.Require().NoError(err) - err = verifier.HandleChangeStreamEvents( + err = verifier.PersistChangeEvents( ctx, changeEventBatch{ events: []ParsedEvent{{ @@ -972,23 +972,23 @@ func (suite *IntegrationTestSuite) TestFailedVerificationTaskInsertions() { events: mslices.Of(event), } - err = verifier.HandleChangeStreamEvents(ctx, batch, src) + err = verifier.PersistChangeEvents(ctx, batch, src) suite.Require().NoError(err) event.OpType = "insert" - err = verifier.HandleChangeStreamEvents(ctx, batch, src) + err = verifier.PersistChangeEvents(ctx, batch, src) suite.Require().NoError(err) event.OpType = "replace" - err = verifier.HandleChangeStreamEvents(ctx, batch, src) + err = verifier.PersistChangeEvents(ctx, batch, src) suite.Require().NoError(err) event.OpType = "update" - err = verifier.HandleChangeStreamEvents(ctx, batch, src) + err = verifier.PersistChangeEvents(ctx, batch, src) suite.Require().NoError(err) batch.events[0].OpType = "flibbity" suite.Assert().Panics( func() { - _ = verifier.HandleChangeStreamEvents(ctx, batch, src) + _ = verifier.PersistChangeEvents(ctx, batch, src) }, "HandleChangeStreamEvents should panic if it gets an unknown optype", ) diff --git a/internal/verifier/recheck_persist.go b/internal/verifier/recheck_persist.go new file mode 100644 index 00000000..ae6b9728 --- /dev/null +++ b/internal/verifier/recheck_persist.go @@ -0,0 +1,180 @@ +package verifier + +import ( + "context" + "fmt" + "time" + + "github.com/10gen/migration-verifier/internal/util" + "github.com/pkg/errors" + "go.mongodb.org/mongo-driver/v2/bson" +) + +// RunChangeEventPersistor persists rechecks from change event batches. +// It needs to be started after the reader starts and should run in its own +// goroutine. +func (verifier *Verifier) RunChangeEventPersistor( + ctx context.Context, + reader changeReader, +) error { + clusterName := reader.getWhichCluster() + persistCallback := reader.persistChangeStreamResumeToken + in := reader.getReadChannel() + + var err error + + var lastPersistedTime time.Time + persistResumeTokenIfNeeded := func(ctx context.Context, token bson.Raw) { + if time.Since(lastPersistedTime) >= minChangeStreamPersistInterval { + persistErr := persistCallback(ctx, token) + if persistErr != nil { + verifier.logger.Warn(). + Str("changeReader", string(clusterName)). + Err(persistErr). + Msg("Failed to persist resume token. Because of this, if the verifier restarts, it will have to re-process already-handled change events. This error may be transient, but if it recurs, investigate.") + } else { + lastPersistedTime = time.Now() + } + } + } + +HandlerLoop: + for err == nil { + select { + case <-ctx.Done(): + err = util.WrapCtxErrWithCause(ctx) + + verifier.logger.Debug(). + Err(err). + Str("changeReader", string(clusterName)). + Msg("Change event handler failed.") + case batch, more := <-in: + if !more { + verifier.logger.Debug(). + Str("changeReader", string(clusterName)). + Msg("Change event batch channel has been closed.") + + break HandlerLoop + } + + verifier.logger.Trace(). + Str("changeReader", string(clusterName)). + Int("batchSize", len(batch.events)). + Any("batch", batch). + Msg("Handling change event batch.") + + err = errors.Wrap( + verifier.PersistChangeEvents(ctx, batch, clusterName), + "failed to handle change stream events", + ) + + if err == nil && batch.resumeToken != nil { + persistResumeTokenIfNeeded(ctx, batch.resumeToken) + } + } + } + + // This will prevent the reader from hanging because the reader checks + // this along with checks for context expiry. + if err != nil { + reader.setPersistorError(err) + } + + return err +} + +// PersistChangeEvents performs the necessary work for change events after receiving a batch. +func (verifier *Verifier) PersistChangeEvents(ctx context.Context, batch changeEventBatch, eventOrigin whichCluster) error { + if len(batch.events) == 0 { + return nil + } + + dbNames := make([]string, len(batch.events)) + collNames := make([]string, len(batch.events)) + docIDs := make([]bson.RawValue, len(batch.events)) + dataSizes := make([]int32, len(batch.events)) + + latestTimestamp := bson.Timestamp{} + + for i, changeEvent := range batch.events { + if !supportedEventOpTypes.Contains(changeEvent.OpType) { + panic(fmt.Sprintf("Unsupported optype in event; should have failed already! event=%+v", changeEvent)) + } + + if changeEvent.ClusterTime == nil { + verifier.logger.Warn(). + Any("event", changeEvent). + Msg("Change event unexpectedly lacks a clusterTime?!?") + } else if changeEvent.ClusterTime.After(latestTimestamp) { + latestTimestamp = *changeEvent.ClusterTime + } + + var srcDBName, srcCollName string + + var eventRecorder EventRecorder + + // Recheck Docs are keyed by source namespaces. + // We need to retrieve the source namespaces if change events are from the destination. + switch eventOrigin { + case dst: + eventRecorder = *verifier.dstEventRecorder + + if verifier.nsMap.Len() == 0 { + // Namespace is not remapped. Source namespace is the same as the destination. + srcDBName = changeEvent.Ns.DB + srcCollName = changeEvent.Ns.Coll + } else { + dstNs := fmt.Sprintf("%s.%s", changeEvent.Ns.DB, changeEvent.Ns.Coll) + srcNs, exist := verifier.nsMap.GetSrcNamespace(dstNs) + if !exist { + return errors.Errorf("no source namespace corresponding to the destination namepsace %s", dstNs) + } + srcDBName, srcCollName = SplitNamespace(srcNs) + } + case src: + eventRecorder = *verifier.srcEventRecorder + + srcDBName = changeEvent.Ns.DB + srcCollName = changeEvent.Ns.Coll + default: + panic(fmt.Sprintf("unknown event origin: %s", eventOrigin)) + } + + dbNames[i] = srcDBName + collNames[i] = srcCollName + docIDs[i] = changeEvent.DocID + + if changeEvent.FullDocLen.OrZero() > 0 { + dataSizes[i] = int32(changeEvent.FullDocLen.OrZero()) + } else if changeEvent.FullDocument == nil { + // This happens for deletes and for some updates. + // The document is probably, but not necessarily, deleted. + dataSizes[i] = fauxDocSizeForDeleteEvents + } else { + // This happens for inserts, replaces, and most updates. + dataSizes[i] = int32(len(changeEvent.FullDocument)) + } + + if err := eventRecorder.AddEvent(&changeEvent); err != nil { + return errors.Wrapf( + err, + "failed to augment stats with %s change event (%+v)", + eventOrigin, + changeEvent, + ) + } + } + + latestTimestampTime := time.Unix(int64(latestTimestamp.T), 0) + lag := time.Unix(int64(batch.clusterTime.T), 0).Sub(latestTimestampTime) + + verifier.logger.Trace(). + Str("origin", string(eventOrigin)). + Int("count", len(docIDs)). + Any("latestTimestamp", latestTimestamp). + Time("latestTimestampTime", latestTimestampTime). + Stringer("lag", lag). + Msg("Persisting rechecks for change events.") + + return verifier.insertRecheckDocs(ctx, dbNames, collNames, docIDs, dataSizes) +} diff --git a/internal/verifier/summary.go b/internal/verifier/summary.go index 7079b294..d204ed88 100644 --- a/internal/verifier/summary.go +++ b/internal/verifier/summary.go @@ -559,8 +559,8 @@ func (verifier *Verifier) printChangeEventStatistics(builder io.Writer) { eventRecorder *EventRecorder csReader changeReader }{ - {"Source", verifier.srcEventRecorder, verifier.srcChangeStreamReader}, - {"Destination", verifier.dstEventRecorder, verifier.dstChangeStreamReader}, + {"Source", verifier.srcEventRecorder, verifier.srcChangeReader}, + {"Destination", verifier.dstEventRecorder, verifier.dstChangeReader}, } { nsStats := cluster.eventRecorder.Read() @@ -619,13 +619,13 @@ func (verifier *Verifier) printChangeEventStatistics(builder io.Writer) { } } - if cluster.csReader == verifier.srcChangeStreamReader { + if cluster.csReader == verifier.srcChangeReader { fmt.Fprint(builder, "\n") } // We only print event breakdowns for the source because we assume that // events on the destination will largely mirror the source’s. - if totalEvents > 0 && cluster.csReader == verifier.srcChangeStreamReader { + if totalEvents > 0 && cluster.csReader == verifier.srcChangeReader { reverseSortedNamespaces := maps.Keys(nsTotals) sort.Slice( reverseSortedNamespaces, From 08d56336dc747a0e8cef516b0feac443e54ea51d Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Tue, 11 Nov 2025 16:28:50 -0500 Subject: [PATCH 07/24] cleanup --- internal/verifier/change_reader.go | 18 +++++++++++------- internal/verifier/change_stream.go | 11 +++++------ internal/verifier/change_stream_test.go | 2 +- internal/verifier/check.go | 10 +++++----- internal/verifier/recheck_persist.go | 2 +- internal/verifier/recheck_test.go | 2 +- 6 files changed, 24 insertions(+), 21 deletions(-) diff --git a/internal/verifier/change_reader.go b/internal/verifier/change_reader.go index 663eb89f..84a4f3e9 100644 --- a/internal/verifier/change_reader.go +++ b/internal/verifier/change_reader.go @@ -16,6 +16,10 @@ import ( "go.mongodb.org/mongo-driver/v2/mongo/options" ) +const ( + changeReaderCollectionName = "changeReader" +) + type changeReader interface { getWhichCluster() whichCluster getReadChannel() <-chan changeEventBatch @@ -28,7 +32,7 @@ type changeReader interface { setPersistorError(error) start(context.Context) error done() <-chan struct{} - persistChangeStreamResumeToken(context.Context, bson.Raw) error + persistResumeToken(context.Context, bson.Raw) error isRunning() bool String() string } @@ -46,7 +50,7 @@ type ChangeReaderCommon struct { resumeTokenTSExtractor func(bson.Raw) (bson.Timestamp, error) - changeStreamRunning bool + running bool changeEventBatchChan chan changeEventBatch writesOffTs *util.Eventual[bson.Timestamp] readerError *util.Eventual[error] @@ -82,7 +86,7 @@ func (rc *ChangeReaderCommon) setWritesOff(ts bson.Timestamp) { } func (rc *ChangeReaderCommon) isRunning() bool { - return rc.changeStreamRunning + return rc.running } func (rc *ChangeReaderCommon) getReadChannel() <-chan changeEventBatch { @@ -127,8 +131,8 @@ func (rc *ChangeReaderCommon) getEventsPerSecond() option.Option[float64] { return option.None[float64]() } -func (rc *ChangeReaderCommon) persistChangeStreamResumeToken(ctx context.Context, token bson.Raw) error { - coll := rc.metaDB.Collection(metadataChangeStreamCollectionName) +func (rc *ChangeReaderCommon) persistResumeToken(ctx context.Context, token bson.Raw) error { + coll := rc.metaDB.Collection(changeReaderCollectionName) _, err := coll.ReplaceOne( ctx, bson.D{{"_id", rc.resumeTokenDocID()}}, @@ -153,7 +157,7 @@ func (rc *ChangeReaderCommon) persistChangeStreamResumeToken(ctx context.Context return nil } - return errors.Wrapf(err, "failed to persist change stream resume token (%v)", token) + return errors.Wrapf(err, "failed to persist %s resume token (%v)", rc.readerType, token) } func (rc *ChangeReaderCommon) resumeTokenDocID() string { @@ -168,7 +172,7 @@ func (rc *ChangeReaderCommon) resumeTokenDocID() string { } func (rc *ChangeReaderCommon) getMetadataCollection() *mongo.Collection { - return rc.metaDB.Collection(metadataChangeStreamCollectionName) + return rc.metaDB.Collection(changeReaderCollectionName) } func (rc *ChangeReaderCommon) loadResumeToken(ctx context.Context) (option.Option[bson.Raw], error) { diff --git a/internal/verifier/change_stream.go b/internal/verifier/change_stream.go index b69254b7..f83814a5 100644 --- a/internal/verifier/change_stream.go +++ b/internal/verifier/change_stream.go @@ -44,9 +44,8 @@ var supportedEventOpTypes = mapset.NewSet( ) const ( - minChangeStreamPersistInterval = time.Second * 10 - maxChangeStreamAwaitTime = time.Second - metadataChangeStreamCollectionName = "changeStream" + minChangeStreamPersistInterval = time.Second * 10 + maxChangeStreamAwaitTime = time.Second ) type UnknownEventError struct { @@ -394,7 +393,7 @@ func (csr *ChangeStreamReader) iterateChangeStream( } if gotwritesOffTimestamp { - csr.changeStreamRunning = false + csr.running = false if csr.lastChangeEventTime != nil { csr.startAtTs = csr.lastChangeEventTime } @@ -472,7 +471,7 @@ func (csr *ChangeStreamReader) createChangeStream( return nil, nil, bson.Timestamp{}, errors.Wrap(err, "failed to open change stream") } - err = csr.persistChangeStreamResumeToken(ctx, changeStream.ResumeToken()) + err = csr.persistResumeToken(ctx, changeStream.ResumeToken()) if err != nil { return nil, nil, bson.Timestamp{}, err } @@ -581,7 +580,7 @@ func (csr *ChangeStreamReader) start(ctx context.Context) error { csr.startAtTs = &startTs - csr.changeStreamRunning = true + csr.running = true return nil } diff --git a/internal/verifier/change_stream_test.go b/internal/verifier/change_stream_test.go index c69e38c5..793426cf 100644 --- a/internal/verifier/change_stream_test.go +++ b/internal/verifier/change_stream_test.go @@ -284,7 +284,7 @@ func (suite *IntegrationTestSuite) TestChangeStream_Resume_NoSkip() { changeStreamMetaColl := verifier1.metaClient. Database(verifier1.metaDBName). - Collection(metadataChangeStreamCollectionName) + Collection(changeReaderCollectionName) var originalResumeToken bson.Raw diff --git a/internal/verifier/check.go b/internal/verifier/check.go index 44f80519..794a7dbf 100644 --- a/internal/verifier/check.go +++ b/internal/verifier/check.go @@ -232,7 +232,7 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh verifier.logger.Info().Msg("Starting change readers.") // Now that we’ve initialized verifier.generation we can - // start the change stream readers. + // start the change readers. verifier.initializeChangeReaders() verifier.mux.Unlock() @@ -380,8 +380,8 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh } verifier.logger.Debug(). - Stringer("changeStreamReader", csr). - Msg("Change stream reader finished.") + Stringer("changeReader", csr). + Msg("Change reader finished.") } if err = ceHandlerGroup.Wait(); err != nil { @@ -391,9 +391,9 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh verifier.lastGeneration = true } - // Increment the in-memory generation so that the change streams will + // Increment the in-memory generation so that the change readers will // mark rechecks for the next generation. For example, if we just - // finished generation 2, the change streams need to mark generation 3 + // finished generation 2, the change readers need to mark generation 3 // on enqueued rechecks. Meanwhile, generaiton 3’s recheck tasks will // derive from rechecks enqueued during generation 2. verifier.generation++ diff --git a/internal/verifier/recheck_persist.go b/internal/verifier/recheck_persist.go index ae6b9728..6ed9b173 100644 --- a/internal/verifier/recheck_persist.go +++ b/internal/verifier/recheck_persist.go @@ -18,7 +18,7 @@ func (verifier *Verifier) RunChangeEventPersistor( reader changeReader, ) error { clusterName := reader.getWhichCluster() - persistCallback := reader.persistChangeStreamResumeToken + persistCallback := reader.persistResumeToken in := reader.getReadChannel() var err error diff --git a/internal/verifier/recheck_test.go b/internal/verifier/recheck_test.go index 1a213dba..0da76289 100644 --- a/internal/verifier/recheck_test.go +++ b/internal/verifier/recheck_test.go @@ -62,7 +62,7 @@ func (suite *IntegrationTestSuite) TestFailedCompareThenReplace() { }, } - err := verifier.HandleChangeStreamEvents( + err := verifier.PersistChangeEvents( ctx, changeEventBatch{events: mslices.Of(event)}, src, From c9e4610cd9d63ad5abd7f21c63d927873d22cff1 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Tue, 11 Nov 2025 16:29:17 -0500 Subject: [PATCH 08/24] tidy --- internal/verifier/change_reader.go | 1 - 1 file changed, 1 deletion(-) diff --git a/internal/verifier/change_reader.go b/internal/verifier/change_reader.go index 84a4f3e9..8c0accd3 100644 --- a/internal/verifier/change_reader.go +++ b/internal/verifier/change_reader.go @@ -191,7 +191,6 @@ func (rc *ChangeReaderCommon) loadResumeToken(ctx context.Context) (option.Optio } func (rc *ChangeReaderCommon) updateLag(sess *mongo.Session, token bson.Raw) { - var tokenTs bson.Timestamp tokenTs, err := rc.resumeTokenTSExtractor(token) if err == nil { lagSecs := int64(sess.OperationTime().T) - int64(tokenTs.T) From c7d9edd8b74d2cc2d88b31dd9aa8bffd74cf19a3 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Tue, 11 Nov 2025 16:29:55 -0500 Subject: [PATCH 09/24] string --- internal/verifier/migration_verifier_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/verifier/migration_verifier_test.go b/internal/verifier/migration_verifier_test.go index 4519f341..8159c19c 100644 --- a/internal/verifier/migration_verifier_test.go +++ b/internal/verifier/migration_verifier_test.go @@ -990,7 +990,7 @@ func (suite *IntegrationTestSuite) TestFailedVerificationTaskInsertions() { func() { _ = verifier.PersistChangeEvents(ctx, batch, src) }, - "HandleChangeStreamEvents should panic if it gets an unknown optype", + "PersistChangeEvents should panic if it gets an unknown optype", ) verifier.generation++ From 196f8877096ff3ff4c7baf889a83195cc4c1bfda Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Tue, 11 Nov 2025 16:39:56 -0500 Subject: [PATCH 10/24] more renames --- internal/verifier/change_reader.go | 18 +++++- internal/verifier/change_stream.go | 86 +++---------------------- internal/verifier/change_stream_test.go | 2 +- internal/verifier/recheck_persist.go | 6 ++ 4 files changed, 31 insertions(+), 81 deletions(-) diff --git a/internal/verifier/change_reader.go b/internal/verifier/change_reader.go index 8c0accd3..aa6e4926 100644 --- a/internal/verifier/change_reader.go +++ b/internal/verifier/change_reader.go @@ -10,6 +10,7 @@ import ( "github.com/10gen/migration-verifier/msync" "github.com/10gen/migration-verifier/option" "github.com/pkg/errors" + "github.com/rs/zerolog" "github.com/samber/lo" "go.mongodb.org/mongo-driver/v2/bson" "go.mongodb.org/mongo-driver/v2/mongo" @@ -54,7 +55,7 @@ type ChangeReaderCommon struct { changeEventBatchChan chan changeEventBatch writesOffTs *util.Eventual[bson.Timestamp] readerError *util.Eventual[error] - handlerError *util.Eventual[error] + persistorError *util.Eventual[error] doneChan chan struct{} startAtTs *bson.Timestamp @@ -70,7 +71,7 @@ func (rc *ChangeReaderCommon) getWhichCluster() whichCluster { } func (rc *ChangeReaderCommon) setPersistorError(err error) { - rc.handlerError.Set(err) + rc.persistorError.Set(err) } func (rc *ChangeReaderCommon) getError() *util.Eventual[error] { @@ -208,3 +209,16 @@ func (rc *ChangeReaderCommon) logIgnoredDDL(rawEvent bson.Raw) { Stringer("event", rawEvent). Msg("Ignoring event with unrecognized type on destination. (It’s assumedly internal to the migration.)") } + +func (rc *ChangeReaderCommon) wrapPersistorErrorForReader() error { + return errors.Wrap( + rc.persistorError.Get(), + "event persistor failed, so no more events can be processed", + ) +} + +func addTimestampToLogEvent(ts bson.Timestamp, event *zerolog.Event) *zerolog.Event { + return event. + Any("timestamp", ts). + Time("time", time.Unix(int64(ts.T), int64(0))) +} diff --git a/internal/verifier/change_stream.go b/internal/verifier/change_stream.go index f83814a5..b8d9f22b 100644 --- a/internal/verifier/change_stream.go +++ b/internal/verifier/change_stream.go @@ -16,8 +16,6 @@ import ( mapset "github.com/deckarep/golang-set/v2" clone "github.com/huandu/go-clone/generic" "github.com/pkg/errors" - "github.com/rs/zerolog" - "github.com/samber/lo" "github.com/samber/mo" "go.mongodb.org/mongo-driver/v2/bson" "go.mongodb.org/mongo-driver/v2/mongo" @@ -56,12 +54,6 @@ func (uee UnknownEventError) Error() string { return fmt.Sprintf("received event with unknown optype: %+v", uee.Event) } -type changeEventBatch struct { - events []ParsedEvent - resumeToken bson.Raw - clusterTime bson.Timestamp -} - type ChangeStreamReader struct { ChangeReaderCommon } @@ -97,11 +89,11 @@ func (verifier *Verifier) initializeChangeReaders() { csr.changeEventBatchChan = make(chan changeEventBatch, batchChanBufferSize) csr.writesOffTs = util.NewEventual[bson.Timestamp]() csr.readerError = util.NewEventual[error]() - csr.handlerError = util.NewEventual[error]() + csr.persistorError = util.NewEventual[error]() csr.doneChan = make(chan struct{}) csr.lag = msync.NewTypedAtomic(option.None[time.Duration]()) csr.batchSizeHistory = history.New[int](time.Minute) - csr.resumeTokenTSExtractor = extractTimestampFromResumeToken + csr.resumeTokenTSExtractor = extractTSFromChangeStreamResumeToken } } @@ -157,7 +149,7 @@ func (csr *ChangeStreamReader) GetChangeStreamFilter() (pipeline mongo.Pipeline) }, ) - if csr.hasBsonSize() { + if util.ClusterHasBSONSize([2]int(csr.clusterInfo.VersionArray)) { pipeline = append( pipeline, bson.D{ @@ -172,16 +164,6 @@ func (csr *ChangeStreamReader) GetChangeStreamFilter() (pipeline mongo.Pipeline) return pipeline } -func (csr *ChangeStreamReader) hasBsonSize() bool { - major := csr.clusterInfo.VersionArray[0] - - if major == 4 { - return csr.clusterInfo.VersionArray[1] >= 4 - } - - return major > 4 -} - // This function reads a single `getMore` response into a slice. // // Note that this doesn’t care about the writesOff timestamp. Thus, @@ -294,8 +276,8 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch( select { case <-ctx.Done(): return util.WrapCtxErrWithCause(ctx) - case <-csr.handlerError.Ready(): - return csr.wrapHandlerErrorForReader() + case <-csr.persistorError.Ready(): + return csr.wrapPersistorErrorForReader() case csr.changeEventBatchChan <- changeEventBatch{ events: changeEvents, @@ -311,13 +293,6 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch( return nil } -func (csr *ChangeStreamReader) wrapHandlerErrorForReader() error { - return errors.Wrap( - csr.handlerError.Get(), - "event handler failed, so no more events can be processed", - ) -} - func (csr *ChangeStreamReader) iterateChangeStream( ctx context.Context, ri *retry.FuncInfo, @@ -341,8 +316,8 @@ func (csr *ChangeStreamReader) iterateChangeStream( return err - case <-csr.handlerError.Ready(): - return csr.wrapHandlerErrorForReader() + case <-csr.persistorError.Ready(): + return csr.wrapPersistorErrorForReader() // If the ChangeStreamEnderChan has a message, the user has indicated that // source writes are ended and the migration tool is finished / committed. @@ -585,56 +560,11 @@ func (csr *ChangeStreamReader) start(ctx context.Context) error { return nil } -// GetLag returns the observed change stream lag (i.e., the delta between -// cluster time and the most-recently-seen change event). -func (csr *ChangeStreamReader) GetLag() option.Option[time.Duration] { - return csr.lag.Load() -} - -// GetSaturation returns the reader’s internal buffer’s saturation level as -// a fraction. If saturation rises, that means we’re reading events faster than -// we can persist them. -func (csr *ChangeStreamReader) GetSaturation() float64 { - return util.DivideToF64(len(csr.changeEventBatchChan), cap(csr.changeEventBatchChan)) -} - -// GetEventsPerSecond returns the number of change events per second we’ve been -// seeing “recently”. (See implementation for the actual period over which we -// compile this metric.) -func (csr *ChangeStreamReader) GetEventsPerSecond() option.Option[float64] { - logs := csr.batchSizeHistory.Get() - lastLog, hasLogs := lo.Last(logs) - - if hasLogs && lastLog.At != logs[0].At { - span := lastLog.At.Sub(logs[0].At) - - // Each log contains a time and a # of events that happened since - // the prior log. Thus, each log’s Datum is a count of events that - // happened before the timestamp. Since we want the # of events that - // happened between the first & last times, we only want events *after* - // the first time. Thus, we skip the first log entry here. - totalEvents := 0 - for _, log := range logs[1:] { - totalEvents += log.Datum - } - - return option.Some(util.DivideToF64(totalEvents, span.Seconds())) - } - - return option.None[float64]() -} - -func addTimestampToLogEvent(ts bson.Timestamp, event *zerolog.Event) *zerolog.Event { - return event. - Any("timestamp", ts). - Time("time", time.Unix(int64(ts.T), int64(0))) -} - func (csr *ChangeStreamReader) String() string { return fmt.Sprintf("%s change stream reader", csr.readerType) } -func extractTimestampFromResumeToken(resumeToken bson.Raw) (bson.Timestamp, error) { +func extractTSFromChangeStreamResumeToken(resumeToken bson.Raw) (bson.Timestamp, error) { // Change stream token is always a V1 keystring in the _data field tokenDataRV, err := resumeToken.LookupErr("_data") diff --git a/internal/verifier/change_stream_test.go b/internal/verifier/change_stream_test.go index 793426cf..0c12a64f 100644 --- a/internal/verifier/change_stream_test.go +++ b/internal/verifier/change_stream_test.go @@ -223,7 +223,7 @@ func (suite *IntegrationTestSuite) TestChangeStreamFilter_WithNamespaces() { for { gotEvent := cs.TryNext(ctx) suite.Require().NoError(cs.Err()) - csOpTime, err := extractTimestampFromResumeToken(cs.ResumeToken()) + csOpTime, err := extractTSFromChangeStreamResumeToken(cs.ResumeToken()) suite.Require().NoError(err, "should get timestamp from resume token") if gotEvent { diff --git a/internal/verifier/recheck_persist.go b/internal/verifier/recheck_persist.go index 6ed9b173..799040be 100644 --- a/internal/verifier/recheck_persist.go +++ b/internal/verifier/recheck_persist.go @@ -10,6 +10,12 @@ import ( "go.mongodb.org/mongo-driver/v2/bson" ) +type changeEventBatch struct { + events []ParsedEvent + resumeToken bson.Raw + clusterTime bson.Timestamp +} + // RunChangeEventPersistor persists rechecks from change event batches. // It needs to be started after the reader starts and should run in its own // goroutine. From 06f7794a7a87f8adec4ea34b051767cc6ca7d49e Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Tue, 11 Nov 2025 16:52:39 -0500 Subject: [PATCH 11/24] move --- internal/verifier/change_reader.go | 9 +++++++++ internal/verifier/change_stream.go | 11 ----------- internal/verifier/metadata.go | 1 + 3 files changed, 10 insertions(+), 11 deletions(-) diff --git a/internal/verifier/change_reader.go b/internal/verifier/change_reader.go index aa6e4926..40e8ccc7 100644 --- a/internal/verifier/change_reader.go +++ b/internal/verifier/change_reader.go @@ -17,7 +17,16 @@ import ( "go.mongodb.org/mongo-driver/v2/mongo/options" ) +type ddlEventHandling string + const ( + fauxDocSizeForDeleteEvents = 1024 + + // The number of batches we’ll hold in memory at once. + batchChanBufferSize = 100 + + onDDLEventAllow ddlEventHandling = "allow" + changeReaderCollectionName = "changeReader" ) diff --git a/internal/verifier/change_stream.go b/internal/verifier/change_stream.go index b8d9f22b..c3841a86 100644 --- a/internal/verifier/change_stream.go +++ b/internal/verifier/change_stream.go @@ -23,17 +23,6 @@ import ( "golang.org/x/exp/slices" ) -type ddlEventHandling string - -const ( - fauxDocSizeForDeleteEvents = 1024 - - // The number of batches we’ll hold in memory at once. - batchChanBufferSize = 100 - - onDDLEventAllow ddlEventHandling = "allow" -) - var supportedEventOpTypes = mapset.NewSet( "insert", "update", diff --git a/internal/verifier/metadata.go b/internal/verifier/metadata.go index 7c317a2c..dbd87300 100644 --- a/internal/verifier/metadata.go +++ b/internal/verifier/metadata.go @@ -5,5 +5,6 @@ package verifier // 2: Split failed-task discrepancies into separate collection. // 3: Enqueued rechecks now reference the generation in which they’ll be // rechecked rather than the generation during which they were enqueued. +// 4: Use “changeReader” instead of “changeStream” collection name. const verifierMetadataVersion = 3 From 0b13627539d07ff1dfc0a19ed6c60d9f31d98fbe Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Tue, 11 Nov 2025 16:52:47 -0500 Subject: [PATCH 12/24] metadata --- internal/verifier/metadata.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/verifier/metadata.go b/internal/verifier/metadata.go index dbd87300..906117c9 100644 --- a/internal/verifier/metadata.go +++ b/internal/verifier/metadata.go @@ -7,4 +7,4 @@ package verifier // rechecked rather than the generation during which they were enqueued. // 4: Use “changeReader” instead of “changeStream” collection name. -const verifierMetadataVersion = 3 +const verifierMetadataVersion = 4 From 25b7e61c5f01c71b52a0f9cc23dc1ec9f3c716a4 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Tue, 11 Nov 2025 16:59:23 -0500 Subject: [PATCH 13/24] comment --- internal/util/clusterinfo.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/util/clusterinfo.go b/internal/util/clusterinfo.go index 7f66f8e7..b2354e96 100644 --- a/internal/util/clusterinfo.go +++ b/internal/util/clusterinfo.go @@ -19,6 +19,8 @@ type ClusterInfo struct { Topology ClusterTopology } +// ClusterHasBSONSize indicates whether a cluster with the given +// major & minor version numbers supports the $bsonSize aggregation operator. func ClusterHasBSONSize(va [2]int) bool { major := va[0] From 3284468d4a3454afbfb90772a8cf9bb2956b5d9d Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Tue, 11 Nov 2025 17:01:09 -0500 Subject: [PATCH 14/24] comments --- internal/verifier/change_reader.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/internal/verifier/change_reader.go b/internal/verifier/change_reader.go index 40e8ccc7..01579434 100644 --- a/internal/verifier/change_reader.go +++ b/internal/verifier/change_reader.go @@ -107,10 +107,15 @@ func (rc *ChangeReaderCommon) done() <-chan struct{} { return rc.doneChan } +// getBufferSaturation returns the reader’s internal buffer’s saturation level +// as a fraction. If saturation rises, that means we’re reading events faster +// than we can persist them. func (rc *ChangeReaderCommon) getBufferSaturation() float64 { return util.DivideToF64(len(rc.changeEventBatchChan), cap(rc.changeEventBatchChan)) } +// getLag returns the observed change stream lag (i.e., the delta between +// cluster time and the most-recently-seen change event). func (rc *ChangeReaderCommon) getLag() option.Option[time.Duration] { return rc.lag.Load() } From 15a7636394d4681e85e0a538aa394ca681df7453 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Wed, 12 Nov 2025 09:13:40 -0500 Subject: [PATCH 15/24] remove persistor error --- internal/verifier/change_reader.go | 16 +--- internal/verifier/change_stream.go | 106 ++++++++++++------------ internal/verifier/change_stream_test.go | 8 +- internal/verifier/check.go | 2 +- internal/verifier/recheck_persist.go | 6 -- 5 files changed, 60 insertions(+), 78 deletions(-) diff --git a/internal/verifier/change_reader.go b/internal/verifier/change_reader.go index 01579434..13ea1e62 100644 --- a/internal/verifier/change_reader.go +++ b/internal/verifier/change_reader.go @@ -15,6 +15,7 @@ import ( "go.mongodb.org/mongo-driver/v2/bson" "go.mongodb.org/mongo-driver/v2/mongo" "go.mongodb.org/mongo-driver/v2/mongo/options" + "golang.org/x/sync/errgroup" ) type ddlEventHandling string @@ -39,8 +40,7 @@ type changeReader interface { getLag() option.Option[time.Duration] getBufferSaturation() float64 setWritesOff(bson.Timestamp) - setPersistorError(error) - start(context.Context) error + start(context.Context, *errgroup.Group) error done() <-chan struct{} persistResumeToken(context.Context, bson.Raw) error isRunning() bool @@ -64,7 +64,6 @@ type ChangeReaderCommon struct { changeEventBatchChan chan changeEventBatch writesOffTs *util.Eventual[bson.Timestamp] readerError *util.Eventual[error] - persistorError *util.Eventual[error] doneChan chan struct{} startAtTs *bson.Timestamp @@ -79,10 +78,6 @@ func (rc *ChangeReaderCommon) getWhichCluster() whichCluster { return rc.readerType } -func (rc *ChangeReaderCommon) setPersistorError(err error) { - rc.persistorError.Set(err) -} - func (rc *ChangeReaderCommon) getError() *util.Eventual[error] { return rc.readerError } @@ -224,13 +219,6 @@ func (rc *ChangeReaderCommon) logIgnoredDDL(rawEvent bson.Raw) { Msg("Ignoring event with unrecognized type on destination. (It’s assumedly internal to the migration.)") } -func (rc *ChangeReaderCommon) wrapPersistorErrorForReader() error { - return errors.Wrap( - rc.persistorError.Get(), - "event persistor failed, so no more events can be processed", - ) -} - func addTimestampToLogEvent(ts bson.Timestamp, event *zerolog.Event) *zerolog.Event { return event. Any("timestamp", ts). diff --git a/internal/verifier/change_stream.go b/internal/verifier/change_stream.go index c3841a86..ae477b14 100644 --- a/internal/verifier/change_stream.go +++ b/internal/verifier/change_stream.go @@ -21,6 +21,7 @@ import ( "go.mongodb.org/mongo-driver/v2/mongo" "go.mongodb.org/mongo-driver/v2/mongo/options" "golang.org/x/exp/slices" + "golang.org/x/sync/errgroup" ) var supportedEventOpTypes = mapset.NewSet( @@ -78,7 +79,6 @@ func (verifier *Verifier) initializeChangeReaders() { csr.changeEventBatchChan = make(chan changeEventBatch, batchChanBufferSize) csr.writesOffTs = util.NewEventual[bson.Timestamp]() csr.readerError = util.NewEventual[error]() - csr.persistorError = util.NewEventual[error]() csr.doneChan = make(chan struct{}) csr.lag = msync.NewTypedAtomic(option.None[time.Duration]()) csr.batchSizeHistory = history.New[int](time.Minute) @@ -265,8 +265,6 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch( select { case <-ctx.Done(): return util.WrapCtxErrWithCause(ctx) - case <-csr.persistorError.Ready(): - return csr.wrapPersistorErrorForReader() case csr.changeEventBatchChan <- changeEventBatch{ events: changeEvents, @@ -305,9 +303,6 @@ func (csr *ChangeStreamReader) iterateChangeStream( return err - case <-csr.persistorError.Ready(): - return csr.wrapPersistorErrorForReader() - // If the ChangeStreamEnderChan has a message, the user has indicated that // source writes are ended and the migration tool is finished / committed. // This means we should exit rather than continue reading the change stream @@ -467,79 +462,80 @@ func (csr *ChangeStreamReader) createChangeStream( } // StartChangeStream starts the change stream. -func (csr *ChangeStreamReader) start(ctx context.Context) error { +func (csr *ChangeStreamReader) start( + ctx context.Context, + eg *errgroup.Group, +) error { // This channel holds the first change stream creation's result, whether // success or failure. Rather than using a Result we could make separate // Timestamp and error channels, but the single channel is cleaner since // there's no chance of "nonsense" like both channels returning a payload. initialCreateResultChan := make(chan mo.Result[bson.Timestamp]) - go func() { - // Closing changeEventBatchChan at the end of change stream goroutine - // notifies the verifier's change event handler to exit. - defer func() { - csr.logger.Debug(). - Stringer("changeStreamReader", csr). - Msg("Closing change event batch channel.") + eg.Go( + func() error { + // Closing changeEventBatchChan at the end of change stream goroutine + // notifies the verifier's change event handler to exit. + defer func() { + csr.logger.Debug(). + Stringer("changeStreamReader", csr). + Msg("Closing change event batch channel.") - close(csr.changeEventBatchChan) - }() + close(csr.changeEventBatchChan) + }() - retryer := retry.New().WithErrorCodes(util.CursorKilledErrCode) + retryer := retry.New().WithErrorCodes(util.CursorKilledErrCode) - parentThreadWaiting := true + parentThreadWaiting := true - err := retryer.WithCallback( - func(ctx context.Context, ri *retry.FuncInfo) error { - changeStream, sess, startTs, err := csr.createChangeStream(ctx) - if err != nil { - logEvent := csr.logger.Debug(). - Err(err). - Stringer("changeStreamReader", csr) + return retryer.WithCallback( + func(ctx context.Context, ri *retry.FuncInfo) error { + changeStream, sess, startTs, err := csr.createChangeStream(ctx) + if err != nil { + logEvent := csr.logger.Debug(). + Err(err). + Stringer("changeStreamReader", csr) - if parentThreadWaiting { - logEvent.Msg("First change stream open failed.") + if parentThreadWaiting { + logEvent.Msg("First change stream open failed.") - initialCreateResultChan <- mo.Err[bson.Timestamp](err) - return nil - } + initialCreateResultChan <- mo.Err[bson.Timestamp](err) + return nil + } - logEvent.Msg("Retried change stream open failed.") + logEvent.Msg("Retried change stream open failed.") - return err - } - - defer changeStream.Close(ctx) + return err + } - logEvent := csr.logger.Debug(). - Stringer("changeStreamReader", csr). - Any("startTimestamp", startTs) + defer changeStream.Close(ctx) - if parentThreadWaiting { - logEvent.Msg("First change stream open succeeded.") + logEvent := csr.logger.Debug(). + Stringer("changeStreamReader", csr). + Any("startTimestamp", startTs) - initialCreateResultChan <- mo.Ok(startTs) - close(initialCreateResultChan) - parentThreadWaiting = false - } else { - logEvent.Msg("Retried change stream open succeeded.") - } + if parentThreadWaiting { + logEvent.Msg("First change stream open succeeded.") - return csr.iterateChangeStream(ctx, ri, changeStream, sess) - }, - "running %s", csr, - ).Run(ctx, csr.logger) + initialCreateResultChan <- mo.Ok(startTs) + close(initialCreateResultChan) + parentThreadWaiting = false + } else { + logEvent.Msg("Retried change stream open succeeded.") + } - if err != nil { - csr.readerError.Set(err) - } - }() + return csr.iterateChangeStream(ctx, ri, changeStream, sess) + }, + "running %s", csr, + ).Run(ctx, csr.logger) + }, + ) result := <-initialCreateResultChan startTs, err := result.Get() if err != nil { - return err + return errors.Wrapf(err, "creating change stream") } csr.startAtTs = &startTs diff --git a/internal/verifier/change_stream_test.go b/internal/verifier/change_stream_test.go index 0c12a64f..d3fb166c 100644 --- a/internal/verifier/change_stream_test.go +++ b/internal/verifier/change_stream_test.go @@ -254,7 +254,9 @@ func (suite *IntegrationTestSuite) TestChangeStreamFilter_WithNamespaces() { } func (suite *IntegrationTestSuite) startSrcChangeStreamReaderAndHandler(ctx context.Context, verifier *Verifier) { - err := verifier.srcChangeReader.start(ctx) + eg, egCtx := contextplus.ErrGroup(ctx) + + err := verifier.srcChangeReader.start(egCtx, eg) suite.Require().NoError(err) go func() { err := verifier.RunChangeEventPersistor(ctx, verifier.srcChangeReader) @@ -1063,7 +1065,9 @@ func (suite *IntegrationTestSuite) TestRecheckDocsWithDstChangeEvents() { verifier.SetDstNamespaces([]string{dstDBName + ".dstColl1", dstDBName + ".dstColl2"}) verifier.SetNamespaceMap() - suite.Require().NoError(verifier.dstChangeReader.start(ctx)) + eg, egCtx := contextplus.ErrGroup(ctx) + + suite.Require().NoError(verifier.dstChangeReader.start(egCtx, eg)) go func() { err := verifier.RunChangeEventPersistor(ctx, verifier.dstChangeReader) if errors.Is(err, context.Canceled) { diff --git a/internal/verifier/check.go b/internal/verifier/check.go index 794a7dbf..66c36993 100644 --- a/internal/verifier/check.go +++ b/internal/verifier/check.go @@ -276,7 +276,7 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh } else { verifier.logger.Debug().Msgf("%s not running; starting change reader", changeReader) - err = changeReader.start(ctx) + err = changeReader.start(groupCtx, ceHandlerGroup) if err != nil { return errors.Wrapf(err, "failed to start %s", changeReader) } diff --git a/internal/verifier/recheck_persist.go b/internal/verifier/recheck_persist.go index 799040be..564e33a2 100644 --- a/internal/verifier/recheck_persist.go +++ b/internal/verifier/recheck_persist.go @@ -80,12 +80,6 @@ HandlerLoop: } } - // This will prevent the reader from hanging because the reader checks - // this along with checks for context expiry. - if err != nil { - reader.setPersistorError(err) - } - return err } From df29cdef625a5a29b657225c528590311e91781f Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Wed, 12 Nov 2025 09:18:29 -0500 Subject: [PATCH 16/24] remove doneChan --- internal/verifier/change_reader.go | 6 ------ internal/verifier/change_stream.go | 11 ++++++----- internal/verifier/change_stream_test.go | 6 ++++-- internal/verifier/check.go | 15 +++++++++------ 4 files changed, 19 insertions(+), 19 deletions(-) diff --git a/internal/verifier/change_reader.go b/internal/verifier/change_reader.go index 13ea1e62..56bf5e0e 100644 --- a/internal/verifier/change_reader.go +++ b/internal/verifier/change_reader.go @@ -41,7 +41,6 @@ type changeReader interface { getBufferSaturation() float64 setWritesOff(bson.Timestamp) start(context.Context, *errgroup.Group) error - done() <-chan struct{} persistResumeToken(context.Context, bson.Raw) error isRunning() bool String() string @@ -64,7 +63,6 @@ type ChangeReaderCommon struct { changeEventBatchChan chan changeEventBatch writesOffTs *util.Eventual[bson.Timestamp] readerError *util.Eventual[error] - doneChan chan struct{} startAtTs *bson.Timestamp @@ -98,10 +96,6 @@ func (rc *ChangeReaderCommon) getReadChannel() <-chan changeEventBatch { return rc.changeEventBatchChan } -func (rc *ChangeReaderCommon) done() <-chan struct{} { - return rc.doneChan -} - // getBufferSaturation returns the reader’s internal buffer’s saturation level // as a fraction. If saturation rises, that means we’re reading events faster // than we can persist them. diff --git a/internal/verifier/change_stream.go b/internal/verifier/change_stream.go index ae477b14..4c0d6c0a 100644 --- a/internal/verifier/change_stream.go +++ b/internal/verifier/change_stream.go @@ -79,7 +79,6 @@ func (verifier *Verifier) initializeChangeReaders() { csr.changeEventBatchChan = make(chan changeEventBatch, batchChanBufferSize) csr.writesOffTs = util.NewEventual[bson.Timestamp]() csr.readerError = util.NewEventual[error]() - csr.doneChan = make(chan struct{}) csr.lag = msync.NewTypedAtomic(option.None[time.Duration]()) csr.batchSizeHistory = history.New[int](time.Minute) csr.resumeTokenTSExtractor = extractTSFromChangeStreamResumeToken @@ -356,9 +355,7 @@ func (csr *ChangeStreamReader) iterateChangeStream( if csr.lastChangeEventTime != nil { csr.startAtTs = csr.lastChangeEventTime } - // since we have started Recheck, we must signal that we have - // finished the change stream changes so that Recheck can continue. - close(csr.doneChan) + break } } @@ -488,7 +485,7 @@ func (csr *ChangeStreamReader) start( parentThreadWaiting := true - return retryer.WithCallback( + err := retryer.WithCallback( func(ctx context.Context, ri *retry.FuncInfo) error { changeStream, sess, startTs, err := csr.createChangeStream(ctx) if err != nil { @@ -528,6 +525,10 @@ func (csr *ChangeStreamReader) start( }, "running %s", csr, ).Run(ctx, csr.logger) + + csr.readerError.Set(err) + + return err }, ) diff --git a/internal/verifier/change_stream_test.go b/internal/verifier/change_stream_test.go index d3fb166c..1eef5e87 100644 --- a/internal/verifier/change_stream_test.go +++ b/internal/verifier/change_stream_test.go @@ -632,7 +632,8 @@ func (suite *IntegrationTestSuite) TestStartAtTimeNoChanges() { verifier.srcChangeReader.setWritesOff(insertTs) - <-verifier.srcChangeReader.done() + <-verifier.srcChangeReader.getError().Ready() + suite.Require().NoError(verifier.srcChangeReader.getError().Get()) startAtTs2 := verifier.srcChangeReader.getStartTimestamp().MustGet() @@ -690,7 +691,8 @@ func (suite *IntegrationTestSuite) TestStartAtTimeWithChanges() { ) verifier.srcChangeReader.setWritesOff(*postEventsSessionTime) - <-verifier.srcChangeReader.done() + <-verifier.srcChangeReader.getError().Ready() + suite.Require().NoError(verifier.srcChangeReader.getError().Get()) startAtTs, hasStartAtTs = verifier.srcChangeReader.getStartTimestamp().Get() suite.Require().True(hasStartAtTs, "startAtTs should be set") diff --git a/internal/verifier/check.go b/internal/verifier/check.go index 66c36993..6ad7261e 100644 --- a/internal/verifier/check.go +++ b/internal/verifier/check.go @@ -56,13 +56,16 @@ func (verifier *Verifier) waitForChangeReader(ctx context.Context, csr changeRea return util.WrapCtxErrWithCause(ctx) case <-csr.getError().Ready(): err := csr.getError().Get() - verifier.logger.Warn().Err(err). - Msgf("Received error from %s.", csr) + + if err != nil { + verifier.logger.Warn().Err(err). + Msgf("Received error from %s.", csr) + } else { + verifier.logger.Debug(). + Msgf("Received completion signal from %s.", csr) + } + return err - case <-csr.done(): - verifier.logger.Debug(). - Msgf("Received completion signal from %s.", csr) - break } return nil From fc6c8ced58e4bfdcd024cccae5d117f81cc0d49d Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Wed, 12 Nov 2025 09:53:25 -0500 Subject: [PATCH 17/24] remove reader error channel --- internal/verifier/change_reader.go | 6 -- internal/verifier/change_stream.go | 3 - internal/verifier/change_stream_test.go | 31 +++++----- internal/verifier/check.go | 75 ++++++++++--------------- internal/verifier/migration_verifier.go | 33 ++++++----- 5 files changed, 66 insertions(+), 82 deletions(-) diff --git a/internal/verifier/change_reader.go b/internal/verifier/change_reader.go index 56bf5e0e..143f5f04 100644 --- a/internal/verifier/change_reader.go +++ b/internal/verifier/change_reader.go @@ -34,7 +34,6 @@ const ( type changeReader interface { getWhichCluster() whichCluster getReadChannel() <-chan changeEventBatch - getError() *util.Eventual[error] getStartTimestamp() option.Option[bson.Timestamp] getEventsPerSecond() option.Option[float64] getLag() option.Option[time.Duration] @@ -62,7 +61,6 @@ type ChangeReaderCommon struct { running bool changeEventBatchChan chan changeEventBatch writesOffTs *util.Eventual[bson.Timestamp] - readerError *util.Eventual[error] startAtTs *bson.Timestamp @@ -76,10 +74,6 @@ func (rc *ChangeReaderCommon) getWhichCluster() whichCluster { return rc.readerType } -func (rc *ChangeReaderCommon) getError() *util.Eventual[error] { - return rc.readerError -} - func (rc *ChangeReaderCommon) getStartTimestamp() option.Option[bson.Timestamp] { return option.FromPointer(rc.startAtTs) } diff --git a/internal/verifier/change_stream.go b/internal/verifier/change_stream.go index 4c0d6c0a..02f63620 100644 --- a/internal/verifier/change_stream.go +++ b/internal/verifier/change_stream.go @@ -78,7 +78,6 @@ func (verifier *Verifier) initializeChangeReaders() { csr.metaDB = verifier.metaClient.Database(verifier.metaDBName) csr.changeEventBatchChan = make(chan changeEventBatch, batchChanBufferSize) csr.writesOffTs = util.NewEventual[bson.Timestamp]() - csr.readerError = util.NewEventual[error]() csr.lag = msync.NewTypedAtomic(option.None[time.Duration]()) csr.batchSizeHistory = history.New[int](time.Minute) csr.resumeTokenTSExtractor = extractTSFromChangeStreamResumeToken @@ -526,8 +525,6 @@ func (csr *ChangeStreamReader) start( "running %s", csr, ).Run(ctx, csr.logger) - csr.readerError.Set(err) - return err }, ) diff --git a/internal/verifier/change_stream_test.go b/internal/verifier/change_stream_test.go index 1eef5e87..5f746f14 100644 --- a/internal/verifier/change_stream_test.go +++ b/internal/verifier/change_stream_test.go @@ -25,6 +25,7 @@ import ( "go.mongodb.org/mongo-driver/v2/bson" "go.mongodb.org/mongo-driver/v2/mongo" "go.mongodb.org/mongo-driver/v2/mongo/options" + "golang.org/x/sync/errgroup" ) func (suite *IntegrationTestSuite) TestChangeStreamFilter_NoNamespaces() { @@ -253,18 +254,21 @@ func (suite *IntegrationTestSuite) TestChangeStreamFilter_WithNamespaces() { ) } -func (suite *IntegrationTestSuite) startSrcChangeStreamReaderAndHandler(ctx context.Context, verifier *Verifier) { +func (suite *IntegrationTestSuite) startSrcChangeStreamReaderAndHandler( + ctx context.Context, + verifier *Verifier, +) *errgroup.Group { eg, egCtx := contextplus.ErrGroup(ctx) err := verifier.srcChangeReader.start(egCtx, eg) suite.Require().NoError(err) - go func() { - err := verifier.RunChangeEventPersistor(ctx, verifier.srcChangeReader) - if errors.Is(err, context.Canceled) { - return - } - suite.Require().NoError(err) - }() + eg.Go( + func() error { + return verifier.RunChangeEventPersistor(egCtx, verifier.srcChangeReader) + }, + ) + + return eg } func (suite *IntegrationTestSuite) TestChangeStream_Resume_NoSkip() { @@ -625,15 +629,14 @@ func (suite *IntegrationTestSuite) TestStartAtTimeNoChanges() { insertTs, err := util.GetClusterTimeFromSession(sess) suite.Require().NoError(err, "should get cluster time") - suite.startSrcChangeStreamReaderAndHandler(ctx, verifier) + eg := suite.startSrcChangeStreamReaderAndHandler(ctx, verifier) startAtTs, hasStartAtTs := verifier.srcChangeReader.getStartTimestamp().Get() suite.Require().True(hasStartAtTs, "startAtTs should be set") verifier.srcChangeReader.setWritesOff(insertTs) - <-verifier.srcChangeReader.getError().Ready() - suite.Require().NoError(verifier.srcChangeReader.getError().Get()) + suite.Require().NoError(eg.Wait()) startAtTs2 := verifier.srcChangeReader.getStartTimestamp().MustGet() @@ -658,7 +661,7 @@ func (suite *IntegrationTestSuite) TestStartAtTimeWithChanges() { origSessionTime := sess.OperationTime() suite.Require().NotNil(origSessionTime) - suite.startSrcChangeStreamReaderAndHandler(ctx, verifier) + eg := suite.startSrcChangeStreamReaderAndHandler(ctx, verifier) startAtTs, hasStartAtTs := verifier.srcChangeReader.getStartTimestamp().Get() suite.Require().True(hasStartAtTs, "startAtTs should be set") @@ -691,8 +694,8 @@ func (suite *IntegrationTestSuite) TestStartAtTimeWithChanges() { ) verifier.srcChangeReader.setWritesOff(*postEventsSessionTime) - <-verifier.srcChangeReader.getError().Ready() - suite.Require().NoError(verifier.srcChangeReader.getError().Get()) + + suite.Require().NoError(eg.Wait()) startAtTs, hasStartAtTs = verifier.srcChangeReader.getStartTimestamp().Get() suite.Require().True(hasStartAtTs, "startAtTs should be set") diff --git a/internal/verifier/check.go b/internal/verifier/check.go index 6ad7261e..42ce141d 100644 --- a/internal/verifier/check.go +++ b/internal/verifier/check.go @@ -1,6 +1,7 @@ package verifier import ( + "cmp" "context" "fmt" "time" @@ -27,9 +28,11 @@ const ( findTaskTimeWarnThreshold = 5 * time.Second ) -var failedStatuses = mapset.NewSet( - verificationTaskFailed, - verificationTaskMetadataMismatch, +var ( + failedStatuses = mapset.NewSet( + verificationTaskFailed, + verificationTaskMetadataMismatch, + ) ) // Check is the asynchronous entry point to Check, should only be called by the web server. Use @@ -50,27 +53,6 @@ func (verifier *Verifier) Check(ctx context.Context, filter bson.D) { verifier.MaybeStartPeriodicHeapProfileCollection(ctx) } -func (verifier *Verifier) waitForChangeReader(ctx context.Context, csr changeReader) error { - select { - case <-ctx.Done(): - return util.WrapCtxErrWithCause(ctx) - case <-csr.getError().Ready(): - err := csr.getError().Get() - - if err != nil { - verifier.logger.Warn().Err(err). - Msgf("Received error from %s.", csr) - } else { - verifier.logger.Debug(). - Msgf("Received completion signal from %s.", csr) - } - - return err - } - - return nil -} - func (verifier *Verifier) CheckWorker(ctxIn context.Context) error { generation := verifier.generation @@ -96,12 +78,14 @@ func (verifier *Verifier) CheckWorker(ctxIn context.Context) error { // If the change reader fails, everything should stop. eg.Go(func() error { select { - case <-verifier.srcChangeReader.getError().Ready(): - err := verifier.srcChangeReader.getError().Get() - return errors.Wrapf(err, "%s failed", verifier.srcChangeReader) - case <-verifier.dstChangeReader.getError().Ready(): - err := verifier.dstChangeReader.getError().Get() - return errors.Wrapf(err, "%s failed", verifier.dstChangeReader) + case <-verifier.changeReaderErr.Ready(): + return errors.Wrap( + cmp.Or( + verifier.changeReaderErr.Get(), + fmt.Errorf("change handling stopped prematurely"), + ), + verifier.dstChangeReader.String(), + ) case <-ctx.Done(): return nil } @@ -272,23 +256,28 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh verifier.phase = Idle }() - ceHandlerGroup, groupCtx := contextplus.ErrGroup(ctx) + changeReaderGroup, groupCtx := contextplus.ErrGroup(ctx) for _, changeReader := range mslices.Of(verifier.srcChangeReader, verifier.dstChangeReader) { if changeReader.isRunning() { verifier.logger.Debug().Msgf("Check: %s already running.", changeReader) } else { verifier.logger.Debug().Msgf("%s not running; starting change reader", changeReader) - err = changeReader.start(groupCtx, ceHandlerGroup) + err = changeReader.start(groupCtx, changeReaderGroup) if err != nil { return errors.Wrapf(err, "failed to start %s", changeReader) } - ceHandlerGroup.Go(func() error { + changeReaderGroup.Go(func() error { return verifier.RunChangeEventPersistor(groupCtx, changeReader) }) } } + verifier.changeReaderErr = util.NewEventual[error]() + go func() { + verifier.changeReaderErr.Set(changeReaderGroup.Wait()) + }() + // Log the verification status when initially booting up so it's easy to see the current state verificationStatus, err := verifier.GetVerificationStatus(ctx) if err != nil { @@ -373,23 +362,19 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh // generation number, or the last changes will not be checked. verifier.mux.Unlock() - for _, csr := range mslices.Of(verifier.srcChangeReader, verifier.dstChangeReader) { - if err = verifier.waitForChangeReader(ctx, csr); err != nil { - return errors.Wrapf( - err, - "an error interrupted the wait for closure of %s", - csr, - ) + select { + case <-ctx.Done(): + return ctx.Err() + case <-verifier.changeReaderErr.Ready(): + err := verifier.changeReaderErr.Get() + if err != nil { + return errors.Wrap(err, "handling change events") } verifier.logger.Debug(). - Stringer("changeReader", csr). - Msg("Change reader finished.") + Msg("Change readers finished.") } - if err = ceHandlerGroup.Wait(); err != nil { - return err - } verifier.mux.Lock() verifier.lastGeneration = true } diff --git a/internal/verifier/migration_verifier.go b/internal/verifier/migration_verifier.go index 9e75bf18..76f98e61 100644 --- a/internal/verifier/migration_verifier.go +++ b/internal/verifier/migration_verifier.go @@ -103,6 +103,8 @@ type Verifier struct { srcEventRecorder *EventRecorder dstEventRecorder *EventRecorder + changeReaderErr *util.Eventual[error] + // Used only with generation 0 to defer the first // progress report until after we’ve finished partitioning // every collection. @@ -272,20 +274,23 @@ func (verifier *Verifier) WritesOff(ctx context.Context) error { // This has to happen outside the lock because the change readers // might be inserting docs into the recheck queue, which happens // under the lock. - select { - case <-verifier.srcChangeReader.getError().Ready(): - err := verifier.srcChangeReader.getError().Get() - return errors.Wrapf(err, "tried to send writes-off timestamp to %s, but change reader already failed", verifier.srcChangeReader) - default: - verifier.srcChangeReader.setWritesOff(srcFinalTs) - } - - select { - case <-verifier.dstChangeReader.getError().Ready(): - err := verifier.dstChangeReader.getError().Get() - return errors.Wrapf(err, "tried to send writes-off timestamp to %s, but change reader already failed", verifier.dstChangeReader) - default: - verifier.dstChangeReader.setWritesOff(dstFinalTs) + for _, readerAndTS := range []struct { + reader changeReader + ts bson.Timestamp + }{ + {verifier.srcChangeReader, srcFinalTs}, + {verifier.dstChangeReader, dstFinalTs}, + } { + select { + case <-ctx.Done(): + return ctx.Err() + case <-verifier.changeReaderErr.Ready(): + return errors.Wrapf( + verifier.changeReaderErr.Get(), + "tried to send writes-off timestamp to %s, but change handling already failed", readerAndTS.reader) + default: + readerAndTS.reader.setWritesOff(readerAndTS.ts) + } } return nil From d47c40a5bd9d17d9fe5c35a9c38fd5e80c8fdb25 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Wed, 12 Nov 2025 10:56:39 -0500 Subject: [PATCH 18/24] allow eventual to accept nil -- MUST TEST! --- internal/util/eventual.go | 20 ++++++++++---------- internal/util/eventual_test.go | 24 ++++++++++++++++++++++-- 2 files changed, 32 insertions(+), 12 deletions(-) diff --git a/internal/util/eventual.go b/internal/util/eventual.go index ad2c6dd7..4bd37271 100644 --- a/internal/util/eventual.go +++ b/internal/util/eventual.go @@ -2,8 +2,6 @@ package util import ( "sync" - - "github.com/10gen/migration-verifier/option" ) // Eventual solves the “one writer, many readers” problem: a value gets @@ -14,7 +12,7 @@ import ( // generalized to any data type. type Eventual[T any] struct { ready chan struct{} - val option.Option[T] + val T mux sync.RWMutex } @@ -37,12 +35,12 @@ func (e *Eventual[T]) Get() T { e.mux.RLock() defer e.mux.RUnlock() - val, has := e.val.Get() - if has { - return val + select { + case <-e.ready: + return e.val + default: + panic("Eventual's Get() called before value was ready.") } - - panic("Eventual's Get() called before value was ready.") } // Set sets the Eventual’s value. It may be called only once; @@ -51,13 +49,15 @@ func (e *Eventual[T]) Set(val T) { e.mux.Lock() defer e.mux.Unlock() - if e.val.IsSome() { + select { + case <-e.ready: panic("Tried to set an eventual twice!") + default: } // NB: This *must* happen before the close(), or else a fast reader may // not see this value. - e.val = option.Some(val) + e.val = val close(e.ready) } diff --git a/internal/util/eventual_test.go b/internal/util/eventual_test.go index 17f4c4b7..6196e4e2 100644 --- a/internal/util/eventual_test.go +++ b/internal/util/eventual_test.go @@ -15,14 +15,14 @@ func (s *UnitTestSuite) TestEventual() { select { case <-eventual.Ready(): s.Require().Fail("should not be ready") - case <-time.NewTimer(time.Second).C: + case <-time.NewTimer(time.Millisecond).C: } eventual.Set(123) select { case <-eventual.Ready(): - case <-time.NewTimer(time.Second).C: + case <-time.NewTimer(time.Millisecond).C: s.Require().Fail("should be ready") } @@ -32,3 +32,23 @@ func (s *UnitTestSuite) TestEventual() { "Get() should return the value", ) } + +func (s *UnitTestSuite) TestEventualNil() { + eventual := NewEventual[error]() + + select { + case <-eventual.Ready(): + s.Require().Fail("should not be ready") + case <-time.NewTimer(time.Millisecond).C: + } + + eventual.Set(nil) + + select { + case <-eventual.Ready(): + case <-time.NewTimer(time.Millisecond).C: + s.Require().Fail("should be ready") + } + + s.Assert().Nil(eventual.Get()) +} From 4e38f0429ee93ea9f6cd606c4ccfc0b804487464 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Wed, 12 Nov 2025 11:39:23 -0500 Subject: [PATCH 19/24] allow premature exit --- internal/verifier/check.go | 6 +----- internal/verifier/timeseries_test.go | 3 ++- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/internal/verifier/check.go b/internal/verifier/check.go index 42ce141d..12296f0d 100644 --- a/internal/verifier/check.go +++ b/internal/verifier/check.go @@ -1,7 +1,6 @@ package verifier import ( - "cmp" "context" "fmt" "time" @@ -80,10 +79,7 @@ func (verifier *Verifier) CheckWorker(ctxIn context.Context) error { select { case <-verifier.changeReaderErr.Ready(): return errors.Wrap( - cmp.Or( - verifier.changeReaderErr.Get(), - fmt.Errorf("change handling stopped prematurely"), - ), + verifier.changeReaderErr.Get(), verifier.dstChangeReader.String(), ) case <-ctx.Done(): diff --git a/internal/verifier/timeseries_test.go b/internal/verifier/timeseries_test.go index ae8548d9..9497b0a9 100644 --- a/internal/verifier/timeseries_test.go +++ b/internal/verifier/timeseries_test.go @@ -298,7 +298,8 @@ func (suite *IntegrationTestSuite) TestTimeSeries_Simple() { suite.Assert().Equal( 0, verificationStatus.FailedTasks, - "should be no failed tasks", + "should be no failed tasks (status: %+v)", + verificationStatus, ) suite.Assert().Equal( 3, From 0d4e40607133b90aa19a32705c24623023416436 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Wed, 12 Nov 2025 12:43:14 -0500 Subject: [PATCH 20/24] rename & move --- internal/verifier/check.go | 13 ++++++------- internal/verifier/migration_verifier.go | 8 +++++--- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/internal/verifier/check.go b/internal/verifier/check.go index 12296f0d..3930fccc 100644 --- a/internal/verifier/check.go +++ b/internal/verifier/check.go @@ -8,7 +8,6 @@ import ( "github.com/10gen/migration-verifier/contextplus" "github.com/10gen/migration-verifier/internal/logger" "github.com/10gen/migration-verifier/internal/retry" - "github.com/10gen/migration-verifier/internal/util" "github.com/10gen/migration-verifier/mslices" mapset "github.com/deckarep/golang-set/v2" "github.com/goaux/timer" @@ -77,9 +76,9 @@ func (verifier *Verifier) CheckWorker(ctxIn context.Context) error { // If the change reader fails, everything should stop. eg.Go(func() error { select { - case <-verifier.changeReaderErr.Ready(): + case <-verifier.changeHandlingErr.Ready(): return errors.Wrap( - verifier.changeReaderErr.Get(), + verifier.changeHandlingErr.Get(), verifier.dstChangeReader.String(), ) case <-ctx.Done(): @@ -269,9 +268,9 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh } } - verifier.changeReaderErr = util.NewEventual[error]() + changeHandlingErr := verifier.changeHandlingErr go func() { - verifier.changeReaderErr.Set(changeReaderGroup.Wait()) + changeHandlingErr.Set(changeReaderGroup.Wait()) }() // Log the verification status when initially booting up so it's easy to see the current state @@ -361,8 +360,8 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh select { case <-ctx.Done(): return ctx.Err() - case <-verifier.changeReaderErr.Ready(): - err := verifier.changeReaderErr.Get() + case <-verifier.changeHandlingErr.Ready(): + err := verifier.changeHandlingErr.Get() if err != nil { return errors.Wrap(err, "handling change events") } diff --git a/internal/verifier/migration_verifier.go b/internal/verifier/migration_verifier.go index 76f98e61..e23445cb 100644 --- a/internal/verifier/migration_verifier.go +++ b/internal/verifier/migration_verifier.go @@ -103,7 +103,7 @@ type Verifier struct { srcEventRecorder *EventRecorder dstEventRecorder *EventRecorder - changeReaderErr *util.Eventual[error] + changeHandlingErr *util.Eventual[error] // Used only with generation 0 to defer the first // progress report until after we’ve finished partitioning @@ -198,6 +198,8 @@ func NewVerifier(settings VerifierSettings, logPath string) *Verifier { verificationStatusCheckInterval: 2 * time.Second, nsMap: NewNSMap(), + + changeHandlingErr: util.NewEventual[error](), } } @@ -284,9 +286,9 @@ func (verifier *Verifier) WritesOff(ctx context.Context) error { select { case <-ctx.Done(): return ctx.Err() - case <-verifier.changeReaderErr.Ready(): + case <-verifier.changeHandlingErr.Ready(): return errors.Wrapf( - verifier.changeReaderErr.Get(), + verifier.changeHandlingErr.Get(), "tried to send writes-off timestamp to %s, but change handling already failed", readerAndTS.reader) default: readerAndTS.reader.setWritesOff(readerAndTS.ts) From e23a50334c229aa996ddc1f15b3c402714b3f44a Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Wed, 12 Nov 2025 13:02:19 -0500 Subject: [PATCH 21/24] move --- internal/verifier/change_stream.go | 37 ----------------------------- internal/verifier/check.go | 38 ++++++++++++++++++++++++++++++ 2 files changed, 38 insertions(+), 37 deletions(-) diff --git a/internal/verifier/change_stream.go b/internal/verifier/change_stream.go index 02f63620..4a91a3c3 100644 --- a/internal/verifier/change_stream.go +++ b/internal/verifier/change_stream.go @@ -5,13 +5,10 @@ import ( "fmt" "time" - "github.com/10gen/migration-verifier/history" "github.com/10gen/migration-verifier/internal/keystring" "github.com/10gen/migration-verifier/internal/retry" "github.com/10gen/migration-verifier/internal/util" "github.com/10gen/migration-verifier/mbson" - "github.com/10gen/migration-verifier/mslices" - "github.com/10gen/migration-verifier/msync" "github.com/10gen/migration-verifier/option" mapset "github.com/deckarep/golang-set/v2" clone "github.com/huandu/go-clone/generic" @@ -50,40 +47,6 @@ type ChangeStreamReader struct { var _ changeReader = &ChangeStreamReader{} -func (verifier *Verifier) initializeChangeReaders() { - srcReader := &ChangeStreamReader{ - ChangeReaderCommon: ChangeReaderCommon{ - readerType: src, - namespaces: verifier.srcNamespaces, - watcherClient: verifier.srcClient, - clusterInfo: *verifier.srcClusterInfo, - }, - } - verifier.srcChangeReader = srcReader - - dstReader := &ChangeStreamReader{ - ChangeReaderCommon: ChangeReaderCommon{ - readerType: dst, - namespaces: verifier.dstNamespaces, - watcherClient: verifier.dstClient, - clusterInfo: *verifier.dstClusterInfo, - onDDLEvent: onDDLEventAllow, - }, - } - verifier.dstChangeReader = dstReader - - // Common elements in both readers: - for _, csr := range mslices.Of(srcReader, dstReader) { - csr.logger = verifier.logger - csr.metaDB = verifier.metaClient.Database(verifier.metaDBName) - csr.changeEventBatchChan = make(chan changeEventBatch, batchChanBufferSize) - csr.writesOffTs = util.NewEventual[bson.Timestamp]() - csr.lag = msync.NewTypedAtomic(option.None[time.Duration]()) - csr.batchSizeHistory = history.New[int](time.Minute) - csr.resumeTokenTSExtractor = extractTSFromChangeStreamResumeToken - } -} - // GetChangeStreamFilter returns an aggregation pipeline that filters // namespaces as per configuration. // diff --git a/internal/verifier/check.go b/internal/verifier/check.go index 3930fccc..3fea7dcf 100644 --- a/internal/verifier/check.go +++ b/internal/verifier/check.go @@ -6,9 +6,13 @@ import ( "time" "github.com/10gen/migration-verifier/contextplus" + "github.com/10gen/migration-verifier/history" "github.com/10gen/migration-verifier/internal/logger" "github.com/10gen/migration-verifier/internal/retry" + "github.com/10gen/migration-verifier/internal/util" "github.com/10gen/migration-verifier/mslices" + "github.com/10gen/migration-verifier/msync" + "github.com/10gen/migration-verifier/option" mapset "github.com/deckarep/golang-set/v2" "github.com/goaux/timer" "github.com/pkg/errors" @@ -595,3 +599,37 @@ func (verifier *Verifier) work(ctx context.Context, workerNum int) error { } } } + +func (verifier *Verifier) initializeChangeReaders() { + srcReader := &ChangeStreamReader{ + ChangeReaderCommon: ChangeReaderCommon{ + readerType: src, + namespaces: verifier.srcNamespaces, + watcherClient: verifier.srcClient, + clusterInfo: *verifier.srcClusterInfo, + }, + } + verifier.srcChangeReader = srcReader + + dstReader := &ChangeStreamReader{ + ChangeReaderCommon: ChangeReaderCommon{ + readerType: dst, + namespaces: verifier.dstNamespaces, + watcherClient: verifier.dstClient, + clusterInfo: *verifier.dstClusterInfo, + onDDLEvent: onDDLEventAllow, + }, + } + verifier.dstChangeReader = dstReader + + // Common elements in both readers: + for _, csr := range mslices.Of(srcReader, dstReader) { + csr.logger = verifier.logger + csr.metaDB = verifier.metaClient.Database(verifier.metaDBName) + csr.changeEventBatchChan = make(chan changeEventBatch, batchChanBufferSize) + csr.writesOffTs = util.NewEventual[bson.Timestamp]() + csr.lag = msync.NewTypedAtomic(option.None[time.Duration]()) + csr.batchSizeHistory = history.New[int](time.Minute) + csr.resumeTokenTSExtractor = extractTSFromChangeStreamResumeToken + } +} From d61a8f1966fe2274d681a267d8ffd78bf3f3eb2b Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Wed, 12 Nov 2025 13:07:07 -0500 Subject: [PATCH 22/24] handling --- internal/verifier/check.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/verifier/check.go b/internal/verifier/check.go index 3fea7dcf..529c3e3f 100644 --- a/internal/verifier/check.go +++ b/internal/verifier/check.go @@ -371,7 +371,7 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh } verifier.logger.Debug(). - Msg("Change readers finished.") + Msg("Change handling finished.") } verifier.mux.Lock() From eb0464da02ca361c6dd07066367beb4d578e3395 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Wed, 12 Nov 2025 15:34:41 -0500 Subject: [PATCH 23/24] test 2nd time --- internal/util/eventual_test.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/internal/util/eventual_test.go b/internal/util/eventual_test.go index 6196e4e2..4bcc209e 100644 --- a/internal/util/eventual_test.go +++ b/internal/util/eventual_test.go @@ -31,6 +31,12 @@ func (s *UnitTestSuite) TestEventual() { eventual.Get(), "Get() should return the value", ) + + s.Assert().Equal( + 123, + eventual.Get(), + "Get() should return the value a 2nd time", + ) } func (s *UnitTestSuite) TestEventualNil() { From 4df242e13f56ff35bf6d5848a68822ac7c79143a Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Wed, 12 Nov 2025 16:11:41 -0500 Subject: [PATCH 24/24] add comment --- internal/util/eventual.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/internal/util/eventual.go b/internal/util/eventual.go index 4bd37271..901d2a1f 100644 --- a/internal/util/eventual.go +++ b/internal/util/eventual.go @@ -35,6 +35,8 @@ func (e *Eventual[T]) Get() T { e.mux.RLock() defer e.mux.RUnlock() + // If the ready channel is still open then there’s no value yet, + // which means this method should not have been called. select { case <-e.ready: return e.val @@ -59,5 +61,6 @@ func (e *Eventual[T]) Set(val T) { // not see this value. e.val = val + // This allows Get() to work: close(e.ready) }