diff --git a/.github/workflows/all.yml b/.github/workflows/all.yml index ced7c10d..1ede5caf 100644 --- a/.github/workflows/all.yml +++ b/.github/workflows/all.yml @@ -7,6 +7,12 @@ on: pull_request: workflow_dispatch: +env: + replsetSrcConnStr: mongodb://localhost:27020,localhost:27021,localhost:27022 + replsetDstConnStr: mongodb://localhost:27030,localhost:27031,localhost:27032 + shardedSrcConnStr: mongodb://localhost:27020 + shardedDstConnStr: mongodb://localhost:27030 + jobs: basics: strategy: @@ -17,10 +23,37 @@ jobs: # Testing fallback when `hello` isn’t implemented # (but appendOplogNote is). - mongodb_versions: [ '4.2.5', '6.0' ] - topology: - name: replset - srcConnStr: mongodb://localhost:27020,localhost:27021,localhost:27022 - dstConnStr: mongodb://localhost:27030,localhost:27031,localhost:27032 + topology: replset + + - mongodb_versions: [ '4.2', '8.0' ] + topology: replset + srcChangeReader: tailOplog + dstChangeReader: tailOplog + + - mongodb_versions: [ '4.4', '8.0' ] + topology: replset + srcChangeReader: tailOplog + dstChangeReader: tailOplog + + - mongodb_versions: [ '5.0', '8.0' ] + topology: replset + srcChangeReader: tailOplog + dstChangeReader: tailOplog + + - mongodb_versions: [ '6.0', '8.0' ] + topology: replset + srcChangeReader: tailOplog + dstChangeReader: tailOplog + + - mongodb_versions: [ '7.0', '8.0' ] + topology: replset + srcChangeReader: tailOplog + dstChangeReader: tailOplog + + - mongodb_versions: [ '8.0', '8.0' ] + topology: replset + srcChangeReader: tailOplog + dstChangeReader: tailOplog exclude: - mongodb_versions: [ '4.2', '4.2' ] @@ -31,6 +64,8 @@ jobs: toHashedIndexKey: true - mongodb_versions: [ '4.2', '6.0' ] toHashedIndexKey: true + - mongodb_versions: [ '4.2', '8.0' ] + toHashedIndexKey: true # versions are: source, destination mongodb_versions: @@ -38,6 +73,7 @@ jobs: - [ '4.2', '4.4' ] - [ '4.2', '5.0' ] - [ '4.2', '6.0' ] + - [ '4.2', '8.0' ] - [ '4.4', '4.4' ] - [ '4.4', '5.0' ] @@ -60,27 +96,19 @@ jobs: toHashedIndexKey: [true, false] - topology: - - name: replset - srcConnStr: mongodb://localhost:27020,localhost:27021,localhost:27022 - dstConnStr: mongodb://localhost:27030,localhost:27031,localhost:27032 - - - name: replset-to-sharded - dstArgs: --sharded 2 - srcConnStr: mongodb://localhost:27020,localhost:27021,localhost:27022 - dstConnStr: mongodb://localhost:27030 + srcChangeReader: [changeStream] + dstChangeReader: [changeStream] - - name: sharded - srcArgs: --sharded 2 - dstArgs: --sharded 2 - srcConnStr: mongodb://localhost:27020 - dstConnStr: mongodb://localhost:27030 + topology: + - replset + - replset-to-sharded + - sharded # Ubuntu 24 lacks OpenSSL 1.1.1’s libcrypto, which pre-v6 MongoDB # versions need. runs-on: ubuntu-22.04 - name: ${{ matrix.mongodb_versions[0] }} to ${{ matrix.mongodb_versions[1] }}, ${{ matrix.topology.name }}${{ matrix.toHashedIndexKey && ', hashed doc compare' || '' }} + name: ${{ matrix.mongodb_versions[0] }} to ${{ matrix.mongodb_versions[1] }}, ${{ matrix.topology }}${{ matrix.toHashedIndexKey && ', hashed doc compare' || '' }}, srcChangeReader=${{ matrix.srcChangeReader }}, dstChangeReader=${{ matrix.dstChangeReader }} steps: - run: uname -a @@ -115,8 +143,8 @@ jobs: run: |- { echo ./build.sh - echo mlaunch init --binarypath $(cat .srcpath) --port 27020 --dir src --replicaset ${{ matrix.topology.srcArgs }} - echo mlaunch init --binarypath $(cat .dstpath) --port 27030 --dir dst --replicaset ${{ matrix.topology.dstArgs }} + echo mlaunch init --binarypath $(cat .srcpath) --port 27020 --dir src --replicaset ${{ (matrix.topology == 'sharded') && '--sharded 2' || '' }} + echo mlaunch init --binarypath $(cat .dstpath) --port 27030 --dir dst --replicaset ${{ (matrix.topology == 'sharded' || matrix.topology == 'replset-to-sharded') && '--sharded 2' || '' }} echo mlaunch init --binarypath $(cat .metapath) --port 27040 --dir meta --replicaset --nodes 1 } | parallel @@ -124,6 +152,11 @@ jobs: run: go test -v ./... -race env: MVTEST_DOC_COMPARE_METHOD: ${{matrix.toHashedIndexKey && 'toHashedIndexKey' || ''}} - MVTEST_SRC: ${{matrix.topology.srcConnStr}} - MVTEST_DST: ${{matrix.topology.dstConnStr}} + + MVTEST_SRC_CHANGE_READER: ${{matrix.srcChangeReader}} + MVTEST_DST_CHANGE_READER: ${{matrix.dstChangeReader}} + + MVTEST_SRC: ${{ (matrix.topology == 'sharded') && env.shardedSrcConnStr || env.replsetSrcConnStr }} + MVTEST_DST: ${{ (matrix.topology == 'sharded' || matrix.topology == 'replset-to-sharded') && env.shardedDstConnStr || env.replsetDstConnStr }} + MVTEST_META: mongodb://localhost:27040 diff --git a/agg/agg.go b/agg/agg.go new file mode 100644 index 00000000..f1202cf7 --- /dev/null +++ b/agg/agg.go @@ -0,0 +1,185 @@ +package agg + +import ( + "go.mongodb.org/mongo-driver/v2/bson" +) + +func Eq(comparands ...any) bson.D { + return bson.D{{"$eq", comparands}} +} + +func In[T any](needle any, haystack ...T) bson.D { + return bson.D{{"$in", bson.A{needle, haystack}}} +} + +func BSONSize(ref any) bson.D { + return bson.D{{"$bsonSize", ref}} +} + +func Type(ref any) bson.D { + return bson.D{{"$type", ref}} +} + +func Concat(refs ...any) bson.D { + return bson.D{{"$concat", refs}} +} + +// --------------------------------------------- + +type Not struct { + Ref any +} + +var _ bson.Marshaler = Not{} + +func (n Not) MarshalBSON() ([]byte, error) { + return bson.Marshal(bson.D{ + {"$not", n.Ref}, + }) +} + +// --------------------------------------------- + +type And []any + +var _ bson.Marshaler = And{} + +func (a And) MarshalBSON() ([]byte, error) { + return bson.Marshal(bson.D{ + {"$and", []any(a)}, + }) +} + +// --------------------------------------------- + +type Or []any + +var _ bson.Marshaler = Or{} + +func (o Or) MarshalBSON() ([]byte, error) { + return bson.Marshal(bson.D{ + {"$or", []any(o)}, + }) +} + +// --------------------------------------------- + +type MergeObjects []any + +var _ bson.Marshaler = MergeObjects{} + +func (m MergeObjects) MarshalBSON() ([]byte, error) { + return bson.Marshal(bson.D{ + {"$mergeObjects", []any(m)}, + }) +} + +// --------------------------------------------- + +type Cond struct { + If, Then, Else any +} + +var _ bson.Marshaler = Cond{} + +func (c Cond) D() bson.D { + return bson.D{ + {"$cond", bson.D{ + {"if", c.If}, + {"then", c.Then}, + {"else", c.Else}, + }}, + } +} + +func (c Cond) MarshalBSON() ([]byte, error) { + return bson.Marshal(c.D()) +} + +// --------------------------------------------- + +type Switch struct { + Branches []SwitchCase + Default any +} + +type SwitchCase struct { + Case any + Then any +} + +func (s Switch) D() bson.D { + return bson.D{{"$switch", bson.D{ + {"branches", s.Branches}, + {"default", s.Default}, + }}} +} + +func (s Switch) MarshalBSON() ([]byte, error) { + return bson.Marshal(s.D()) +} + +// --------------------------------------------- + +type ArrayElemAt struct { + Array any + Index int +} + +func (a ArrayElemAt) D() bson.D { + return bson.D{{"$arrayElemAt", bson.A{ + a.Array, + a.Index, + }}} +} + +func (a ArrayElemAt) MarshalBSON() ([]byte, error) { + return bson.Marshal(a.D()) +} + +// --------------------------------------------- + +type Map struct { + Input, As, In any +} + +var _ bson.Marshaler = Map{} + +func (m Map) D() bson.D { + return bson.D{ + {"$map", bson.D{ + {"input", m.Input}, + {"as", m.As}, + {"in", m.In}, + }}, + } +} + +func (m Map) MarshalBSON() ([]byte, error) { + return bson.Marshal(m.D()) +} + +// ------------------------------------------ + +type Filter struct { + Input, As, Cond, Limit any +} + +var _ bson.Marshaler = Filter{} + +func (f Filter) D() bson.D { + d := bson.D{ + {"input", f.Input}, + {"as", f.As}, + {"cond", f.Cond}, + } + + if f.Limit != nil { + d = append(d, bson.E{"limit", f.Limit}) + } + return bson.D{{"$filter", d}} +} + +func (f Filter) MarshalBSON() ([]byte, error) { + return bson.Marshal(f.D()) +} diff --git a/agg/helpers/string.go b/agg/helpers/string.go new file mode 100644 index 00000000..99502190 --- /dev/null +++ b/agg/helpers/string.go @@ -0,0 +1,35 @@ +package helpers + +import ( + "go.mongodb.org/mongo-driver/v2/bson" +) + +type StringHasPrefix struct { + FieldRef any + Prefix string +} + +func (sp StringHasPrefix) MarshalBSON() ([]byte, error) { + return bson.Marshal(bson.D{ + {"$eq", bson.A{ + 0, + bson.D{{"$indexOfCP", bson.A{ + sp.FieldRef, + sp.Prefix, + 0, + 1, + }}}, + }}, + }) + + /* + return bson.Marshal(agg.Eq( + sp.Prefix, + agg.SubstrBytes{ + sp.FieldRef, + 0, + len(sp.Prefix), + }, + )) + */ +} diff --git a/internal/retry/retry.go b/internal/retry/retry.go index 62f74e8d..8f7b260d 100644 --- a/internal/retry/retry.go +++ b/internal/retry/retry.go @@ -180,6 +180,10 @@ func (r *Retryer) runRetryLoop( // Not a transient error? Fail immediately. if !r.shouldRetryWithSleep(logger, sleepTime, descriptions, cbErr) { + if descr, has := r.description.Get(); has { + cbErr = errors.Wrap(cbErr, descr) + } + return cbErr } @@ -187,11 +191,17 @@ func (r *Retryer) runRetryLoop( // then fail. if failedFuncInfo.GetDurationSoFar() > li.durationLimit { - return RetryDurationLimitExceededErr{ + var err error = RetryDurationLimitExceededErr{ attempts: li.attemptsSoFar, duration: failedFuncInfo.GetDurationSoFar(), lastErr: groupErr.errFromCallback, } + + if descr, has := r.description.Get(); has { + err = errors.Wrap(err, descr) + } + + return err } // Sleep and increase the sleep time for the next retry, diff --git a/internal/verifier/change_reader.go b/internal/verifier/change_reader.go index 143f5f04..f611e716 100644 --- a/internal/verifier/change_reader.go +++ b/internal/verifier/change_reader.go @@ -6,12 +6,14 @@ import ( "github.com/10gen/migration-verifier/history" "github.com/10gen/migration-verifier/internal/logger" + "github.com/10gen/migration-verifier/internal/retry" "github.com/10gen/migration-verifier/internal/util" "github.com/10gen/migration-verifier/msync" "github.com/10gen/migration-verifier/option" "github.com/pkg/errors" "github.com/rs/zerolog" "github.com/samber/lo" + "github.com/samber/mo" "go.mongodb.org/mongo-driver/v2/bson" "go.mongodb.org/mongo-driver/v2/mongo" "go.mongodb.org/mongo-driver/v2/mongo/options" @@ -21,7 +23,7 @@ import ( type ddlEventHandling string const ( - fauxDocSizeForDeleteEvents = 1024 + defaultUserDocumentSize = 1024 // The number of batches we’ll hold in memory at once. batchChanBufferSize = 100 @@ -35,6 +37,7 @@ type changeReader interface { getWhichCluster() whichCluster getReadChannel() <-chan changeEventBatch getStartTimestamp() option.Option[bson.Timestamp] + getLatestTimestamp() option.Option[bson.Timestamp] getEventsPerSecond() option.Option[float64] getLag() option.Option[time.Duration] getBufferSaturation() float64 @@ -48,9 +51,8 @@ type changeReader interface { type ChangeReaderCommon struct { readerType whichCluster - lastChangeEventTime *bson.Timestamp - logger *logger.Logger - namespaces []string + logger *logger.Logger + namespaces []string metaDB *mongo.Database watcherClient *mongo.Client @@ -62,14 +64,35 @@ type ChangeReaderCommon struct { changeEventBatchChan chan changeEventBatch writesOffTs *util.Eventual[bson.Timestamp] + lastChangeEventTime *msync.TypedAtomic[option.Option[bson.Timestamp]] + startAtTs *bson.Timestamp lag *msync.TypedAtomic[option.Option[time.Duration]] batchSizeHistory *history.History[int] + createIteratorCb func(context.Context, *mongo.Session) (bson.Timestamp, error) + iterateCb func(context.Context, *retry.FuncInfo, *mongo.Session) error + onDDLEvent ddlEventHandling } +func newChangeReaderCommon(clusterName whichCluster) ChangeReaderCommon { + return ChangeReaderCommon{ + readerType: clusterName, + changeEventBatchChan: make(chan changeEventBatch, batchChanBufferSize), + writesOffTs: util.NewEventual[bson.Timestamp](), + lag: msync.NewTypedAtomic(option.None[time.Duration]()), + lastChangeEventTime: msync.NewTypedAtomic(option.None[bson.Timestamp]()), + batchSizeHistory: history.New[int](time.Minute), + onDDLEvent: lo.Ternary( + clusterName == dst, + onDDLEventAllow, + "", + ), + } +} + func (rc *ChangeReaderCommon) getWhichCluster() whichCluster { return rc.readerType } @@ -90,6 +113,10 @@ func (rc *ChangeReaderCommon) getReadChannel() <-chan changeEventBatch { return rc.changeEventBatchChan } +func (rc *ChangeReaderCommon) getLatestTimestamp() option.Option[bson.Timestamp] { + return rc.lastChangeEventTime.Load() +} + // getBufferSaturation returns the reader’s internal buffer’s saturation level // as a fraction. If saturation rises, that means we’re reading events faster // than we can persist them. @@ -129,43 +156,134 @@ func (rc *ChangeReaderCommon) getEventsPerSecond() option.Option[float64] { return option.None[float64]() } +// start starts the change reader +func (rc *ChangeReaderCommon) start( + ctx context.Context, + eg *errgroup.Group, +) error { + // This channel holds the first change stream creation's result, whether + // success or failure. Rather than using a Result we could make separate + // Timestamp and error channels, but the single channel is cleaner since + // there's no chance of "nonsense" like both channels returning a payload. + initialCreateResultChan := make(chan mo.Result[bson.Timestamp]) + + eg.Go( + func() error { + // Closing changeEventBatchChan at the end of change stream goroutine + // notifies the verifier's change event handler to exit. + defer func() { + rc.logger.Debug(). + Str("reader", string(rc.readerType)). + Msg("Finished.") + + close(rc.changeEventBatchChan) + }() + + retryer := retry.New().WithErrorCodes(util.CursorKilledErrCode) + + parentThreadWaiting := true + + err := retryer.WithCallback( + func(ctx context.Context, ri *retry.FuncInfo) error { + sess, err := rc.watcherClient.StartSession() + if err != nil { + return errors.Wrap(err, "failed to start session") + } + + startTs, err := rc.createIteratorCb(ctx, sess) + if err != nil { + logEvent := rc.logger.Debug(). + Err(err). + Str("reader", string(rc.readerType)) + + if parentThreadWaiting { + logEvent.Msg("First change stream open failed.") + + initialCreateResultChan <- mo.Err[bson.Timestamp](err) + return nil + } + + logEvent.Msg("Retried change stream open failed.") + + return err + } + + logEvent := rc.logger.Debug(). + Str("reader", string(rc.readerType)). + Any("startTimestamp", startTs) + + if parentThreadWaiting { + logEvent.Msg("First change stream open succeeded.") + + initialCreateResultChan <- mo.Ok(startTs) + close(initialCreateResultChan) + parentThreadWaiting = false + } else { + logEvent.Msg("Retried change stream open succeeded.") + } + + return rc.iterateCb(ctx, ri, sess) + }, + "reading %s’s changes", rc.readerType, + ).Run(ctx, rc.logger) + + return err + }, + ) + + result := <-initialCreateResultChan + + startTs, err := result.Get() + if err != nil { + return errors.Wrapf(err, "creating change stream") + } + + rc.startAtTs = &startTs + + rc.running = true + + return nil +} + func (rc *ChangeReaderCommon) persistResumeToken(ctx context.Context, token bson.Raw) error { + ts, err := rc.resumeTokenTSExtractor(token) + if err != nil { + return errors.Wrapf(err, "parsing resume token %#q", token) + } + + if ts.IsZero() { + panic("empty ts in resume token is invalid!") + } + coll := rc.metaDB.Collection(changeReaderCollectionName) - _, err := coll.ReplaceOne( + _, err = coll.ReplaceOne( ctx, - bson.D{{"_id", rc.resumeTokenDocID()}}, + bson.D{{"_id", resumeTokenDocID(rc.getWhichCluster())}}, token, options.Replace().SetUpsert(true), ) - if err == nil { - ts, err := rc.resumeTokenTSExtractor(token) - - logEvent := rc.logger.Debug() + if err != nil { + return errors.Wrapf(err, "persisting %s resume token (%v)", rc.readerType, token) + } - if err == nil { - logEvent = addTimestampToLogEvent(ts, logEvent) - } else { - rc.logger.Warn().Err(err). - Msg("failed to extract resume token timestamp") - } + logEvent := rc.logger.Debug() - logEvent.Msgf("Persisted %s's resume token.", rc.readerType) + logEvent = addTimestampToLogEvent(ts, logEvent) - return nil - } + logEvent.Msgf("Persisted %s’s resume token.", rc.readerType) - return errors.Wrapf(err, "failed to persist %s resume token (%v)", rc.readerType, token) + return nil } -func (rc *ChangeReaderCommon) resumeTokenDocID() string { - switch rc.readerType { +func resumeTokenDocID(clusterType whichCluster) string { + switch clusterType { case src: return "srcResumeToken" case dst: return "dstResumeToken" default: - panic("unknown readerType: " + rc.readerType) + panic("unknown readerType: " + clusterType) } } @@ -178,7 +296,7 @@ func (rc *ChangeReaderCommon) loadResumeToken(ctx context.Context) (option.Optio token, err := coll.FindOne( ctx, - bson.D{{"_id", rc.resumeTokenDocID()}}, + bson.D{{"_id", resumeTokenDocID(rc.getWhichCluster())}}, ).Raw() if errors.Is(err, mongo.ErrNoDocuments) { diff --git a/internal/verifier/change_stream.go b/internal/verifier/change_stream.go index 4a91a3c3..6bfa3c64 100644 --- a/internal/verifier/change_stream.go +++ b/internal/verifier/change_stream.go @@ -13,12 +13,10 @@ import ( mapset "github.com/deckarep/golang-set/v2" clone "github.com/huandu/go-clone/generic" "github.com/pkg/errors" - "github.com/samber/mo" "go.mongodb.org/mongo-driver/v2/bson" "go.mongodb.org/mongo-driver/v2/mongo" "go.mongodb.org/mongo-driver/v2/mongo/options" "golang.org/x/exp/slices" - "golang.org/x/sync/errgroup" ) var supportedEventOpTypes = mapset.NewSet( @@ -29,8 +27,9 @@ var supportedEventOpTypes = mapset.NewSet( ) const ( - minChangeStreamPersistInterval = time.Second * 10 - maxChangeStreamAwaitTime = time.Second + maxChangeStreamAwaitTime = time.Second + + ChangeReaderOptChangeStream = "changeStream" ) type UnknownEventError struct { @@ -42,11 +41,37 @@ func (uee UnknownEventError) Error() string { } type ChangeStreamReader struct { - ChangeReaderCommon + changeStream *mongo.ChangeStream + *ChangeReaderCommon } var _ changeReader = &ChangeStreamReader{} +func (v *Verifier) newChangeStreamReader( + namespaces []string, + cluster whichCluster, + client *mongo.Client, + clusterInfo util.ClusterInfo, +) *ChangeStreamReader { + common := newChangeReaderCommon(cluster) + common.namespaces = namespaces + common.readerType = cluster + common.watcherClient = client + common.clusterInfo = clusterInfo + + common.logger = v.logger + common.metaDB = v.metaClient.Database(v.metaDBName) + + common.resumeTokenTSExtractor = extractTSFromChangeStreamResumeToken + + csr := &ChangeStreamReader{ChangeReaderCommon: &common} + + common.createIteratorCb = csr.createChangeStream + common.iterateCb = csr.iterateChangeStream + + return csr +} + // GetChangeStreamFilter returns an aggregation pipeline that filters // namespaces as per configuration. // @@ -123,10 +148,9 @@ func (csr *ChangeStreamReader) GetChangeStreamFilter() (pipeline mongo.Pipeline) // is unideal but shouldn’t impede correctness since post-writesOff events // shouldn’t really happen anyway by definition. func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch( - ctx context.Context, + sctx context.Context, ri *retry.FuncInfo, cs *mongo.ChangeStream, - sess *mongo.Session, ) error { eventsRead := 0 var changeEvents []ParsedEvent @@ -135,7 +159,7 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch( var batchTotalBytes int for hasEventInBatch := true; hasEventInBatch; hasEventInBatch = cs.RemainingBatchLength() > 0 { - gotEvent := cs.TryNext(ctx) + gotEvent := cs.TryNext(sctx) if cs.Err() != nil { return errors.Wrap(cs.Err(), "change stream iteration failed") @@ -193,17 +217,17 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch( return errors.Errorf("Change event lacks a namespace: %+v", changeEvents[eventsRead]) } - if changeEvents[eventsRead].ClusterTime != nil && - (csr.lastChangeEventTime == nil || - csr.lastChangeEventTime.Before(*changeEvents[eventsRead].ClusterTime)) { - - csr.lastChangeEventTime = changeEvents[eventsRead].ClusterTime + eventTime := changeEvents[eventsRead].ClusterTime + if eventTime != nil && csr.lastChangeEventTime.Load().OrZero().Before(*eventTime) { + csr.lastChangeEventTime.Store(option.Some(*eventTime)) latestEvent = option.Some(changeEvents[eventsRead]) } eventsRead++ } + sess := mongo.SessionFromContext(sctx) + csr.updateLag(sess, cs.ResumeToken()) if eventsRead == 0 { @@ -224,15 +248,11 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch( ri.NoteSuccess("parsed %d-event batch", len(changeEvents)) select { - case <-ctx.Done(): - return util.WrapCtxErrWithCause(ctx) + case <-sctx.Done(): + return util.WrapCtxErrWithCause(sctx) case csr.changeEventBatchChan <- changeEventBatch{ - events: changeEvents, - + events: changeEvents, resumeToken: cs.ResumeToken(), - - // NB: We know by now that OperationTime is non-nil. - clusterTime: *sess.OperationTime(), }: } @@ -244,9 +264,14 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch( func (csr *ChangeStreamReader) iterateChangeStream( ctx context.Context, ri *retry.FuncInfo, - cs *mongo.ChangeStream, sess *mongo.Session, ) error { + sctx := mongo.NewSessionContext(ctx, sess) + + cs := csr.changeStream + + defer cs.Close(sctx) + for { var err error var gotwritesOffTimestamp bool @@ -297,7 +322,7 @@ func (csr *ChangeStreamReader) iterateChangeStream( break } - err = csr.readAndHandleOneChangeEventBatch(ctx, ri, cs, sess) + err = csr.readAndHandleOneChangeEventBatch(sctx, ri, cs) if err != nil { return err @@ -305,7 +330,7 @@ func (csr *ChangeStreamReader) iterateChangeStream( } default: - err = csr.readAndHandleOneChangeEventBatch(ctx, ri, cs, sess) + err = csr.readAndHandleOneChangeEventBatch(sctx, ri, cs) if err != nil { return err @@ -314,8 +339,8 @@ func (csr *ChangeStreamReader) iterateChangeStream( if gotwritesOffTimestamp { csr.running = false - if csr.lastChangeEventTime != nil { - csr.startAtTs = csr.lastChangeEventTime + if ts, has := csr.lastChangeEventTime.Load().Get(); has { + csr.startAtTs = &ts } break @@ -338,7 +363,8 @@ func (csr *ChangeStreamReader) iterateChangeStream( func (csr *ChangeStreamReader) createChangeStream( ctx context.Context, -) (*mongo.ChangeStream, *mongo.Session, bson.Timestamp, error) { + sess *mongo.Session, +) (bson.Timestamp, error) { pipeline := csr.GetChangeStreamFilter() opts := options.ChangeStream(). SetMaxAwaitTime(maxChangeStreamAwaitTime) @@ -354,14 +380,14 @@ func (csr *ChangeStreamReader) createChangeStream( savedResumeToken, err := csr.loadResumeToken(ctx) if err != nil { - return nil, nil, bson.Timestamp{}, errors.Wrap(err, "failed to load persisted change stream resume token") + return bson.Timestamp{}, errors.Wrap(err, "failed to load persisted change stream resume token") } csStartLogEvent := csr.logger.Info() if token, hasToken := savedResumeToken.Get(); hasToken { logEvent := csStartLogEvent. - Stringer(csr.resumeTokenDocID(), token) + Stringer(resumeTokenDocID(csr.readerType), token) ts, err := csr.resumeTokenTSExtractor(token) if err == nil { @@ -379,24 +405,25 @@ func (csr *ChangeStreamReader) createChangeStream( csStartLogEvent.Msgf("Starting change stream from current %s cluster time.", csr.readerType) } - sess, err := csr.watcherClient.StartSession() - if err != nil { - return nil, nil, bson.Timestamp{}, errors.Wrap(err, "failed to start session") - } sctx := mongo.NewSessionContext(ctx, sess) + changeStream, err := csr.watcherClient.Watch(sctx, pipeline, opts) if err != nil { - return nil, nil, bson.Timestamp{}, errors.Wrap(err, "failed to open change stream") + return bson.Timestamp{}, errors.Wrap(err, "failed to open change stream") } - err = csr.persistResumeToken(ctx, changeStream.ResumeToken()) + resumeToken := changeStream.ResumeToken() + + err = csr.persistResumeToken(ctx, resumeToken) if err != nil { - return nil, nil, bson.Timestamp{}, err + changeStream.Close(sctx) + return bson.Timestamp{}, err } - startTs, err := csr.resumeTokenTSExtractor(changeStream.ResumeToken()) + startTs, err := csr.resumeTokenTSExtractor(resumeToken) if err != nil { - return nil, nil, bson.Timestamp{}, errors.Wrap(err, "failed to extract timestamp from change stream's resume token") + changeStream.Close(sctx) + return bson.Timestamp{}, errors.Wrap(err, "failed to extract timestamp from change stream's resume token") } // With sharded clusters the resume token might lead the cluster time @@ -404,7 +431,8 @@ func (csr *ChangeStreamReader) createChangeStream( // otherwise we will get errors. clusterTime, err := util.GetClusterTimeFromSession(sess) if err != nil { - return nil, nil, bson.Timestamp{}, errors.Wrap(err, "failed to read cluster time from session") + changeStream.Close(sctx) + return bson.Timestamp{}, errors.Wrap(err, "failed to read cluster time from session") } csr.logger.Debug(). @@ -417,93 +445,9 @@ func (csr *ChangeStreamReader) createChangeStream( startTs = clusterTime } - return changeStream, sess, startTs, nil -} - -// StartChangeStream starts the change stream. -func (csr *ChangeStreamReader) start( - ctx context.Context, - eg *errgroup.Group, -) error { - // This channel holds the first change stream creation's result, whether - // success or failure. Rather than using a Result we could make separate - // Timestamp and error channels, but the single channel is cleaner since - // there's no chance of "nonsense" like both channels returning a payload. - initialCreateResultChan := make(chan mo.Result[bson.Timestamp]) - - eg.Go( - func() error { - // Closing changeEventBatchChan at the end of change stream goroutine - // notifies the verifier's change event handler to exit. - defer func() { - csr.logger.Debug(). - Stringer("changeStreamReader", csr). - Msg("Closing change event batch channel.") - - close(csr.changeEventBatchChan) - }() - - retryer := retry.New().WithErrorCodes(util.CursorKilledErrCode) - - parentThreadWaiting := true - - err := retryer.WithCallback( - func(ctx context.Context, ri *retry.FuncInfo) error { - changeStream, sess, startTs, err := csr.createChangeStream(ctx) - if err != nil { - logEvent := csr.logger.Debug(). - Err(err). - Stringer("changeStreamReader", csr) - - if parentThreadWaiting { - logEvent.Msg("First change stream open failed.") - - initialCreateResultChan <- mo.Err[bson.Timestamp](err) - return nil - } - - logEvent.Msg("Retried change stream open failed.") - - return err - } - - defer changeStream.Close(ctx) - - logEvent := csr.logger.Debug(). - Stringer("changeStreamReader", csr). - Any("startTimestamp", startTs) - - if parentThreadWaiting { - logEvent.Msg("First change stream open succeeded.") - - initialCreateResultChan <- mo.Ok(startTs) - close(initialCreateResultChan) - parentThreadWaiting = false - } else { - logEvent.Msg("Retried change stream open succeeded.") - } - - return csr.iterateChangeStream(ctx, ri, changeStream, sess) - }, - "running %s", csr, - ).Run(ctx, csr.logger) - - return err - }, - ) - - result := <-initialCreateResultChan - - startTs, err := result.Get() - if err != nil { - return errors.Wrapf(err, "creating change stream") - } - - csr.startAtTs = &startTs - - csr.running = true + csr.changeStream = changeStream - return nil + return startTs, nil } func (csr *ChangeStreamReader) String() string { diff --git a/internal/verifier/change_stream_test.go b/internal/verifier/change_stream_test.go index 5f746f14..05b1d90e 100644 --- a/internal/verifier/change_stream_test.go +++ b/internal/verifier/change_stream_test.go @@ -345,15 +345,20 @@ func (suite *IntegrationTestSuite) TestChangeStream_Resume_NoSkip() { assert.Eventually( suite.T(), func() bool { - rt, err := changeStreamMetaColl.FindOne(ctx, bson.D{}).Raw() + rt, err := changeStreamMetaColl.FindOne( + ctx, + bson.D{ + {"_id", resumeTokenDocID(src)}, + }, + ).Raw() require.NoError(suite.T(), err) - suite.T().Logf("found rt: %v\n", rt) + suite.T().Logf("found rt: %v", rt) return !bytes.Equal(rt, originalResumeToken) }, time.Minute, - 50*time.Millisecond, + 500*time.Millisecond, "should see a new change stream resume token persisted", ) @@ -700,10 +705,9 @@ func (suite *IntegrationTestSuite) TestStartAtTimeWithChanges() { startAtTs, hasStartAtTs = verifier.srcChangeReader.getStartTimestamp().Get() suite.Require().True(hasStartAtTs, "startAtTs should be set") - suite.Assert().Equal( - *postEventsSessionTime, - startAtTs, - "verifier.srcStartAtTs should now be our session timestamp", + suite.Assert().False( + startAtTs.Before(*postEventsSessionTime), + "verifier.srcStartAtTs should now be at least at the session timestamp", ) } @@ -955,7 +959,7 @@ func (suite *IntegrationTestSuite) TestCreateForbidden() { eventErr := UnknownEventError{} suite.Require().ErrorAs(err, &eventErr) - suite.Assert().Equal("create", eventErr.Event.Lookup("operationType").StringValue()) + suite.Assert().Contains(string(eventErr.Event), "create") } func (suite *IntegrationTestSuite) TestTolerateDestinationCollMod() { diff --git a/internal/verifier/check.go b/internal/verifier/check.go index 529c3e3f..ba73ff71 100644 --- a/internal/verifier/check.go +++ b/internal/verifier/check.go @@ -6,13 +6,9 @@ import ( "time" "github.com/10gen/migration-verifier/contextplus" - "github.com/10gen/migration-verifier/history" "github.com/10gen/migration-verifier/internal/logger" "github.com/10gen/migration-verifier/internal/retry" - "github.com/10gen/migration-verifier/internal/util" "github.com/10gen/migration-verifier/mslices" - "github.com/10gen/migration-verifier/msync" - "github.com/10gen/migration-verifier/option" mapset "github.com/deckarep/golang-set/v2" "github.com/goaux/timer" "github.com/pkg/errors" @@ -35,6 +31,11 @@ var ( verificationTaskFailed, verificationTaskMetadataMismatch, ) + + ChangeReaderOpts = mslices.Of( + ChangeReaderOptChangeStream, + ChangeReaderOptOplog, + ) ) // Check is the asynchronous entry point to Check, should only be called by the web server. Use @@ -219,9 +220,13 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh // Now that we’ve initialized verifier.generation we can // start the change readers. - verifier.initializeChangeReaders() + err = verifier.initializeChangeReaders() verifier.mux.Unlock() + if err != nil { + return err + } + err = retry.New().WithCallback( func(ctx context.Context, _ *retry.FuncInfo) error { err = verifier.AddMetaIndexes(ctx) @@ -267,6 +272,7 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh return errors.Wrapf(err, "failed to start %s", changeReader) } changeReaderGroup.Go(func() error { + defer fmt.Printf("----- %s persistor finished\n", changeReader.String()) return verifier.RunChangeEventPersistor(groupCtx, changeReader) }) } @@ -355,7 +361,7 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh // caught again on the next iteration. if verifier.writesOff { verifier.logger.Debug(). - Msg("Waiting for change readers to end.") + Msg("Waiting for change handling to finish.") // It's necessary to wait for the change reader to finish before incrementing the // generation number, or the last changes will not be checked. @@ -600,36 +606,54 @@ func (verifier *Verifier) work(ctx context.Context, workerNum int) error { } } -func (verifier *Verifier) initializeChangeReaders() { - srcReader := &ChangeStreamReader{ - ChangeReaderCommon: ChangeReaderCommon{ - readerType: src, - namespaces: verifier.srcNamespaces, - watcherClient: verifier.srcClient, - clusterInfo: *verifier.srcClusterInfo, - }, +func (v *Verifier) initializeChangeReaders() error { + warnAboutOplog := func(cluster whichCluster) { + v.logger.Warn(). + Str("cluster", string(cluster)). + Msg("Reading writes via oplog tailing. This feature is experimental.") } - verifier.srcChangeReader = srcReader - - dstReader := &ChangeStreamReader{ - ChangeReaderCommon: ChangeReaderCommon{ - readerType: dst, - namespaces: verifier.dstNamespaces, - watcherClient: verifier.dstClient, - clusterInfo: *verifier.dstClusterInfo, - onDDLEvent: onDDLEventAllow, - }, + + switch v.srcChangeReaderMethod { + case ChangeReaderOptOplog: + warnAboutOplog(src) + + v.srcChangeReader = v.newOplogReader( + v.srcNamespaces, + src, + v.srcClient, + *v.srcClusterInfo, + ) + case ChangeReaderOptChangeStream: + v.srcChangeReader = v.newChangeStreamReader( + v.srcNamespaces, + src, + v.srcClient, + *v.srcClusterInfo, + ) + default: + return fmt.Errorf("bad source change reader: %#q", v.srcChangeReaderMethod) } - verifier.dstChangeReader = dstReader - - // Common elements in both readers: - for _, csr := range mslices.Of(srcReader, dstReader) { - csr.logger = verifier.logger - csr.metaDB = verifier.metaClient.Database(verifier.metaDBName) - csr.changeEventBatchChan = make(chan changeEventBatch, batchChanBufferSize) - csr.writesOffTs = util.NewEventual[bson.Timestamp]() - csr.lag = msync.NewTypedAtomic(option.None[time.Duration]()) - csr.batchSizeHistory = history.New[int](time.Minute) - csr.resumeTokenTSExtractor = extractTSFromChangeStreamResumeToken + + switch v.dstChangeReaderMethod { + case ChangeReaderOptOplog: + warnAboutOplog(dst) + + v.dstChangeReader = v.newOplogReader( + v.dstNamespaces, + dst, + v.dstClient, + *v.dstClusterInfo, + ) + case ChangeReaderOptChangeStream: + v.dstChangeReader = v.newChangeStreamReader( + v.dstNamespaces, + dst, + v.dstClient, + *v.dstClusterInfo, + ) + default: + return fmt.Errorf("bad destination change reader: %#q", v.srcChangeReaderMethod) } + + return nil } diff --git a/internal/verifier/compare.go b/internal/verifier/compare.go index 85fbfca0..f31b34f7 100644 --- a/internal/verifier/compare.go +++ b/internal/verifier/compare.go @@ -18,7 +18,6 @@ import ( "go.mongodb.org/mongo-driver/v2/bson" "go.mongodb.org/mongo-driver/v2/mongo" "go.mongodb.org/mongo-driver/v2/mongo/options" - "go.mongodb.org/mongo-driver/v2/mongo/readpref" "golang.org/x/exp/slices" ) @@ -467,7 +466,7 @@ func (verifier *Verifier) getFetcherChannelsAndCallbacks( sctx, verifier.srcClientCollection(task), verifier.srcClusterInfo, - verifier.srcChangeReader.getStartTimestamp().ToPointer(), + verifier.srcChangeReader.getLatestTimestamp().ToPointer(), task, ) @@ -500,7 +499,7 @@ func (verifier *Verifier) getFetcherChannelsAndCallbacks( sctx, verifier.dstClientCollection(task), verifier.dstClusterInfo, - verifier.dstChangeReader.getStartTimestamp().ToPointer(), + verifier.dstChangeReader.getLatestTimestamp().ToPointer(), task, ) @@ -532,25 +531,29 @@ func iterateCursorToChannel( ) error { defer close(writer) - sess := mongo.SessionFromContext(sctx) + //sess := mongo.SessionFromContext(sctx) for cursor.Next(sctx) { state.NoteSuccess("received a document") - clusterTime, err := util.GetClusterTimeFromSession(sess) - if err != nil { - return errors.Wrap(err, "reading cluster time from session") - } + fmt.Printf("----- received a document: %+v\n\n", cursor.Current) + + /* + clusterTime, err := util.GetClusterTimeFromSession(sess) + if err != nil { + return errors.Wrap(err, "reading cluster time from session") + } + */ buf := pool.Get(len(cursor.Current)) copy(buf, cursor.Current) - err = chanutil.WriteWithDoneCheck( + err := chanutil.WriteWithDoneCheck( sctx, writer, docWithTs{ doc: buf, - ts: clusterTime, + //ts: clusterTime, }, ) @@ -573,7 +576,7 @@ func getMapKey(docKeyValues []bson.RawValue) string { return keyBuffer.String() } -func (verifier *Verifier) getDocumentsCursor(ctx context.Context, collection *mongo.Collection, clusterInfo *util.ClusterInfo, +func (verifier *Verifier) getDocumentsCursor(sctx context.Context, collection *mongo.Collection, clusterInfo *util.ClusterInfo, startAtTs *bson.Timestamp, task *VerificationTask) (*mongo.Cursor, error) { var findOptions bson.D runCommandOptions := options.RunCmd() @@ -590,6 +593,7 @@ func (verifier *Verifier) getDocumentsCursor(ctx context.Context, collection *mo case DocQueryFunctionFind: findOptions = bson.D{ bson.E{"filter", filter}, + //bson.E{"readConcern", readconcern.Majority()}, } case DocQueryFunctionAggregate: aggOptions = bson.D{ @@ -656,42 +660,68 @@ func (verifier *Verifier) getDocumentsCursor(ctx context.Context, collection *mo ) } - if verifier.readPreference.Mode() != readpref.PrimaryMode { - runCommandOptions = runCommandOptions.SetReadPreference(verifier.readPreference) - if startAtTs != nil { - readConcern := bson.D{ - {"afterClusterTime", *startAtTs}, - } + runCommandOptions = runCommandOptions.SetReadPreference(verifier.readPreference) - // We never want to read before the change stream start time, - // or for the last generation, the change stream end time. - cmd = append( - cmd, - bson.E{"readConcern", readConcern}, - ) + if startAtTs != nil { + readConcern := bson.D{ + {"level", "snapshot"}, + {"afterClusterTime", *startAtTs}, } + + // We never want to read before the change stream start time, + // or for the last generation, the change stream end time. + cmd = append( + cmd, + bson.E{"readConcern", readConcern}, + ) } // Suppress this log for recheck tasks because the list of IDs can be // quite long. - if !task.IsRecheck() { - if verifier.logger.Trace().Enabled() { - evt := verifier.logger.Trace(). - Any("task", task.PrimaryKey) + /* + if !task.IsRecheck() { + if verifier.logger.Trace().Enabled() { + */ + evt := verifier.logger.Debug(). + Any("task", task.PrimaryKey) + + cmdStr, err := bson.MarshalExtJSON(cmd, true, false) + if err != nil { + cmdStr = fmt.Appendf(nil, "%s", cmd) + } - cmdStr, err := bson.MarshalExtJSON(cmd, true, false) - if err != nil { - cmdStr = fmt.Appendf(nil, "%s", cmd) + evt. + Str("cmd", string(cmdStr)). + Str("options", fmt.Sprintf("%v", *runCommandOptions)). + Msg("getDocuments command.") + /* + } } + */ - evt. - Str("cmd", string(cmdStr)). - Str("options", fmt.Sprintf("%v", *runCommandOptions)). - Msg("getDocuments command.") - } - } + /* + sess := lo.Must(collection.Database().Client().StartSession()) + defer sess.EndSession(ctx) + + sess.AdvanceOperationTime(startAtTs) + */ - return collection.Database().RunCommandCursor(ctx, cmd, runCommandOptions) + /* + lo.Must0(mongo.SessionFromContext(sctx).AdvanceOperationTime(startAtTs)) + lo.Must0(mongo.SessionFromContext(sctx).AdvanceClusterTime(lo.Must(bson.Marshal( + bson.D{ + {"$clusterTime", bson.D{ + {"clusterTime", *startAtTs}, + }}, + }, + )))) + */ + + return collection.Database().RunCommandCursor( + mongo.NewSessionContext(sctx, nil), + cmd, + runCommandOptions, + ) } func transformPipelineForToHashedIndexKey( diff --git a/internal/verifier/integration_test_suite.go b/internal/verifier/integration_test_suite.go index 19aff0ab..19291208 100644 --- a/internal/verifier/integration_test_suite.go +++ b/internal/verifier/integration_test_suite.go @@ -1,6 +1,7 @@ package verifier import ( + "cmp" "context" "os" "strings" @@ -191,11 +192,26 @@ func (suite *IntegrationTestSuite) BuildVerifier() *Verifier { "should set metadata connection string", ) verifier.SetMetaDBName(metaDBName) - verifier.initializeChangeReaders() + + envSrcChangeReader := cmp.Or( + os.Getenv("MVTEST_SRC_CHANGE_READER"), + ChangeReaderOptChangeStream, + ) + suite.Require().NoError(verifier.SetSrcChangeReader(envSrcChangeReader)) + + envDstChangeReader := cmp.Or( + os.Getenv("MVTEST_DST_CHANGE_READER"), + ChangeReaderOptChangeStream, + ) + + suite.Require().NoError(verifier.SetDstChangeReader(envDstChangeReader)) + + suite.Require().NoError(verifier.initializeChangeReaders()) suite.Require().NoError(verifier.srcClientCollection(&task).Drop(ctx)) suite.Require().NoError(verifier.dstClientCollection(&task).Drop(ctx)) suite.Require().NoError(verifier.AddMetaIndexes(ctx)) + return verifier } diff --git a/internal/verifier/migration_verifier.go b/internal/verifier/migration_verifier.go index e23445cb..b26baa5b 100644 --- a/internal/verifier/migration_verifier.go +++ b/internal/verifier/migration_verifier.go @@ -103,6 +103,9 @@ type Verifier struct { srcEventRecorder *EventRecorder dstEventRecorder *EventRecorder + srcChangeReaderMethod string + dstChangeReaderMethod string + changeHandlingErr *util.Eventual[error] // Used only with generation 0 to defer the first @@ -245,7 +248,7 @@ func (verifier *Verifier) WritesOff(ctx context.Context) error { } verifier.writesOff = true - verifier.logger.Debug().Msg("Signalling that writes are done.") + verifier.logger.Debug().Msg("Signaling that writes are done.") srcFinalTs, err = GetNewClusterTime( ctx, @@ -378,6 +381,50 @@ func (verifier *Verifier) SetDocCompareMethod(method DocCompareMethod) { verifier.docCompareMethod = method } +func (verifier *Verifier) SetSrcChangeReader(method string) error { + err := validateChangeReaderOpt(method, *verifier.srcClusterInfo) + if err != nil { + return errors.Wrap(err, "setting source change reader method") + } + + verifier.srcChangeReaderMethod = method + + return nil +} + +func (verifier *Verifier) SetDstChangeReader(method string) error { + err := validateChangeReaderOpt(method, *verifier.dstClusterInfo) + if err != nil { + return errors.Wrap(err, "setting source change reader method") + } + + verifier.dstChangeReaderMethod = method + + return nil +} + +func validateChangeReaderOpt( + method string, + clusterInfo util.ClusterInfo, +) error { + if method != ChangeReaderOptOplog { + return nil + } + + var whyNoOplog string + + switch { + case clusterInfo.Topology == util.TopologySharded: + whyNoOplog = "sharded" + } + + if whyNoOplog != "" { + return fmt.Errorf("cannot read oplog (%s)", whyNoOplog) + } + + return nil +} + func (verifier *Verifier) SetVerifyAll(arg bool) { verifier.verifyAll = arg } diff --git a/internal/verifier/migration_verifier_test.go b/internal/verifier/migration_verifier_test.go index 396077bf..a933a722 100644 --- a/internal/verifier/migration_verifier_test.go +++ b/internal/verifier/migration_verifier_test.go @@ -1686,19 +1686,21 @@ func (suite *IntegrationTestSuite) TestVerifierCompareIndexes() { func (suite *IntegrationTestSuite) TestVerifierDocMismatches() { ctx := suite.Context() + dbName := suite.DBNameForTest() + suite.Require().NoError( suite.srcMongoClient. - Database("test"). + Database(dbName). Collection("coll").Drop(ctx), ) suite.Require().NoError( suite.dstMongoClient. - Database("test"). + Database(dbName). Collection("coll").Drop(ctx), ) _, err := suite.srcMongoClient. - Database("test"). + Database(dbName). Collection("coll"). InsertMany( ctx, @@ -1717,7 +1719,7 @@ func (suite *IntegrationTestSuite) TestVerifierDocMismatches() { // The first has a mismatched `foo` value, // and the 2nd lacks `foo` entirely. _, err = suite.dstMongoClient. - Database("test"). + Database(dbName). Collection("coll"). InsertMany(ctx, lo.ToAnySlice([]bson.D{ {{"_id", 100000}, {"foo", 1}}, @@ -1728,7 +1730,7 @@ func (suite *IntegrationTestSuite) TestVerifierDocMismatches() { verifier := suite.BuildVerifier() verifier.failureDisplaySize = 10 - ns := "test.coll" + ns := dbName + ".coll" verifier.SetSrcNamespaces([]string{ns}) verifier.SetDstNamespaces([]string{ns}) verifier.SetNamespaceMap() @@ -2318,15 +2320,23 @@ func (suite *IntegrationTestSuite) TestVerifierWithFilter() { _, err = srcColl.InsertOne(ctx, bson.M{"_id": 201, "x": 201, "inFilter": true}) suite.Require().NoError(err) - // Tell check to start the next generation. - checkContinueChan <- struct{}{} + suite.Require().Eventually( + func() bool { + suite.T().Log("checking to see if a failure was found yet") - // Wait for one generation to finish. - <-checkDoneChan - status = waitForTasks() + // Tell check to start the next generation. + checkContinueChan <- struct{}{} - // There should be a failure from the src insert of a document in the filter. - suite.Require().Equal(VerificationStatus{TotalTasks: 1, FailedTasks: 1}, *status) + // Wait for one generation to finish. + <-checkDoneChan + status = waitForTasks() + + return *status == VerificationStatus{TotalTasks: 1, FailedTasks: 1} + }, + time.Minute, + time.Second, + "we should see a failure from the src insert of a document in the filter.", + ) // Now patch up the destination. _, err = dstColl.InsertOne(ctx, bson.M{"_id": 201, "x": 201, "inFilter": true}) @@ -2342,6 +2352,8 @@ func (suite *IntegrationTestSuite) TestVerifierWithFilter() { // There should be no failures now, since they are equivalent at this point in time. suite.Require().Equal(VerificationStatus{TotalTasks: 1, CompletedTasks: 1}, *status) + suite.T().Log("Finalizing test") + // Turn writes off. suite.Require().NoError(verifier.WritesOff(ctx)) diff --git a/internal/verifier/namespaces/meta.go b/internal/verifier/namespaces/meta.go new file mode 100644 index 00000000..97cf09af --- /dev/null +++ b/internal/verifier/namespaces/meta.go @@ -0,0 +1,10 @@ +package namespaces + +import "github.com/10gen/migration-verifier/mslices" + +var ( + MongosyncMetaDBPrefixes = mslices.Of( + "mongosync_internal_", + "mongosync_reserved_", + ) +) diff --git a/internal/verifier/oplog/oplog.go b/internal/verifier/oplog/oplog.go new file mode 100644 index 00000000..91d31a8a --- /dev/null +++ b/internal/verifier/oplog/oplog.go @@ -0,0 +1,125 @@ +package oplog + +import ( + "encoding/binary" + "fmt" + "slices" + + "github.com/10gen/migration-verifier/mbson" + "github.com/pkg/errors" + "go.mongodb.org/mongo-driver/v2/bson" + "go.mongodb.org/mongo-driver/v2/x/bsonx/bsoncore" +) + +const ( + rtBSONLength = 4 + 1 + 2 + 1 + 8 + 1 +) + +type Op struct { + Op string + TS bson.Timestamp + Ns string + CmdName string + DocLen int32 + DocID bson.RawValue + Ops []Op +} + +type ResumeToken struct { + TS bson.Timestamp +} + +func GetRawResumeTokenTimestamp(token bson.Raw) (bson.Timestamp, error) { + rv, err := token.LookupErr("ts") + if err != nil { + return bson.Timestamp{}, errors.Wrap(err, "getting ts") + } + + return mbson.CastRawValue[bson.Timestamp](rv) +} + +func (rt ResumeToken) MarshalToBSON() []byte { + buf := make([]byte, 4, rtBSONLength) + + binary.LittleEndian.PutUint32(buf, uint32(cap(buf))) + + buf = bsoncore.AppendTimestampElement(buf, "ts", rt.TS.T, rt.TS.I) + + buf = append(buf, 0) + + if len(buf) != rtBSONLength { + panic(fmt.Sprintf("bad resume token BSON length: %d", len(buf))) + } + + return buf +} + +func (o *Op) UnmarshalFromBSON(in []byte) error { + //fmt.Printf("---- unmarshaling: %+v\n\n", bson.Raw(in)) + + for el, err := range mbson.RawElements(bson.Raw(in)) { + if err != nil { + return errors.Wrap(err, "iterating BSON document") + } + + key, err := el.KeyErr() + if err != nil { + return errors.Wrap(err, "reading BSON field name") + } + + switch key { + case "op": + err = mbson.UnmarshalElementValue(el, &o.Op) + case "ts": + err = mbson.UnmarshalElementValue(el, &o.TS) + case "ns": + err = mbson.UnmarshalElementValue(el, &o.Ns) + case "cmdName": + err = mbson.UnmarshalElementValue(el, &o.CmdName) + case "docLen": + err = mbson.UnmarshalElementValue(el, &o.DocLen) + case "docID": + o.DocID, err = el.ValueErr() + if err != nil { + err = errors.Wrapf(err, "parsing %#q value", key) + } + o.DocID.Value = slices.Clone(o.DocID.Value) + case "ops": + var arr bson.RawArray + err = errors.Wrapf( + mbson.UnmarshalElementValue(el, &arr), + "parsing ops", + ) + + if err == nil { + vals, err := arr.Values() + if err != nil { + return errors.Wrap(err, "parsing applyOps") + } + + o.Ops = make([]Op, len(vals)) + + for i, val := range vals { + + var opRaw bson.Raw + err := mbson.UnmarshalRawValue(val, &opRaw) + if err != nil { + return errors.Wrapf(err, "parsing applyOps field") + } + + if err := (&o.Ops[i]).UnmarshalFromBSON(opRaw); err != nil { + return errors.Wrapf(err, "parsing applyOps value") + } + } + } + default: + err = errors.Wrapf(err, "unexpected field %#q", key) + } + + if err != nil { + return err + } + } + + return nil +} diff --git a/internal/verifier/oplog/start_time.go b/internal/verifier/oplog/start_time.go new file mode 100644 index 00000000..494ac8a8 --- /dev/null +++ b/internal/verifier/oplog/start_time.go @@ -0,0 +1,161 @@ +package oplog + +import ( + "context" + "fmt" + + "github.com/10gen/migration-verifier/option" + "github.com/pkg/errors" + "go.mongodb.org/mongo-driver/v2/bson" + "go.mongodb.org/mongo-driver/v2/mongo" + "go.mongodb.org/mongo-driver/v2/mongo/options" + "go.mongodb.org/mongo-driver/v2/mongo/readconcern" +) + +func GetTailingStartTimes( + ctx context.Context, + client *mongo.Client, +) (OpTime, OpTime, error) { + oldestTxn, err := getOldestTransactionTime(ctx, client) + if err != nil { + return OpTime{}, OpTime{}, errors.Wrapf(err, "finding oldest txn") + } + + latestTime, err := getLatestVisibleOplogOpTime(ctx, client) + if err != nil { + return OpTime{}, OpTime{}, errors.Wrapf(err, "finding latest optime") + } + + if oldestTime, has := oldestTxn.Get(); has { + return oldestTime, latestTime, nil + } + + return latestTime, latestTime, nil +} + +type OpTime struct { + TS bson.Timestamp + T int64 + H option.Option[int64] +} + +func (ot OpTime) Equals(ot2 OpTime) bool { + if !ot.TS.Equal(ot2.TS) { + return false + } + + if ot.T != ot2.T { + return false + } + + return ot.H.OrZero() == ot2.H.OrZero() +} + +// GetLatestOplogOpTime returns the optime of the most recent oplog +// record satisfying the given `query` or a zero-value db.OpTime{} if +// no oplog record matches. This method does not ensure that all prior oplog +// entries are visible (i.e. have been storage-committed). +func getLatestOplogOpTime( + ctx context.Context, + client *mongo.Client, +) (OpTime, error) { + var optime OpTime + + opts := options.FindOne(). + SetProjection(bson.M{"ts": 1, "t": 1, "h": 1}). + SetSort(bson.D{{"$natural", -1}}) + + coll := client.Database("local").Collection("oplog.rs") + + res := coll.FindOne(ctx, bson.D{}, opts) + if err := res.Err(); err != nil { + return OpTime{}, err + } + + if err := res.Decode(&optime); err != nil { + return OpTime{}, err + } + return optime, nil +} + +func getLatestVisibleOplogOpTime( + ctx context.Context, + client *mongo.Client, +) (OpTime, error) { + + latestOpTime, err := getLatestOplogOpTime(ctx, client) + if err != nil { + return OpTime{}, err + } + + coll := client.Database("local").Collection("oplog.rs") + + // Do a forward scan starting at the last op fetched to ensure that + // all operations with earlier oplog times have been storage-committed. + result, err := coll.FindOne(ctx, + bson.M{"ts": bson.M{"$gte": latestOpTime.TS}}, + + //nolint SA1019 + options.FindOne().SetOplogReplay(true), + ).Raw() + if err != nil { + if errors.Is(err, mongo.ErrNoDocuments) { + return OpTime{}, fmt.Errorf( + "last op was not confirmed. last optime: %+v. confirmation time was not found", + latestOpTime, + ) + } + return OpTime{}, err + } + + var optime OpTime + + if err := bson.Unmarshal(result, &optime); err != nil { + return OpTime{}, errors.Wrap(err, "local.oplog.rs error") + } + + if !optime.Equals(latestOpTime) { + return OpTime{}, fmt.Errorf( + "last op was not confirmed. last optime: %+v. confirmation time: %+v", + latestOpTime, + optime, + ) + } + + return latestOpTime, nil +} + +func getOldestTransactionTime( + ctx context.Context, + client *mongo.Client, +) (option.Option[OpTime], error) { + coll := client.Database("config"). + Collection( + "transactions", + options.Collection().SetReadConcern(readconcern.Local()), + ) + + decoded := struct { + StartOpTime OpTime + }{} + + err := coll.FindOne( + ctx, + bson.D{ + {"state", bson.D{ + {"$in", bson.A{"prepared", "inProgress"}}, + }}, + }, + options.FindOne().SetSort(bson.D{{"startOpTime", 1}}), + ).Decode(&decoded) + + if errors.Is(err, mongo.ErrNoDocuments) { + return option.None[OpTime](), nil + } + + if err != nil { + return option.None[OpTime](), errors.Wrap(err, "config.transactions.findOne") + } + + return option.Some(decoded.StartOpTime), nil +} diff --git a/internal/verifier/oplog_reader.go b/internal/verifier/oplog_reader.go new file mode 100644 index 00000000..cff4aaaa --- /dev/null +++ b/internal/verifier/oplog_reader.go @@ -0,0 +1,682 @@ +package verifier + +import ( + "context" + "fmt" + + "github.com/10gen/migration-verifier/agg" + "github.com/10gen/migration-verifier/agg/helpers" + "github.com/10gen/migration-verifier/internal/retry" + "github.com/10gen/migration-verifier/internal/types" + "github.com/10gen/migration-verifier/internal/util" + "github.com/10gen/migration-verifier/internal/verifier/namespaces" + "github.com/10gen/migration-verifier/internal/verifier/oplog" + "github.com/10gen/migration-verifier/mbson" + "github.com/10gen/migration-verifier/mmongo" + "github.com/10gen/migration-verifier/option" + "github.com/pkg/errors" + "github.com/samber/lo" + "go.mongodb.org/mongo-driver/v2/bson" + "go.mongodb.org/mongo-driver/v2/mongo" + "go.mongodb.org/mongo-driver/v2/mongo/options" + "go.mongodb.org/mongo-driver/v2/mongo/readconcern" + "go.mongodb.org/mongo-driver/v2/x/bsonx/bsoncore" + "golang.org/x/exp/slices" +) + +const ( + ChangeReaderOptOplog = "tailOplog" +) + +// OplogReader reads change events via oplog tailing instead of a change stream. +// This significantly lightens server load and allows verification of heavier +// workloads than change streams allow. It only works with replica sets. +type OplogReader struct { + *ChangeReaderCommon + + curDocs []bson.Raw + scratch []byte + cursor *mongo.Cursor + allowDDLBeforeTS bson.Timestamp +} + +var _ changeReader = &OplogReader{} + +func (v *Verifier) newOplogReader( + namespaces []string, + cluster whichCluster, + client *mongo.Client, + clusterInfo util.ClusterInfo, +) *OplogReader { + common := newChangeReaderCommon(cluster) + common.namespaces = namespaces + common.watcherClient = client + common.clusterInfo = clusterInfo + + common.logger = v.logger + common.metaDB = v.metaClient.Database(v.metaDBName) + + common.resumeTokenTSExtractor = oplog.GetRawResumeTokenTimestamp + + o := &OplogReader{ChangeReaderCommon: &common} + + common.createIteratorCb = o.createCursor + common.iterateCb = o.iterateCursor + + return o +} + +func (o *OplogReader) createCursor( + ctx context.Context, + sess *mongo.Session, +) (bson.Timestamp, error) { + savedResumeToken, err := o.loadResumeToken(ctx) + if err != nil { + return bson.Timestamp{}, errors.Wrap(err, "loading persisted resume token") + } + + var allowDDLBeforeTS bson.Timestamp + + var startTS bson.Timestamp + + if token, has := savedResumeToken.Get(); has { + var rt oplog.ResumeToken + if err := bson.Unmarshal(token, &rt); err != nil { + return bson.Timestamp{}, errors.Wrap(err, "parsing persisted resume token") + } + + ddlAllowanceResult := o.getMetadataCollection().FindOne( + ctx, + bson.D{ + {"_id", o.ddlAllowanceDocID()}, + }, + ) + + allowanceRaw, err := ddlAllowanceResult.Raw() + if err != nil { + return bson.Timestamp{}, errors.Wrap(err, "fetching DDL allowance timestamp") + } + + allowDDLBeforeTS, err = mbson.Lookup[bson.Timestamp](allowanceRaw, "ts") + if err != nil { + return bson.Timestamp{}, errors.Wrap(err, "parsing DDL allowance timestamp doc") + } + + startTS = rt.TS + } else { + startOpTime, latestOpTime, err := oplog.GetTailingStartTimes(ctx, o.watcherClient) + if err != nil { + return bson.Timestamp{}, errors.Wrapf(err, "getting start optime from %s", o.readerType) + } + + allowDDLBeforeTS = latestOpTime.TS + + _, err = o.getMetadataCollection().ReplaceOne( + ctx, + bson.D{ + {"_id", o.ddlAllowanceDocID()}, + }, + bson.D{ + {"ts", allowDDLBeforeTS}, + }, + options.Replace().SetUpsert(true), + ) + if err != nil { + return bson.Timestamp{}, errors.Wrapf(err, "persisting DDL-allowance timestamp") + } + + startTS = startOpTime.TS + + err = o.persistResumeToken(ctx, oplog.ResumeToken{startTS}.MarshalToBSON()) + if err != nil { + return bson.Timestamp{}, errors.Wrap(err, "persisting resume token") + } + } + + o.logger.Info(). + Any("startReadTs", startTS). + Any("currentOplogTs", allowDDLBeforeTS). + Msg("Tailing oplog.") + + sctx := mongo.NewSessionContext(ctx, sess) + + findOpts := options.Find(). + SetCursorType(options.TailableAwait) + + if util.ClusterHasBSONSize([2]int(o.clusterInfo.VersionArray)) { + findOpts.SetProjection(o.getExprProjection()) + } + + oplogFilter := bson.D{{"$and", []any{ + bson.D{{"ts", bson.D{{"$gte", startTS}}}}, + + bson.D{{"$expr", agg.Or{ + // plain ops: one write per op + append( + agg.And{agg.In("$op", "d", "i", "u")}, + o.getNSFilter("$$ROOT")..., + ), + + // op=n is for no-ops, so we stay up-to-date. + agg.Eq("$op", "n"), + + // op=c is for applyOps, and also to detect forbidden DDL. + agg.And{ + agg.Eq("$op", "c"), + agg.Not{helpers.StringHasPrefix{ + FieldRef: "$ns", + Prefix: "config.", + }}, + }, + }}}, + }}} + + fmt.Printf("------ oplogFilter: %v\n\n", oplogFilter) + + cursor, err := o.watcherClient. + Database("local"). + Collection( + "oplog.rs", + options.Collection().SetReadConcern(readconcern.Majority()), + ). + Find( + sctx, + oplogFilter, + findOpts, + ) + + if err != nil { + return bson.Timestamp{}, errors.Wrapf(err, "opening cursor to tail %s’s oplog", o.readerType) + } + + o.cursor = cursor + o.allowDDLBeforeTS = allowDDLBeforeTS + + return startTS, nil +} + +func (o *OplogReader) getExprProjection() bson.D { + return bson.D{ + {"ts", 1}, + {"op", 1}, + {"ns", 1}, + + {"docLen", getOplogDocLenExpr("$$ROOT")}, + + {"docID", getOplogDocIDExpr("$$ROOT")}, + + {"cmdName", agg.Cond{ + If: agg.Eq("$op", "c"), + Then: agg.ArrayElemAt{ + Array: agg.Map{ + Input: bson.D{ + {"$objectToArray", "$o"}, + }, + As: "field", + In: "$$field.k", + }, + Index: 0, + }, + Else: "$$REMOVE", + }}, + + {"o", agg.Cond{ + If: agg.And{ + agg.Eq("$op", "c"), + agg.Eq("missing", agg.Type("$o.applyOps")), + }, + Then: "$o", + Else: "$$REMOVE", + }}, + + {"ops", agg.Cond{ + If: agg.And{ + agg.Eq("$op", "c"), + agg.Eq(agg.Type("$o.applyOps"), "array"), + }, + Then: agg.Map{ + Input: agg.Filter{ + Input: "$o.applyOps", + As: "opEntry", + Cond: o.getNSFilter("$$opEntry"), + }, + As: "opEntry", + In: bson.D{ + {"op", "$$opEntry.op"}, + {"ns", "$$opEntry.ns"}, + {"docID", getOplogDocIDExpr("$$opEntry")}, + {"docLen", getOplogDocLenExpr("$$opEntry")}, + }, + }, + Else: "$$REMOVE", + }}, + } +} + +func (o *OplogReader) ddlAllowanceDocID() string { + return string(o.readerType) + "-ddlAllowanceTS" +} + +func (o *OplogReader) iterateCursor( + ctx context.Context, + _ *retry.FuncInfo, + sess *mongo.Session, +) error { + sctx := mongo.NewSessionContext(ctx, sess) + cursor := o.cursor + allowDDLBeforeTS := o.allowDDLBeforeTS + +CursorLoop: + for { + var err error + + select { + case <-sctx.Done(): + return sctx.Err() + case <-o.writesOffTs.Ready(): + o.logger.Debug(). + Stringer("reader", o). + Any("timestamp", o.writesOffTs.Get()). + Msg("Received writes-off timestamp.") + + break CursorLoop + default: + err = o.readAndHandleOneBatch(sctx, cursor, allowDDLBeforeTS) + if err != nil { + return err + } + } + } + + writesOffTS := o.writesOffTs.Get() + + for { + if !o.lastChangeEventTime.Load().OrZero().Before(writesOffTS) { + fmt.Printf("----------- %s reached writes off ts %v\n", o, writesOffTS) + break + } + + err := o.readAndHandleOneBatch(sctx, cursor, allowDDLBeforeTS) + if err != nil { + return err + } + } + + // TODO: deduplicate + o.running = false + + infoLog := o.logger.Info() + if ts, has := o.lastChangeEventTime.Load().Get(); has { + infoLog = infoLog.Any("lastEventTime", ts) + o.startAtTs = lo.ToPtr(ts) + } else { + infoLog = infoLog.Str("lastEventTime", "none") + } + + infoLog. + Stringer("reader", o). + Msg("Oplog reader is done.") + + return nil +} + +var oplogOpToOperationType = map[string]string{ + "i": "insert", + "u": "update", // don’t need to distinguish from replace + "d": "delete", +} + +func (o *OplogReader) readAndHandleOneBatch( + sctx context.Context, + cursor *mongo.Cursor, + allowDDLBeforeTS bson.Timestamp, +) error { + var err error + + o.curDocs = o.curDocs[:0] + o.scratch = o.scratch[:0] + + o.curDocs, o.scratch, err = mmongo.GetBatch(sctx, cursor, o.curDocs, o.scratch) + if err != nil { + return errors.Wrap(err, "reading cursor") + } + + if len(o.curDocs) == 0 { + // If there were no oplog events, then there’s nothing for us to do. + return nil + } + + var latestTS bson.Timestamp + + events := make([]ParsedEvent, 0, len(o.curDocs)) + + if util.ClusterHasBSONSize([2]int(o.clusterInfo.VersionArray)) { + events, latestTS, err = o.parseExprProjectedOps(events, allowDDLBeforeTS) + } else { + events, latestTS, err = o.parseRawOps(events, allowDDLBeforeTS) + } + + if err != nil { + return err + } + + sess := mongo.SessionFromContext(sctx) + resumeToken := oplog.ResumeToken{latestTS}.MarshalToBSON() + + o.updateLag(sess, resumeToken) + + // NB: events can legitimately be empty here because we might only have + // gotten op=n oplog entries, which we just use to advance the reader. + // (Similar to a change stream’s post-batch resume token.) + if len(events) > 0 { + o.batchSizeHistory.Add(len(events)) + } + + select { + case <-sctx.Done(): + return err + case o.changeEventBatchChan <- changeEventBatch{ + events: events, + resumeToken: resumeToken, + }: + } + + o.lastChangeEventTime.Store(option.Some(latestTS)) + + return nil +} + +func (o *OplogReader) parseRawOps(events []ParsedEvent, allowDDLBeforeTS bson.Timestamp) ([]ParsedEvent, bson.Timestamp, error) { + fmt.Printf("--------------- parseRawOps\n\n\n") + var latestTS bson.Timestamp + + parseOneDocumentOp := func(opName string, ts bson.Timestamp, rawDoc bson.Raw) error { + //fmt.Printf("---- got op: %+v\n\n", rawDoc) + + nsStr, err := mbson.Lookup[string](rawDoc, "ns") + if err != nil { + return err + } + + var docID bson.RawValue + var docLength types.ByteCount + var docField string + + switch opName { + case "i": + docField = "o" + case "d": + docID, err = rawDoc.LookupErr("o", "_id") + if err != nil { + return errors.Wrap(err, "extracting o._id from delete") + } + case "u": + _, err := rawDoc.LookupErr("o", "_id") + if err == nil { + // replace, so we have the full doc + docField = "o" + } else if errors.Is(err, bsoncore.ErrElementNotFound) { + docID, err = rawDoc.LookupErr("o2", "_id") + if err != nil { + return errors.Wrap(err, "extracting o2._id from update") + } + } else { + return errors.Wrap(err, "extracting o._id from update") + } + default: + panic(fmt.Sprintf("op=%#q unexpected (%v)", opName, rawDoc)) + } + + if docField != "" { + doc, err := mbson.Lookup[bson.Raw](rawDoc, docField) + if err != nil { + return errors.Wrap(err, "extracting doc from op") + } + + docLength = types.ByteCount(len(doc)) + docID, err = doc.LookupErr("_id") + if err != nil { + return errors.Wrap(err, "extracting doc ID from op") + } + } else { + if docID.IsZero() { + panic("zero doc ID!") + } + + docLength = defaultUserDocumentSize + } + + docID.Value = slices.Clone(docID.Value) + + events = append( + events, + ParsedEvent{ + OpType: oplogOpToOperationType[opName], + Ns: NewNamespace(SplitNamespace(nsStr)), + DocID: docID, + FullDocLen: option.Some(docLength), + ClusterTime: lo.ToPtr(ts), + }, + ) + + return nil + } + + for _, rawDoc := range o.curDocs { + opName, err := mbson.Lookup[string](rawDoc, "op") + if err != nil { + return nil, bson.Timestamp{}, err + } + + err = mbson.LookupTo(rawDoc, &latestTS, "ts") + if err != nil { + return nil, bson.Timestamp{}, err + } + + switch opName { + case "n": + case "c": + oDoc, err := mbson.Lookup[bson.Raw](rawDoc, "o") + if err != nil { + return nil, bson.Timestamp{}, err + } + + el, err := oDoc.IndexErr(0) + if err != nil { + return nil, bson.Timestamp{}, errors.Wrap(err, "getting first el of o doc") + } + + cmdName, err := el.KeyErr() + if err != nil { + return nil, bson.Timestamp{}, errors.Wrap(err, "getting first field name of o doc") + } + + if cmdName != "applyOps" { + if o.onDDLEvent == onDDLEventAllow { + o.logIgnoredDDL(rawDoc) + continue + } + + if !latestTS.After(allowDDLBeforeTS) { + o.logger.Info(). + Stringer("event", rawDoc). + Msg("Ignoring unrecognized write from the past.") + + continue + } + + return nil, bson.Timestamp{}, UnknownEventError{rawDoc} + } + + var opsArray bson.Raw + err = mbson.UnmarshalElementValue(el, &opsArray) + if err != nil { + return nil, bson.Timestamp{}, errors.Wrap(err, "parsing applyOps") + } + + arrayVals, err := opsArray.Values() + if err != nil { + return nil, bson.Timestamp{}, errors.Wrap(err, "getting applyOps values") + } + + // Might as well ... + events = slices.Grow(events, len(arrayVals)) + + for i, opRV := range arrayVals { + opRaw, err := mbson.CastRawValue[bson.Raw](opRV) + if err != nil { + return nil, bson.Timestamp{}, errors.Wrapf(err, "extracting applyOps[%d]", i) + } + + opName, err := mbson.Lookup[string](opRaw, "op") + if err != nil { + return nil, bson.Timestamp{}, errors.Wrapf(err, "extracting applyOps[%d].op", i) + } + + err = parseOneDocumentOp(opName, latestTS, opRaw) + if err != nil { + return nil, bson.Timestamp{}, errors.Wrapf(err, "processing applyOps[%d]", i) + } + } + default: + err := parseOneDocumentOp(opName, latestTS, rawDoc) + if err != nil { + return nil, bson.Timestamp{}, err + } + } + } + + return events, latestTS, nil +} + +func (o *OplogReader) parseExprProjectedOps(events []ParsedEvent, allowDDLBeforeTS bson.Timestamp) ([]ParsedEvent, bson.Timestamp, error) { + + var latestTS bson.Timestamp + + for _, rawDoc := range o.curDocs { + //fmt.Printf("----- %s got op: %+v\n\n", o, rawDoc) + var op oplog.Op + + if err := (&op).UnmarshalFromBSON(rawDoc); err != nil { + return nil, bson.Timestamp{}, errors.Wrapf(err, "reading oplog entry") + } + + latestTS = op.TS + + switch op.Op { + case "n": + case "c": + if op.CmdName != "applyOps" { + if o.onDDLEvent == onDDLEventAllow { + o.logIgnoredDDL(rawDoc) + continue + } + + if !op.TS.After(allowDDLBeforeTS) { + o.logger.Info(). + Stringer("event", rawDoc). + Msg("Ignoring unrecognized write from the past.") + + continue + } + + return nil, bson.Timestamp{}, UnknownEventError{rawDoc} + } + + events = append( + events, + lo.Map( + op.Ops, + func(subOp oplog.Op, _ int) ParsedEvent { + return ParsedEvent{ + OpType: oplogOpToOperationType[subOp.Op], + Ns: NewNamespace(SplitNamespace(subOp.Ns)), + DocID: subOp.DocID, + FullDocLen: option.Some(types.ByteCount(subOp.DocLen)), + ClusterTime: &op.TS, + } + }, + )..., + ) + default: + events = append( + events, + ParsedEvent{ + OpType: oplogOpToOperationType[op.Op], + Ns: NewNamespace(SplitNamespace(op.Ns)), + DocID: op.DocID, + FullDocLen: option.Some(types.ByteCount(op.DocLen)), + ClusterTime: &op.TS, + }, + ) + } + } + + return events, latestTS, nil +} + +func (o *OplogReader) getNSFilter(docroot string) agg.And { + prefixes := append( + slices.Clone(namespaces.MongosyncMetaDBPrefixes), + o.metaDB.Name()+".", + "config.", + "admin.", + ) + + filter := agg.And(lo.Map( + prefixes, + func(prefix string, _ int) any { + return agg.Not{helpers.StringHasPrefix{ + FieldRef: docroot + ".ns", + Prefix: prefix, + }} + }, + )) + + if len(o.namespaces) > 0 { + filter = append( + filter, + agg.In(docroot+".ns", o.namespaces...), + ) + } + + return filter +} + +func getOplogDocLenExpr(docroot string) any { + return agg.Cond{ + If: agg.Or{ + agg.Eq(docroot+".op", "i"), + agg.And{ + agg.Eq(docroot+".op", "u"), + agg.Not{agg.Eq("missing", docroot+".o._id")}, + }, + }, + Then: agg.BSONSize(docroot + ".o"), + Else: "$$REMOVE", + } +} + +func getOplogDocIDExpr(docroot string) any { + // $switch was new in MongoDB 4.4, so use $cond instead. + return agg.Switch{ + Branches: []agg.SwitchCase{ + { + Case: agg.Eq(docroot+".op", "c"), + Then: "$$REMOVE", + }, + { + Case: agg.In(docroot+".op", "i", "d"), + Then: docroot + ".o._id", + }, + { + Case: agg.In(docroot+".op", "u"), + Then: docroot + ".o2._id", + }, + }, + } +} + +func (o *OplogReader) String() string { + return fmt.Sprintf("%s oplog reader", o.readerType) +} diff --git a/internal/verifier/recheck.go b/internal/verifier/recheck.go index 70cbe09b..ebf7620f 100644 --- a/internal/verifier/recheck.go +++ b/internal/verifier/recheck.go @@ -86,6 +86,7 @@ func (verifier *Verifier) insertRecheckDocs( insertThreads := 0 sendRechecks := func(rechecks []bson.Raw) { + fmt.Printf("----- inserting rechecks: %+v\n\n", rechecks) insertThreads++ eg.Go(func() error { diff --git a/internal/verifier/recheck_persist.go b/internal/verifier/recheck_persist.go index 564e33a2..27b0d395 100644 --- a/internal/verifier/recheck_persist.go +++ b/internal/verifier/recheck_persist.go @@ -13,9 +13,12 @@ import ( type changeEventBatch struct { events []ParsedEvent resumeToken bson.Raw - clusterTime bson.Timestamp } +const ( + minResumeTokenPersistInterval = 10 * time.Second +) + // RunChangeEventPersistor persists rechecks from change event batches. // It needs to be started after the reader starts and should run in its own // goroutine. @@ -31,7 +34,7 @@ func (verifier *Verifier) RunChangeEventPersistor( var lastPersistedTime time.Time persistResumeTokenIfNeeded := func(ctx context.Context, token bson.Raw) { - if time.Since(lastPersistedTime) >= minChangeStreamPersistInterval { + if time.Since(lastPersistedTime) >= minResumeTokenPersistInterval { persistErr := persistCallback(ctx, token) if persistErr != nil { verifier.logger.Warn(). @@ -71,7 +74,7 @@ HandlerLoop: err = errors.Wrap( verifier.PersistChangeEvents(ctx, batch, clusterName), - "failed to handle change stream events", + "persisting rechecks for change events", ) if err == nil && batch.resumeToken != nil { @@ -89,14 +92,14 @@ func (verifier *Verifier) PersistChangeEvents(ctx context.Context, batch changeE return nil } - dbNames := make([]string, len(batch.events)) - collNames := make([]string, len(batch.events)) - docIDs := make([]bson.RawValue, len(batch.events)) - dataSizes := make([]int32, len(batch.events)) + dbNames := make([]string, 0, len(batch.events)) + collNames := make([]string, 0, len(batch.events)) + docIDs := make([]bson.RawValue, 0, len(batch.events)) + dataSizes := make([]int32, 0, len(batch.events)) latestTimestamp := bson.Timestamp{} - for i, changeEvent := range batch.events { + for _, changeEvent := range batch.events { if !supportedEventOpTypes.Contains(changeEvent.OpType) { panic(fmt.Sprintf("Unsupported optype in event; should have failed already! event=%+v", changeEvent)) } @@ -124,10 +127,14 @@ func (verifier *Verifier) PersistChangeEvents(ctx context.Context, batch changeE srcDBName = changeEvent.Ns.DB srcCollName = changeEvent.Ns.Coll } else { + if changeEvent.Ns.DB == "VERIFIER_TEST_META" { + continue + } + dstNs := fmt.Sprintf("%s.%s", changeEvent.Ns.DB, changeEvent.Ns.Coll) srcNs, exist := verifier.nsMap.GetSrcNamespace(dstNs) if !exist { - return errors.Errorf("no source namespace corresponding to the destination namepsace %s", dstNs) + return errors.Errorf("no source namespace matches the destination namepsace %#q", dstNs) } srcDBName, srcCollName = SplitNamespace(srcNs) } @@ -140,21 +147,24 @@ func (verifier *Verifier) PersistChangeEvents(ctx context.Context, batch changeE panic(fmt.Sprintf("unknown event origin: %s", eventOrigin)) } - dbNames[i] = srcDBName - collNames[i] = srcCollName - docIDs[i] = changeEvent.DocID + dbNames = append(dbNames, srcDBName) + collNames = append(collNames, srcCollName) + docIDs = append(docIDs, changeEvent.DocID) + var dataSize int32 if changeEvent.FullDocLen.OrZero() > 0 { - dataSizes[i] = int32(changeEvent.FullDocLen.OrZero()) + dataSize = int32(changeEvent.FullDocLen.OrZero()) } else if changeEvent.FullDocument == nil { // This happens for deletes and for some updates. // The document is probably, but not necessarily, deleted. - dataSizes[i] = fauxDocSizeForDeleteEvents + dataSize = defaultUserDocumentSize } else { // This happens for inserts, replaces, and most updates. - dataSizes[i] = int32(len(changeEvent.FullDocument)) + dataSize = int32(len(changeEvent.FullDocument)) } + dataSizes = append(dataSizes, dataSize) + if err := eventRecorder.AddEvent(&changeEvent); err != nil { return errors.Wrapf( err, @@ -166,14 +176,12 @@ func (verifier *Verifier) PersistChangeEvents(ctx context.Context, batch changeE } latestTimestampTime := time.Unix(int64(latestTimestamp.T), 0) - lag := time.Unix(int64(batch.clusterTime.T), 0).Sub(latestTimestampTime) verifier.logger.Trace(). Str("origin", string(eventOrigin)). Int("count", len(docIDs)). Any("latestTimestamp", latestTimestamp). Time("latestTimestampTime", latestTimestampTime). - Stringer("lag", lag). Msg("Persisting rechecks for change events.") return verifier.insertRecheckDocs(ctx, dbNames, collNames, docIDs, dataSizes) diff --git a/main/migration_verifier.go b/main/migration_verifier.go index aa1c8a65..f203c96e 100644 --- a/main/migration_verifier.go +++ b/main/migration_verifier.go @@ -33,6 +33,8 @@ const ( logPath = "logPath" srcNamespace = "srcNamespace" dstNamespace = "dstNamespace" + srcChangeReader = "srcChangeReader" + dstChangeReader = "dstChangeReader" metaDBName = "metaDBName" docCompareMethod = "docCompareMethod" verifyAll = "verifyAll" @@ -126,6 +128,22 @@ func main() { Name: dstNamespace, Usage: "destination `namespaces` to check", }), + altsrc.NewStringFlag(cli.StringFlag{ + Name: srcChangeReader, + Value: verifier.ChangeReaderOptChangeStream, + Usage: "How to read changes from the source. One of: " + strings.Join( + verifier.ChangeReaderOpts, + ", ", + ), + }), + altsrc.NewStringFlag(cli.StringFlag{ + Name: dstChangeReader, + Value: verifier.ChangeReaderOptChangeStream, + Usage: "How to read changes from the destination. One of: " + strings.Join( + verifier.ChangeReaderOpts, + ", ", + ), + }), altsrc.NewStringFlag(cli.StringFlag{ Name: metaDBName, Value: "migration_verification_metadata", @@ -344,9 +362,27 @@ func handleArgs(ctx context.Context, cCtx *cli.Context) (*verifier.Verifier, err } v.SetMetaDBName(cCtx.String(metaDBName)) + srcChangeReaderVal := cCtx.String(srcChangeReader) + if !slices.Contains(verifier.ChangeReaderOpts, srcChangeReaderVal) { + return nil, errors.Errorf("invalid %#q (%s); valid values are: %#q", srcChangeReader, srcChangeReaderVal, verifier.ChangeReaderOpts) + } + err = v.SetSrcChangeReader(srcChangeReaderVal) + if err != nil { + return nil, err + } + + dstChangeReaderVal := cCtx.String(dstChangeReader) + if !slices.Contains(verifier.ChangeReaderOpts, dstChangeReaderVal) { + return nil, errors.Errorf("invalid %#q (%s); valid values are: %#q", dstChangeReader, dstChangeReaderVal, verifier.ChangeReaderOpts) + } + err = v.SetDstChangeReader(srcChangeReaderVal) + if err != nil { + return nil, err + } + docCompareMethod := verifier.DocCompareMethod(cCtx.String(docCompareMethod)) if !slices.Contains(verifier.DocCompareMethods, docCompareMethod) { - return nil, errors.Errorf("invalid doc compare method (%s); valid value are: %v", docCompareMethod, verifier.DocCompareMethods) + return nil, errors.Errorf("invalid doc compare method (%s); valid values are: %#q", docCompareMethod, verifier.DocCompareMethods) } v.SetDocCompareMethod(docCompareMethod) diff --git a/mbson/raw_value.go b/mbson/raw_value.go index b0b1c96c..7674c56e 100644 --- a/mbson/raw_value.go +++ b/mbson/raw_value.go @@ -9,7 +9,7 @@ import ( ) type bsonCastRecipient interface { - bson.Raw | bson.Timestamp | bson.ObjectID | string | int32 + bson.Raw | bson.RawArray | bson.Timestamp | bson.ObjectID | string | int32 } type bsonSourceTypes interface { @@ -36,6 +36,10 @@ func CastRawValue[T bsonCastRecipient](in bson.RawValue) (T, error) { if doc, isDoc := in.DocumentOK(); isDoc { return any(doc).(T), nil } + case bson.RawArray: + if arr, ok := in.ArrayOK(); ok { + return any(arr).(T), nil + } case bson.Timestamp: if t, i, ok := in.TimestampOK(); ok { return any(bson.Timestamp{t, i}).(T), nil diff --git a/mmongo/cursor.go b/mmongo/cursor.go new file mode 100644 index 00000000..c76b8a18 --- /dev/null +++ b/mmongo/cursor.go @@ -0,0 +1,55 @@ +package mmongo + +import ( + "context" + + "github.com/pkg/errors" + "go.mongodb.org/mongo-driver/v2/bson" + "go.mongodb.org/mongo-driver/v2/mongo" +) + +// GetBatch returns a batch of documents from a cursor. It does so by appending +// to passed-in slices, which lets you optimize memory handling. +func GetBatch( + ctx context.Context, + cursor *mongo.Cursor, + docs []bson.Raw, + buffer []byte, +) ([]bson.Raw, []byte, error) { + for hasDocs := true; hasDocs; hasDocs = cursor.RemainingBatchLength() > 0 { + got := cursor.TryNext(ctx) + + if cursor.Err() != nil { + return nil, nil, errors.Wrap(cursor.Err(), "cursor iteration failed") + } + + if !got { + break + } + + docPos := len(buffer) + buffer = append(buffer, cursor.Current...) + docs = append(docs, buffer[docPos:]) + } + + /* + batchLen := cursor.RemainingBatchLength() + + docs = slices.Grow(docs, batchLen) + + for range batchLen { + if !cursor.Next(ctx) { + return nil, nil, mcmp.Or( + errors.Wrap(cursor.Err(), "iterating cursor mid-batch"), + fmt.Errorf("expected %d docs from cursor but only saw %d", batchLen, len(docs)), + ) + } + + docPos := len(buffer) + buffer = append(buffer, cursor.Current...) + docs = append(docs, buffer[docPos:]) + } + */ + + return docs, buffer, nil +}