diff --git a/internal/util/clusterinfo.go b/internal/util/clusterinfo.go index 2364db93..b2354e96 100644 --- a/internal/util/clusterinfo.go +++ b/internal/util/clusterinfo.go @@ -19,6 +19,18 @@ type ClusterInfo struct { Topology ClusterTopology } +// ClusterHasBSONSize indicates whether a cluster with the given +// major & minor version numbers supports the $bsonSize aggregation operator. +func ClusterHasBSONSize(va [2]int) bool { + major := va[0] + + if major == 4 { + return va[1] >= 4 + } + + return major > 4 +} + const ( TopologySharded ClusterTopology = "sharded" TopologyReplset ClusterTopology = "replset" diff --git a/internal/verifier/change_reader.go b/internal/verifier/change_reader.go new file mode 100644 index 00000000..13ea1e62 --- /dev/null +++ b/internal/verifier/change_reader.go @@ -0,0 +1,226 @@ +package verifier + +import ( + "context" + "time" + + "github.com/10gen/migration-verifier/history" + "github.com/10gen/migration-verifier/internal/logger" + "github.com/10gen/migration-verifier/internal/util" + "github.com/10gen/migration-verifier/msync" + "github.com/10gen/migration-verifier/option" + "github.com/pkg/errors" + "github.com/rs/zerolog" + "github.com/samber/lo" + "go.mongodb.org/mongo-driver/v2/bson" + "go.mongodb.org/mongo-driver/v2/mongo" + "go.mongodb.org/mongo-driver/v2/mongo/options" + "golang.org/x/sync/errgroup" +) + +type ddlEventHandling string + +const ( + fauxDocSizeForDeleteEvents = 1024 + + // The number of batches we’ll hold in memory at once. + batchChanBufferSize = 100 + + onDDLEventAllow ddlEventHandling = "allow" + + changeReaderCollectionName = "changeReader" +) + +type changeReader interface { + getWhichCluster() whichCluster + getReadChannel() <-chan changeEventBatch + getError() *util.Eventual[error] + getStartTimestamp() option.Option[bson.Timestamp] + getEventsPerSecond() option.Option[float64] + getLag() option.Option[time.Duration] + getBufferSaturation() float64 + setWritesOff(bson.Timestamp) + start(context.Context, *errgroup.Group) error + done() <-chan struct{} + persistResumeToken(context.Context, bson.Raw) error + isRunning() bool + String() string +} + +type ChangeReaderCommon struct { + readerType whichCluster + + lastChangeEventTime *bson.Timestamp + logger *logger.Logger + namespaces []string + + metaDB *mongo.Database + watcherClient *mongo.Client + clusterInfo util.ClusterInfo + + resumeTokenTSExtractor func(bson.Raw) (bson.Timestamp, error) + + running bool + changeEventBatchChan chan changeEventBatch + writesOffTs *util.Eventual[bson.Timestamp] + readerError *util.Eventual[error] + doneChan chan struct{} + + startAtTs *bson.Timestamp + + lag *msync.TypedAtomic[option.Option[time.Duration]] + batchSizeHistory *history.History[int] + + onDDLEvent ddlEventHandling +} + +func (rc *ChangeReaderCommon) getWhichCluster() whichCluster { + return rc.readerType +} + +func (rc *ChangeReaderCommon) getError() *util.Eventual[error] { + return rc.readerError +} + +func (rc *ChangeReaderCommon) getStartTimestamp() option.Option[bson.Timestamp] { + return option.FromPointer(rc.startAtTs) +} + +func (rc *ChangeReaderCommon) setWritesOff(ts bson.Timestamp) { + rc.writesOffTs.Set(ts) +} + +func (rc *ChangeReaderCommon) isRunning() bool { + return rc.running +} + +func (rc *ChangeReaderCommon) getReadChannel() <-chan changeEventBatch { + return rc.changeEventBatchChan +} + +func (rc *ChangeReaderCommon) done() <-chan struct{} { + return rc.doneChan +} + +// getBufferSaturation returns the reader’s internal buffer’s saturation level +// as a fraction. If saturation rises, that means we’re reading events faster +// than we can persist them. +func (rc *ChangeReaderCommon) getBufferSaturation() float64 { + return util.DivideToF64(len(rc.changeEventBatchChan), cap(rc.changeEventBatchChan)) +} + +// getLag returns the observed change stream lag (i.e., the delta between +// cluster time and the most-recently-seen change event). +func (rc *ChangeReaderCommon) getLag() option.Option[time.Duration] { + return rc.lag.Load() +} + +// getEventsPerSecond returns the number of change events per second we’ve been +// seeing “recently”. (See implementation for the actual period over which we +// compile this metric.) +func (rc *ChangeReaderCommon) getEventsPerSecond() option.Option[float64] { + logs := rc.batchSizeHistory.Get() + lastLog, hasLogs := lo.Last(logs) + + if hasLogs && lastLog.At != logs[0].At { + span := lastLog.At.Sub(logs[0].At) + + // Each log contains a time and a # of events that happened since + // the prior log. Thus, each log’s Datum is a count of events that + // happened before the timestamp. Since we want the # of events that + // happened between the first & last times, we only want events *after* + // the first time. Thus, we skip the first log entry here. + totalEvents := 0 + for _, log := range logs[1:] { + totalEvents += log.Datum + } + + return option.Some(util.DivideToF64(totalEvents, span.Seconds())) + } + + return option.None[float64]() +} + +func (rc *ChangeReaderCommon) persistResumeToken(ctx context.Context, token bson.Raw) error { + coll := rc.metaDB.Collection(changeReaderCollectionName) + _, err := coll.ReplaceOne( + ctx, + bson.D{{"_id", rc.resumeTokenDocID()}}, + token, + options.Replace().SetUpsert(true), + ) + + if err == nil { + ts, err := rc.resumeTokenTSExtractor(token) + + logEvent := rc.logger.Debug() + + if err == nil { + logEvent = addTimestampToLogEvent(ts, logEvent) + } else { + rc.logger.Warn().Err(err). + Msg("failed to extract resume token timestamp") + } + + logEvent.Msgf("Persisted %s's resume token.", rc.readerType) + + return nil + } + + return errors.Wrapf(err, "failed to persist %s resume token (%v)", rc.readerType, token) +} + +func (rc *ChangeReaderCommon) resumeTokenDocID() string { + switch rc.readerType { + case src: + return "srcResumeToken" + case dst: + return "dstResumeToken" + default: + panic("unknown readerType: " + rc.readerType) + } +} + +func (rc *ChangeReaderCommon) getMetadataCollection() *mongo.Collection { + return rc.metaDB.Collection(changeReaderCollectionName) +} + +func (rc *ChangeReaderCommon) loadResumeToken(ctx context.Context) (option.Option[bson.Raw], error) { + coll := rc.getMetadataCollection() + + token, err := coll.FindOne( + ctx, + bson.D{{"_id", rc.resumeTokenDocID()}}, + ).Raw() + + if errors.Is(err, mongo.ErrNoDocuments) { + return option.None[bson.Raw](), nil + } + + return option.Some(token), err +} + +func (rc *ChangeReaderCommon) updateLag(sess *mongo.Session, token bson.Raw) { + tokenTs, err := rc.resumeTokenTSExtractor(token) + if err == nil { + lagSecs := int64(sess.OperationTime().T) - int64(tokenTs.T) + rc.lag.Store(option.Some(time.Second * time.Duration(lagSecs))) + } else { + rc.logger.Warn(). + Err(err). + Msgf("Failed to extract timestamp from %s's resume token to compute lag.", rc.readerType) + } +} + +func (rc *ChangeReaderCommon) logIgnoredDDL(rawEvent bson.Raw) { + rc.logger.Info(). + Str("reader", string(rc.readerType)). + Stringer("event", rawEvent). + Msg("Ignoring event with unrecognized type on destination. (It’s assumedly internal to the migration.)") +} + +func addTimestampToLogEvent(ts bson.Timestamp, event *zerolog.Event) *zerolog.Event { + return event. + Any("timestamp", ts). + Time("time", time.Unix(int64(ts.T), int64(0))) +} diff --git a/internal/verifier/change_stream.go b/internal/verifier/change_stream.go index 06aacca9..ae477b14 100644 --- a/internal/verifier/change_stream.go +++ b/internal/verifier/change_stream.go @@ -7,7 +7,6 @@ import ( "github.com/10gen/migration-verifier/history" "github.com/10gen/migration-verifier/internal/keystring" - "github.com/10gen/migration-verifier/internal/logger" "github.com/10gen/migration-verifier/internal/retry" "github.com/10gen/migration-verifier/internal/util" "github.com/10gen/migration-verifier/mbson" @@ -17,24 +16,12 @@ import ( mapset "github.com/deckarep/golang-set/v2" clone "github.com/huandu/go-clone/generic" "github.com/pkg/errors" - "github.com/rs/zerolog" - "github.com/samber/lo" "github.com/samber/mo" "go.mongodb.org/mongo-driver/v2/bson" "go.mongodb.org/mongo-driver/v2/mongo" "go.mongodb.org/mongo-driver/v2/mongo/options" "golang.org/x/exp/slices" -) - -type ddlEventHandling string - -const ( - fauxDocSizeForDeleteEvents = 1024 - - // The number of batches we’ll hold in memory at once. - batchChanBufferSize = 100 - - onDDLEventAllow ddlEventHandling = "allow" + "golang.org/x/sync/errgroup" ) var supportedEventOpTypes = mapset.NewSet( @@ -45,9 +32,8 @@ var supportedEventOpTypes = mapset.NewSet( ) const ( - minChangeStreamPersistInterval = time.Second * 10 - maxChangeStreamAwaitTime = time.Second - metadataChangeStreamCollectionName = "changeStream" + minChangeStreamPersistInterval = time.Second * 10 + maxChangeStreamAwaitTime = time.Second ) type UnknownEventError struct { @@ -58,55 +44,33 @@ func (uee UnknownEventError) Error() string { return fmt.Sprintf("received event with unknown optype: %+v", uee.Event) } -type changeEventBatch struct { - events []ParsedEvent - resumeToken bson.Raw - clusterTime bson.Timestamp -} - type ChangeStreamReader struct { - readerType whichCluster - - lastChangeEventTime *bson.Timestamp - logger *logger.Logger - namespaces []string - - metaDB *mongo.Database - watcherClient *mongo.Client - clusterInfo util.ClusterInfo - - changeStreamRunning bool - changeEventBatchChan chan changeEventBatch - writesOffTs *util.Eventual[bson.Timestamp] - readerError *util.Eventual[error] - handlerError *util.Eventual[error] - doneChan chan struct{} - - startAtTs *bson.Timestamp - - lag *msync.TypedAtomic[option.Option[time.Duration]] - batchSizeHistory *history.History[int] - - onDDLEvent ddlEventHandling + ChangeReaderCommon } -func (verifier *Verifier) initializeChangeStreamReaders() { +var _ changeReader = &ChangeStreamReader{} + +func (verifier *Verifier) initializeChangeReaders() { srcReader := &ChangeStreamReader{ - readerType: src, - namespaces: verifier.srcNamespaces, - watcherClient: verifier.srcClient, - clusterInfo: *verifier.srcClusterInfo, + ChangeReaderCommon: ChangeReaderCommon{ + readerType: src, + namespaces: verifier.srcNamespaces, + watcherClient: verifier.srcClient, + clusterInfo: *verifier.srcClusterInfo, + }, } - verifier.srcChangeStreamReader = srcReader + verifier.srcChangeReader = srcReader dstReader := &ChangeStreamReader{ - readerType: dst, - namespaces: verifier.dstNamespaces, - watcherClient: verifier.dstClient, - clusterInfo: *verifier.dstClusterInfo, - onDDLEvent: onDDLEventAllow, + ChangeReaderCommon: ChangeReaderCommon{ + readerType: dst, + namespaces: verifier.dstNamespaces, + watcherClient: verifier.dstClient, + clusterInfo: *verifier.dstClusterInfo, + onDDLEvent: onDDLEventAllow, + }, } - verifier.dstChangeStreamReader = dstReader + verifier.dstChangeReader = dstReader // Common elements in both readers: for _, csr := range mslices.Of(srcReader, dstReader) { @@ -115,175 +79,13 @@ func (verifier *Verifier) initializeChangeStreamReaders() { csr.changeEventBatchChan = make(chan changeEventBatch, batchChanBufferSize) csr.writesOffTs = util.NewEventual[bson.Timestamp]() csr.readerError = util.NewEventual[error]() - csr.handlerError = util.NewEventual[error]() csr.doneChan = make(chan struct{}) csr.lag = msync.NewTypedAtomic(option.None[time.Duration]()) csr.batchSizeHistory = history.New[int](time.Minute) + csr.resumeTokenTSExtractor = extractTSFromChangeStreamResumeToken } } -// RunChangeEventHandler handles change event batches from the reader. -// It needs to be started after the reader starts and should run in its own -// goroutine. -func (verifier *Verifier) RunChangeEventHandler(ctx context.Context, reader *ChangeStreamReader) error { - var err error - - var lastPersistedTime time.Time - persistResumeTokenIfNeeded := func(ctx context.Context, token bson.Raw) { - if time.Since(lastPersistedTime) >= minChangeStreamPersistInterval { - persistErr := reader.persistChangeStreamResumeToken(ctx, token) - if persistErr != nil { - verifier.logger.Warn(). - Stringer("changeReader", reader). - Err(persistErr). - Msg("Failed to persist resume token. Because of this, if the verifier restarts, it will have to re-process already-handled change events. This error may be transient, but if it recurs, investigate.") - } else { - lastPersistedTime = time.Now() - } - } - } - -HandlerLoop: - for err == nil { - select { - case <-ctx.Done(): - err = util.WrapCtxErrWithCause(ctx) - - verifier.logger.Debug(). - Err(err). - Stringer("changeStreamReader", reader). - Msg("Change event handler failed.") - case batch, more := <-reader.changeEventBatchChan: - if !more { - verifier.logger.Debug(). - Stringer("changeStreamReader", reader). - Msg("Change event batch channel has been closed.") - - break HandlerLoop - } - - verifier.logger.Trace(). - Stringer("changeStreamReader", reader). - Int("batchSize", len(batch.events)). - Any("batch", batch). - Msg("Handling change event batch.") - - err = errors.Wrap( - verifier.HandleChangeStreamEvents(ctx, batch, reader.readerType), - "failed to handle change stream events", - ) - - if err == nil && batch.resumeToken != nil { - persistResumeTokenIfNeeded(ctx, batch.resumeToken) - } - } - } - - // This will prevent the reader from hanging because the reader checks - // this along with checks for context expiry. - if err != nil { - reader.handlerError.Set(err) - } - - return err -} - -// HandleChangeStreamEvents performs the necessary work for change stream events after receiving a batch. -func (verifier *Verifier) HandleChangeStreamEvents(ctx context.Context, batch changeEventBatch, eventOrigin whichCluster) error { - if len(batch.events) == 0 { - return nil - } - - dbNames := make([]string, len(batch.events)) - collNames := make([]string, len(batch.events)) - docIDs := make([]bson.RawValue, len(batch.events)) - dataSizes := make([]int32, len(batch.events)) - - latestTimestamp := bson.Timestamp{} - - for i, changeEvent := range batch.events { - if !supportedEventOpTypes.Contains(changeEvent.OpType) { - panic(fmt.Sprintf("Unsupported optype in event; should have failed already! event=%+v", changeEvent)) - } - - if changeEvent.ClusterTime == nil { - verifier.logger.Warn(). - Any("event", changeEvent). - Msg("Change event unexpectedly lacks a clusterTime?!?") - } else if changeEvent.ClusterTime.After(latestTimestamp) { - latestTimestamp = *changeEvent.ClusterTime - } - - var srcDBName, srcCollName string - - var eventRecorder EventRecorder - - // Recheck Docs are keyed by source namespaces. - // We need to retrieve the source namespaces if change events are from the destination. - switch eventOrigin { - case dst: - eventRecorder = *verifier.dstEventRecorder - - if verifier.nsMap.Len() == 0 { - // Namespace is not remapped. Source namespace is the same as the destination. - srcDBName = changeEvent.Ns.DB - srcCollName = changeEvent.Ns.Coll - } else { - dstNs := fmt.Sprintf("%s.%s", changeEvent.Ns.DB, changeEvent.Ns.Coll) - srcNs, exist := verifier.nsMap.GetSrcNamespace(dstNs) - if !exist { - return errors.Errorf("no source namespace corresponding to the destination namepsace %s", dstNs) - } - srcDBName, srcCollName = SplitNamespace(srcNs) - } - case src: - eventRecorder = *verifier.srcEventRecorder - - srcDBName = changeEvent.Ns.DB - srcCollName = changeEvent.Ns.Coll - default: - panic(fmt.Sprintf("unknown event origin: %s", eventOrigin)) - } - - dbNames[i] = srcDBName - collNames[i] = srcCollName - docIDs[i] = changeEvent.DocID - - if changeEvent.FullDocLen.OrZero() > 0 { - dataSizes[i] = int32(changeEvent.FullDocLen.OrZero()) - } else if changeEvent.FullDocument == nil { - // This happens for deletes and for some updates. - // The document is probably, but not necessarily, deleted. - dataSizes[i] = fauxDocSizeForDeleteEvents - } else { - // This happens for inserts, replaces, and most updates. - dataSizes[i] = int32(len(changeEvent.FullDocument)) - } - - if err := eventRecorder.AddEvent(&changeEvent); err != nil { - return errors.Wrapf( - err, - "failed to augment stats with %s change event (%+v)", - eventOrigin, - changeEvent, - ) - } - } - - latestTimestampTime := time.Unix(int64(latestTimestamp.T), 0) - lag := time.Unix(int64(batch.clusterTime.T), 0).Sub(latestTimestampTime) - - verifier.logger.Trace(). - Str("origin", string(eventOrigin)). - Int("count", len(docIDs)). - Any("latestTimestamp", latestTimestamp). - Time("latestTimestampTime", latestTimestampTime). - Stringer("lag", lag). - Msg("Persisting rechecks for change events.") - - return verifier.insertRecheckDocs(ctx, dbNames, collNames, docIDs, dataSizes) -} - // GetChangeStreamFilter returns an aggregation pipeline that filters // namespaces as per configuration. // @@ -336,7 +138,7 @@ func (csr *ChangeStreamReader) GetChangeStreamFilter() (pipeline mongo.Pipeline) }, ) - if csr.hasBsonSize() { + if util.ClusterHasBSONSize([2]int(csr.clusterInfo.VersionArray)) { pipeline = append( pipeline, bson.D{ @@ -351,16 +153,6 @@ func (csr *ChangeStreamReader) GetChangeStreamFilter() (pipeline mongo.Pipeline) return pipeline } -func (csr *ChangeStreamReader) hasBsonSize() bool { - major := csr.clusterInfo.VersionArray[0] - - if major == 4 { - return csr.clusterInfo.VersionArray[1] >= 4 - } - - return major > 4 -} - // This function reads a single `getMore` response into a slice. // // Note that this doesn’t care about the writesOff timestamp. Thus, @@ -424,10 +216,7 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch( // indexes are created after initial sync. if csr.onDDLEvent == onDDLEventAllow { - csr.logger.Info(). - Stringer("changeStream", csr). - Stringer("event", cs.Current). - Msg("Ignoring event with unrecognized type on destination. (It’s assumedly internal to the migration.)") + csr.logIgnoredDDL(cs.Current) // Discard this event, then keep reading. changeEvents = changeEvents[:len(changeEvents)-1] @@ -454,16 +243,7 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch( eventsRead++ } - var tokenTs bson.Timestamp - tokenTs, err := extractTimestampFromResumeToken(cs.ResumeToken()) - if err == nil { - lagSecs := int64(sess.OperationTime().T) - int64(tokenTs.T) - csr.lag.Store(option.Some(time.Second * time.Duration(lagSecs))) - } else { - csr.logger.Warn(). - Err(err). - Msgf("Failed to extract timestamp from %s's resume token to compute change stream lag.", csr) - } + csr.updateLag(sess, cs.ResumeToken()) if eventsRead == 0 { ri.NoteSuccess("received an empty change stream response") @@ -485,8 +265,6 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch( select { case <-ctx.Done(): return util.WrapCtxErrWithCause(ctx) - case <-csr.handlerError.Ready(): - return csr.wrapHandlerErrorForReader() case csr.changeEventBatchChan <- changeEventBatch{ events: changeEvents, @@ -502,13 +280,6 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch( return nil } -func (csr *ChangeStreamReader) wrapHandlerErrorForReader() error { - return errors.Wrap( - csr.handlerError.Get(), - "event handler failed, so no more events can be processed", - ) -} - func (csr *ChangeStreamReader) iterateChangeStream( ctx context.Context, ri *retry.FuncInfo, @@ -532,9 +303,6 @@ func (csr *ChangeStreamReader) iterateChangeStream( return err - case <-csr.handlerError.Ready(): - return csr.wrapHandlerErrorForReader() - // If the ChangeStreamEnderChan has a message, the user has indicated that // source writes are ended and the migration tool is finished / committed. // This means we should exit rather than continue reading the change stream @@ -552,7 +320,7 @@ func (csr *ChangeStreamReader) iterateChangeStream( // (i.e., the `getMore` call returns empty) for { var curTs bson.Timestamp - curTs, err = extractTimestampFromResumeToken(cs.ResumeToken()) + curTs, err = csr.resumeTokenTSExtractor(cs.ResumeToken()) if err != nil { return errors.Wrap(err, "failed to extract timestamp from change stream's resume token") } @@ -584,7 +352,7 @@ func (csr *ChangeStreamReader) iterateChangeStream( } if gotwritesOffTimestamp { - csr.changeStreamRunning = false + csr.running = false if csr.lastChangeEventTime != nil { csr.startAtTs = csr.lastChangeEventTime } @@ -625,18 +393,18 @@ func (csr *ChangeStreamReader) createChangeStream( ) } - savedResumeToken, err := csr.loadChangeStreamResumeToken(ctx) + savedResumeToken, err := csr.loadResumeToken(ctx) if err != nil { return nil, nil, bson.Timestamp{}, errors.Wrap(err, "failed to load persisted change stream resume token") } csStartLogEvent := csr.logger.Info() - if savedResumeToken != nil { + if token, hasToken := savedResumeToken.Get(); hasToken { logEvent := csStartLogEvent. - Stringer(csr.resumeTokenDocID(), savedResumeToken) + Stringer(csr.resumeTokenDocID(), token) - ts, err := extractTimestampFromResumeToken(savedResumeToken) + ts, err := csr.resumeTokenTSExtractor(token) if err == nil { logEvent = addTimestampToLogEvent(ts, logEvent) } else { @@ -647,7 +415,7 @@ func (csr *ChangeStreamReader) createChangeStream( logEvent.Msg("Starting change stream from persisted resume token.") - opts = opts.SetStartAfter(savedResumeToken) + opts = opts.SetStartAfter(token) } else { csStartLogEvent.Msgf("Starting change stream from current %s cluster time.", csr.readerType) } @@ -662,12 +430,12 @@ func (csr *ChangeStreamReader) createChangeStream( return nil, nil, bson.Timestamp{}, errors.Wrap(err, "failed to open change stream") } - err = csr.persistChangeStreamResumeToken(ctx, changeStream.ResumeToken()) + err = csr.persistResumeToken(ctx, changeStream.ResumeToken()) if err != nil { return nil, nil, bson.Timestamp{}, err } - startTs, err := extractTimestampFromResumeToken(changeStream.ResumeToken()) + startTs, err := csr.resumeTokenTSExtractor(changeStream.ResumeToken()) if err != nil { return nil, nil, bson.Timestamp{}, errors.Wrap(err, "failed to extract timestamp from change stream's resume token") } @@ -694,197 +462,94 @@ func (csr *ChangeStreamReader) createChangeStream( } // StartChangeStream starts the change stream. -func (csr *ChangeStreamReader) StartChangeStream(ctx context.Context) error { +func (csr *ChangeStreamReader) start( + ctx context.Context, + eg *errgroup.Group, +) error { // This channel holds the first change stream creation's result, whether // success or failure. Rather than using a Result we could make separate // Timestamp and error channels, but the single channel is cleaner since // there's no chance of "nonsense" like both channels returning a payload. initialCreateResultChan := make(chan mo.Result[bson.Timestamp]) - go func() { - // Closing changeEventBatchChan at the end of change stream goroutine - // notifies the verifier's change event handler to exit. - defer func() { - csr.logger.Debug(). - Stringer("changeStreamReader", csr). - Msg("Closing change event batch channel.") + eg.Go( + func() error { + // Closing changeEventBatchChan at the end of change stream goroutine + // notifies the verifier's change event handler to exit. + defer func() { + csr.logger.Debug(). + Stringer("changeStreamReader", csr). + Msg("Closing change event batch channel.") - close(csr.changeEventBatchChan) - }() + close(csr.changeEventBatchChan) + }() - retryer := retry.New().WithErrorCodes(util.CursorKilledErrCode) + retryer := retry.New().WithErrorCodes(util.CursorKilledErrCode) - parentThreadWaiting := true + parentThreadWaiting := true - err := retryer.WithCallback( - func(ctx context.Context, ri *retry.FuncInfo) error { - changeStream, sess, startTs, err := csr.createChangeStream(ctx) - if err != nil { - logEvent := csr.logger.Debug(). - Err(err). - Stringer("changeStreamReader", csr) + return retryer.WithCallback( + func(ctx context.Context, ri *retry.FuncInfo) error { + changeStream, sess, startTs, err := csr.createChangeStream(ctx) + if err != nil { + logEvent := csr.logger.Debug(). + Err(err). + Stringer("changeStreamReader", csr) - if parentThreadWaiting { - logEvent.Msg("First change stream open failed.") + if parentThreadWaiting { + logEvent.Msg("First change stream open failed.") - initialCreateResultChan <- mo.Err[bson.Timestamp](err) - return nil - } + initialCreateResultChan <- mo.Err[bson.Timestamp](err) + return nil + } - logEvent.Msg("Retried change stream open failed.") + logEvent.Msg("Retried change stream open failed.") - return err - } + return err + } - defer changeStream.Close(ctx) + defer changeStream.Close(ctx) - logEvent := csr.logger.Debug(). - Stringer("changeStreamReader", csr). - Any("startTimestamp", startTs) - - if parentThreadWaiting { - logEvent.Msg("First change stream open succeeded.") + logEvent := csr.logger.Debug(). + Stringer("changeStreamReader", csr). + Any("startTimestamp", startTs) - initialCreateResultChan <- mo.Ok(startTs) - close(initialCreateResultChan) - parentThreadWaiting = false - } else { - logEvent.Msg("Retried change stream open succeeded.") - } + if parentThreadWaiting { + logEvent.Msg("First change stream open succeeded.") - return csr.iterateChangeStream(ctx, ri, changeStream, sess) - }, - "running %s", csr, - ).Run(ctx, csr.logger) + initialCreateResultChan <- mo.Ok(startTs) + close(initialCreateResultChan) + parentThreadWaiting = false + } else { + logEvent.Msg("Retried change stream open succeeded.") + } - if err != nil { - csr.readerError.Set(err) - } - }() + return csr.iterateChangeStream(ctx, ri, changeStream, sess) + }, + "running %s", csr, + ).Run(ctx, csr.logger) + }, + ) result := <-initialCreateResultChan startTs, err := result.Get() if err != nil { - return err + return errors.Wrapf(err, "creating change stream") } csr.startAtTs = &startTs - csr.changeStreamRunning = true + csr.running = true return nil } -// GetLag returns the observed change stream lag (i.e., the delta between -// cluster time and the most-recently-seen change event). -func (csr *ChangeStreamReader) GetLag() option.Option[time.Duration] { - return csr.lag.Load() -} - -// GetSaturation returns the reader’s internal buffer’s saturation level as -// a fraction. If saturation rises, that means we’re reading events faster than -// we can persist them. -func (csr *ChangeStreamReader) GetSaturation() float64 { - return util.DivideToF64(len(csr.changeEventBatchChan), cap(csr.changeEventBatchChan)) -} - -// GetEventsPerSecond returns the number of change events per second we’ve been -// seeing “recently”. (See implementation for the actual period over which we -// compile this metric.) -func (csr *ChangeStreamReader) GetEventsPerSecond() option.Option[float64] { - logs := csr.batchSizeHistory.Get() - lastLog, hasLogs := lo.Last(logs) - - if hasLogs && lastLog.At != logs[0].At { - span := lastLog.At.Sub(logs[0].At) - - // Each log contains a time and a # of events that happened since - // the prior log. Thus, each log’s Datum is a count of events that - // happened before the timestamp. Since we want the # of events that - // happened between the first & last times, we only want events *after* - // the first time. Thus, we skip the first log entry here. - totalEvents := 0 - for _, log := range logs[1:] { - totalEvents += log.Datum - } - - return option.Some(util.DivideToF64(totalEvents, span.Seconds())) - } - - return option.None[float64]() -} - -func addTimestampToLogEvent(ts bson.Timestamp, event *zerolog.Event) *zerolog.Event { - return event. - Any("timestamp", ts). - Time("time", time.Unix(int64(ts.T), int64(0))) -} - -func (csr *ChangeStreamReader) getChangeStreamMetadataCollection() *mongo.Collection { - return csr.metaDB.Collection(metadataChangeStreamCollectionName) -} - -func (csr *ChangeStreamReader) loadChangeStreamResumeToken(ctx context.Context) (bson.Raw, error) { - coll := csr.getChangeStreamMetadataCollection() - - token, err := coll.FindOne( - ctx, - bson.D{{"_id", csr.resumeTokenDocID()}}, - ).Raw() - - if errors.Is(err, mongo.ErrNoDocuments) { - return nil, nil - } - - return token, err -} - func (csr *ChangeStreamReader) String() string { return fmt.Sprintf("%s change stream reader", csr.readerType) } -func (csr *ChangeStreamReader) resumeTokenDocID() string { - switch csr.readerType { - case src: - return "srcResumeToken" - case dst: - return "dstResumeToken" - default: - panic("unknown readerType: " + csr.readerType) - } -} - -func (csr *ChangeStreamReader) persistChangeStreamResumeToken(ctx context.Context, token bson.Raw) error { - coll := csr.getChangeStreamMetadataCollection() - _, err := coll.ReplaceOne( - ctx, - bson.D{{"_id", csr.resumeTokenDocID()}}, - token, - options.Replace().SetUpsert(true), - ) - - if err == nil { - ts, err := extractTimestampFromResumeToken(token) - - logEvent := csr.logger.Debug() - - if err == nil { - logEvent = addTimestampToLogEvent(ts, logEvent) - } else { - csr.logger.Warn().Err(err). - Msg("failed to extract resume token timestamp") - } - - logEvent.Msgf("Persisted %s's resume token.", csr) - - return nil - } - - return errors.Wrapf(err, "failed to persist change stream resume token (%v)", token) -} - -func extractTimestampFromResumeToken(resumeToken bson.Raw) (bson.Timestamp, error) { +func extractTSFromChangeStreamResumeToken(resumeToken bson.Raw) (bson.Timestamp, error) { // Change stream token is always a V1 keystring in the _data field tokenDataRV, err := resumeToken.LookupErr("_data") diff --git a/internal/verifier/change_stream_test.go b/internal/verifier/change_stream_test.go index cbe3928e..d3fb166c 100644 --- a/internal/verifier/change_stream_test.go +++ b/internal/verifier/change_stream_test.go @@ -32,7 +32,12 @@ func (suite *IntegrationTestSuite) TestChangeStreamFilter_NoNamespaces() { verifier := suite.BuildVerifier() - filter := verifier.srcChangeStreamReader.GetChangeStreamFilter() + changeStreamReader, ok := verifier.srcChangeReader.(*ChangeStreamReader) + if !ok { + suite.T().Skipf("source change reader is a %T; this test needs a %T", verifier.srcChangeReader, changeStreamReader) + } + + filter := changeStreamReader.GetChangeStreamFilter() _, err := suite.srcMongoClient. Database("realUserDatabase"). @@ -96,18 +101,23 @@ func (suite *IntegrationTestSuite) TestChangeStreamFilter_BsonSize() { ctx := suite.Context() verifier := suite.BuildVerifier() - if !verifier.srcChangeStreamReader.hasBsonSize() { + if !util.ClusterHasBSONSize([2]int(verifier.srcClusterInfo.VersionArray)) { suite.T().Skip("Need a source version that has $bsonSize") } + changeStreamReader, ok := verifier.srcChangeReader.(*ChangeStreamReader) + if !ok { + suite.T().Skipf("source change reader is a %T; this test needs a %T", verifier.srcChangeReader, changeStreamReader) + } + srcColl := verifier.srcClient.Database(suite.DBNameForTest()).Collection("coll") _, err := srcColl.InsertOne(ctx, bson.D{{"_id", 123}}) suite.Require().NoError(err) - verifier.srcChangeStreamReader.namespaces = mslices.Of(FullName(srcColl)) + changeStreamReader.namespaces = mslices.Of(FullName(srcColl)) - filter := verifier.srcChangeStreamReader.GetChangeStreamFilter() + filter := changeStreamReader.GetChangeStreamFilter() cs, err := suite.srcMongoClient.Watch( ctx, @@ -157,14 +167,20 @@ func (suite *IntegrationTestSuite) TestChangeStreamFilter_WithNamespaces() { ctx := suite.Context() verifier := suite.BuildVerifier() - verifier.srcChangeStreamReader.namespaces = []string{ + + changeStreamReader, ok := verifier.srcChangeReader.(*ChangeStreamReader) + if !ok { + suite.T().Skipf("source change reader is a %T; this test needs a %T", verifier.srcChangeReader, changeStreamReader) + } + + changeStreamReader.namespaces = []string{ "foo.bar", "foo.baz", "test.car", "test.chaz", } - filter := verifier.srcChangeStreamReader.GetChangeStreamFilter() + filter := changeStreamReader.GetChangeStreamFilter() cs, err := suite.srcMongoClient.Watch(ctx, filter) suite.Require().NoError(err) @@ -188,7 +204,7 @@ func (suite *IntegrationTestSuite) TestChangeStreamFilter_WithNamespaces() { suite.Require().NoError(err) sctx := mongo.NewSessionContext(ctx, sess) - for _, ns := range verifier.srcChangeStreamReader.namespaces { + for _, ns := range changeStreamReader.namespaces { dbAndColl := strings.Split(ns, ".") _, err := suite.srcMongoClient. @@ -207,7 +223,7 @@ func (suite *IntegrationTestSuite) TestChangeStreamFilter_WithNamespaces() { for { gotEvent := cs.TryNext(ctx) suite.Require().NoError(cs.Err()) - csOpTime, err := extractTimestampFromResumeToken(cs.ResumeToken()) + csOpTime, err := extractTSFromChangeStreamResumeToken(cs.ResumeToken()) suite.Require().NoError(err, "should get timestamp from resume token") if gotEvent { @@ -223,7 +239,7 @@ func (suite *IntegrationTestSuite) TestChangeStreamFilter_WithNamespaces() { suite.Assert().Len( events, - len(verifier.srcChangeStreamReader.namespaces), + len(changeStreamReader.namespaces), "should have 1 event per in-filter namespace", ) suite.Assert().True( @@ -238,10 +254,12 @@ func (suite *IntegrationTestSuite) TestChangeStreamFilter_WithNamespaces() { } func (suite *IntegrationTestSuite) startSrcChangeStreamReaderAndHandler(ctx context.Context, verifier *Verifier) { - err := verifier.srcChangeStreamReader.StartChangeStream(ctx) + eg, egCtx := contextplus.ErrGroup(ctx) + + err := verifier.srcChangeReader.start(egCtx, eg) suite.Require().NoError(err) go func() { - err := verifier.RunChangeEventHandler(ctx, verifier.srcChangeStreamReader) + err := verifier.RunChangeEventPersistor(ctx, verifier.srcChangeReader) if errors.Is(err, context.Canceled) { return } @@ -266,7 +284,9 @@ func (suite *IntegrationTestSuite) TestChangeStream_Resume_NoSkip() { defer v1Cancel(ctx.Err()) suite.startSrcChangeStreamReaderAndHandler(v1Ctx, verifier1) - changeStreamMetaColl := verifier1.srcChangeStreamReader.getChangeStreamMetadataCollection() + changeStreamMetaColl := verifier1.metaClient. + Database(verifier1.metaDBName). + Collection(changeReaderCollectionName) var originalResumeToken bson.Raw @@ -417,10 +437,12 @@ func (suite *IntegrationTestSuite) TestChangeStreamResumability() { suite.startSrcChangeStreamReaderAndHandler(ctx, verifier2) - suite.Require().NotNil(verifier2.srcChangeStreamReader.startAtTs) + startAtTs, hasStartAtTs := verifier2.srcChangeReader.getStartTimestamp().Get() + + suite.Require().True(hasStartAtTs) suite.Assert().False( - verifier2.srcChangeStreamReader.startAtTs.After(newTime), + startAtTs.After(newTime), "verifier2's change stream should be no later than this new session", ) @@ -569,7 +591,7 @@ func (suite *IntegrationTestSuite) TestChangeStreamLag() { verifierRunner.AwaitGenerationEnd(), ) - return verifier.srcChangeStreamReader.GetLag().IsSome() + return verifier.srcChangeReader.getLag().IsSome() }, time.Minute, 100*time.Millisecond, @@ -578,7 +600,7 @@ func (suite *IntegrationTestSuite) TestChangeStreamLag() { // NB: The lag will include whatever time elapsed above before // verifier read the event, so it can be several seconds. suite.Assert().Less( - verifier.srcChangeStreamReader.GetLag().MustGet(), + verifier.srcChangeReader.getLag().MustGet(), 10*time.Minute, "verifier lag is as expected", ) @@ -605,18 +627,20 @@ func (suite *IntegrationTestSuite) TestStartAtTimeNoChanges() { suite.startSrcChangeStreamReaderAndHandler(ctx, verifier) - startAtTs := verifier.srcChangeStreamReader.startAtTs - suite.Require().NotNil(startAtTs, "startAtTs should be set") + startAtTs, hasStartAtTs := verifier.srcChangeReader.getStartTimestamp().Get() + suite.Require().True(hasStartAtTs, "startAtTs should be set") + + verifier.srcChangeReader.setWritesOff(insertTs) - verifier.srcChangeStreamReader.writesOffTs.Set(insertTs) + <-verifier.srcChangeReader.done() - <-verifier.srcChangeStreamReader.doneChan + startAtTs2 := verifier.srcChangeReader.getStartTimestamp().MustGet() suite.Require().False( - verifier.srcChangeStreamReader.startAtTs.Before(*startAtTs), + startAtTs2.Before(startAtTs), "new startAtTs (%+v) should be no earlier than last one (%+v)", - verifier.srcChangeStreamReader.startAtTs, - *startAtTs, + startAtTs2, + startAtTs, ) } } @@ -635,10 +659,13 @@ func (suite *IntegrationTestSuite) TestStartAtTimeWithChanges() { suite.Require().NotNil(origSessionTime) suite.startSrcChangeStreamReaderAndHandler(ctx, verifier) + startAtTs, hasStartAtTs := verifier.srcChangeReader.getStartTimestamp().Get() + suite.Require().True(hasStartAtTs, "startAtTs should be set") + // srcStartAtTs derives from the change stream’s resume token, which can // postdate our session time but should not precede it. suite.Require().False( - verifier.srcChangeStreamReader.startAtTs.Before(*origSessionTime), + startAtTs.Before(*origSessionTime), "srcStartAtTs should be >= the insert’s optime", ) @@ -662,12 +689,15 @@ func (suite *IntegrationTestSuite) TestStartAtTimeWithChanges() { "session time after events should exceed the original", ) - verifier.srcChangeStreamReader.writesOffTs.Set(*postEventsSessionTime) - <-verifier.srcChangeStreamReader.doneChan + verifier.srcChangeReader.setWritesOff(*postEventsSessionTime) + <-verifier.srcChangeReader.done() + + startAtTs, hasStartAtTs = verifier.srcChangeReader.getStartTimestamp().Get() + suite.Require().True(hasStartAtTs, "startAtTs should be set") suite.Assert().Equal( *postEventsSessionTime, - *verifier.srcChangeStreamReader.startAtTs, + startAtTs, "verifier.srcStartAtTs should now be our session timestamp", ) } @@ -684,8 +714,12 @@ func (suite *IntegrationTestSuite) TestNoStartAtTime() { origStartTs := sess.OperationTime() suite.Require().NotNil(origStartTs) suite.startSrcChangeStreamReaderAndHandler(ctx, verifier) - suite.Require().NotNil(verifier.srcChangeStreamReader.startAtTs) - suite.Require().LessOrEqual(origStartTs.Compare(*verifier.srcChangeStreamReader.startAtTs), 0) + + startAtTs, hasStartAtTs := verifier.srcChangeReader.getStartTimestamp().Get() + suite.Require().True(hasStartAtTs, "startAtTs should be set") + + suite.Require().NotNil(startAtTs) + suite.Require().LessOrEqual(origStartTs.Compare(startAtTs), 0) } func (suite *IntegrationTestSuite) TestWithChangeEventsBatching() { @@ -1031,9 +1065,11 @@ func (suite *IntegrationTestSuite) TestRecheckDocsWithDstChangeEvents() { verifier.SetDstNamespaces([]string{dstDBName + ".dstColl1", dstDBName + ".dstColl2"}) verifier.SetNamespaceMap() - suite.Require().NoError(verifier.dstChangeStreamReader.StartChangeStream(ctx)) + eg, egCtx := contextplus.ErrGroup(ctx) + + suite.Require().NoError(verifier.dstChangeReader.start(egCtx, eg)) go func() { - err := verifier.RunChangeEventHandler(ctx, verifier.dstChangeStreamReader) + err := verifier.RunChangeEventPersistor(ctx, verifier.dstChangeReader) if errors.Is(err, context.Canceled) { return } diff --git a/internal/verifier/check.go b/internal/verifier/check.go index e8269ecd..66c36993 100644 --- a/internal/verifier/check.go +++ b/internal/verifier/check.go @@ -50,16 +50,16 @@ func (verifier *Verifier) Check(ctx context.Context, filter bson.D) { verifier.MaybeStartPeriodicHeapProfileCollection(ctx) } -func (verifier *Verifier) waitForChangeStream(ctx context.Context, csr *ChangeStreamReader) error { +func (verifier *Verifier) waitForChangeReader(ctx context.Context, csr changeReader) error { select { case <-ctx.Done(): return util.WrapCtxErrWithCause(ctx) - case <-csr.readerError.Ready(): - err := csr.readerError.Get() + case <-csr.getError().Ready(): + err := csr.getError().Get() verifier.logger.Warn().Err(err). Msgf("Received error from %s.", csr) return err - case <-csr.doneChan: + case <-csr.done(): verifier.logger.Debug(). Msgf("Received completion signal from %s.", csr) break @@ -90,15 +90,15 @@ func (verifier *Verifier) CheckWorker(ctxIn context.Context) error { cancelableCtx, canceler := contextplus.WithCancelCause(ctxIn) eg, ctx := contextplus.ErrGroup(cancelableCtx) - // If the change stream fails, everything should stop. + // If the change reader fails, everything should stop. eg.Go(func() error { select { - case <-verifier.srcChangeStreamReader.readerError.Ready(): - err := verifier.srcChangeStreamReader.readerError.Get() - return errors.Wrapf(err, "%s failed", verifier.srcChangeStreamReader) - case <-verifier.dstChangeStreamReader.readerError.Ready(): - err := verifier.dstChangeStreamReader.readerError.Get() - return errors.Wrapf(err, "%s failed", verifier.dstChangeStreamReader) + case <-verifier.srcChangeReader.getError().Ready(): + err := verifier.srcChangeReader.getError().Get() + return errors.Wrapf(err, "%s failed", verifier.srcChangeReader) + case <-verifier.dstChangeReader.getError().Ready(): + err := verifier.dstChangeReader.getError().Get() + return errors.Wrapf(err, "%s failed", verifier.dstChangeReader) case <-ctx.Done(): return nil } @@ -229,11 +229,11 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh } } - verifier.logger.Info().Msg("Starting change streams.") + verifier.logger.Info().Msg("Starting change readers.") // Now that we’ve initialized verifier.generation we can - // start the change stream readers. - verifier.initializeChangeStreamReaders() + // start the change readers. + verifier.initializeChangeReaders() verifier.mux.Unlock() err = retry.New().WithCallback( @@ -270,18 +270,18 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh }() ceHandlerGroup, groupCtx := contextplus.ErrGroup(ctx) - for _, csReader := range []*ChangeStreamReader{verifier.srcChangeStreamReader, verifier.dstChangeStreamReader} { - if csReader.changeStreamRunning { - verifier.logger.Debug().Msgf("Check: %s already running.", csReader) + for _, changeReader := range mslices.Of(verifier.srcChangeReader, verifier.dstChangeReader) { + if changeReader.isRunning() { + verifier.logger.Debug().Msgf("Check: %s already running.", changeReader) } else { - verifier.logger.Debug().Msgf("%s not running; starting change stream", csReader) + verifier.logger.Debug().Msgf("%s not running; starting change reader", changeReader) - err = csReader.StartChangeStream(ctx) + err = changeReader.start(groupCtx, ceHandlerGroup) if err != nil { - return errors.Wrapf(err, "failed to start %s", csReader) + return errors.Wrapf(err, "failed to start %s", changeReader) } ceHandlerGroup.Go(func() error { - return verifier.RunChangeEventHandler(groupCtx, csReader) + return verifier.RunChangeEventPersistor(groupCtx, changeReader) }) } } @@ -364,14 +364,14 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh // caught again on the next iteration. if verifier.writesOff { verifier.logger.Debug(). - Msg("Waiting for change streams to end.") + Msg("Waiting for change readers to end.") - // It's necessary to wait for the change stream to finish before incrementing the + // It's necessary to wait for the change reader to finish before incrementing the // generation number, or the last changes will not be checked. verifier.mux.Unlock() - for _, csr := range mslices.Of(verifier.srcChangeStreamReader, verifier.dstChangeStreamReader) { - if err = verifier.waitForChangeStream(ctx, csr); err != nil { + for _, csr := range mslices.Of(verifier.srcChangeReader, verifier.dstChangeReader) { + if err = verifier.waitForChangeReader(ctx, csr); err != nil { return errors.Wrapf( err, "an error interrupted the wait for closure of %s", @@ -380,8 +380,8 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh } verifier.logger.Debug(). - Stringer("changeStreamReader", csr). - Msg("Change stream reader finished.") + Stringer("changeReader", csr). + Msg("Change reader finished.") } if err = ceHandlerGroup.Wait(); err != nil { @@ -391,9 +391,9 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh verifier.lastGeneration = true } - // Increment the in-memory generation so that the change streams will + // Increment the in-memory generation so that the change readers will // mark rechecks for the next generation. For example, if we just - // finished generation 2, the change streams need to mark generation 3 + // finished generation 2, the change readers need to mark generation 3 // on enqueued rechecks. Meanwhile, generaiton 3’s recheck tasks will // derive from rechecks enqueued during generation 2. verifier.generation++ diff --git a/internal/verifier/compare.go b/internal/verifier/compare.go index fc49aaa4..3172c0d4 100644 --- a/internal/verifier/compare.go +++ b/internal/verifier/compare.go @@ -467,7 +467,7 @@ func (verifier *Verifier) getFetcherChannelsAndCallbacks( sctx, verifier.srcClientCollection(task), verifier.srcClusterInfo, - verifier.srcChangeStreamReader.startAtTs, + verifier.srcChangeReader.getStartTimestamp().ToPointer(), task, ) @@ -500,7 +500,7 @@ func (verifier *Verifier) getFetcherChannelsAndCallbacks( sctx, verifier.dstClientCollection(task), verifier.dstClusterInfo, - verifier.dstChangeStreamReader.startAtTs, + verifier.dstChangeReader.getStartTimestamp().ToPointer(), task, ) diff --git a/internal/verifier/integration_test_suite.go b/internal/verifier/integration_test_suite.go index 897678da..19aff0ab 100644 --- a/internal/verifier/integration_test_suite.go +++ b/internal/verifier/integration_test_suite.go @@ -191,7 +191,7 @@ func (suite *IntegrationTestSuite) BuildVerifier() *Verifier { "should set metadata connection string", ) verifier.SetMetaDBName(metaDBName) - verifier.initializeChangeStreamReaders() + verifier.initializeChangeReaders() suite.Require().NoError(verifier.srcClientCollection(&task).Drop(ctx)) suite.Require().NoError(verifier.dstClientCollection(&task).Drop(ctx)) diff --git a/internal/verifier/metadata.go b/internal/verifier/metadata.go index 7c317a2c..906117c9 100644 --- a/internal/verifier/metadata.go +++ b/internal/verifier/metadata.go @@ -5,5 +5,6 @@ package verifier // 2: Split failed-task discrepancies into separate collection. // 3: Enqueued rechecks now reference the generation in which they’ll be // rechecked rather than the generation during which they were enqueued. +// 4: Use “changeReader” instead of “changeStream” collection name. -const verifierMetadataVersion = 3 +const verifierMetadataVersion = 4 diff --git a/internal/verifier/migration_verifier.go b/internal/verifier/migration_verifier.go index c7c1102b..9e75bf18 100644 --- a/internal/verifier/migration_verifier.go +++ b/internal/verifier/migration_verifier.go @@ -132,8 +132,8 @@ type Verifier struct { mux sync.RWMutex - srcChangeStreamReader *ChangeStreamReader - dstChangeStreamReader *ChangeStreamReader + srcChangeReader changeReader + dstChangeReader changeReader readConcernSetting ReadConcernSetting @@ -188,7 +188,7 @@ func NewVerifier(settings VerifierSettings, logPath string) *Verifier { readConcernSetting: readConcern, // This will get recreated once gen0 starts, but we want it - // here in case the change streams gets an event before then. + // here in case the change readers get an event before then. srcEventRecorder: NewEventRecorder(), dstEventRecorder: NewEventRecorder(), @@ -269,23 +269,23 @@ func (verifier *Verifier) WritesOff(ctx context.Context) error { return err } - // This has to happen outside the lock because the change streams + // This has to happen outside the lock because the change readers // might be inserting docs into the recheck queue, which happens // under the lock. select { - case <-verifier.srcChangeStreamReader.readerError.Ready(): - err := verifier.srcChangeStreamReader.readerError.Get() - return errors.Wrapf(err, "tried to send writes-off timestamp to %s, but change stream already failed", verifier.srcChangeStreamReader) + case <-verifier.srcChangeReader.getError().Ready(): + err := verifier.srcChangeReader.getError().Get() + return errors.Wrapf(err, "tried to send writes-off timestamp to %s, but change reader already failed", verifier.srcChangeReader) default: - verifier.srcChangeStreamReader.writesOffTs.Set(srcFinalTs) + verifier.srcChangeReader.setWritesOff(srcFinalTs) } select { - case <-verifier.dstChangeStreamReader.readerError.Ready(): - err := verifier.dstChangeStreamReader.readerError.Get() - return errors.Wrapf(err, "tried to send writes-off timestamp to %s, but change stream already failed", verifier.dstChangeStreamReader) + case <-verifier.dstChangeReader.getError().Ready(): + err := verifier.dstChangeReader.getError().Get() + return errors.Wrapf(err, "tried to send writes-off timestamp to %s, but change reader already failed", verifier.dstChangeReader) default: - verifier.dstChangeStreamReader.writesOffTs.Set(dstFinalTs) + verifier.dstChangeReader.setWritesOff(dstFinalTs) } return nil diff --git a/internal/verifier/migration_verifier_test.go b/internal/verifier/migration_verifier_test.go index caa2b714..8159c19c 100644 --- a/internal/verifier/migration_verifier_test.go +++ b/internal/verifier/migration_verifier_test.go @@ -681,7 +681,7 @@ func (suite *IntegrationTestSuite) TestGetPersistedNamespaceStatistics_Recheck() ctx := suite.Context() verifier := suite.BuildVerifier() - err := verifier.HandleChangeStreamEvents( + err := verifier.PersistChangeEvents( ctx, changeEventBatch{ events: []ParsedEvent{{ @@ -697,7 +697,7 @@ func (suite *IntegrationTestSuite) TestGetPersistedNamespaceStatistics_Recheck() ) suite.Require().NoError(err) - err = verifier.HandleChangeStreamEvents( + err = verifier.PersistChangeEvents( ctx, changeEventBatch{ events: []ParsedEvent{{ @@ -972,25 +972,25 @@ func (suite *IntegrationTestSuite) TestFailedVerificationTaskInsertions() { events: mslices.Of(event), } - err = verifier.HandleChangeStreamEvents(ctx, batch, src) + err = verifier.PersistChangeEvents(ctx, batch, src) suite.Require().NoError(err) event.OpType = "insert" - err = verifier.HandleChangeStreamEvents(ctx, batch, src) + err = verifier.PersistChangeEvents(ctx, batch, src) suite.Require().NoError(err) event.OpType = "replace" - err = verifier.HandleChangeStreamEvents(ctx, batch, src) + err = verifier.PersistChangeEvents(ctx, batch, src) suite.Require().NoError(err) event.OpType = "update" - err = verifier.HandleChangeStreamEvents(ctx, batch, src) + err = verifier.PersistChangeEvents(ctx, batch, src) suite.Require().NoError(err) batch.events[0].OpType = "flibbity" suite.Assert().Panics( func() { - _ = verifier.HandleChangeStreamEvents(ctx, batch, src) + _ = verifier.PersistChangeEvents(ctx, batch, src) }, - "HandleChangeStreamEvents should panic if it gets an unknown optype", + "PersistChangeEvents should panic if it gets an unknown optype", ) verifier.generation++ diff --git a/internal/verifier/recheck_persist.go b/internal/verifier/recheck_persist.go new file mode 100644 index 00000000..564e33a2 --- /dev/null +++ b/internal/verifier/recheck_persist.go @@ -0,0 +1,180 @@ +package verifier + +import ( + "context" + "fmt" + "time" + + "github.com/10gen/migration-verifier/internal/util" + "github.com/pkg/errors" + "go.mongodb.org/mongo-driver/v2/bson" +) + +type changeEventBatch struct { + events []ParsedEvent + resumeToken bson.Raw + clusterTime bson.Timestamp +} + +// RunChangeEventPersistor persists rechecks from change event batches. +// It needs to be started after the reader starts and should run in its own +// goroutine. +func (verifier *Verifier) RunChangeEventPersistor( + ctx context.Context, + reader changeReader, +) error { + clusterName := reader.getWhichCluster() + persistCallback := reader.persistResumeToken + in := reader.getReadChannel() + + var err error + + var lastPersistedTime time.Time + persistResumeTokenIfNeeded := func(ctx context.Context, token bson.Raw) { + if time.Since(lastPersistedTime) >= minChangeStreamPersistInterval { + persistErr := persistCallback(ctx, token) + if persistErr != nil { + verifier.logger.Warn(). + Str("changeReader", string(clusterName)). + Err(persistErr). + Msg("Failed to persist resume token. Because of this, if the verifier restarts, it will have to re-process already-handled change events. This error may be transient, but if it recurs, investigate.") + } else { + lastPersistedTime = time.Now() + } + } + } + +HandlerLoop: + for err == nil { + select { + case <-ctx.Done(): + err = util.WrapCtxErrWithCause(ctx) + + verifier.logger.Debug(). + Err(err). + Str("changeReader", string(clusterName)). + Msg("Change event handler failed.") + case batch, more := <-in: + if !more { + verifier.logger.Debug(). + Str("changeReader", string(clusterName)). + Msg("Change event batch channel has been closed.") + + break HandlerLoop + } + + verifier.logger.Trace(). + Str("changeReader", string(clusterName)). + Int("batchSize", len(batch.events)). + Any("batch", batch). + Msg("Handling change event batch.") + + err = errors.Wrap( + verifier.PersistChangeEvents(ctx, batch, clusterName), + "failed to handle change stream events", + ) + + if err == nil && batch.resumeToken != nil { + persistResumeTokenIfNeeded(ctx, batch.resumeToken) + } + } + } + + return err +} + +// PersistChangeEvents performs the necessary work for change events after receiving a batch. +func (verifier *Verifier) PersistChangeEvents(ctx context.Context, batch changeEventBatch, eventOrigin whichCluster) error { + if len(batch.events) == 0 { + return nil + } + + dbNames := make([]string, len(batch.events)) + collNames := make([]string, len(batch.events)) + docIDs := make([]bson.RawValue, len(batch.events)) + dataSizes := make([]int32, len(batch.events)) + + latestTimestamp := bson.Timestamp{} + + for i, changeEvent := range batch.events { + if !supportedEventOpTypes.Contains(changeEvent.OpType) { + panic(fmt.Sprintf("Unsupported optype in event; should have failed already! event=%+v", changeEvent)) + } + + if changeEvent.ClusterTime == nil { + verifier.logger.Warn(). + Any("event", changeEvent). + Msg("Change event unexpectedly lacks a clusterTime?!?") + } else if changeEvent.ClusterTime.After(latestTimestamp) { + latestTimestamp = *changeEvent.ClusterTime + } + + var srcDBName, srcCollName string + + var eventRecorder EventRecorder + + // Recheck Docs are keyed by source namespaces. + // We need to retrieve the source namespaces if change events are from the destination. + switch eventOrigin { + case dst: + eventRecorder = *verifier.dstEventRecorder + + if verifier.nsMap.Len() == 0 { + // Namespace is not remapped. Source namespace is the same as the destination. + srcDBName = changeEvent.Ns.DB + srcCollName = changeEvent.Ns.Coll + } else { + dstNs := fmt.Sprintf("%s.%s", changeEvent.Ns.DB, changeEvent.Ns.Coll) + srcNs, exist := verifier.nsMap.GetSrcNamespace(dstNs) + if !exist { + return errors.Errorf("no source namespace corresponding to the destination namepsace %s", dstNs) + } + srcDBName, srcCollName = SplitNamespace(srcNs) + } + case src: + eventRecorder = *verifier.srcEventRecorder + + srcDBName = changeEvent.Ns.DB + srcCollName = changeEvent.Ns.Coll + default: + panic(fmt.Sprintf("unknown event origin: %s", eventOrigin)) + } + + dbNames[i] = srcDBName + collNames[i] = srcCollName + docIDs[i] = changeEvent.DocID + + if changeEvent.FullDocLen.OrZero() > 0 { + dataSizes[i] = int32(changeEvent.FullDocLen.OrZero()) + } else if changeEvent.FullDocument == nil { + // This happens for deletes and for some updates. + // The document is probably, but not necessarily, deleted. + dataSizes[i] = fauxDocSizeForDeleteEvents + } else { + // This happens for inserts, replaces, and most updates. + dataSizes[i] = int32(len(changeEvent.FullDocument)) + } + + if err := eventRecorder.AddEvent(&changeEvent); err != nil { + return errors.Wrapf( + err, + "failed to augment stats with %s change event (%+v)", + eventOrigin, + changeEvent, + ) + } + } + + latestTimestampTime := time.Unix(int64(latestTimestamp.T), 0) + lag := time.Unix(int64(batch.clusterTime.T), 0).Sub(latestTimestampTime) + + verifier.logger.Trace(). + Str("origin", string(eventOrigin)). + Int("count", len(docIDs)). + Any("latestTimestamp", latestTimestamp). + Time("latestTimestampTime", latestTimestampTime). + Stringer("lag", lag). + Msg("Persisting rechecks for change events.") + + return verifier.insertRecheckDocs(ctx, dbNames, collNames, docIDs, dataSizes) +} diff --git a/internal/verifier/recheck_test.go b/internal/verifier/recheck_test.go index 1a213dba..0da76289 100644 --- a/internal/verifier/recheck_test.go +++ b/internal/verifier/recheck_test.go @@ -62,7 +62,7 @@ func (suite *IntegrationTestSuite) TestFailedCompareThenReplace() { }, } - err := verifier.HandleChangeStreamEvents( + err := verifier.PersistChangeEvents( ctx, changeEventBatch{events: mslices.Of(event)}, src, diff --git a/internal/verifier/summary.go b/internal/verifier/summary.go index f04607da..d204ed88 100644 --- a/internal/verifier/summary.go +++ b/internal/verifier/summary.go @@ -557,10 +557,10 @@ func (verifier *Verifier) printChangeEventStatistics(builder io.Writer) { for _, cluster := range []struct { title string eventRecorder *EventRecorder - csReader *ChangeStreamReader + csReader changeReader }{ - {"Source", verifier.srcEventRecorder, verifier.srcChangeStreamReader}, - {"Destination", verifier.dstEventRecorder, verifier.dstChangeStreamReader}, + {"Source", verifier.srcEventRecorder, verifier.srcChangeReader}, + {"Destination", verifier.dstEventRecorder, verifier.dstChangeReader}, } { nsStats := cluster.eventRecorder.Read() @@ -584,16 +584,16 @@ func (verifier *Verifier) printChangeEventStatistics(builder io.Writer) { fmt.Fprintf(builder, "%s change events this generation: %s\n", cluster.title, eventsDescr) - if eventsPerSec, has := cluster.csReader.GetEventsPerSecond().Get(); has { + if eventsPerSec, has := cluster.csReader.getEventsPerSecond().Get(); has { var lagNote string - lag, hasLag := cluster.csReader.GetLag().Get() + lag, hasLag := cluster.csReader.getLag().Get() if hasLag { lagNote = fmt.Sprintf("lag: %s; ", reportutils.DurationToHMS(lag)) } - saturation := cluster.csReader.GetSaturation() + saturation := cluster.csReader.getBufferSaturation() fmt.Fprintf( builder, @@ -619,13 +619,13 @@ func (verifier *Verifier) printChangeEventStatistics(builder io.Writer) { } } - if cluster.csReader == verifier.srcChangeStreamReader { + if cluster.csReader == verifier.srcChangeReader { fmt.Fprint(builder, "\n") } // We only print event breakdowns for the source because we assume that // events on the destination will largely mirror the source’s. - if totalEvents > 0 && cluster.csReader == verifier.srcChangeStreamReader { + if totalEvents > 0 && cluster.csReader == verifier.srcChangeReader { reverseSortedNamespaces := maps.Keys(nsTotals) sort.Slice( reverseSortedNamespaces,