From 90f7502069ab51126d35c35ad2697a30c5d5cb65 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Wed, 4 Dec 2024 10:55:21 -0500 Subject: [PATCH 01/28] recreate --- internal/verifier/change_stream.go | 47 ++++++++++++++----- internal/verifier/summary.go | 74 ++++++++++++++++++------------ msync/typed_atomic.go | 50 ++++++++++++++++++++ msync/typed_atomic_test.go | 59 ++++++++++++++++++++++++ 4 files changed, 189 insertions(+), 41 deletions(-) create mode 100644 msync/typed_atomic.go create mode 100644 msync/typed_atomic_test.go diff --git a/internal/verifier/change_stream.go b/internal/verifier/change_stream.go index e48a22d5..1b57f219 100644 --- a/internal/verifier/change_stream.go +++ b/internal/verifier/change_stream.go @@ -9,6 +9,8 @@ import ( "github.com/10gen/migration-verifier/internal/logger" "github.com/10gen/migration-verifier/internal/retry" "github.com/10gen/migration-verifier/internal/util" + "github.com/10gen/migration-verifier/msync" + "github.com/10gen/migration-verifier/option" "github.com/pkg/errors" "github.com/rs/zerolog" "github.com/samber/mo" @@ -71,6 +73,8 @@ type ChangeStreamReader struct { doneChan chan struct{} startAtTs *primitive.Timestamp + + lag *msync.TypedAtomic[option.Option[time.Duration]] } func (verifier *Verifier) initializeChangeStreamReaders() { @@ -86,6 +90,7 @@ func (verifier *Verifier) initializeChangeStreamReaders() { writesOffTsChan: make(chan primitive.Timestamp), errChan: make(chan error), doneChan: make(chan struct{}), + lag: msync.NewTypedAtomic(option.None[time.Duration]()), } verifier.dstChangeStreamReader = &ChangeStreamReader{ readerType: dst, @@ -99,6 +104,7 @@ func (verifier *Verifier) initializeChangeStreamReaders() { writesOffTsChan: make(chan primitive.Timestamp), errChan: make(chan error), doneChan: make(chan struct{}), + lag: msync.NewTypedAtomic(option.None[time.Duration]()), } } @@ -257,6 +263,7 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch( ctx context.Context, ri *retry.FuncInfo, cs *mongo.ChangeStream, + sess mongo.Session, ) error { eventsRead := 0 var changeEventBatch []ParsedEvent @@ -298,6 +305,17 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch( return nil } + var curTs primitive.Timestamp + curTs, err := extractTimestampFromResumeToken(cs.ResumeToken()) + if err == nil { + lagSecs := curTs.T - sess.OperationTime().T + csr.lag.Store(option.Some(time.Second * time.Duration(lagSecs))) + } else { + csr.logger.Warn(). + Err(err). + Msgf("Failed to extract timestamp from %s's resume token to compute change stream lag.", csr) + } + csr.changeEventBatchChan <- changeEventBatch return nil } @@ -306,6 +324,7 @@ func (csr *ChangeStreamReader) iterateChangeStream( ctx context.Context, ri *retry.FuncInfo, cs *mongo.ChangeStream, + sess mongo.Session, ) error { var lastPersistedTime time.Time @@ -363,7 +382,7 @@ func (csr *ChangeStreamReader) iterateChangeStream( break } - err = csr.readAndHandleOneChangeEventBatch(ctx, ri, cs) + err = csr.readAndHandleOneChangeEventBatch(ctx, ri, cs, sess) if err != nil { return err @@ -371,7 +390,7 @@ func (csr *ChangeStreamReader) iterateChangeStream( } default: - err = csr.readAndHandleOneChangeEventBatch(ctx, ri, cs) + err = csr.readAndHandleOneChangeEventBatch(ctx, ri, cs, sess) if err == nil { err = persistResumeTokenIfNeeded() @@ -408,7 +427,7 @@ func (csr *ChangeStreamReader) iterateChangeStream( func (csr *ChangeStreamReader) createChangeStream( ctx context.Context, -) (*mongo.ChangeStream, primitive.Timestamp, error) { +) (*mongo.ChangeStream, mongo.Session, primitive.Timestamp, error) { pipeline := csr.GetChangeStreamFilter() opts := options.ChangeStream(). SetMaxAwaitTime(1 * time.Second). @@ -420,7 +439,7 @@ func (csr *ChangeStreamReader) createChangeStream( savedResumeToken, err := csr.loadChangeStreamResumeToken(ctx) if err != nil { - return nil, primitive.Timestamp{}, errors.Wrap(err, "failed to load persisted change stream resume token") + return nil, nil, primitive.Timestamp{}, errors.Wrap(err, "failed to load persisted change stream resume token") } csStartLogEvent := csr.logger.Info() @@ -447,22 +466,22 @@ func (csr *ChangeStreamReader) createChangeStream( sess, err := csr.watcherClient.StartSession() if err != nil { - return nil, primitive.Timestamp{}, errors.Wrap(err, "failed to start session") + return nil, nil, primitive.Timestamp{}, errors.Wrap(err, "failed to start session") } sctx := mongo.NewSessionContext(ctx, sess) changeStream, err := csr.watcherClient.Watch(sctx, pipeline, opts) if err != nil { - return nil, primitive.Timestamp{}, errors.Wrap(err, "failed to open change stream") + return nil, nil, primitive.Timestamp{}, errors.Wrap(err, "failed to open change stream") } err = csr.persistChangeStreamResumeToken(ctx, changeStream) if err != nil { - return nil, primitive.Timestamp{}, err + return nil, nil, primitive.Timestamp{}, err } startTs, err := extractTimestampFromResumeToken(changeStream.ResumeToken()) if err != nil { - return nil, primitive.Timestamp{}, errors.Wrap(err, "failed to extract timestamp from change stream's resume token") + return nil, nil, primitive.Timestamp{}, errors.Wrap(err, "failed to extract timestamp from change stream's resume token") } // With sharded clusters the resume token might lead the cluster time @@ -470,14 +489,14 @@ func (csr *ChangeStreamReader) createChangeStream( // otherwise we will get errors. clusterTime, err := getClusterTimeFromSession(sess) if err != nil { - return nil, primitive.Timestamp{}, errors.Wrap(err, "failed to read cluster time from session") + return nil, nil, primitive.Timestamp{}, errors.Wrap(err, "failed to read cluster time from session") } if startTs.After(clusterTime) { startTs = clusterTime } - return changeStream, startTs, nil + return changeStream, sess, startTs, nil } // StartChangeStream starts the change stream. @@ -502,7 +521,7 @@ func (csr *ChangeStreamReader) StartChangeStream(ctx context.Context) error { ctx, csr.logger, func(ctx context.Context, ri *retry.FuncInfo) error { - changeStream, startTs, err := csr.createChangeStream(ctx) + changeStream, sess, startTs, err := csr.createChangeStream(ctx) if err != nil { if parentThreadWaiting { initialCreateResultChan <- mo.Err[primitive.Timestamp](err) @@ -520,7 +539,7 @@ func (csr *ChangeStreamReader) StartChangeStream(ctx context.Context) error { parentThreadWaiting = false } - return csr.iterateChangeStream(ctx, ri, changeStream) + return csr.iterateChangeStream(ctx, ri, changeStream, sess) }, ) @@ -546,6 +565,10 @@ func (csr *ChangeStreamReader) StartChangeStream(ctx context.Context) error { return nil } +func (csr *ChangeStreamReader) GetLag() option.Option[time.Duration] { + return csr.lag.Load() +} + func addTimestampToLogEvent(ts primitive.Timestamp, event *zerolog.Event) *zerolog.Event { return event. Interface("timestamp", ts). diff --git a/internal/verifier/summary.go b/internal/verifier/summary.go index 3d71f5e9..65084c6a 100644 --- a/internal/verifier/summary.go +++ b/internal/verifier/summary.go @@ -399,43 +399,59 @@ func (verifier *Verifier) printChangeEventStatistics(builder *strings.Builder) { builder.WriteString(fmt.Sprintf("\nChange events this generation: %s\n", eventsDescr)) - if totalEvents == 0 { - return - } + if totalEvents > 0 { - reverseSortedNamespaces := maps.Keys(nsTotals) - sort.Slice( - reverseSortedNamespaces, - func(i, j int) bool { - return nsTotals[reverseSortedNamespaces[i]] > nsTotals[reverseSortedNamespaces[j]] - }, - ) + reverseSortedNamespaces := maps.Keys(nsTotals) + sort.Slice( + reverseSortedNamespaces, + func(i, j int) bool { + return nsTotals[reverseSortedNamespaces[i]] > nsTotals[reverseSortedNamespaces[j]] + }, + ) - // Only report the busiest namespaces. - if len(reverseSortedNamespaces) > changeEventsTableMaxSize { - reverseSortedNamespaces = reverseSortedNamespaces[:changeEventsTableMaxSize] - } + // Only report the busiest namespaces. + if len(reverseSortedNamespaces) > changeEventsTableMaxSize { + reverseSortedNamespaces = reverseSortedNamespaces[:changeEventsTableMaxSize] + } - table := tablewriter.NewWriter(builder) - table.SetHeader([]string{"Namespace", "Insert", "Update", "Replace", "Delete", "Total"}) + table := tablewriter.NewWriter(builder) + table.SetHeader([]string{"Namespace", "Insert", "Update", "Replace", "Delete", "Total"}) + + for _, ns := range reverseSortedNamespaces { + curNsStats := nsStats[ns] + + table.Append( + append( + []string{ns}, + strconv.Itoa(curNsStats.Insert), + strconv.Itoa(curNsStats.Update), + strconv.Itoa(curNsStats.Replace), + strconv.Itoa(curNsStats.Delete), + strconv.Itoa(curNsStats.Total()), + ), + ) + } - for _, ns := range reverseSortedNamespaces { - curNsStats := nsStats[ns] + builder.WriteString("\nMost frequently-changing namespaces:\n") + table.Render() + } - table.Append( - append( - []string{ns}, - strconv.Itoa(curNsStats.Insert), - strconv.Itoa(curNsStats.Update), - strconv.Itoa(curNsStats.Replace), - strconv.Itoa(curNsStats.Delete), - strconv.Itoa(curNsStats.Total()), - ), + srcLag, hasSrcLag := verifier.srcChangeStreamReader.GetLag().Get() + if hasSrcLag { + builder.WriteString( + fmt.Sprintf("\nSource change stream lag: %s\n", reportutils.DurationToHMS(srcLag)), ) } - builder.WriteString("\nMost frequently-changing namespaces:\n") - table.Render() + dstLag, hasDstLag := verifier.dstChangeStreamReader.GetLag().Get() + if hasDstLag { + if !hasSrcLag { + builder.WriteString("\n") + } + builder.WriteString( + fmt.Sprintf("Destination change stream lag: %s\n", reportutils.DurationToHMS(dstLag)), + ) + } } func (verifier *Verifier) printWorkerStatus(builder *strings.Builder) { diff --git a/msync/typed_atomic.go b/msync/typed_atomic.go new file mode 100644 index 00000000..219fa34f --- /dev/null +++ b/msync/typed_atomic.go @@ -0,0 +1,50 @@ +package msync + +import "sync/atomic" + +// TypedAtomic is a type-safe wrapper around the standard-library atomic.Value. +// TypedAtomic serves largely the same purpose as atomic.Pointer but stores +// the value itself rather than a pointer to it. This is often more ergonomic +// than an atomic.Pointer: it can be used to store constants directly (where +// taking a pointer is inconvenient), and it defaults to the type's zero value +// rather than a nil pointer. +type TypedAtomic[T any] struct { + v atomic.Value +} + +// NewTypedAtomic returns a new TypedAtomic, initialized to val. +func NewTypedAtomic[T any](val T) *TypedAtomic[T] { + var v atomic.Value + v.Store(val) + return &TypedAtomic[T]{v} +} + +// Load returns the value set by the most recent Store. It returns the zero +// value for the type if there has been no call to Store. +func (ta *TypedAtomic[T]) Load() T { + return orZero[T](ta.v.Load()) +} + +// Store sets the value TypedAtomic to val. Store(nil) panics. +func (ta *TypedAtomic[T]) Store(val T) { + ta.v.Store(val) +} + +// Swap stores newVal into the TypedAtomic and returns the previous value. It +// returns the zero value for the type if the value is empty. +func (ta *TypedAtomic[T]) Swap(newVal T) T { + return orZero[T](ta.v.Swap(newVal)) +} + +// CompareAndSwap executes the compare-and-swap operation for the TypedAtomic. +func (ta *TypedAtomic[T]) CompareAndSwap(oldVal, newVal T) bool { + return ta.v.CompareAndSwap(oldVal, newVal) +} + +func orZero[T any](val any) T { + if val == nil { + return *new(T) + } + + return val.(T) +} diff --git a/msync/typed_atomic_test.go b/msync/typed_atomic_test.go new file mode 100644 index 00000000..d4789137 --- /dev/null +++ b/msync/typed_atomic_test.go @@ -0,0 +1,59 @@ +package msync + +import ( + "sync" +) + +func (s *unitTestSuite) TestTypedAtomic() { + ta := NewTypedAtomic(42) + + s.Require().Equal(42, ta.Load()) + s.Require().False(ta.CompareAndSwap(17, 99)) + s.Require().True(ta.CompareAndSwap(42, 99)) + s.Require().Equal(99, ta.Load()) + s.Require().Equal(99, ta.Swap(42)) + s.Require().Equal(42, ta.Load()) + + ta.Store(17) + s.Require().Equal(17, ta.Load()) + + // This block is for race detection under -race. + var wg sync.WaitGroup + for i := range 100 { + wg.Add(1) + go func() { + defer wg.Done() + ta.Load() + ta.Store(i) + }() + } + wg.Wait() +} + +func (s *unitTestSuite) TestAtomicZeroValues() { + s.Run("string", func() { + var ta TypedAtomic[string] + s.Require().Equal("", ta.Load()) + s.Require().Equal("", ta.Swap("foo")) + s.Require().Equal("foo", ta.Load()) + }) + + s.Run("int", func() { + var ta TypedAtomic[int] + s.Require().Equal(0, ta.Load()) + s.Require().Equal(0, ta.Swap(42)) + s.Require().Equal(42, ta.Load()) + }) + + s.Run("arbitrary data", func() { + type data struct { + I int + S string + } + + var ta TypedAtomic[data] + s.Require().Equal(data{}, ta.Load()) + s.Require().Equal(data{}, ta.Swap(data{76, "trombones"})) + s.Require().Equal(data{76, "trombones"}, ta.Load()) + }) +} From aea6160e1f3cac8b28aaa5d1ff5bdc5e747b6ca9 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Thu, 5 Dec 2024 10:43:46 -0500 Subject: [PATCH 02/28] revamp retryer --- internal/partitions/partitions.go | 261 ++++++++++++------------ internal/retry/retry.go | 150 +++++++++----- internal/retry/retry_info.go | 17 +- internal/retry/retryer.go | 50 ++++- internal/retry/retryer_test.go | 82 ++++---- internal/uuidutil/get_uuid.go | 15 +- internal/verifier/change_stream.go | 10 +- internal/verifier/check.go | 22 +- internal/verifier/clustertime.go | 16 +- internal/verifier/compare.go | 19 +- internal/verifier/migration_verifier.go | 14 +- internal/verifier/mongos_refresh.go | 22 +- internal/verifier/recheck.go | 18 +- internal/verifier/verification_task.go | 62 +++--- msync/typed_atomic.go | 50 +++++ msync/typed_atomic_test.go | 59 ++++++ 16 files changed, 539 insertions(+), 328 deletions(-) create mode 100644 msync/typed_atomic.go create mode 100644 msync/typed_atomic_test.go diff --git a/internal/partitions/partitions.go b/internal/partitions/partitions.go index 01419808..022a54e4 100644 --- a/internal/partitions/partitions.go +++ b/internal/partitions/partitions.go @@ -119,7 +119,6 @@ const ( func PartitionCollectionWithSize( ctx context.Context, uuidEntry *uuidutil.NamespaceAndUUID, - retryer *retry.Retryer, srcClient *mongo.Client, replicatorList []Replicator, subLogger *logger.Logger, @@ -137,7 +136,6 @@ func PartitionCollectionWithSize( partitions, docCount, byteCount, err := PartitionCollectionWithParameters( ctx, uuidEntry, - retryer, srcClient, replicatorList, defaultSampleRate, @@ -153,7 +151,6 @@ func PartitionCollectionWithSize( return PartitionCollectionWithParameters( ctx, uuidEntry, - retryer, srcClient, replicatorList, defaultSampleRate, @@ -174,7 +171,6 @@ func PartitionCollectionWithSize( func PartitionCollectionWithParameters( ctx context.Context, uuidEntry *uuidutil.NamespaceAndUUID, - retryer *retry.Retryer, srcClient *mongo.Client, replicatorList []Replicator, sampleRate float64, @@ -191,13 +187,13 @@ func PartitionCollectionWithParameters( // Get the collection's size in bytes and its document count. It is okay if these return zero since there might still be // items in the collection. Rely on getOuterIDBound to do a majority read to determine if we continue processing the collection. - collSizeInBytes, collDocCount, isCapped, err := GetSizeAndDocumentCount(ctx, subLogger, retryer, srcColl) + collSizeInBytes, collDocCount, isCapped, err := GetSizeAndDocumentCount(ctx, subLogger, srcColl) if err != nil { return nil, 0, 0, err } // The lower bound for the collection. There is no partitioning to do if the bound is nil. - minIDBound, err := getOuterIDBound(ctx, subLogger, retryer, minBound, srcDB, uuidEntry.CollName, globalFilter) + minIDBound, err := getOuterIDBound(ctx, subLogger, minBound, srcDB, uuidEntry.CollName, globalFilter) if err != nil { return nil, 0, 0, err } @@ -210,7 +206,7 @@ func PartitionCollectionWithParameters( } // The upper bound for the collection. There is no partitioning to do if the bound is nil. - maxIDBound, err := getOuterIDBound(ctx, subLogger, retryer, maxBound, srcDB, uuidEntry.CollName, globalFilter) + maxIDBound, err := getOuterIDBound(ctx, subLogger, maxBound, srcDB, uuidEntry.CollName, globalFilter) if err != nil { return nil, 0, 0, err } @@ -232,7 +228,7 @@ func PartitionCollectionWithParameters( // If a filter is used for partitioning, number of partitions is calculated with the ratio of filtered documents. if len(globalFilter) > 0 { - numFilteredDocs, filteredCntErr := GetDocumentCountAfterFiltering(ctx, subLogger, retryer, srcColl, globalFilter) + numFilteredDocs, filteredCntErr := GetDocumentCountAfterFiltering(ctx, subLogger, srcColl, globalFilter) if filteredCntErr == nil { numPartitions = getNumPartitions(collSizeInBytes, partitionSizeInBytes, float64(numFilteredDocs)/float64(collDocCount)) } else { @@ -251,7 +247,6 @@ func PartitionCollectionWithParameters( midIDBounds, collDropped, err := getMidIDBounds( ctx, subLogger, - retryer, srcDB, uuidEntry.CollName, collDocCount, @@ -314,7 +309,7 @@ func PartitionCollectionWithParameters( // capped status, in that order. // // Exported for usage in integration tests. -func GetSizeAndDocumentCount(ctx context.Context, logger *logger.Logger, retryer *retry.Retryer, srcColl *mongo.Collection) (int64, int64, bool, error) { +func GetSizeAndDocumentCount(ctx context.Context, logger *logger.Logger, srcColl *mongo.Collection) (int64, int64, bool, error) { srcDB := srcColl.Database() collName := srcColl.Name() @@ -324,39 +319,43 @@ func GetSizeAndDocumentCount(ctx context.Context, logger *logger.Logger, retryer Capped bool `bson:"capped"` }{} - err := retryer.Run(ctx, logger, func(ctx context.Context, ri *retry.FuncInfo) error { - ri.Log(logger.Logger, "collStats", "source", srcDB.Name(), collName, "Retrieving collection size and document count.") - request := bson.D{ - {"aggregate", collName}, - {"pipeline", mongo.Pipeline{ - bson.D{{"$collStats", bson.D{ - {"storageStats", bson.E{"scale", 1}}, - }}}, - // The "$group" here behaves as a project and rename when there's only one - // document (non-sharded case). When there are multiple documents (one for - // each shard) it correctly sums the counts and sizes from each shard. - bson.D{{"$group", bson.D{ - {"_id", "ns"}, - {"count", bson.D{{"$sum", "$storageStats.count"}}}, - {"size", bson.D{{"$sum", "$storageStats.size"}}}, - {"capped", bson.D{{"$first", "$capped"}}}}}}, - }}, - {"cursor", bson.D{}}, - } + err := retry.New().WithCallback( + func(ctx context.Context, ri *retry.FuncInfo) error { + ri.Log(logger.Logger, "collStats", "source", srcDB.Name(), collName, "Retrieving collection size and document count.") + request := bson.D{ + {"aggregate", collName}, + {"pipeline", mongo.Pipeline{ + bson.D{{"$collStats", bson.D{ + {"storageStats", bson.E{"scale", 1}}, + }}}, + // The "$group" here behaves as a project and rename when there's only one + // document (non-sharded case). When there are multiple documents (one for + // each shard) it correctly sums the counts and sizes from each shard. + bson.D{{"$group", bson.D{ + {"_id", "ns"}, + {"count", bson.D{{"$sum", "$storageStats.count"}}}, + {"size", bson.D{{"$sum", "$storageStats.size"}}}, + {"capped", bson.D{{"$first", "$capped"}}}}}}, + }}, + {"cursor", bson.D{}}, + } - cursor, driverErr := srcDB.RunCommandCursor(ctx, request) - if driverErr != nil { - return driverErr - } + cursor, driverErr := srcDB.RunCommandCursor(ctx, request) + if driverErr != nil { + return driverErr + } - defer cursor.Close(ctx) - if cursor.Next(ctx) { - if err := cursor.Decode(&value); err != nil { - return errors.Wrapf(err, "failed to decode $collStats response for source namespace %s.%s", srcDB.Name(), collName) + defer cursor.Close(ctx) + if cursor.Next(ctx) { + if err := cursor.Decode(&value); err != nil { + return errors.Wrapf(err, "failed to decode $collStats response for source namespace %s.%s", srcDB.Name(), collName) + } } - } - return nil - }) + return nil + }, + "retrieving %#q's statistics", + srcDB.Name()+"."+collName, + ).Run(ctx, logger) // TODO (REP-960): remove this check. // If we get NamespaceNotFoundError then return 0,0 since we won't do any partitioning with those returns @@ -380,7 +379,7 @@ func GetSizeAndDocumentCount(ctx context.Context, logger *logger.Logger, retryer // // This function could take a long time, especially if the collection does not have an index // on the filtered fields. -func GetDocumentCountAfterFiltering(ctx context.Context, logger *logger.Logger, retryer *retry.Retryer, srcColl *mongo.Collection, filter map[string]any) (int64, error) { +func GetDocumentCountAfterFiltering(ctx context.Context, logger *logger.Logger, srcColl *mongo.Collection, filter map[string]any) (int64, error) { srcDB := srcColl.Database() collName := srcColl.Name() @@ -395,27 +394,31 @@ func GetDocumentCountAfterFiltering(ctx context.Context, logger *logger.Logger, } pipeline = append(pipeline, bson.D{{"$count", "numFilteredDocs"}}) - err := retryer.Run(ctx, logger, func(ctx context.Context, ri *retry.FuncInfo) error { - ri.Log(logger.Logger, "count", "source", srcDB.Name(), collName, "Counting filtered documents.") - request := bson.D{ - {"aggregate", collName}, - {"pipeline", pipeline}, - {"cursor", bson.D{}}, - } + err := retry.New().WithCallback( + func(ctx context.Context, ri *retry.FuncInfo) error { + ri.Log(logger.Logger, "count", "source", srcDB.Name(), collName, "Counting filtered documents.") + request := bson.D{ + {"aggregate", collName}, + {"pipeline", pipeline}, + {"cursor", bson.D{}}, + } - cursor, driverErr := srcDB.RunCommandCursor(ctx, request) - if driverErr != nil { - return driverErr - } + cursor, driverErr := srcDB.RunCommandCursor(ctx, request) + if driverErr != nil { + return driverErr + } - defer cursor.Close(ctx) - if cursor.Next(ctx) { - if err := cursor.Decode(&value); err != nil { - return errors.Wrapf(err, "failed to decode $count response (%+v) for source namespace %s.%s after filter (%+v)", cursor.Current, srcDB.Name(), collName, filter) + defer cursor.Close(ctx) + if cursor.Next(ctx) { + if err := cursor.Decode(&value); err != nil { + return errors.Wrapf(err, "failed to decode $count response (%+v) for source namespace %s.%s after filter (%+v)", cursor.Current, srcDB.Name(), collName, filter) + } } - } - return nil - }) + return nil + }, + "counting %#q's filtered documents", + srcDB.Name()+"."+collName, + ).Run(ctx, logger) // TODO (REP-960): remove this check. // If we get NamespaceNotFoundError then return 0 since we won't do any partitioning with those returns @@ -458,7 +461,6 @@ func getNumPartitions(collSizeInBytes, partitionSizeInBytes int64, filteredRatio func getOuterIDBound( ctx context.Context, subLogger *logger.Logger, - retryer *retry.Retryer, minOrMaxBound minOrMaxBound, srcDB *mongo.Database, collName string, @@ -488,30 +490,35 @@ func getOuterIDBound( }...) // Get one document containing only the smallest or largest _id value in the collection. - err := retryer.Run(ctx, subLogger, func(ctx context.Context, ri *retry.FuncInfo) error { - ri.Log(subLogger.Logger, "aggregate", "source", srcDB.Name(), collName, fmt.Sprintf("getting %s _id partition bound", minOrMaxBound)) - cursor, cmdErr := - srcDB.RunCommandCursor(ctx, bson.D{ - {"aggregate", collName}, - {"pipeline", pipeline}, - {"hint", bson.D{{"_id", 1}}}, - {"cursor", bson.D{}}, - }) - - if cmdErr != nil { - return cmdErr - } + err := retry.New().WithCallback( + func(ctx context.Context, ri *retry.FuncInfo) error { + ri.Log(subLogger.Logger, "aggregate", "source", srcDB.Name(), collName, fmt.Sprintf("getting %s _id partition bound", minOrMaxBound)) + cursor, cmdErr := + srcDB.RunCommandCursor(ctx, bson.D{ + {"aggregate", collName}, + {"pipeline", pipeline}, + {"hint", bson.D{{"_id", 1}}}, + {"cursor", bson.D{}}, + }) + + if cmdErr != nil { + return cmdErr + } - // If we don't have at least one document, the collection is either empty or was dropped. - defer cursor.Close(ctx) - if !cursor.Next(ctx) { - return nil - } + // If we don't have at least one document, the collection is either empty or was dropped. + defer cursor.Close(ctx) + if !cursor.Next(ctx) { + return nil + } - // Return the _id value from that document. - docID, cmdErr = cursor.Current.LookupErr("_id") - return cmdErr - }) + // Return the _id value from that document. + docID, cmdErr = cursor.Current.LookupErr("_id") + return cmdErr + }, + "finding %#q's %s _id", + srcDB.Name()+"."+collName, + minOrMaxBound, + ).Run(ctx, subLogger) if err != nil { return nil, errors.Wrapf(err, "could not get %s _id bound for source collection '%s.%s'", minOrMaxBound, srcDB.Name(), collName) @@ -528,7 +535,6 @@ func getOuterIDBound( func getMidIDBounds( ctx context.Context, logger *logger.Logger, - retryer *retry.Retryer, srcDB *mongo.Database, collName string, collDocCount int64, @@ -576,48 +582,53 @@ func getMidIDBounds( // Get a cursor for the $sample and $bucketAuto aggregation. var midIDBounds []interface{} - agRetryer := retryer.WithErrorCodes(util.SampleTooManyDuplicates) - err := agRetryer.Run(ctx, logger, func(ctx context.Context, ri *retry.FuncInfo) error { - ri.Log(logger.Logger, "aggregate", "source", srcDB.Name(), collName, "Retrieving mid _id partition bounds using $sample.") - cursor, cmdErr := - srcDB.RunCommandCursor(ctx, bson.D{ - {"aggregate", collName}, - {"pipeline", pipeline}, - {"allowDiskUse", true}, - {"cursor", bson.D{}}, - }) - - if cmdErr != nil { - return errors.Wrapf(cmdErr, "failed to $sample and $bucketAuto documents for source namespace '%s.%s'", srcDB.Name(), collName) - } - - defer cursor.Close(ctx) - - // Iterate through all $bucketAuto documents of the form: - // { - // "_id" : { - // "min" : ... , - // "max" : ... - // }, - // "count" : ... - // } - midIDBounds = make([]interface{}, 0, numPartitions) - for cursor.Next(ctx) { - // Get a mid _id bound using the $bucketAuto document's max value. - bucketAutoDoc := make(bson.Raw, len(cursor.Current)) - copy(bucketAutoDoc, cursor.Current) - bound, err := bucketAutoDoc.LookupErr("_id", "max") - if err != nil { - return errors.Wrapf(err, "failed to lookup '_id.max' key in $bucketAuto document for source namespace '%s.%s'", srcDB.Name(), collName) - } - - // Append the copied bound to the other mid _id bounds. - midIDBounds = append(midIDBounds, bound) - ri.NoteSuccess() - } - - return cursor.Err() - }) + agRetryer := retry.New().WithErrorCodes(util.SampleTooManyDuplicates) + err := agRetryer. + WithCallback( + func(ctx context.Context, ri *retry.FuncInfo) error { + ri.Log(logger.Logger, "aggregate", "source", srcDB.Name(), collName, "Retrieving mid _id partition bounds using $sample.") + cursor, cmdErr := + srcDB.RunCommandCursor(ctx, bson.D{ + {"aggregate", collName}, + {"pipeline", pipeline}, + {"allowDiskUse", true}, + {"cursor", bson.D{}}, + }) + + if cmdErr != nil { + return errors.Wrapf(cmdErr, "failed to $sample and $bucketAuto documents for source namespace '%s.%s'", srcDB.Name(), collName) + } + + defer cursor.Close(ctx) + + // Iterate through all $bucketAuto documents of the form: + // { + // "_id" : { + // "min" : ... , + // "max" : ... + // }, + // "count" : ... + // } + midIDBounds = make([]interface{}, 0, numPartitions) + for cursor.Next(ctx) { + // Get a mid _id bound using the $bucketAuto document's max value. + bucketAutoDoc := make(bson.Raw, len(cursor.Current)) + copy(bucketAutoDoc, cursor.Current) + bound, err := bucketAutoDoc.LookupErr("_id", "max") + if err != nil { + return errors.Wrapf(err, "failed to lookup '_id.max' key in $bucketAuto document for source namespace '%s.%s'", srcDB.Name(), collName) + } + + // Append the copied bound to the other mid _id bounds. + midIDBounds = append(midIDBounds, bound) + ri.NoteSuccess() + } + + return cursor.Err() + }, + "finding %#q's _id partition boundaries", + srcDB.Name()+"."+collName, + ).Run(ctx, logger) if err != nil { return nil, false, errors.Wrapf(err, "encountered a problem in the cursor when trying to $sample and $bucketAuto aggregation for source namespace '%s.%s'", srcDB.Name(), collName) diff --git a/internal/retry/retry.go b/internal/retry/retry.go index a4b9fb18..f4eaea17 100644 --- a/internal/retry/retry.go +++ b/internal/retry/retry.go @@ -2,30 +2,22 @@ package retry import ( "context" - "errors" "fmt" - "math/rand" "time" "github.com/10gen/migration-verifier/internal/logger" + "github.com/10gen/migration-verifier/internal/reportutils" "github.com/10gen/migration-verifier/internal/util" + "github.com/10gen/migration-verifier/mmongo" + "github.com/10gen/migration-verifier/msync" + "github.com/pkg/errors" + "github.com/rs/zerolog" "github.com/samber/lo" "golang.org/x/sync/errgroup" ) type RetryCallback = func(context.Context, *FuncInfo) error -// Retry is a convenience that creates a retryer and executes it. -// See RunForTransientErrorsOnly for argument details. -func Retry( - ctx context.Context, - logger *logger.Logger, - callbacks ...RetryCallback, -) error { - retryer := New(DefaultDurationLimit) - return retryer.Run(ctx, logger, callbacks...) -} - // Run() runs each given callback in parallel. If none of them fail, // then no error is returned. // @@ -54,43 +46,54 @@ func Retry( // // This returns an error if the duration limit is reached, or if f() returns a // non-transient error. -func (r *Retryer) Run( - ctx context.Context, logger *logger.Logger, funcs ...RetryCallback, -) error { - return r.runRetryLoop(ctx, logger, funcs) +func (r *Retryer) Run(ctx context.Context, logger *logger.Logger) error { + return r.runRetryLoop(ctx, logger) } // runRetryLoop contains the core logic for the retry loops. func (r *Retryer) runRetryLoop( ctx context.Context, logger *logger.Logger, - funcs []RetryCallback, ) error { var err error + if len(r.callbacks) == 0 { + return errors.Errorf("retryer (%s) run with no callbacks", r.description) + } + startTime := time.Now() li := &LoopInfo{ durationLimit: r.retryLimit, } funcinfos := lo.RepeatBy( - len(funcs), + len(r.callbacks), func(_ int) *FuncInfo { return &FuncInfo{ - lastResetTime: startTime, - loopInfo: li, + lastResetTime: msync.NewTypedAtomic(startTime), + loopDescription: r.description, + loopInfo: li, } }, ) sleepTime := minSleepTime for { + if li.attemptsSoFar > 0 { + r.addDescriptionToEvent(logger.Info()). + Int("attemptsSoFar", li.attemptsSoFar). + Msg("Retrying after failure.") + } + if beforeFunc, hasBefore := r.before.Get(); hasBefore { beforeFunc() } eg, egCtx := errgroup.WithContext(ctx) - for i, curFunc := range funcs { + + for i, curCbInfo := range r.callbacks { + curFunc := curCbInfo.callback + if curFunc == nil { panic("curFunc should be non-nil") } @@ -99,6 +102,31 @@ func (r *Retryer) runRetryLoop( } eg.Go(func() error { + cbDoneChan := make(chan struct{}) + defer close(cbDoneChan) + + go func() { + ticker := time.NewTicker(time.Minute) + defer ticker.Stop() + + for { + lastSuccessTime := funcinfos[i].lastResetTime.Load() + + select { + case <-cbDoneChan: + return + case <-ticker.C: + if funcinfos[i].lastResetTime.Load() == lastSuccessTime { + logger.Warn(). + Str("callbackDescription", curCbInfo.description). + Time("lastSuccessAt", lastSuccessTime). + Str("elapsedTime", reportutils.DurationToHMS(time.Since(lastSuccessTime))). + Msg("Operation has not reported success for a while.") + } + } + } + }() + err := curFunc(egCtx, funcinfos[i]) if err != nil { @@ -124,19 +152,21 @@ func (r *Retryer) runRetryLoop( panic(fmt.Sprintf("Error should be a %T, not %T: %v", groupErr, err, err)) } + failedFuncInfo := funcinfos[groupErr.funcNum] + // Not a transient error? Fail immediately. - if !r.shouldRetryWithSleep(logger, sleepTime, groupErr.errFromCallback) { + if !r.shouldRetryWithSleep(logger, sleepTime, *failedFuncInfo, groupErr.errFromCallback) { return groupErr.errFromCallback } - li.attemptNumber++ + li.attemptsSoFar++ // Our error is transient. If we've exhausted the allowed time // then fail. - failedFuncInfo := funcinfos[groupErr.funcNum] + if failedFuncInfo.GetDurationSoFar() > li.durationLimit { return RetryDurationLimitExceededErr{ - attempts: li.attemptNumber, + attempts: li.attemptsSoFar, duration: failedFuncInfo.GetDurationSoFar(), lastErr: groupErr.errFromCallback, } @@ -146,7 +176,9 @@ func (r *Retryer) runRetryLoop( // up to maxSleepTime. select { case <-ctx.Done(): - logger.Error().Err(ctx.Err()).Msg("Context was canceled. Aborting retry loop.") + r.addDescriptionToEvent(logger.Error()). + Err(ctx.Err()). + Msg("Context was canceled. Aborting retry loop.") return ctx.Err() case <-time.After(sleepTime): sleepTime *= sleepTimeMultiplier @@ -160,12 +192,27 @@ func (r *Retryer) runRetryLoop( // Set all of the funcs that did *not* fail as having just succeeded. for i, curInfo := range funcinfos { if i != groupErr.funcNum { - curInfo.lastResetTime = now + curInfo.lastResetTime.Store(now) } } } } +func (r *Retryer) addDescriptionToEvent(event *zerolog.Event) *zerolog.Event { + if description, hasDesc := r.description.Get(); hasDesc { + event.Str("description", description) + } else { + event.Strs("description", lo.Map( + r.callbacks, + func(cbInfo retryCallbackInfo, _ int) string { + return cbInfo.description + }, + )) + } + + return event +} + // // For the above function, there have historically been concerns regarding majority write concern // upon retrying a write operation to the server. Mongomirror explicitly handled this: @@ -179,36 +226,41 @@ func (r *Retryer) runRetryLoop( func (r *Retryer) shouldRetryWithSleep( logger *logger.Logger, sleepTime time.Duration, + funcinfo FuncInfo, err error, ) bool { - // Randomly retry approximately 1 in 100 calls to the wrapped - // function. This is only enabled in tests. - if r.retryRandomly && rand.Int()%100 == 0 { - logger.Debug().Msgf("Waiting %s seconds to retry operation because of test code forcing a retry.", sleepTime) - return true - } - if err == nil { - return false + panic("nil error should not get here") } - errCode := util.GetErrorCode(err) - if util.IsTransientError(err) { - logger.Warn().Int("error code", errCode).Err(err).Msgf( - "Waiting %s seconds to retry operation after transient error.", sleepTime) - return true + isTransient := util.IsTransientError(err) || lo.SomeBy( + r.additionalErrorCodes, + func(code int) bool { + return mmongo.ErrorHasCode(err, code) + }, + ) + + event := logger.WithLevel( + lo.Ternary(isTransient, zerolog.InfoLevel, zerolog.WarnLevel), + ) + + if loopDesc, hasLoopDesc := r.description.Get(); hasLoopDesc { + event.Str("operationDescription", loopDesc) } - for _, code := range r.additionalErrorCodes { - if code == errCode { - logger.Warn().Int("error code", errCode).Err(err).Msgf( - "Waiting %s seconds to retry operation after an error because it is in our additional codes list.", sleepTime) - return true - } + event.Str("callbackDescription", funcinfo.description). + Int("error code", util.GetErrorCode(err)). + Err(err) + + if isTransient { + event. + Stringer("delay", sleepTime). + Msg("Pausing before retrying after transient error.") + + return true } - logger.Debug().Err(err).Int("error code", errCode). - Msg("Not retrying on error because it is not transient nor is it in our additional codes list.") + event.Msg("Non-transient error occurred.") return false } diff --git a/internal/retry/retry_info.go b/internal/retry/retry_info.go index 9f96e22a..ea325ac7 100644 --- a/internal/retry/retry_info.go +++ b/internal/retry/retry_info.go @@ -4,6 +4,8 @@ import ( "time" "github.com/10gen/migration-verifier/internal/reportutils" + "github.com/10gen/migration-verifier/msync" + "github.com/10gen/migration-verifier/option" "github.com/rs/zerolog" ) @@ -13,14 +15,15 @@ import ( // The attempt number is 0-indexed (0 means this is the first attempt). // The duration tracks the duration of retrying for transient errors only. type LoopInfo struct { - attemptNumber int + attemptsSoFar int durationLimit time.Duration } type FuncInfo struct { - loopInfo *LoopInfo - - lastResetTime time.Time + loopInfo *LoopInfo + description string + loopDescription option.Option[string] + lastResetTime *msync.TypedAtomic[time.Time] } // Log will log a debug-level message for the current Info values and the provided strings. @@ -60,13 +63,13 @@ func (fi *FuncInfo) Log(logger *zerolog.Logger, cmdName string, clientType strin // GetAttemptNumber returns the Info's current attempt number (0-indexed). func (fi *FuncInfo) GetAttemptNumber() int { - return fi.loopInfo.attemptNumber + return fi.loopInfo.attemptsSoFar } // GetDurationSoFar returns the Info's current duration so far. This duration // applies to the duration of retrying for transient errors only. func (fi *FuncInfo) GetDurationSoFar() time.Duration { - return time.Since(fi.lastResetTime) + return time.Since(fi.lastResetTime.Load()) } // NoteSuccess is used to tell the retry util to reset its measurement @@ -76,5 +79,5 @@ func (fi *FuncInfo) GetDurationSoFar() time.Duration { // Call this after every successful command in a multi-command callback. // (It’s useless--but harmless--in a single-command callback.) func (i *FuncInfo) NoteSuccess() { - i.lastResetTime = time.Now() + i.lastResetTime.Store(time.Now()) } diff --git a/internal/retry/retryer.go b/internal/retry/retryer.go index 6c269113..926319d6 100644 --- a/internal/retry/retryer.go +++ b/internal/retry/retryer.go @@ -1,30 +1,31 @@ package retry import ( + "fmt" "time" "github.com/10gen/migration-verifier/option" ) +type retryCallbackInfo struct { + callback RetryCallback + description string +} + // Retryer handles retrying operations that fail because of network failures. type Retryer struct { retryLimit time.Duration retryRandomly bool before option.Option[func()] + callbacks []retryCallbackInfo + description option.Option[string] additionalErrorCodes []int } // New returns a new retryer. -func New(retryLimit time.Duration) *Retryer { - return NewWithRandomlyRetries(retryLimit, false) -} - -// NewWithRandomlyRetries returns a new retryer, but allows the option of setting the -// retryRandomly field. -func NewWithRandomlyRetries(retryLimit time.Duration, retryRandomly bool) *Retryer { +func New() *Retryer { return &Retryer{ - retryLimit: retryLimit, - retryRandomly: retryRandomly, + retryLimit: DefaultDurationLimit, } } @@ -39,6 +40,13 @@ func (r *Retryer) WithErrorCodes(codes ...int) *Retryer { return &r2 } +func (r *Retryer) WithRetryLimit(limit time.Duration) *Retryer { + r2 := *r + r2.retryLimit = limit + + return &r2 +} + // WithBefore sets a callback that always runs before any retryer callback. // // This is useful if there are multiple callbacks and you need to reset some @@ -50,3 +58,27 @@ func (r *Retryer) WithBefore(todo func()) *Retryer { return &r2 } + +func (r *Retryer) WithDescription(msg string, args ...any) *Retryer { + r2 := *r + r2.description = option.Some(fmt.Sprintf(msg, args...)) + + return &r2 +} + +func (r *Retryer) WithCallback( + callback RetryCallback, + msg string, args ...any, +) *Retryer { + r2 := *r + + r2.callbacks = append( + r2.callbacks, + retryCallbackInfo{ + callback: callback, + description: fmt.Sprintf(msg, args...), + }, + ) + + return &r2 +} diff --git a/internal/retry/retryer_test.go b/internal/retry/retryer_test.go index 44b5e3fb..8b430407 100644 --- a/internal/retry/retryer_test.go +++ b/internal/retry/retryer_test.go @@ -19,7 +19,7 @@ var someNetworkError = &mongo.CommandError{ var badError = errors.New("I am fatal!") func (suite *UnitTestSuite) TestRetryer() { - retryer := New(DefaultDurationLimit) + retryer := New() logger := suite.Logger() suite.Run("with a function that immediately succeeds", func() { @@ -29,7 +29,7 @@ func (suite *UnitTestSuite) TestRetryer() { return nil } - err := retryer.Run(suite.Context(), logger, f) + err := retryer.WithCallback(f, "f").Run(suite.Context(), logger) suite.NoError(err) suite.Equal(0, attemptNumber) @@ -38,7 +38,7 @@ func (suite *UnitTestSuite) TestRetryer() { return nil } - err = retryer.Run(suite.Context(), logger, f2) + err = retryer.WithCallback(f2, "f2").Run(suite.Context(), logger) suite.NoError(err) suite.Equal(0, attemptNumber) }) @@ -53,7 +53,7 @@ func (suite *UnitTestSuite) TestRetryer() { return nil } - err := retryer.Run(suite.Context(), logger, f) + err := retryer.WithCallback(f, "f").Run(suite.Context(), logger) suite.NoError(err) suite.Equal(2, attemptNumber) @@ -66,14 +66,14 @@ func (suite *UnitTestSuite) TestRetryer() { return nil } - err = retryer.Run(suite.Context(), logger, f2) + err = retryer.WithCallback(f2, "f2").Run(suite.Context(), logger) suite.NoError(err) suite.Equal(2, attemptNumber) }) } func (suite *UnitTestSuite) TestRetryerDurationLimitIsZero() { - retryer := New(0) + retryer := New().WithRetryLimit(0) attemptNumber := -1 f := func(_ context.Context, ri *FuncInfo) error { @@ -81,13 +81,13 @@ func (suite *UnitTestSuite) TestRetryerDurationLimitIsZero() { return someNetworkError } - err := retryer.Run(suite.Context(), suite.Logger(), f) + err := retryer.WithCallback(f, "f").Run(suite.Context(), suite.Logger()) suite.Assert().ErrorIs(err, someNetworkError) suite.Assert().Equal(0, attemptNumber) } func (suite *UnitTestSuite) TestRetryerDurationReset() { - retryer := New(DefaultDurationLimit) + retryer := New() logger := suite.Logger() // In this test, the given function f takes longer than the durationLimit @@ -99,7 +99,9 @@ func (suite *UnitTestSuite) TestRetryerDurationReset() { noSuccessIterations := 0 f1 := func(_ context.Context, ri *FuncInfo) error { // Artificially advance how much time was taken. - ri.lastResetTime = ri.lastResetTime.Add(-2 * ri.loopInfo.durationLimit) + ri.lastResetTime.Store( + ri.lastResetTime.Load().Add(-2 * ri.loopInfo.durationLimit), + ) noSuccessIterations++ if noSuccessIterations == 1 { @@ -109,7 +111,7 @@ func (suite *UnitTestSuite) TestRetryerDurationReset() { return nil } - err := retryer.Run(suite.Context(), logger, f1) + err := retryer.WithCallback(f1, "f1").Run(suite.Context(), logger) // The error should be the limit-exceeded error, with the // last-noted error being the transient error. @@ -122,7 +124,9 @@ func (suite *UnitTestSuite) TestRetryerDurationReset() { successIterations := 0 f2 := func(_ context.Context, ri *FuncInfo) error { // Artificially advance how much time was taken. - ri.lastResetTime = ri.lastResetTime.Add(-2 * ri.loopInfo.durationLimit) + ri.lastResetTime.Store( + ri.lastResetTime.Load().Add(-2 * ri.loopInfo.durationLimit), + ) ri.NoteSuccess() @@ -134,13 +138,13 @@ func (suite *UnitTestSuite) TestRetryerDurationReset() { return nil } - err = retryer.Run(suite.Context(), logger, f2) + err = retryer.WithCallback(f2, "f2").Run(suite.Context(), logger) suite.Assert().NoError(err) suite.Assert().Equal(2, successIterations) } func (suite *UnitTestSuite) TestCancelViaContext() { - retryer := New(DefaultDurationLimit) + retryer := New() logger := suite.Logger() counter := 0 @@ -160,7 +164,7 @@ func (suite *UnitTestSuite) TestCancelViaContext() { // retry code will see the cancel before the timer it sets expires. cancel() go func() { - err := retryer.Run(ctx, logger, f) + err := retryer.WithCallback(f, "f").Run(ctx, logger) suite.ErrorIs(err, context.Canceled) suite.Equal(1, counter) wg.Done() @@ -187,32 +191,32 @@ func (suite *UnitTestSuite) TestRetryerAdditionalErrorCodes() { } suite.Run("with no additional error codes", func() { - retryer := New(DefaultDurationLimit) - err := retryer.Run(suite.Context(), logger, f) + retryer := New() + err := retryer.WithCallback(f, "f").Run(suite.Context(), logger) suite.Equal(42, util.GetErrorCode(err)) suite.Equal(0, attemptNumber) }) suite.Run("with one additional error code", func() { - retryer := New(DefaultDurationLimit) + retryer := New() retryer = retryer.WithErrorCodes(42) - err := retryer.Run(suite.Context(), logger, f) + err := retryer.WithCallback(f, "f").Run(suite.Context(), logger) suite.NoError(err) suite.Equal(1, attemptNumber) }) suite.Run("with multiple additional error codes", func() { - retryer := New(DefaultDurationLimit) + retryer := New() retryer = retryer.WithErrorCodes(42, 43, 44) - err := retryer.Run(suite.Context(), logger, f) + err := retryer.WithCallback(f, "f").Run(suite.Context(), logger) suite.NoError(err) suite.Equal(1, attemptNumber) }) suite.Run("with multiple additional error codes that don't match error", func() { - retryer := New(DefaultDurationLimit) + retryer := New() retryer = retryer.WithErrorCodes(41, 43, 44) - err := retryer.Run(suite.Context(), logger, f) + err := retryer.WithCallback(f, "f").Run(suite.Context(), logger) suite.Equal(42, util.GetErrorCode(err)) suite.Equal(0, attemptNumber) }) @@ -222,11 +226,7 @@ func (suite *UnitTestSuite) TestMulti_NonTransient() { ctx := suite.Context() logger := suite.Logger() - retryer := New(DefaultDurationLimit) - - err := retryer.Run( - ctx, - logger, + err := New().WithCallback( func(ctx context.Context, _ *FuncInfo) error { timer := time.NewTimer(10 * time.Second) select { @@ -236,10 +236,13 @@ func (suite *UnitTestSuite) TestMulti_NonTransient() { return nil } }, + "slow", + ).WithCallback( func(_ context.Context, _ *FuncInfo) error { return badError }, - ) + "fails quickly", + ).Run(ctx, logger) suite.Assert().ErrorIs(err, badError) } @@ -252,20 +255,17 @@ func (suite *UnitTestSuite) TestMulti_Transient() { suite.Run( fmt.Sprintf("final error: %v", finalErr), func() { - retryer := New(DefaultDurationLimit) cb1Attempts := 0 cb2Attempts := 0 - err := retryer.Run( - ctx, - logger, - - // This one succeeds every time. + err := New().WithCallback( func(ctx context.Context, _ *FuncInfo) error { cb1Attempts++ return nil }, + "succeeds every time", + ).WithCallback( func(_ context.Context, _ *FuncInfo) error { cb2Attempts++ @@ -276,7 +276,8 @@ func (suite *UnitTestSuite) TestMulti_Transient() { return finalErr } }, - ) + "fails variously", + ).Run(ctx, logger) if finalErr == nil { suite.Assert().NoError(err) @@ -300,13 +301,11 @@ func (suite *UnitTestSuite) TestMulti_LongRunningSuccess() { startTime := time.Now() retryerLimit := 2 * time.Second - retryer := New(retryerLimit) + retryer := New().WithRetryLimit(retryerLimit) succeedPastTime := startTime.Add(retryerLimit + 1*time.Second) - err := retryer.Run( - ctx, - logger, + err := retryer.WithCallback( func(ctx context.Context, fi *FuncInfo) error { fi.NoteSuccess() @@ -317,6 +316,8 @@ func (suite *UnitTestSuite) TestMulti_LongRunningSuccess() { return nil }, + "quick success, then fail; all success after a bit", + ).WithCallback( func(ctx context.Context, fi *FuncInfo) error { if time.Now().Before(succeedPastTime) { <-ctx.Done() @@ -325,7 +326,8 @@ func (suite *UnitTestSuite) TestMulti_LongRunningSuccess() { return nil }, - ) + "long-running: hangs then succeeds", + ).Run(ctx, logger) suite.Assert().NoError(err) } diff --git a/internal/uuidutil/get_uuid.go b/internal/uuidutil/get_uuid.go index 86996933..1a694d77 100644 --- a/internal/uuidutil/get_uuid.go +++ b/internal/uuidutil/get_uuid.go @@ -27,8 +27,8 @@ type NamespaceAndUUID struct { CollName string } -func GetCollectionNamespaceAndUUID(ctx context.Context, logger *logger.Logger, retryer *retry.Retryer, db *mongo.Database, collName string) (*NamespaceAndUUID, error) { - binaryUUID, uuidErr := GetCollectionUUID(ctx, logger, retryer, db, collName) +func GetCollectionNamespaceAndUUID(ctx context.Context, logger *logger.Logger, db *mongo.Database, collName string) (*NamespaceAndUUID, error) { + binaryUUID, uuidErr := GetCollectionUUID(ctx, logger, db, collName) if uuidErr != nil { return nil, uuidErr } @@ -39,20 +39,21 @@ func GetCollectionNamespaceAndUUID(ctx context.Context, logger *logger.Logger, r }, nil } -func GetCollectionUUID(ctx context.Context, logger *logger.Logger, retryer *retry.Retryer, db *mongo.Database, collName string) (*primitive.Binary, error) { +func GetCollectionUUID(ctx context.Context, logger *logger.Logger, db *mongo.Database, collName string) (*primitive.Binary, error) { filter := bson.D{{"name", collName}} opts := options.ListCollections().SetNameOnly(false) var collSpecs []*mongo.CollectionSpecification - err := retryer.Run( - ctx, - logger, + err := retry.New().WithCallback( func(_ context.Context, ri *retry.FuncInfo) error { ri.Log(logger.Logger, "ListCollectionSpecifications", db.Name(), collName, "Getting collection UUID.", "") var driverErr error collSpecs, driverErr = db.ListCollectionSpecifications(ctx, filter, opts) return driverErr - }) + }, + "getting namespace %#q's specification", + db.Name()+"."+collName, + ).Run(ctx, logger) if err != nil { return nil, errors.Wrapf(err, "failed to list collections specification") } diff --git a/internal/verifier/change_stream.go b/internal/verifier/change_stream.go index e48a22d5..4451dad1 100644 --- a/internal/verifier/change_stream.go +++ b/internal/verifier/change_stream.go @@ -493,14 +493,11 @@ func (csr *ChangeStreamReader) StartChangeStream(ctx context.Context) error { // notifies the verifier's change event handler to exit. defer close(csr.changeEventBatchChan) - retryer := retry.New(retry.DefaultDurationLimit) - retryer = retryer.WithErrorCodes(util.CursorKilled) + retryer := retry.New().WithErrorCodes(util.CursorKilled) parentThreadWaiting := true - err := retryer.Run( - ctx, - csr.logger, + err := retryer.WithCallback( func(ctx context.Context, ri *retry.FuncInfo) error { changeStream, startTs, err := csr.createChangeStream(ctx) if err != nil { @@ -522,7 +519,8 @@ func (csr *ChangeStreamReader) StartChangeStream(ctx context.Context) error { return csr.iterateChangeStream(ctx, ri, changeStream) }, - ) + "running %s", csr, + ).Run(ctx, csr.logger) if err != nil { // NB: This failure always happens after the initial change stream diff --git a/internal/verifier/check.go b/internal/verifier/check.go index 3207791f..e48b7c96 100644 --- a/internal/verifier/check.go +++ b/internal/verifier/check.go @@ -190,9 +190,7 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter map[string]any return err } } - err = retry.Retry( - ctx, - verifier.logger, + err = retry.New().WithCallback( func(ctx context.Context, _ *retry.FuncInfo) error { err = verifier.AddMetaIndexes(ctx) if err != nil { @@ -211,7 +209,8 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter map[string]any return nil }, - ) + "setting up verifier metadata", + ).Run(ctx, verifier.logger) if err != nil { return err @@ -325,13 +324,12 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter map[string]any // Generation of recheck tasks can partial-fail. The following will // cause a full redo in that case, which is inefficient but simple. // Such failures seem unlikely anyhow. - err = retry.Retry( - ctx, - verifier.logger, + err = retry.New().WithCallback( func(ctx context.Context, _ *retry.FuncInfo) error { return verifier.GenerateRecheckTasksWhileLocked(ctx) }, - ) + "generating recheck tasks", + ).Run(ctx, verifier.logger) if err != nil { verifier.mux.Unlock() return err @@ -437,9 +435,7 @@ func FetchFailedAndIncompleteTasks( ) ([]VerificationTask, []VerificationTask, error) { var FailedTasks, allTasks, IncompleteTasks []VerificationTask - err := retry.Retry( - ctx, - logger, + err := retry.New().WithCallback( func(ctx context.Context, _ *retry.FuncInfo) error { cur, err := coll.Find(ctx, bson.D{ bson.E{Key: "type", Value: taskType}, @@ -463,7 +459,9 @@ func FetchFailedAndIncompleteTasks( return nil }, - ) + "fetching generation %d's failed & incomplete tasks", + generation, + ).Run(ctx, logger) return FailedTasks, IncompleteTasks, err } diff --git a/internal/verifier/clustertime.go b/internal/verifier/clustertime.go index 6fdbf279..116b0669 100644 --- a/internal/verifier/clustertime.go +++ b/internal/verifier/clustertime.go @@ -25,15 +25,11 @@ func GetNewClusterTime( logger *logger.Logger, client *mongo.Client, ) (primitive.Timestamp, error) { - retryer := retry.New(retry.DefaultDurationLimit) - var clusterTime primitive.Timestamp // First we just fetch the latest cluster time among all shards without // updating any shards’ oplogs. - err := retryer.Run( - ctx, - logger, + err := retry.New().WithCallback( func(ctx context.Context, _ *retry.FuncInfo) error { var err error clusterTime, err = runAppendOplogNote( @@ -44,7 +40,8 @@ func GetNewClusterTime( ) return err }, - ) + "appending oplog note to get cluster time", + ).Run(ctx, logger) if err != nil { return primitive.Timestamp{}, err @@ -53,9 +50,7 @@ func GetNewClusterTime( // fetchClusterTime() will have taught the mongos about the most current // shard’s cluster time. Now we tell that mongos to update all lagging // shards to that time. - err = retryer.Run( - ctx, - logger, + err = retry.New().WithCallback( func(ctx context.Context, _ *retry.FuncInfo) error { var err error _, err = runAppendOplogNote( @@ -66,7 +61,8 @@ func GetNewClusterTime( ) return err }, - ) + "appending oplog note to synchronize cluster", + ).Run(ctx, logger) if err != nil { // This isn't serious enough even for info-level. logger.Debug().Err(err). diff --git a/internal/verifier/compare.go b/internal/verifier/compare.go index 93234fc7..a5ebab3b 100644 --- a/internal/verifier/compare.go +++ b/internal/verifier/compare.go @@ -33,22 +33,30 @@ func (verifier *Verifier) FetchAndCompareDocuments( var docCount types.DocumentCount var byteCount types.ByteCount - retryer := retry.New(retry.DefaultDurationLimit) + retryer := retry.New().WithDescription( + "reading task %v's documents (namespace: %s)", + task.PrimaryKey, + task.QueryFilter.Namespace, + ) err := retryer. WithBefore(func() { srcChannel, dstChannel, readSrcCallback, readDstCallback = verifier.getFetcherChannelsAndCallbacks(task) }). WithErrorCodes(util.CursorKilled). - Run( - givenCtx, - verifier.logger, + WithCallback( func(ctx context.Context, fi *retry.FuncInfo) error { return readSrcCallback(ctx, fi) }, + "reading from source", + ). + WithCallback( func(ctx context.Context, fi *retry.FuncInfo) error { return readDstCallback(ctx, fi) }, + "reading from destination", + ). + WithCallback( func(ctx context.Context, _ *retry.FuncInfo) error { var err error results, docCount, byteCount, err = verifier.compareDocsFromChannels( @@ -60,7 +68,8 @@ func (verifier *Verifier) FetchAndCompareDocuments( return err }, - ) + "comparing documents", + ).Run(givenCtx, verifier.logger) return results, docCount, byteCount, err } diff --git a/internal/verifier/migration_verifier.go b/internal/verifier/migration_verifier.go index cd158af0..45ff9542 100644 --- a/internal/verifier/migration_verifier.go +++ b/internal/verifier/migration_verifier.go @@ -750,9 +750,8 @@ func (verifier *Verifier) getShardKeyFields( // 2. Fetch shard keys. // 3. Fetch the size: # of docs, and # of bytes. func (verifier *Verifier) partitionAndInspectNamespace(ctx context.Context, namespace string) ([]*partitions.Partition, []string, types.DocumentCount, types.ByteCount, error) { - retryer := retry.New(retry.DefaultDurationLimit) dbName, collName := SplitNamespace(namespace) - namespaceAndUUID, err := uuidutil.GetCollectionNamespaceAndUUID(ctx, verifier.logger, retryer, + namespaceAndUUID, err := uuidutil.GetCollectionNamespaceAndUUID(ctx, verifier.logger, verifier.srcClientDatabase(dbName), collName) if err != nil { return nil, nil, 0, 0, err @@ -767,7 +766,7 @@ func (verifier *Verifier) partitionAndInspectNamespace(ctx context.Context, name replicator1 := partitions.Replicator{ID: "verifier"} replicators := []partitions.Replicator{replicator1} partitionList, srcDocs, srcBytes, err := partitions.PartitionCollectionWithSize( - ctx, namespaceAndUUID, retryer, verifier.srcClient, replicators, verifier.logger, verifier.partitionSizeInBytes, verifier.globalFilter) + ctx, namespaceAndUUID, verifier.srcClient, replicators, verifier.logger, verifier.partitionSizeInBytes, verifier.globalFilter) if err != nil { return nil, nil, 0, 0, err } @@ -1239,9 +1238,7 @@ func (verifier *Verifier) GetVerificationStatus(ctx context.Context) (*Verificat var results []bson.Raw - err := retry.Retry( - ctx, - verifier.logger, + err := retry.New().WithCallback( func(ctx context.Context, _ *retry.FuncInfo) error { cursor, err := taskCollection.Aggregate( ctx, @@ -1266,7 +1263,10 @@ func (verifier *Verifier) GetVerificationStatus(ctx context.Context) (*Verificat return cursor.All(ctx, &results) }, - ) + "counting generation %d's (non-primary) tasks by status", + generation, + ).Run(ctx, verifier.logger) + if err != nil { return nil, err } diff --git a/internal/verifier/mongos_refresh.go b/internal/verifier/mongos_refresh.go index ec8fa70f..80fdb511 100644 --- a/internal/verifier/mongos_refresh.go +++ b/internal/verifier/mongos_refresh.go @@ -33,8 +33,6 @@ func RefreshAllMongosInstances( Strs("hosts", hosts). Msgf("Refreshing all %d mongos instance(s) on the source.", len(hosts)) - r := retry.New(retry.DefaultDurationLimit) - for _, host := range hosts { singleHostClientOpts := *clientOpts @@ -52,16 +50,13 @@ func RefreshAllMongosInstances( shardConnStr, err := getAnyExistingShardConnectionStr( ctx, l, - r, singleHostClient, ) if err != nil { return err } - err = r.Run( - ctx, - l, + err = retry.New().WithCallback( func(ctx context.Context, _ *retry.FuncInfo) error { // Query a collection on the config server with linearizable read concern to advance the config // server primary's majority-committed optime. This populates the $configOpTime. @@ -112,7 +107,9 @@ func RefreshAllMongosInstances( } return nil - }) + }, + "refreshing mongos shard cache", + ).Run(ctx, l) if err != nil { return err @@ -137,10 +134,9 @@ func RefreshAllMongosInstances( func getAnyExistingShardConnectionStr( ctx context.Context, l *logger.Logger, - r *retry.Retryer, client *mongo.Client, ) (string, error) { - res, err := runListShards(ctx, l, r, client) + res, err := runListShards(ctx, l, client) if err != nil { return "", err } @@ -169,17 +165,15 @@ func getAnyExistingShardConnectionStr( func runListShards( ctx context.Context, l *logger.Logger, - r *retry.Retryer, client *mongo.Client, ) (*mongo.SingleResult, error) { var res *mongo.SingleResult - err := r.Run( - ctx, - l, + err := retry.New().WithCallback( func(ctx context.Context, _ *retry.FuncInfo) error { res = client.Database("admin").RunCommand(ctx, bson.D{{"listShards", 1}}) return res.Err() }, - ) + "listing shards", + ).Run(ctx, l) return res, err } diff --git a/internal/verifier/recheck.go b/internal/verifier/recheck.go index 0ecbe20e..e4191655 100644 --- a/internal/verifier/recheck.go +++ b/internal/verifier/recheck.go @@ -128,10 +128,8 @@ func (verifier *Verifier) insertRecheckDocs( SetUpsert(true) } - retryer := retry.New(retry.DefaultDurationLimit) - err := retryer.Run( - groupCtx, - verifier.logger, + retryer := retry.New() + err := retryer.WithCallback( func(retryCtx context.Context, _ *retry.FuncInfo) error { _, err := verifier.verificationDatabase().Collection(recheckQueue).BulkWrite( retryCtx, @@ -141,7 +139,9 @@ func (verifier *Verifier) insertRecheckDocs( return err }, - ) + "persisting %d recheck(s)", + len(models), + ).Run(groupCtx, verifier.logger) return errors.Wrapf(err, "failed to persist %d recheck(s) for generation %d", len(models), generation) }) @@ -177,9 +177,7 @@ func (verifier *Verifier) ClearRecheckDocsWhileLocked(ctx context.Context) error Int("previousGeneration", prevGeneration). Msg("Deleting previous generation's enqueued rechecks.") - return retry.Retry( - ctx, - verifier.logger, + return retry.New().WithCallback( func(ctx context.Context, i *retry.FuncInfo) error { _, err := verifier.verificationDatabase().Collection(recheckQueue).DeleteMany( ctx, @@ -188,7 +186,9 @@ func (verifier *Verifier) ClearRecheckDocsWhileLocked(ctx context.Context) error return err }, - ) + "deleting generation %d's enqueued rechecks", + prevGeneration, + ).Run(ctx, verifier.logger) } func (verifier *Verifier) getPreviousGenerationWhileLocked() int { diff --git a/internal/verifier/verification_task.go b/internal/verifier/verification_task.go index c2b7bad4..3eeb4d29 100644 --- a/internal/verifier/verification_task.go +++ b/internal/verifier/verification_task.go @@ -138,14 +138,14 @@ func (verifier *Verifier) insertCollectionVerificationTask( }, } - err := retry.Retry( - ctx, - verifier.logger, + err := retry.New().WithCallback( func(ctx context.Context, _ *retry.FuncInfo) error { _, err := verifier.verificationTaskCollection().InsertOne(ctx, verificationTask) return err }, - ) + "persisting namespace %#q's verification task", + srcNamespace, + ).Run(ctx, verifier.logger) return &verificationTask, err } @@ -185,15 +185,17 @@ func (verifier *Verifier) InsertPartitionVerificationTask( }, } - err := retry.Retry( - ctx, - verifier.logger, + err := retry.New().WithCallback( func(ctx context.Context, _ *retry.FuncInfo) error { _, err := verifier.verificationTaskCollection().InsertOne(ctx, &task) return err }, - ) + "persisting partition verification task for %#q (%v to %v)", + task.QueryFilter.Namespace, + task.QueryFilter.Partition.Key.Lower, + task.QueryFilter.Partition.Upper, + ).Run(ctx, verifier.logger) return &task, err } @@ -227,11 +229,16 @@ func (verifier *Verifier) InsertDocumentRecheckTask( SourceByteCount: dataSize, } - err := retry.Retry(ctx, verifier.logger, func(ctx context.Context, _ *retry.FuncInfo) error { - _, err := verifier.verificationTaskCollection().InsertOne(ctx, &task) + err := retry.New().WithCallback( + func(ctx context.Context, _ *retry.FuncInfo) error { + _, err := verifier.verificationTaskCollection().InsertOne(ctx, &task) - return err - }) + return err + }, + "persisting recheck task for namespace %#q (%d document(s))", + task.QueryFilter.Namespace, + len(ids), + ).Run(ctx, verifier.logger) return &task, err } @@ -241,9 +248,7 @@ func (verifier *Verifier) FindNextVerifyTaskAndUpdate( ) (option.Option[VerificationTask], error) { task := &VerificationTask{} - err := retry.Retry( - ctx, - verifier.logger, + err := retry.New().WithCallback( func(ctx context.Context, _ *retry.FuncInfo) error { err := verifier.verificationTaskCollection().FindOneAndUpdate( @@ -282,15 +287,15 @@ func (verifier *Verifier) FindNextVerifyTaskAndUpdate( return err }, - ) + "finding next task to do in generation %d", + verifier.generation, + ).Run(ctx, verifier.logger) return option.FromPointer(task), err } func (verifier *Verifier) UpdateVerificationTask(ctx context.Context, task *VerificationTask) error { - return retry.Retry( - ctx, - verifier.logger, + return retry.New().WithCallback( func(ctx context.Context, _ *retry.FuncInfo) error { result, err := verifier.verificationTaskCollection().UpdateOne( ctx, @@ -317,7 +322,10 @@ func (verifier *Verifier) UpdateVerificationTask(ctx context.Context, task *Veri return err }, - ) + "updating task %v (namespace %#q)", + task.PrimaryKey, + task.QueryFilter.Namespace, + ).Run(ctx, verifier.logger) } func (verifier *Verifier) CreatePrimaryTaskIfNeeded(ctx context.Context) (bool, error) { @@ -325,9 +333,7 @@ func (verifier *Verifier) CreatePrimaryTaskIfNeeded(ctx context.Context) (bool, var created bool - err := retry.Retry( - ctx, - verifier.logger, + err := retry.New().WithCallback( func(ctx context.Context, _ *retry.FuncInfo) error { result, err := verifier.verificationTaskCollection().UpdateOne( ctx, @@ -349,15 +355,14 @@ func (verifier *Verifier) CreatePrimaryTaskIfNeeded(ctx context.Context) (bool, return nil }, - ) + "ensuring primary task's existence", + ).Run(ctx, verifier.logger) return created, err } func (verifier *Verifier) UpdatePrimaryTaskComplete(ctx context.Context) error { - return retry.Retry( - ctx, - verifier.logger, + return retry.New().WithCallback( func(ctx context.Context, _ *retry.FuncInfo) error { result, err := verifier.verificationTaskCollection().UpdateMany( ctx, @@ -380,5 +385,6 @@ func (verifier *Verifier) UpdatePrimaryTaskComplete(ctx context.Context) error { return nil }, - ) + "noting completion of primary task", + ).Run(ctx, verifier.logger) } diff --git a/msync/typed_atomic.go b/msync/typed_atomic.go new file mode 100644 index 00000000..219fa34f --- /dev/null +++ b/msync/typed_atomic.go @@ -0,0 +1,50 @@ +package msync + +import "sync/atomic" + +// TypedAtomic is a type-safe wrapper around the standard-library atomic.Value. +// TypedAtomic serves largely the same purpose as atomic.Pointer but stores +// the value itself rather than a pointer to it. This is often more ergonomic +// than an atomic.Pointer: it can be used to store constants directly (where +// taking a pointer is inconvenient), and it defaults to the type's zero value +// rather than a nil pointer. +type TypedAtomic[T any] struct { + v atomic.Value +} + +// NewTypedAtomic returns a new TypedAtomic, initialized to val. +func NewTypedAtomic[T any](val T) *TypedAtomic[T] { + var v atomic.Value + v.Store(val) + return &TypedAtomic[T]{v} +} + +// Load returns the value set by the most recent Store. It returns the zero +// value for the type if there has been no call to Store. +func (ta *TypedAtomic[T]) Load() T { + return orZero[T](ta.v.Load()) +} + +// Store sets the value TypedAtomic to val. Store(nil) panics. +func (ta *TypedAtomic[T]) Store(val T) { + ta.v.Store(val) +} + +// Swap stores newVal into the TypedAtomic and returns the previous value. It +// returns the zero value for the type if the value is empty. +func (ta *TypedAtomic[T]) Swap(newVal T) T { + return orZero[T](ta.v.Swap(newVal)) +} + +// CompareAndSwap executes the compare-and-swap operation for the TypedAtomic. +func (ta *TypedAtomic[T]) CompareAndSwap(oldVal, newVal T) bool { + return ta.v.CompareAndSwap(oldVal, newVal) +} + +func orZero[T any](val any) T { + if val == nil { + return *new(T) + } + + return val.(T) +} diff --git a/msync/typed_atomic_test.go b/msync/typed_atomic_test.go new file mode 100644 index 00000000..d4789137 --- /dev/null +++ b/msync/typed_atomic_test.go @@ -0,0 +1,59 @@ +package msync + +import ( + "sync" +) + +func (s *unitTestSuite) TestTypedAtomic() { + ta := NewTypedAtomic(42) + + s.Require().Equal(42, ta.Load()) + s.Require().False(ta.CompareAndSwap(17, 99)) + s.Require().True(ta.CompareAndSwap(42, 99)) + s.Require().Equal(99, ta.Load()) + s.Require().Equal(99, ta.Swap(42)) + s.Require().Equal(42, ta.Load()) + + ta.Store(17) + s.Require().Equal(17, ta.Load()) + + // This block is for race detection under -race. + var wg sync.WaitGroup + for i := range 100 { + wg.Add(1) + go func() { + defer wg.Done() + ta.Load() + ta.Store(i) + }() + } + wg.Wait() +} + +func (s *unitTestSuite) TestAtomicZeroValues() { + s.Run("string", func() { + var ta TypedAtomic[string] + s.Require().Equal("", ta.Load()) + s.Require().Equal("", ta.Swap("foo")) + s.Require().Equal("foo", ta.Load()) + }) + + s.Run("int", func() { + var ta TypedAtomic[int] + s.Require().Equal(0, ta.Load()) + s.Require().Equal(0, ta.Swap(42)) + s.Require().Equal(42, ta.Load()) + }) + + s.Run("arbitrary data", func() { + type data struct { + I int + S string + } + + var ta TypedAtomic[data] + s.Require().Equal(data{}, ta.Load()) + s.Require().Equal(data{}, ta.Swap(data{76, "trombones"})) + s.Require().Equal(data{76, "trombones"}, ta.Load()) + }) +} From 9c0cf51a021590f593b923b9754de42fe848f74b Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Thu, 5 Dec 2024 10:55:24 -0500 Subject: [PATCH 03/28] note success too --- internal/retry/retry.go | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/internal/retry/retry.go b/internal/retry/retry.go index f4eaea17..6630bbe0 100644 --- a/internal/retry/retry.go +++ b/internal/retry/retry.go @@ -58,7 +58,10 @@ func (r *Retryer) runRetryLoop( var err error if len(r.callbacks) == 0 { - return errors.Errorf("retryer (%s) run with no callbacks", r.description) + return errors.Errorf( + "retryer (%s) run with no callbacks", + r.description.OrElse("no description"), + ) } startTime := time.Now() @@ -141,8 +144,16 @@ func (r *Retryer) runRetryLoop( } err = eg.Wait() + li.attemptsSoFar++ + // No error? Success! if err == nil { + if li.attemptsSoFar > 1 { + r.addDescriptionToEvent(logger.Info()). + Int("attempts", li.attemptsSoFar). + Msg("Retried operation succeeded.") + } + return nil } @@ -159,8 +170,6 @@ func (r *Retryer) runRetryLoop( return groupErr.errFromCallback } - li.attemptsSoFar++ - // Our error is transient. If we've exhausted the allowed time // then fail. From 5dfafab93e0daccbd3b2ab3f1b58068e241b409a Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Thu, 5 Dec 2024 11:11:01 -0500 Subject: [PATCH 04/28] real clones --- internal/retry/retryer.go | 41 ++++++++++++++++++++++++++------------- 1 file changed, 28 insertions(+), 13 deletions(-) diff --git a/internal/retry/retryer.go b/internal/retry/retryer.go index 926319d6..76b20df3 100644 --- a/internal/retry/retryer.go +++ b/internal/retry/retryer.go @@ -2,6 +2,7 @@ package retry import ( "fmt" + "slices" "time" "github.com/10gen/migration-verifier/option" @@ -15,14 +16,13 @@ type retryCallbackInfo struct { // Retryer handles retrying operations that fail because of network failures. type Retryer struct { retryLimit time.Duration - retryRandomly bool before option.Option[func()] callbacks []retryCallbackInfo description option.Option[string] additionalErrorCodes []int } -// New returns a new retryer. +// New returns a new Retryer with DefaultDurationLimit as its time limit. func New() *Retryer { return &Retryer{ retryLimit: DefaultDurationLimit, @@ -30,47 +30,51 @@ func New() *Retryer { } // WithErrorCodes returns a new Retryer that will retry on the codes passed to -// this method. This allows for a single function to customize the codes it +// this method. This allows for a single retryer to customize the codes it // wants to retry on. Note that if the Retryer already has additional custom // error codes set, these are _replaced_ when this method is called. func (r *Retryer) WithErrorCodes(codes ...int) *Retryer { - r2 := *r + r2 := r.clone() r2.additionalErrorCodes = codes - return &r2 + return r2 } +// WithRetryLimit returns a new retryer with the specified time limit. func (r *Retryer) WithRetryLimit(limit time.Duration) *Retryer { - r2 := *r + r2 := r.clone() r2.retryLimit = limit - return &r2 + return r2 } -// WithBefore sets a callback that always runs before any retryer callback. +// WithBefore returns a new retryer with a callback that always runs before +// any retryer callback. // // This is useful if there are multiple callbacks and you need to reset some // condition before each retryer iteration. (In the single-callback case it’s // largely redundant.) func (r *Retryer) WithBefore(todo func()) *Retryer { - r2 := *r + r2 := r.clone() r2.before = option.Some(todo) - return &r2 + return r2 } +// WithDescription returns a new retryer with the given description. func (r *Retryer) WithDescription(msg string, args ...any) *Retryer { - r2 := *r + r2 := r.clone() r2.description = option.Some(fmt.Sprintf(msg, args...)) - return &r2 + return r2 } +// WithCallback returns a new retryer with the additional callback. func (r *Retryer) WithCallback( callback RetryCallback, msg string, args ...any, ) *Retryer { - r2 := *r + r2 := r.clone() r2.callbacks = append( r2.callbacks, @@ -80,5 +84,16 @@ func (r *Retryer) WithCallback( }, ) + return r2 +} + +func (r *Retryer) clone() *Retryer { + r2 := *r + + r2.before = option.FromPointer(r.before.ToPointer()) + r2.description = option.FromPointer(r.description.ToPointer()) + r2.callbacks = slices.Clone(r.callbacks) + r2.additionalErrorCodes = slices.Clone(r.additionalErrorCodes) + return &r2 } From 16527783d27b2cd9a3631363865bde5dbeb83768 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Thu, 5 Dec 2024 16:21:55 -0500 Subject: [PATCH 05/28] a bit more debugging --- internal/verifier/check.go | 4 ++++ internal/verifier/migration_verifier.go | 6 +++++- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/internal/verifier/check.go b/internal/verifier/check.go index e48b7c96..365cd987 100644 --- a/internal/verifier/check.go +++ b/internal/verifier/check.go @@ -123,6 +123,10 @@ func (verifier *Verifier) CheckWorker(ctxIn context.Context) error { ) } + verifier.logger.Debug(). + Interface("taskCountsByStatus", verificationStatus). + Send() + if waitForTaskCreation%2 == 0 { if generation > 0 || verifier.gen0PendingCollectionTasks.Load() == 0 { verifier.PrintVerificationSummary(ctx, GenerationInProgress) diff --git a/internal/verifier/migration_verifier.go b/internal/verifier/migration_verifier.go index 45ff9542..e47fd3e6 100644 --- a/internal/verifier/migration_verifier.go +++ b/internal/verifier/migration_verifier.go @@ -1268,7 +1268,11 @@ func (verifier *Verifier) GetVerificationStatus(ctx context.Context) (*Verificat ).Run(ctx, verifier.logger) if err != nil { - return nil, err + return nil, errors.Wrapf( + err, + "failed to count generation %d's tasks by status", + generation, + ) } verificationStatus := VerificationStatus{} From b8ed20d90f8ae310986b0a14aea882a5dd771119 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Thu, 5 Dec 2024 16:28:03 -0500 Subject: [PATCH 06/28] wording tweaks, and a return that was missing --- internal/verifier/compare.go | 2 +- internal/verifier/migration_verifier.go | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/internal/verifier/compare.go b/internal/verifier/compare.go index a5ebab3b..3fdcb12e 100644 --- a/internal/verifier/compare.go +++ b/internal/verifier/compare.go @@ -34,7 +34,7 @@ func (verifier *Verifier) FetchAndCompareDocuments( var byteCount types.ByteCount retryer := retry.New().WithDescription( - "reading task %v's documents (namespace: %s)", + "comparing task %v's documents (namespace: %s)", task.PrimaryKey, task.QueryFilter.Namespace, ) diff --git a/internal/verifier/migration_verifier.go b/internal/verifier/migration_verifier.go index e47fd3e6..c5764772 100644 --- a/internal/verifier/migration_verifier.go +++ b/internal/verifier/migration_verifier.go @@ -1501,6 +1501,7 @@ func (verifier *Verifier) PrintVerificationSummary(ctx context.Context, genstatu if err != nil { verifier.logger.Err(err).Msgf("Failed to report per-namespace statistics") + return } verifier.printChangeEventStatistics(strBuilder) From ac170b46ab90e3b5718cef6a465e3d2980ea161c Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Thu, 5 Dec 2024 16:37:46 -0500 Subject: [PATCH 07/28] s/failed/discrepancies --- internal/verifier/migration_verifier.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/verifier/migration_verifier.go b/internal/verifier/migration_verifier.go index c5764772..78b9e7d3 100644 --- a/internal/verifier/migration_verifier.go +++ b/internal/verifier/migration_verifier.go @@ -612,7 +612,7 @@ func (verifier *Verifier) ProcessVerifyTask(ctx context.Context, workerNum int, Interface("task", task.PrimaryKey). Str("namespace", task.QueryFilter.Namespace). Int("mismatchesCount", len(problems)). - Msg("Document comparison task failed, but it may pass in the next generation.") + Msg("Discrepancies found. Will recheck in the next generation.") var mismatches []VerificationResult var missingIds []interface{} From 5f9733cd463e2597131debacd2cc517a52903d71 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Thu, 5 Dec 2024 23:21:31 -0500 Subject: [PATCH 08/28] fix/beef-up --- internal/retry/retry.go | 25 +++++++++++++++++++------ internal/verifier/migration_verifier.go | 5 +++++ 2 files changed, 24 insertions(+), 6 deletions(-) diff --git a/internal/retry/retry.go b/internal/retry/retry.go index 6630bbe0..6c8d6267 100644 --- a/internal/retry/retry.go +++ b/internal/retry/retry.go @@ -166,7 +166,7 @@ func (r *Retryer) runRetryLoop( failedFuncInfo := funcinfos[groupErr.funcNum] // Not a transient error? Fail immediately. - if !r.shouldRetryWithSleep(logger, sleepTime, *failedFuncInfo, groupErr.errFromCallback) { + if !r.shouldRetryWithSleep(logger, sleepTime, r.callbacks[groupErr.funcNum], groupErr.errFromCallback) { return groupErr.errFromCallback } @@ -235,7 +235,7 @@ func (r *Retryer) addDescriptionToEvent(event *zerolog.Event) *zerolog.Event { func (r *Retryer) shouldRetryWithSleep( logger *logger.Logger, sleepTime time.Duration, - funcinfo FuncInfo, + cbInfo retryCallbackInfo, err error, ) bool { if err == nil { @@ -250,26 +250,39 @@ func (r *Retryer) shouldRetryWithSleep( ) event := logger.WithLevel( - lo.Ternary(isTransient, zerolog.InfoLevel, zerolog.WarnLevel), + lo.Ternary( + // If it’s transient, surface it as info. + isTransient, + zerolog.InfoLevel, + + lo.Ternary( + // Context cancellation is unimportant, so debug. + errors.Is(err, context.Canceled), + zerolog.DebugLevel, + + // Other non-retryables are serious, so warn. + zerolog.WarnLevel, + ), + ), ) if loopDesc, hasLoopDesc := r.description.Get(); hasLoopDesc { event.Str("operationDescription", loopDesc) } - event.Str("callbackDescription", funcinfo.description). + event.Str("callbackDescription", cbInfo.description). Int("error code", util.GetErrorCode(err)). Err(err) if isTransient { event. Stringer("delay", sleepTime). - Msg("Pausing before retrying after transient error.") + Msg("Got retryable error. Pausing, then will retry.") return true } - event.Msg("Non-transient error occurred.") + event.Msg("Non-retryable error occurred.") return false } diff --git a/internal/verifier/migration_verifier.go b/internal/verifier/migration_verifier.go index 78b9e7d3..b8799b1e 100644 --- a/internal/verifier/migration_verifier.go +++ b/internal/verifier/migration_verifier.go @@ -1236,6 +1236,11 @@ func (verifier *Verifier) GetVerificationStatus(ctx context.Context) (*Verificat taskCollection := verifier.verificationTaskCollection() generation, _ := verifier.getGeneration() + // XXX REMOVE ME + verifier.logger.Debug(). + Int("generation", generation). + Msg("Running GetVerificationStatus().") + var results []bson.Raw err := retry.New().WithCallback( From 90aa0359423718d2db8694a31a94600e3e4c39bb Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Fri, 6 Dec 2024 08:22:12 -0500 Subject: [PATCH 09/28] add more detail to retries --- internal/partitions/partitions.go | 2 +- internal/retry/retry.go | 43 +++++++++++++++---------- internal/retry/retry_info.go | 34 ++++++++++++++++--- internal/retry/retryer_test.go | 19 +++++++---- internal/verifier/change_stream.go | 2 +- internal/verifier/change_stream_test.go | 1 - internal/verifier/compare.go | 6 ++-- 7 files changed, 73 insertions(+), 34 deletions(-) diff --git a/internal/partitions/partitions.go b/internal/partitions/partitions.go index 022a54e4..acb02c3a 100644 --- a/internal/partitions/partitions.go +++ b/internal/partitions/partitions.go @@ -621,7 +621,7 @@ func getMidIDBounds( // Append the copied bound to the other mid _id bounds. midIDBounds = append(midIDBounds, bound) - ri.NoteSuccess() + ri.NoteSuccess("received an ID partition") } return cursor.Err() diff --git a/internal/retry/retry.go b/internal/retry/retry.go index 6c8d6267..34c79d45 100644 --- a/internal/retry/retry.go +++ b/internal/retry/retry.go @@ -69,11 +69,14 @@ func (r *Retryer) runRetryLoop( li := &LoopInfo{ durationLimit: r.retryLimit, } - funcinfos := lo.RepeatBy( - len(r.callbacks), - func(_ int) *FuncInfo { + funcinfos := lo.Map( + r.callbacks, + func(cb retryCallbackInfo, _ int) *FuncInfo { return &FuncInfo{ - lastResetTime: msync.NewTypedAtomic(startTime), + lastReset: msync.NewTypedAtomic(lastResetInfo{ + time: startTime, + }), + description: cb.description, loopDescription: r.description, loopInfo: li, } @@ -113,17 +116,25 @@ func (r *Retryer) runRetryLoop( defer ticker.Stop() for { - lastSuccessTime := funcinfos[i].lastResetTime.Load() + lastReset := funcinfos[i].lastReset.Load() select { case <-cbDoneChan: return case <-ticker.C: - if funcinfos[i].lastResetTime.Load() == lastSuccessTime { - logger.Warn(). + if funcinfos[i].lastReset.Load() == lastReset { + event := logger.Warn(). Str("callbackDescription", curCbInfo.description). - Time("lastSuccessAt", lastSuccessTime). - Str("elapsedTime", reportutils.DurationToHMS(time.Since(lastSuccessTime))). + Time("since", lastReset.time) + + if successDesc, hasDesc := lastReset.description.Get(); hasDesc { + event. + Str("successDescription", successDesc). + Uint64("successesSoFar", lastReset.resetsSoFar) + } + + event. + Str("elapsedTime", reportutils.DurationToHMS(time.Since(lastReset.time))). Msg("Operation has not reported success for a while.") } } @@ -164,9 +175,11 @@ func (r *Retryer) runRetryLoop( } failedFuncInfo := funcinfos[groupErr.funcNum] + descriptions := failedFuncInfo.GetDescriptions() + cbErr := groupErr.errFromCallback // Not a transient error? Fail immediately. - if !r.shouldRetryWithSleep(logger, sleepTime, r.callbacks[groupErr.funcNum], groupErr.errFromCallback) { + if !r.shouldRetryWithSleep(logger, sleepTime, descriptions, cbErr) { return groupErr.errFromCallback } @@ -201,7 +214,7 @@ func (r *Retryer) runRetryLoop( // Set all of the funcs that did *not* fail as having just succeeded. for i, curInfo := range funcinfos { if i != groupErr.funcNum { - curInfo.lastResetTime.Store(now) + curInfo.lastReset.Store(lastResetInfo{time: now}) } } } @@ -235,7 +248,7 @@ func (r *Retryer) addDescriptionToEvent(event *zerolog.Event) *zerolog.Event { func (r *Retryer) shouldRetryWithSleep( logger *logger.Logger, sleepTime time.Duration, - cbInfo retryCallbackInfo, + descriptions []string, err error, ) bool { if err == nil { @@ -266,11 +279,7 @@ func (r *Retryer) shouldRetryWithSleep( ), ) - if loopDesc, hasLoopDesc := r.description.Get(); hasLoopDesc { - event.Str("operationDescription", loopDesc) - } - - event.Str("callbackDescription", cbInfo.description). + event.Strs("description", descriptions). Int("error code", util.GetErrorCode(err)). Err(err) diff --git a/internal/retry/retry_info.go b/internal/retry/retry_info.go index ea325ac7..8be4fce3 100644 --- a/internal/retry/retry_info.go +++ b/internal/retry/retry_info.go @@ -1,9 +1,11 @@ package retry import ( + "slices" "time" "github.com/10gen/migration-verifier/internal/reportutils" + "github.com/10gen/migration-verifier/mslices" "github.com/10gen/migration-verifier/msync" "github.com/10gen/migration-verifier/option" "github.com/rs/zerolog" @@ -19,11 +21,20 @@ type LoopInfo struct { durationLimit time.Duration } +type lastResetInfo struct { + time time.Time + + // These go into logs to facilitate debugging. + description option.Option[string] + resetsSoFar uint64 +} + type FuncInfo struct { loopInfo *LoopInfo description string loopDescription option.Option[string] - lastResetTime *msync.TypedAtomic[time.Time] + + lastReset *msync.TypedAtomic[lastResetInfo] } // Log will log a debug-level message for the current Info values and the provided strings. @@ -69,7 +80,7 @@ func (fi *FuncInfo) GetAttemptNumber() int { // GetDurationSoFar returns the Info's current duration so far. This duration // applies to the duration of retrying for transient errors only. func (fi *FuncInfo) GetDurationSoFar() time.Duration { - return time.Since(fi.lastResetTime.Load()) + return time.Since(fi.lastReset.Load().time) } // NoteSuccess is used to tell the retry util to reset its measurement @@ -78,6 +89,21 @@ func (fi *FuncInfo) GetDurationSoFar() time.Duration { // // Call this after every successful command in a multi-command callback. // (It’s useless--but harmless--in a single-command callback.) -func (i *FuncInfo) NoteSuccess() { - i.lastResetTime.Store(time.Now()) +func (i *FuncInfo) NoteSuccess(description string) { + totalResets := i.lastReset.Load().resetsSoFar + + i.lastReset.Store(lastResetInfo{ + description: option.Some(description), + time: time.Now(), + resetsSoFar: 1 + totalResets, + }) +} + +func (i *FuncInfo) GetDescriptions() []string { + descriptions := mslices.Of(i.description) + if loopDesc, hasDesc := i.loopDescription.Get(); hasDesc { + descriptions = slices.Insert(descriptions, 0, loopDesc) + } + + return descriptions } diff --git a/internal/retry/retryer_test.go b/internal/retry/retryer_test.go index 8b430407..38a76fc0 100644 --- a/internal/retry/retryer_test.go +++ b/internal/retry/retryer_test.go @@ -8,6 +8,7 @@ import ( "time" "github.com/10gen/migration-verifier/internal/util" + "github.com/10gen/migration-verifier/option" "go.mongodb.org/mongo-driver/mongo" ) @@ -99,8 +100,11 @@ func (suite *UnitTestSuite) TestRetryerDurationReset() { noSuccessIterations := 0 f1 := func(_ context.Context, ri *FuncInfo) error { // Artificially advance how much time was taken. - ri.lastResetTime.Store( - ri.lastResetTime.Load().Add(-2 * ri.loopInfo.durationLimit), + ri.lastReset.Store( + lastResetInfo{ + time: ri.lastReset.Load().time.Add(-2 * ri.loopInfo.durationLimit), + description: option.Some("artificially rewinding time"), + }, ) noSuccessIterations++ @@ -124,12 +128,13 @@ func (suite *UnitTestSuite) TestRetryerDurationReset() { successIterations := 0 f2 := func(_ context.Context, ri *FuncInfo) error { // Artificially advance how much time was taken. - ri.lastResetTime.Store( - ri.lastResetTime.Load().Add(-2 * ri.loopInfo.durationLimit), + ri.lastReset.Store( + lastResetInfo{ + time: ri.lastReset.Load().time.Add(-2 * ri.loopInfo.durationLimit), + description: option.Some("artificially rewinding time"), + }, ) - ri.NoteSuccess() - successIterations++ if successIterations == 1 { return someNetworkError @@ -307,7 +312,7 @@ func (suite *UnitTestSuite) TestMulti_LongRunningSuccess() { err := retryer.WithCallback( func(ctx context.Context, fi *FuncInfo) error { - fi.NoteSuccess() + fi.NoteSuccess("success right away") if time.Now().Before(succeedPastTime) { time.Sleep(1 * time.Second) diff --git a/internal/verifier/change_stream.go b/internal/verifier/change_stream.go index 4c5bc1d9..b4eb71cb 100644 --- a/internal/verifier/change_stream.go +++ b/internal/verifier/change_stream.go @@ -299,7 +299,7 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch( eventsRead++ } - ri.NoteSuccess() + ri.NoteSuccess("received a batch of change events") if eventsRead == 0 { return nil diff --git a/internal/verifier/change_stream_test.go b/internal/verifier/change_stream_test.go index 6da20876..6eeaf7a1 100644 --- a/internal/verifier/change_stream_test.go +++ b/internal/verifier/change_stream_test.go @@ -172,7 +172,6 @@ func (suite *IntegrationTestSuite) TestChangeStreamFilter_WithNamespaces() { events = append(events, newEvent) } - suite.T().Logf("Change stream op time (got event? %v): %v", gotEvent, csOpTime) if csOpTime.After(*changeStreamStopTime) { break } diff --git a/internal/verifier/compare.go b/internal/verifier/compare.go index 3fdcb12e..d6f8ccae 100644 --- a/internal/verifier/compare.go +++ b/internal/verifier/compare.go @@ -324,7 +324,7 @@ func (verifier *Verifier) getFetcherChannelsAndCallbacks( ) if err == nil { - state.NoteSuccess() + state.NoteSuccess("opened src find cursor") err = errors.Wrap( iterateCursorToChannel(ctx, state, cursor, srcChannel), @@ -350,7 +350,7 @@ func (verifier *Verifier) getFetcherChannelsAndCallbacks( ) if err == nil { - state.NoteSuccess() + state.NoteSuccess("opened dst find cursor") err = errors.Wrap( iterateCursorToChannel(ctx, state, cursor, dstChannel), @@ -376,7 +376,7 @@ func iterateCursorToChannel( writer chan<- bson.Raw, ) error { for cursor.Next(ctx) { - state.NoteSuccess() + state.NoteSuccess("received a document") writer <- slices.Clone(cursor.Current) } From 6bc2e10c67dec168bc71ad4faae173958dcac8e7 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Fri, 6 Dec 2024 12:24:06 -0500 Subject: [PATCH 10/28] tweak --- internal/retry/retry.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/retry/retry.go b/internal/retry/retry.go index 34c79d45..396e29a3 100644 --- a/internal/retry/retry.go +++ b/internal/retry/retry.go @@ -125,12 +125,12 @@ func (r *Retryer) runRetryLoop( if funcinfos[i].lastReset.Load() == lastReset { event := logger.Warn(). Str("callbackDescription", curCbInfo.description). - Time("since", lastReset.time) + Time("noSuccessSince", lastReset.time). + Uint64("successesSoFar", lastReset.resetsSoFar) if successDesc, hasDesc := lastReset.description.Get(); hasDesc { event. - Str("successDescription", successDesc). - Uint64("successesSoFar", lastReset.resetsSoFar) + Str("lastSuccessDescription", successDesc) } event. From 7dda460b29eff15095520f6173550ffc7ed0d5d2 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Fri, 6 Dec 2024 16:05:48 -0500 Subject: [PATCH 11/28] Convert change stream err and writesOffTs channels to Eventuals. --- internal/util/eventual.go | 55 +++++++++++++++++++++++++ internal/verifier/change_stream.go | 24 +++++------ internal/verifier/change_stream_test.go | 4 +- internal/verifier/check.go | 9 ++-- internal/verifier/migration_verifier.go | 12 ++++-- 5 files changed, 82 insertions(+), 22 deletions(-) create mode 100644 internal/util/eventual.go diff --git a/internal/util/eventual.go b/internal/util/eventual.go new file mode 100644 index 00000000..3968cafe --- /dev/null +++ b/internal/util/eventual.go @@ -0,0 +1,55 @@ +package util + +import ( + "sync" + + "github.com/10gen/migration-verifier/option" +) + +// Eventual represents a value that isn’t available when this struct is created +// but can be awaited via a channel. +// +// This is much like how context.Context’s Done() and Err() methods work. +// It’s useful to await a value’s readiness via channel but then read it +// multiple times. +type Eventual[T any] struct { + ready chan struct{} + val option.Option[T] + mux sync.RWMutex +} + +// NewEventual creates an Eventual and returns a pointer +// to it. +func NewEventual[T any]() *Eventual[T] { + return &Eventual[T]{ + ready: make(chan struct{}), + } +} + +// Ready returns a channel that closes once the Eventual’s value is ready. +func (e *Eventual[T]) Ready() <-chan struct{} { + return e.ready +} + +// Get returns an option that contains the Eventual’s value, or +// empty if the value isn’t ready yet. +func (e *Eventual[T]) Get() option.Option[T] { + e.mux.RLock() + defer e.mux.RUnlock() + + return e.val +} + +// Set +func (e *Eventual[T]) Set(val T) { + e.mux.Lock() + defer e.mux.Unlock() + + if e.val.IsSome() { + panic("Double set on eventual!") + } + + e.val = option.Some(val) + + close(e.ready) +} diff --git a/internal/verifier/change_stream.go b/internal/verifier/change_stream.go index e48a22d5..e116334c 100644 --- a/internal/verifier/change_stream.go +++ b/internal/verifier/change_stream.go @@ -66,8 +66,8 @@ type ChangeStreamReader struct { changeStreamRunning bool changeEventBatchChan chan []ParsedEvent - writesOffTsChan chan primitive.Timestamp - errChan chan error + writesOffTs *util.Eventual[primitive.Timestamp] + error *util.Eventual[error] doneChan chan struct{} startAtTs *primitive.Timestamp @@ -83,8 +83,8 @@ func (verifier *Verifier) initializeChangeStreamReaders() { clusterInfo: *verifier.srcClusterInfo, changeStreamRunning: false, changeEventBatchChan: make(chan []ParsedEvent), - writesOffTsChan: make(chan primitive.Timestamp), - errChan: make(chan error), + writesOffTs: util.NewEventual[primitive.Timestamp](), + error: util.NewEventual[error](), doneChan: make(chan struct{}), } verifier.dstChangeStreamReader = &ChangeStreamReader{ @@ -96,8 +96,8 @@ func (verifier *Verifier) initializeChangeStreamReaders() { clusterInfo: *verifier.dstClusterInfo, changeStreamRunning: false, changeEventBatchChan: make(chan []ParsedEvent), - writesOffTsChan: make(chan primitive.Timestamp), - errChan: make(chan error), + writesOffTs: util.NewEventual[primitive.Timestamp](), + error: util.NewEventual[error](), doneChan: make(chan struct{}), } } @@ -117,7 +117,6 @@ func (verifier *Verifier) StartChangeEventHandler(ctx context.Context, reader *C verifier.logger.Trace().Msgf("Verifier is handling a change event batch from %s: %v", reader, batch) err := verifier.HandleChangeStreamEvents(ctx, batch, reader.readerType) if err != nil { - reader.errChan <- err return err } } @@ -336,7 +335,9 @@ func (csr *ChangeStreamReader) iterateChangeStream( // source writes are ended and the migration tool is finished / committed. // This means we should exit rather than continue reading the change stream // since there should be no more events. - case writesOffTs := <-csr.writesOffTsChan: + case <-csr.writesOffTs.Ready(): + writesOffTs := csr.writesOffTs.Get().MustGet() + csr.logger.Debug(). Interface("writesOffTimestamp", writesOffTs). Msgf("%s thread received writesOff timestamp. Finalizing change stream.", csr) @@ -389,7 +390,7 @@ func (csr *ChangeStreamReader) iterateChangeStream( } // since we have started Recheck, we must signal that we have // finished the change stream changes so that Recheck can continue. - csr.doneChan <- struct{}{} + close(csr.doneChan) break } } @@ -525,10 +526,7 @@ func (csr *ChangeStreamReader) StartChangeStream(ctx context.Context) error { ) if err != nil { - // NB: This failure always happens after the initial change stream - // creation. - csr.errChan <- err - close(csr.errChan) + csr.error.Set(err) } }() diff --git a/internal/verifier/change_stream_test.go b/internal/verifier/change_stream_test.go index 6da20876..714c8332 100644 --- a/internal/verifier/change_stream_test.go +++ b/internal/verifier/change_stream_test.go @@ -319,7 +319,7 @@ func (suite *IntegrationTestSuite) TestStartAtTimeNoChanges() { suite.Require().NotNil(origStartTs) suite.startSrcChangeStreamReaderAndHandler(ctx, verifier) suite.Require().Equal(verifier.srcChangeStreamReader.startAtTs, origStartTs) - verifier.srcChangeStreamReader.writesOffTsChan <- *origStartTs + verifier.srcChangeStreamReader.writesOffTs.Set(*origStartTs) <-verifier.srcChangeStreamReader.doneChan suite.Require().Equal(verifier.srcChangeStreamReader.startAtTs, origStartTs) } @@ -365,7 +365,7 @@ func (suite *IntegrationTestSuite) TestStartAtTimeWithChanges() { "session time after events should exceed the original", ) - verifier.srcChangeStreamReader.writesOffTsChan <- *postEventsSessionTime + verifier.srcChangeStreamReader.writesOffTs.Set(*postEventsSessionTime) <-verifier.srcChangeStreamReader.doneChan suite.Assert().Equal( diff --git a/internal/verifier/check.go b/internal/verifier/check.go index 3207791f..b524309d 100644 --- a/internal/verifier/check.go +++ b/internal/verifier/check.go @@ -49,7 +49,8 @@ func (verifier *Verifier) waitForChangeStream(ctx context.Context, csr *ChangeSt select { case <-ctx.Done(): return ctx.Err() - case err := <-csr.errChan: + case <-csr.error.Ready(): + err := csr.error.Get().MustGet() verifier.logger.Warn().Err(err). Msgf("Received error from %s.", csr) return err @@ -87,9 +88,11 @@ func (verifier *Verifier) CheckWorker(ctxIn context.Context) error { // If the change stream fails, everything should stop. eg.Go(func() error { select { - case err := <-verifier.srcChangeStreamReader.errChan: + case <-verifier.srcChangeStreamReader.error.Ready(): + err := verifier.srcChangeStreamReader.error.Get().MustGet() return errors.Wrapf(err, "%s failed", verifier.srcChangeStreamReader) - case err := <-verifier.dstChangeStreamReader.errChan: + case <-verifier.dstChangeStreamReader.error.Ready(): + err := verifier.dstChangeStreamReader.error.Get().MustGet() return errors.Wrapf(err, "%s failed", verifier.dstChangeStreamReader) case <-ctx.Done(): return nil diff --git a/internal/verifier/migration_verifier.go b/internal/verifier/migration_verifier.go index cd158af0..df76df8a 100644 --- a/internal/verifier/migration_verifier.go +++ b/internal/verifier/migration_verifier.go @@ -279,15 +279,19 @@ func (verifier *Verifier) WritesOff(ctx context.Context) error { // might be inserting docs into the recheck queue, which happens // under the lock. select { - case verifier.srcChangeStreamReader.writesOffTsChan <- srcFinalTs: - case err := <-verifier.srcChangeStreamReader.errChan: + case <-verifier.srcChangeStreamReader.error.Ready(): + err := verifier.srcChangeStreamReader.error.Get().MustGet() return errors.Wrapf(err, "tried to send writes-off timestamp to %s, but change stream already failed", verifier.srcChangeStreamReader) + default: + verifier.srcChangeStreamReader.writesOffTs.Set(srcFinalTs) } select { - case verifier.dstChangeStreamReader.writesOffTsChan <- dstFinalTs: - case err := <-verifier.dstChangeStreamReader.errChan: + case <-verifier.dstChangeStreamReader.error.Ready(): + err := verifier.dstChangeStreamReader.error.Get().MustGet() return errors.Wrapf(err, "tried to send writes-off timestamp to %s, but change stream already failed", verifier.dstChangeStreamReader) + default: + verifier.dstChangeStreamReader.writesOffTs.Set(dstFinalTs) } return nil From 78c609f78db261a28482f0fa522d02688ad2c2ea Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Fri, 6 Dec 2024 16:11:34 -0500 Subject: [PATCH 12/28] add test for Eventual --- internal/util/eventual_test.go | 37 ++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) create mode 100644 internal/util/eventual_test.go diff --git a/internal/util/eventual_test.go b/internal/util/eventual_test.go new file mode 100644 index 00000000..871a2179 --- /dev/null +++ b/internal/util/eventual_test.go @@ -0,0 +1,37 @@ +package util + +import ( + "time" + + "github.com/10gen/migration-verifier/option" +) + +func (s *UnitTestSuite) TestEventual() { + eventual := NewEventual[int]() + + s.Assert().Equal( + option.None[int](), + eventual.Get(), + "Get() should return empty", + ) + + select { + case <-eventual.Ready(): + s.Require().Fail("should not be ready") + case <-time.NewTimer(time.Second).C: + } + + eventual.Set(123) + + select { + case <-eventual.Ready(): + case <-time.NewTimer(time.Second).C: + s.Require().Fail("should be ready") + } + + s.Assert().Equal( + option.Some(123), + eventual.Get(), + "Get() should return the value", + ) +} From 9c17faf31faa19bdfc614985f3d400a12484eaea Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Fri, 6 Dec 2024 16:36:33 -0500 Subject: [PATCH 13/28] trace for test that flapped --- internal/verifier/change_stream_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/verifier/change_stream_test.go b/internal/verifier/change_stream_test.go index 714c8332..6910acbb 100644 --- a/internal/verifier/change_stream_test.go +++ b/internal/verifier/change_stream_test.go @@ -307,6 +307,8 @@ func (suite *IntegrationTestSuite) fetchVerifierRechecks(ctx context.Context, ve } func (suite *IntegrationTestSuite) TestStartAtTimeNoChanges() { + zerolog.SetGlobalLevel(zerolog.TraceLevel) + verifier := suite.BuildVerifier() ctx := suite.Context() sess, err := suite.srcMongoClient.StartSession() From fa63ff7ab737a6c82d6c4a94dd4334b7bbbc5e20 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Fri, 6 Dec 2024 16:40:01 -0500 Subject: [PATCH 14/28] godoc --- internal/util/eventual.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/internal/util/eventual.go b/internal/util/eventual.go index 3968cafe..ade2b7d2 100644 --- a/internal/util/eventual.go +++ b/internal/util/eventual.go @@ -40,7 +40,8 @@ func (e *Eventual[T]) Get() option.Option[T] { return e.val } -// Set +// Set sets the Eventual’s value. It may be called only once; +// if called again it will panic. func (e *Eventual[T]) Set(val T) { e.mux.Lock() defer e.mux.Unlock() From 28b8525b93cc6d1fdccc6ecff2d511aa88ba1ea2 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Fri, 6 Dec 2024 23:16:55 -0500 Subject: [PATCH 15/28] fix case where context closes docs reader --- internal/verifier/compare.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/internal/verifier/compare.go b/internal/verifier/compare.go index d6f8ccae..c92489f1 100644 --- a/internal/verifier/compare.go +++ b/internal/verifier/compare.go @@ -375,12 +375,17 @@ func iterateCursorToChannel( cursor *mongo.Cursor, writer chan<- bson.Raw, ) error { + defer close(writer) + for cursor.Next(ctx) { state.NoteSuccess("received a document") - writer <- slices.Clone(cursor.Current) - } - close(writer) + select { + case <-ctx.Done(): + return ctx.Err() + case writer <- slices.Clone(cursor.Current): + } + } return errors.Wrap(cursor.Err(), "failed to iterate cursor") } From 490b2d65aca6364d085844d95476cd3de6a6d807 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Mon, 9 Dec 2024 08:04:57 -0500 Subject: [PATCH 16/28] add test --- internal/verifier/integration_test_suite.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/internal/verifier/integration_test_suite.go b/internal/verifier/integration_test_suite.go index 0ba77ff2..b87e874b 100644 --- a/internal/verifier/integration_test_suite.go +++ b/internal/verifier/integration_test_suite.go @@ -79,6 +79,8 @@ func (suite *IntegrationTestSuite) SetupSuite() { func (suite *IntegrationTestSuite) SetupTest() { ctx, canceller := context.WithCancelCause(context.Background()) + suite.testContext, suite.contextCanceller = ctx, canceller + suite.zerologGlobalLogLevel = zerolog.GlobalLevel() dbname := suite.DBNameForTest() @@ -112,9 +114,6 @@ func (suite *IntegrationTestSuite) SetupTest() { suite.initialDbNames.Add(dbName) } } - - suite.testContext, suite.contextCanceller = ctx, canceller - suite.zerologGlobalLogLevel = zerolog.GlobalLevel() } func (suite *IntegrationTestSuite) TearDownTest() { From c1990096054628b2a7709c388dde8f191dc9365c Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Mon, 9 Dec 2024 08:11:54 -0500 Subject: [PATCH 17/28] revert retryer changes --- internal/partitions/partitions.go | 2 +- internal/retry/retry.go | 62 ++++++++++-------------------- internal/retry/retry_info.go | 34 ++-------------- internal/retry/retryer_test.go | 19 ++++----- internal/verifier/change_stream.go | 2 +- internal/verifier/compare.go | 6 +-- 6 files changed, 36 insertions(+), 89 deletions(-) diff --git a/internal/partitions/partitions.go b/internal/partitions/partitions.go index acb02c3a..022a54e4 100644 --- a/internal/partitions/partitions.go +++ b/internal/partitions/partitions.go @@ -621,7 +621,7 @@ func getMidIDBounds( // Append the copied bound to the other mid _id bounds. midIDBounds = append(midIDBounds, bound) - ri.NoteSuccess("received an ID partition") + ri.NoteSuccess() } return cursor.Err() diff --git a/internal/retry/retry.go b/internal/retry/retry.go index 396e29a3..6630bbe0 100644 --- a/internal/retry/retry.go +++ b/internal/retry/retry.go @@ -69,14 +69,11 @@ func (r *Retryer) runRetryLoop( li := &LoopInfo{ durationLimit: r.retryLimit, } - funcinfos := lo.Map( - r.callbacks, - func(cb retryCallbackInfo, _ int) *FuncInfo { + funcinfos := lo.RepeatBy( + len(r.callbacks), + func(_ int) *FuncInfo { return &FuncInfo{ - lastReset: msync.NewTypedAtomic(lastResetInfo{ - time: startTime, - }), - description: cb.description, + lastResetTime: msync.NewTypedAtomic(startTime), loopDescription: r.description, loopInfo: li, } @@ -116,25 +113,17 @@ func (r *Retryer) runRetryLoop( defer ticker.Stop() for { - lastReset := funcinfos[i].lastReset.Load() + lastSuccessTime := funcinfos[i].lastResetTime.Load() select { case <-cbDoneChan: return case <-ticker.C: - if funcinfos[i].lastReset.Load() == lastReset { - event := logger.Warn(). + if funcinfos[i].lastResetTime.Load() == lastSuccessTime { + logger.Warn(). Str("callbackDescription", curCbInfo.description). - Time("noSuccessSince", lastReset.time). - Uint64("successesSoFar", lastReset.resetsSoFar) - - if successDesc, hasDesc := lastReset.description.Get(); hasDesc { - event. - Str("lastSuccessDescription", successDesc) - } - - event. - Str("elapsedTime", reportutils.DurationToHMS(time.Since(lastReset.time))). + Time("lastSuccessAt", lastSuccessTime). + Str("elapsedTime", reportutils.DurationToHMS(time.Since(lastSuccessTime))). Msg("Operation has not reported success for a while.") } } @@ -175,11 +164,9 @@ func (r *Retryer) runRetryLoop( } failedFuncInfo := funcinfos[groupErr.funcNum] - descriptions := failedFuncInfo.GetDescriptions() - cbErr := groupErr.errFromCallback // Not a transient error? Fail immediately. - if !r.shouldRetryWithSleep(logger, sleepTime, descriptions, cbErr) { + if !r.shouldRetryWithSleep(logger, sleepTime, *failedFuncInfo, groupErr.errFromCallback) { return groupErr.errFromCallback } @@ -214,7 +201,7 @@ func (r *Retryer) runRetryLoop( // Set all of the funcs that did *not* fail as having just succeeded. for i, curInfo := range funcinfos { if i != groupErr.funcNum { - curInfo.lastReset.Store(lastResetInfo{time: now}) + curInfo.lastResetTime.Store(now) } } } @@ -248,7 +235,7 @@ func (r *Retryer) addDescriptionToEvent(event *zerolog.Event) *zerolog.Event { func (r *Retryer) shouldRetryWithSleep( logger *logger.Logger, sleepTime time.Duration, - descriptions []string, + funcinfo FuncInfo, err error, ) bool { if err == nil { @@ -263,35 +250,26 @@ func (r *Retryer) shouldRetryWithSleep( ) event := logger.WithLevel( - lo.Ternary( - // If it’s transient, surface it as info. - isTransient, - zerolog.InfoLevel, - - lo.Ternary( - // Context cancellation is unimportant, so debug. - errors.Is(err, context.Canceled), - zerolog.DebugLevel, - - // Other non-retryables are serious, so warn. - zerolog.WarnLevel, - ), - ), + lo.Ternary(isTransient, zerolog.InfoLevel, zerolog.WarnLevel), ) - event.Strs("description", descriptions). + if loopDesc, hasLoopDesc := r.description.Get(); hasLoopDesc { + event.Str("operationDescription", loopDesc) + } + + event.Str("callbackDescription", funcinfo.description). Int("error code", util.GetErrorCode(err)). Err(err) if isTransient { event. Stringer("delay", sleepTime). - Msg("Got retryable error. Pausing, then will retry.") + Msg("Pausing before retrying after transient error.") return true } - event.Msg("Non-retryable error occurred.") + event.Msg("Non-transient error occurred.") return false } diff --git a/internal/retry/retry_info.go b/internal/retry/retry_info.go index 8be4fce3..ea325ac7 100644 --- a/internal/retry/retry_info.go +++ b/internal/retry/retry_info.go @@ -1,11 +1,9 @@ package retry import ( - "slices" "time" "github.com/10gen/migration-verifier/internal/reportutils" - "github.com/10gen/migration-verifier/mslices" "github.com/10gen/migration-verifier/msync" "github.com/10gen/migration-verifier/option" "github.com/rs/zerolog" @@ -21,20 +19,11 @@ type LoopInfo struct { durationLimit time.Duration } -type lastResetInfo struct { - time time.Time - - // These go into logs to facilitate debugging. - description option.Option[string] - resetsSoFar uint64 -} - type FuncInfo struct { loopInfo *LoopInfo description string loopDescription option.Option[string] - - lastReset *msync.TypedAtomic[lastResetInfo] + lastResetTime *msync.TypedAtomic[time.Time] } // Log will log a debug-level message for the current Info values and the provided strings. @@ -80,7 +69,7 @@ func (fi *FuncInfo) GetAttemptNumber() int { // GetDurationSoFar returns the Info's current duration so far. This duration // applies to the duration of retrying for transient errors only. func (fi *FuncInfo) GetDurationSoFar() time.Duration { - return time.Since(fi.lastReset.Load().time) + return time.Since(fi.lastResetTime.Load()) } // NoteSuccess is used to tell the retry util to reset its measurement @@ -89,21 +78,6 @@ func (fi *FuncInfo) GetDurationSoFar() time.Duration { // // Call this after every successful command in a multi-command callback. // (It’s useless--but harmless--in a single-command callback.) -func (i *FuncInfo) NoteSuccess(description string) { - totalResets := i.lastReset.Load().resetsSoFar - - i.lastReset.Store(lastResetInfo{ - description: option.Some(description), - time: time.Now(), - resetsSoFar: 1 + totalResets, - }) -} - -func (i *FuncInfo) GetDescriptions() []string { - descriptions := mslices.Of(i.description) - if loopDesc, hasDesc := i.loopDescription.Get(); hasDesc { - descriptions = slices.Insert(descriptions, 0, loopDesc) - } - - return descriptions +func (i *FuncInfo) NoteSuccess() { + i.lastResetTime.Store(time.Now()) } diff --git a/internal/retry/retryer_test.go b/internal/retry/retryer_test.go index 38a76fc0..8b430407 100644 --- a/internal/retry/retryer_test.go +++ b/internal/retry/retryer_test.go @@ -8,7 +8,6 @@ import ( "time" "github.com/10gen/migration-verifier/internal/util" - "github.com/10gen/migration-verifier/option" "go.mongodb.org/mongo-driver/mongo" ) @@ -100,11 +99,8 @@ func (suite *UnitTestSuite) TestRetryerDurationReset() { noSuccessIterations := 0 f1 := func(_ context.Context, ri *FuncInfo) error { // Artificially advance how much time was taken. - ri.lastReset.Store( - lastResetInfo{ - time: ri.lastReset.Load().time.Add(-2 * ri.loopInfo.durationLimit), - description: option.Some("artificially rewinding time"), - }, + ri.lastResetTime.Store( + ri.lastResetTime.Load().Add(-2 * ri.loopInfo.durationLimit), ) noSuccessIterations++ @@ -128,13 +124,12 @@ func (suite *UnitTestSuite) TestRetryerDurationReset() { successIterations := 0 f2 := func(_ context.Context, ri *FuncInfo) error { // Artificially advance how much time was taken. - ri.lastReset.Store( - lastResetInfo{ - time: ri.lastReset.Load().time.Add(-2 * ri.loopInfo.durationLimit), - description: option.Some("artificially rewinding time"), - }, + ri.lastResetTime.Store( + ri.lastResetTime.Load().Add(-2 * ri.loopInfo.durationLimit), ) + ri.NoteSuccess() + successIterations++ if successIterations == 1 { return someNetworkError @@ -312,7 +307,7 @@ func (suite *UnitTestSuite) TestMulti_LongRunningSuccess() { err := retryer.WithCallback( func(ctx context.Context, fi *FuncInfo) error { - fi.NoteSuccess("success right away") + fi.NoteSuccess() if time.Now().Before(succeedPastTime) { time.Sleep(1 * time.Second) diff --git a/internal/verifier/change_stream.go b/internal/verifier/change_stream.go index 2a7ba265..09c5f415 100644 --- a/internal/verifier/change_stream.go +++ b/internal/verifier/change_stream.go @@ -298,7 +298,7 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch( eventsRead++ } - ri.NoteSuccess("received a batch of change events") + ri.NoteSuccess() if eventsRead == 0 { return nil diff --git a/internal/verifier/compare.go b/internal/verifier/compare.go index c92489f1..d999f047 100644 --- a/internal/verifier/compare.go +++ b/internal/verifier/compare.go @@ -324,7 +324,7 @@ func (verifier *Verifier) getFetcherChannelsAndCallbacks( ) if err == nil { - state.NoteSuccess("opened src find cursor") + state.NoteSuccess() err = errors.Wrap( iterateCursorToChannel(ctx, state, cursor, srcChannel), @@ -350,7 +350,7 @@ func (verifier *Verifier) getFetcherChannelsAndCallbacks( ) if err == nil { - state.NoteSuccess("opened dst find cursor") + state.NoteSuccess() err = errors.Wrap( iterateCursorToChannel(ctx, state, cursor, dstChannel), @@ -378,7 +378,7 @@ func iterateCursorToChannel( defer close(writer) for cursor.Next(ctx) { - state.NoteSuccess("received a document") + state.NoteSuccess() select { case <-ctx.Done(): From bab60a525ad8fdfba831e168b279d4f374d0513f Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Mon, 9 Dec 2024 08:27:58 -0500 Subject: [PATCH 18/28] docs tweak --- internal/util/eventual.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/internal/util/eventual.go b/internal/util/eventual.go index ade2b7d2..e9da4b0b 100644 --- a/internal/util/eventual.go +++ b/internal/util/eventual.go @@ -6,12 +6,12 @@ import ( "github.com/10gen/migration-verifier/option" ) -// Eventual represents a value that isn’t available when this struct is created -// but can be awaited via a channel. +// Eventual solves the “one writer, many readers” problem: a value gets +// written once, then the readers will see that the value is `Ready()` and +// can then `Get()` it. // -// This is much like how context.Context’s Done() and Err() methods work. -// It’s useful to await a value’s readiness via channel but then read it -// multiple times. +// It’s like how `context.Context`’s `Done()` and `Err()` methods work, but +// generalized to any data type. type Eventual[T any] struct { ready chan struct{} val option.Option[T] @@ -47,9 +47,11 @@ func (e *Eventual[T]) Set(val T) { defer e.mux.Unlock() if e.val.IsSome() { - panic("Double set on eventual!") + panic("Tried to set an eventual twice!") } + // NB: This *must* happen before the close(), or else a fast reader may + // not see this value. e.val = option.Some(val) close(e.ready) From 250a659be15d08d5e61fe362b0ad4dc6d80c92f5 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Mon, 9 Dec 2024 08:28:27 -0500 Subject: [PATCH 19/28] add test --- internal/verifier/compare_test.go | 78 +++++++++++++++++++++++++++++++ 1 file changed, 78 insertions(+) create mode 100644 internal/verifier/compare_test.go diff --git a/internal/verifier/compare_test.go b/internal/verifier/compare_test.go new file mode 100644 index 00000000..88c817e8 --- /dev/null +++ b/internal/verifier/compare_test.go @@ -0,0 +1,78 @@ +package verifier + +import ( + "context" + "math/rand" + "sync/atomic" + "time" + + "github.com/10gen/migration-verifier/internal/partitions" + "github.com/10gen/migration-verifier/mslices" + "github.com/pkg/errors" + "github.com/samber/lo" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/primitive" +) + +// TestFetchAndCompareDocuments_ContextCancellation ensures that nothing hangs +// when a context is canceled during FetchAndCompareDocuments(). +func (s *IntegrationTestSuite) TestFetchAndCompareDocuments_Context() { + ctx := s.Context() + + for _, client := range mslices.Of(s.srcMongoClient, s.dstMongoClient) { + docs := lo.RepeatBy( + 10_000, + func(i int) bson.D { + return bson.D{} + }, + ) + + _, err := client.Database(s.DBNameForTest()).Collection("stuff"). + InsertMany(ctx, lo.ToAnySlice(docs)) + + s.Require().NoError(err) + } + + task := VerificationTask{ + PrimaryKey: primitive.NewObjectID(), + Type: verificationTaskVerifyDocuments, + Status: verificationTaskProcessing, + QueryFilter: QueryFilter{ + Namespace: s.DBNameForTest() + ".stuff", + Partition: &partitions.Partition{ + Key: partitions.PartitionKey{ + Lower: primitive.MinKey{}, + }, + Upper: primitive.MaxKey{}, + }, + }, + } + + verifier := s.BuildVerifier() + + for range 100 { + cancelableCtx, cancel := context.WithCancelCause(ctx) + + var done atomic.Bool + go func() { + verifier.FetchAndCompareDocuments( + cancelableCtx, + &task, + ) + done.Store(true) + }() + + delay := time.Duration(100 * float64(time.Millisecond) * rand.Float64()) + time.Sleep(delay) + cancel(errors.Errorf("canceled after %s", delay)) + + s.Assert().Eventually( + func() bool { + return done.Load() + }, + time.Minute, + 10*time.Millisecond, + "cancellation after %s should not cause hang", + ) + } +} From be2638e77b96b9fd677fc03344e1f798c105df4d Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Mon, 9 Dec 2024 08:56:03 -0500 Subject: [PATCH 20/28] change stream test --- internal/testutil/testutil.go | 67 ++++++++++++++++ internal/verifier/change_stream_test.go | 102 +++++++++++++----------- internal/verifier/migration_verifier.go | 4 +- 3 files changed, 124 insertions(+), 49 deletions(-) diff --git a/internal/testutil/testutil.go b/internal/testutil/testutil.go index e7c367c9..9a8c5897 100644 --- a/internal/testutil/testutil.go +++ b/internal/testutil/testutil.go @@ -1,8 +1,14 @@ package testutil import ( + "context" + "testing" + + "github.com/pkg/errors" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + "go.mongodb.org/mongo-driver/mongo/readconcern" ) // Marshal wraps `bsonMarshal` with a panic on failure. @@ -34,3 +40,64 @@ func convertDocsToAnys(docs []bson.D) []any { return anys } + +func KillApplicationChangeStreams( + ctx context.Context, + t *testing.T, + client *mongo.Client, + appName string, +) error { + // Kill verifier’s change stream. + cursor, err := client.Database( + "admin", + options.Database().SetReadConcern(readconcern.Local()), + ).Aggregate( + ctx, + mongo.Pipeline{ + { + {"$currentOp", bson.D{ + {"idleCursors", true}, + }}, + }, + { + {"$match", bson.D{ + {"clientMetadata.application.name", appName}, + {"command.collection", "$cmd.aggregate"}, + {"cursor.originatingCommand.pipeline.0.$_internalChangeStreamOplogMatch", + bson.D{{"$type", "object"}}, + }, + }}, + }, + }, + ) + if err != nil { + return errors.Wrapf(err, "failed to list %#q's change streams", appName) + } + + ops := []struct { + Opid any + }{} + err = cursor.All(ctx, &ops) + if err != nil { + return errors.Wrapf(err, "failed to decode %#q's change streams", appName) + } + + for _, op := range ops { + t.Logf("Killing change stream op %+v", op.Opid) + + err := + client.Database("admin").RunCommand( + ctx, + bson.D{ + {"killOp", 1}, + {"op", op.Opid}, + }, + ).Err() + + if err != nil { + return errors.Wrapf(err, "failed to kill change stream with opId %#q", op.Opid) + } + } + + return nil +} diff --git a/internal/verifier/change_stream_test.go b/internal/verifier/change_stream_test.go index 944a4405..47f23b16 100644 --- a/internal/verifier/change_stream_test.go +++ b/internal/verifier/change_stream_test.go @@ -15,8 +15,6 @@ import ( "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" - "go.mongodb.org/mongo-driver/mongo/options" - "go.mongodb.org/mongo-driver/mongo/readconcern" ) func (suite *IntegrationTestSuite) TestChangeStreamFilter_NoNamespaces() { @@ -428,6 +426,52 @@ func (suite *IntegrationTestSuite) TestWithChangeEventsBatching() { ) } +func (suite *IntegrationTestSuite) TestWritesOffCursorKilledResilience() { + ctx := suite.Context() + + coll := suite.srcMongoClient. + Database(suite.DBNameForTest()). + Collection("mycoll") + + suite.Require().NoError( + coll.Database().CreateCollection( + ctx, + coll.Name(), + ), + ) + + suite.Require().NoError( + suite.dstMongoClient. + Database(coll.Database().Name()). + CreateCollection( + ctx, + coll.Name(), + ), + ) + + for range 100 { + verifier := suite.BuildVerifier() + + docs := lo.RepeatBy(1_000, func(_ int) bson.D { return bson.D{} }) + _, err := coll.InsertMany( + ctx, + lo.ToAnySlice(docs), + ) + suite.Require().NoError(err) + + suite.Require().NoError(verifier.WritesOff(ctx)) + + suite.Require().NoError( + testutil.KillApplicationChangeStreams( + suite.Context(), + suite.T(), + suite.srcMongoClient, + clientAppName, + ), + ) + } +} + func (suite *IntegrationTestSuite) TestCursorKilledResilience() { ctx := suite.Context() @@ -445,54 +489,16 @@ func (suite *IntegrationTestSuite) TestCursorKilledResilience() { // wait for generation 0 to end suite.Require().NoError(verifierRunner.AwaitGenerationEnd()) - const mvName = "Migration Verifier" - - // Kill verifier’s change stream. - cursor, err := suite.srcMongoClient.Database( - "admin", - options.Database().SetReadConcern(readconcern.Local()), - ).Aggregate( - ctx, - mongo.Pipeline{ - { - {"$currentOp", bson.D{ - {"idleCursors", true}, - }}, - }, - { - {"$match", bson.D{ - {"clientMetadata.application.name", mvName}, - {"command.collection", "$cmd.aggregate"}, - {"cursor.originatingCommand.pipeline.0.$_internalChangeStreamOplogMatch", - bson.D{{"$type", "object"}}, - }, - }}, - }, - }, + suite.Require().NoError( + testutil.KillApplicationChangeStreams( + suite.Context(), + suite.T(), + suite.srcMongoClient, + clientAppName, + ), ) - suite.Require().NoError(err) - - var ops []bson.Raw - suite.Require().NoError(cursor.All(ctx, &ops)) - - for _, cursorRaw := range ops { - opId, err := cursorRaw.LookupErr("opid") - suite.Require().NoError(err, "should get opid from op") - - suite.T().Logf("Killing change stream op %+v", opId) - - suite.Require().NoError( - suite.srcMongoClient.Database("admin").RunCommand( - ctx, - bson.D{ - {"killOp", 1}, - {"op", opId}, - }, - ).Err(), - ) - } - _, err = coll.InsertOne( + _, err := coll.InsertOne( ctx, bson.D{{"_id", "after kill"}}, ) diff --git a/internal/verifier/migration_verifier.go b/internal/verifier/migration_verifier.go index b4228f1b..6eff1949 100644 --- a/internal/verifier/migration_verifier.go +++ b/internal/verifier/migration_verifier.go @@ -68,6 +68,8 @@ const ( okSymbol = "\u2705" // white heavy check mark infoSymbol = "\u24d8" // circled Latin small letter I notOkSymbol = "\u2757" // heavy exclamation mark symbol + + clientAppName = "Migration Verifier" ) type whichCluster string @@ -221,7 +223,7 @@ func (verifier *Verifier) ConfigureReadConcern(setting ReadConcernSetting) { } func (verifier *Verifier) getClientOpts(uri string) *options.ClientOptions { - appName := "Migration Verifier" + appName := clientAppName opts := &options.ClientOptions{ AppName: &appName, } From 41229d1ccc079e4b75745abbd12bb05a44b998ed Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Mon, 9 Dec 2024 10:07:52 -0500 Subject: [PATCH 21/28] Fix flapping TestStartAtTimeNoChanges. --- internal/verifier/change_stream.go | 31 +++++++-------- internal/verifier/change_stream_test.go | 50 +++++++++++++++++-------- 2 files changed, 47 insertions(+), 34 deletions(-) diff --git a/internal/verifier/change_stream.go b/internal/verifier/change_stream.go index 09c5f415..39290ee6 100644 --- a/internal/verifier/change_stream.go +++ b/internal/verifier/change_stream.go @@ -47,6 +47,8 @@ const ( metadataChangeStreamCollectionName = "changeStream" ) +var maxChangeStreamAwaitTime = time.Second + type UnknownEventError struct { Event *ParsedEvent } @@ -293,6 +295,9 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch( (csr.lastChangeEventTime == nil || csr.lastChangeEventTime.Before(*changeEventBatch[eventsRead].ClusterTime)) { csr.lastChangeEventTime = changeEventBatch[eventsRead].ClusterTime + csr.logger.Trace(). + Interface("event", changeEventBatch[eventsRead]). + Msg("Updated lastChangeEventTime.") } eventsRead++ @@ -431,7 +436,7 @@ func (csr *ChangeStreamReader) createChangeStream( ) (*mongo.ChangeStream, mongo.Session, primitive.Timestamp, error) { pipeline := csr.GetChangeStreamFilter() opts := options.ChangeStream(). - SetMaxAwaitTime(1 * time.Second). + SetMaxAwaitTime(maxChangeStreamAwaitTime). SetFullDocument(options.UpdateLookup) if csr.clusterInfo.VersionArray[0] >= 6 { @@ -488,11 +493,17 @@ func (csr *ChangeStreamReader) createChangeStream( // With sharded clusters the resume token might lead the cluster time // by 1 increment. In that case we need the actual cluster time; // otherwise we will get errors. - clusterTime, err := getClusterTimeFromSession(sess) + clusterTime, err := util.GetClusterTimeFromSession(sess) if err != nil { return nil, nil, primitive.Timestamp{}, errors.Wrap(err, "failed to read cluster time from session") } + csr.logger.Debug(). + Interface("resumeTokenTimestamp", startTs). + Interface("clusterTime", clusterTime). + Stringer("changeStreamReader", csr). + Msg("Using earlier time as start timestamp.") + if startTs.After(clusterTime) { startTs = clusterTime } @@ -659,19 +670,3 @@ func extractTimestampFromResumeToken(resumeToken bson.Raw) (primitive.Timestamp, return resumeTokenTime, nil } - -func getClusterTimeFromSession(sess mongo.Session) (primitive.Timestamp, error) { - ctStruct := struct { - ClusterTime struct { - ClusterTime primitive.Timestamp `bson:"clusterTime"` - } `bson:"$clusterTime"` - }{} - - clusterTimeRaw := sess.ClusterTime() - err := bson.Unmarshal(sess.ClusterTime(), &ctStruct) - if err != nil { - return primitive.Timestamp{}, errors.Wrapf(err, "failed to find clusterTime in session cluster time document (%v)", clusterTimeRaw) - } - - return ctStruct.ClusterTime.ClusterTime, nil -} diff --git a/internal/verifier/change_stream_test.go b/internal/verifier/change_stream_test.go index 47f23b16..c3cf5d15 100644 --- a/internal/verifier/change_stream_test.go +++ b/internal/verifier/change_stream_test.go @@ -283,7 +283,7 @@ func (suite *IntegrationTestSuite) getClusterTime(ctx context.Context, client *m sctx := mongo.NewSessionContext(ctx, sess) suite.Require().NoError(sess.Client().Ping(sctx, nil)) - newTime, err := getClusterTimeFromSession(sess) + newTime, err := util.GetClusterTimeFromSession(sess) suite.Require().NoError(err, "should fetch cluster time") return newTime @@ -306,21 +306,39 @@ func (suite *IntegrationTestSuite) fetchVerifierRechecks(ctx context.Context, ve func (suite *IntegrationTestSuite) TestStartAtTimeNoChanges() { zerolog.SetGlobalLevel(zerolog.TraceLevel) - verifier := suite.BuildVerifier() - ctx := suite.Context() - sess, err := suite.srcMongoClient.StartSession() - suite.Require().NoError(err) - sctx := mongo.NewSessionContext(ctx, sess) - _, err = suite.srcMongoClient.Database("testDb").Collection("testColl").InsertOne( - sctx, bson.D{{"_id", 0}}) - suite.Require().NoError(err) - origStartTs := sess.OperationTime() - suite.Require().NotNil(origStartTs) - suite.startSrcChangeStreamReaderAndHandler(ctx, verifier) - suite.Require().Equal(verifier.srcChangeStreamReader.startAtTs, origStartTs) - verifier.srcChangeStreamReader.writesOffTs.Set(*origStartTs) - <-verifier.srcChangeStreamReader.doneChan - suite.Require().Equal(verifier.srcChangeStreamReader.startAtTs, origStartTs) + // Each of these takes ~1s, so don’t do too many of them. + for range 5 { + verifier := suite.BuildVerifier() + ctx := suite.Context() + sess, err := suite.srcMongoClient.StartSession() + suite.Require().NoError(err) + sctx := mongo.NewSessionContext(ctx, sess) + _, err = suite.srcMongoClient.Database("testDb").Collection("testColl").InsertOne( + sctx, bson.D{}) + suite.Require().NoError(err, "should insert doc") + + insertTs, err := util.GetClusterTimeFromSession(sess) + suite.Require().NoError(err, "should get cluster time") + + suite.startSrcChangeStreamReaderAndHandler(ctx, verifier) + + startAtTs := verifier.srcChangeStreamReader.startAtTs + suite.Require().NotNil(startAtTs) + + suite.Require().False( + startAtTs.After(insertTs), + "change stream should start no later than the last operation", + ) + + verifier.srcChangeStreamReader.writesOffTs.Set(insertTs) + + <-verifier.srcChangeStreamReader.doneChan + + suite.Require().False( + verifier.srcChangeStreamReader.startAtTs.Before(*startAtTs), + "new startAtTs should be no earlier than last one", + ) + } } func (suite *IntegrationTestSuite) TestStartAtTimeWithChanges() { From b4dad36bb6cc571fbddd6b591feb483e1e4ef367 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Mon, 9 Dec 2024 10:21:08 -0500 Subject: [PATCH 22/28] cluster_time.go --- internal/util/cluster_time.go | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) create mode 100644 internal/util/cluster_time.go diff --git a/internal/util/cluster_time.go b/internal/util/cluster_time.go new file mode 100644 index 00000000..761cdc38 --- /dev/null +++ b/internal/util/cluster_time.go @@ -0,0 +1,24 @@ +package util + +import ( + "github.com/pkg/errors" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/primitive" + "go.mongodb.org/mongo-driver/mongo" +) + +func GetClusterTimeFromSession(sess mongo.Session) (primitive.Timestamp, error) { + ctStruct := struct { + ClusterTime struct { + ClusterTime primitive.Timestamp `bson:"clusterTime"` + } `bson:"$clusterTime"` + }{} + + clusterTimeRaw := sess.ClusterTime() + err := bson.Unmarshal(sess.ClusterTime(), &ctStruct) + if err != nil { + return primitive.Timestamp{}, errors.Wrapf(err, "failed to find clusterTime in session cluster time document (%v)", clusterTimeRaw) + } + + return ctStruct.ClusterTime.ClusterTime, nil +} From e602c9efde01744e8ed7aed75a05e63faac301e4 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Mon, 9 Dec 2024 10:33:20 -0500 Subject: [PATCH 23/28] check err --- internal/verifier/compare_test.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/internal/verifier/compare_test.go b/internal/verifier/compare_test.go index 88c817e8..fcc59e59 100644 --- a/internal/verifier/compare_test.go +++ b/internal/verifier/compare_test.go @@ -55,10 +55,17 @@ func (s *IntegrationTestSuite) TestFetchAndCompareDocuments_Context() { var done atomic.Bool go func() { - verifier.FetchAndCompareDocuments( + _, _, _, err := verifier.FetchAndCompareDocuments( cancelableCtx, &task, ) + if err != nil { + s.Assert().ErrorIs( + err, + context.Canceled, + "only failure should be context cancellation", + ) + } done.Store(true) }() From 3b2ff9ffd719eb787a25d628ba3a0eb649af3d99 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Mon, 9 Dec 2024 10:39:15 -0500 Subject: [PATCH 24/28] only log once --- internal/verifier/change_stream.go | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/internal/verifier/change_stream.go b/internal/verifier/change_stream.go index 39290ee6..5847233c 100644 --- a/internal/verifier/change_stream.go +++ b/internal/verifier/change_stream.go @@ -44,11 +44,10 @@ type DocKey struct { const ( minChangeStreamPersistInterval = time.Second * 10 + maxChangeStreamAwaitTime = time.Second metadataChangeStreamCollectionName = "changeStream" ) -var maxChangeStreamAwaitTime = time.Second - type UnknownEventError struct { Event *ParsedEvent } @@ -269,6 +268,8 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch( eventsRead := 0 var changeEventBatch []ParsedEvent + latestEvent := option.None[ParsedEvent]() + for hasEventInBatch := true; hasEventInBatch; hasEventInBatch = cs.RemainingBatchLength() > 0 { gotEvent := cs.TryNext(ctx) @@ -294,10 +295,9 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch( if changeEventBatch[eventsRead].ClusterTime != nil && (csr.lastChangeEventTime == nil || csr.lastChangeEventTime.Before(*changeEventBatch[eventsRead].ClusterTime)) { + csr.lastChangeEventTime = changeEventBatch[eventsRead].ClusterTime - csr.logger.Trace(). - Interface("event", changeEventBatch[eventsRead]). - Msg("Updated lastChangeEventTime.") + latestEvent = option.Some(changeEventBatch[eventsRead]) } eventsRead++ @@ -309,6 +309,12 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch( return nil } + if event, has := latestEvent.Get(); has { + csr.logger.Trace(). + Interface("event", event). + Msg("Updated lastChangeEventTime.") + } + var curTs primitive.Timestamp curTs, err := extractTimestampFromResumeToken(cs.ResumeToken()) if err == nil { From d11cf4a74c22417ec0a70e165b178e15888200da Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Mon, 9 Dec 2024 10:40:51 -0500 Subject: [PATCH 25/28] pruning --- internal/verifier/migration_verifier.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/internal/verifier/migration_verifier.go b/internal/verifier/migration_verifier.go index 6eff1949..a6fcb736 100644 --- a/internal/verifier/migration_verifier.go +++ b/internal/verifier/migration_verifier.go @@ -1242,11 +1242,6 @@ func (verifier *Verifier) GetVerificationStatus(ctx context.Context) (*Verificat taskCollection := verifier.verificationTaskCollection() generation, _ := verifier.getGeneration() - // XXX REMOVE ME - verifier.logger.Debug(). - Int("generation", generation). - Msg("Running GetVerificationStatus().") - var results []bson.Raw err := retry.New().WithCallback( From 42376f5cebbd023945c454cb050d85ef4bc3c2d7 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Mon, 9 Dec 2024 11:33:19 -0500 Subject: [PATCH 26/28] Make Eventual Get() panic if the value is unset. --- internal/util/eventual.go | 13 +++++++++---- internal/util/eventual_test.go | 11 ++++------- internal/verifier/change_stream.go | 2 +- internal/verifier/check.go | 6 +++--- internal/verifier/migration_verifier.go | 4 ++-- 5 files changed, 19 insertions(+), 17 deletions(-) diff --git a/internal/util/eventual.go b/internal/util/eventual.go index e9da4b0b..ad2c6dd7 100644 --- a/internal/util/eventual.go +++ b/internal/util/eventual.go @@ -31,13 +31,18 @@ func (e *Eventual[T]) Ready() <-chan struct{} { return e.ready } -// Get returns an option that contains the Eventual’s value, or -// empty if the value isn’t ready yet. -func (e *Eventual[T]) Get() option.Option[T] { +// Get returns the Eventual’s value if it’s ready. +// It panics otherwise. +func (e *Eventual[T]) Get() T { e.mux.RLock() defer e.mux.RUnlock() - return e.val + val, has := e.val.Get() + if has { + return val + } + + panic("Eventual's Get() called before value was ready.") } // Set sets the Eventual’s value. It may be called only once; diff --git a/internal/util/eventual_test.go b/internal/util/eventual_test.go index 871a2179..17f4c4b7 100644 --- a/internal/util/eventual_test.go +++ b/internal/util/eventual_test.go @@ -2,17 +2,14 @@ package util import ( "time" - - "github.com/10gen/migration-verifier/option" ) func (s *UnitTestSuite) TestEventual() { eventual := NewEventual[int]() - s.Assert().Equal( - option.None[int](), - eventual.Get(), - "Get() should return empty", + s.Assert().Panics( + func() { eventual.Get() }, + "Get() should panic before the value is set", ) select { @@ -30,7 +27,7 @@ func (s *UnitTestSuite) TestEventual() { } s.Assert().Equal( - option.Some(123), + 123, eventual.Get(), "Get() should return the value", ) diff --git a/internal/verifier/change_stream.go b/internal/verifier/change_stream.go index 5847233c..04f1a2c9 100644 --- a/internal/verifier/change_stream.go +++ b/internal/verifier/change_stream.go @@ -366,7 +366,7 @@ func (csr *ChangeStreamReader) iterateChangeStream( // This means we should exit rather than continue reading the change stream // since there should be no more events. case <-csr.writesOffTs.Ready(): - writesOffTs := csr.writesOffTs.Get().MustGet() + writesOffTs := csr.writesOffTs.Get() csr.logger.Debug(). Interface("writesOffTimestamp", writesOffTs). diff --git a/internal/verifier/check.go b/internal/verifier/check.go index 2db00c15..910d2796 100644 --- a/internal/verifier/check.go +++ b/internal/verifier/check.go @@ -50,7 +50,7 @@ func (verifier *Verifier) waitForChangeStream(ctx context.Context, csr *ChangeSt case <-ctx.Done(): return ctx.Err() case <-csr.error.Ready(): - err := csr.error.Get().MustGet() + err := csr.error.Get() verifier.logger.Warn().Err(err). Msgf("Received error from %s.", csr) return err @@ -89,10 +89,10 @@ func (verifier *Verifier) CheckWorker(ctxIn context.Context) error { eg.Go(func() error { select { case <-verifier.srcChangeStreamReader.error.Ready(): - err := verifier.srcChangeStreamReader.error.Get().MustGet() + err := verifier.srcChangeStreamReader.error.Get() return errors.Wrapf(err, "%s failed", verifier.srcChangeStreamReader) case <-verifier.dstChangeStreamReader.error.Ready(): - err := verifier.dstChangeStreamReader.error.Get().MustGet() + err := verifier.dstChangeStreamReader.error.Get() return errors.Wrapf(err, "%s failed", verifier.dstChangeStreamReader) case <-ctx.Done(): return nil diff --git a/internal/verifier/migration_verifier.go b/internal/verifier/migration_verifier.go index a6fcb736..eaab6511 100644 --- a/internal/verifier/migration_verifier.go +++ b/internal/verifier/migration_verifier.go @@ -282,7 +282,7 @@ func (verifier *Verifier) WritesOff(ctx context.Context) error { // under the lock. select { case <-verifier.srcChangeStreamReader.error.Ready(): - err := verifier.srcChangeStreamReader.error.Get().MustGet() + err := verifier.srcChangeStreamReader.error.Get() return errors.Wrapf(err, "tried to send writes-off timestamp to %s, but change stream already failed", verifier.srcChangeStreamReader) default: verifier.srcChangeStreamReader.writesOffTs.Set(srcFinalTs) @@ -290,7 +290,7 @@ func (verifier *Verifier) WritesOff(ctx context.Context) error { select { case <-verifier.dstChangeStreamReader.error.Ready(): - err := verifier.dstChangeStreamReader.error.Get().MustGet() + err := verifier.dstChangeStreamReader.error.Get() return errors.Wrapf(err, "tried to send writes-off timestamp to %s, but change stream already failed", verifier.dstChangeStreamReader) default: verifier.dstChangeStreamReader.writesOffTs.Set(dstFinalTs) From 5b5e871864b89340fcd11f94a3be6bf5f1c27898 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Mon, 9 Dec 2024 11:34:01 -0500 Subject: [PATCH 27/28] use DBNameForTest --- internal/verifier/change_stream_test.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/internal/verifier/change_stream_test.go b/internal/verifier/change_stream_test.go index c3cf5d15..6c1ded7e 100644 --- a/internal/verifier/change_stream_test.go +++ b/internal/verifier/change_stream_test.go @@ -313,8 +313,10 @@ func (suite *IntegrationTestSuite) TestStartAtTimeNoChanges() { sess, err := suite.srcMongoClient.StartSession() suite.Require().NoError(err) sctx := mongo.NewSessionContext(ctx, sess) - _, err = suite.srcMongoClient.Database("testDb").Collection("testColl").InsertOne( - sctx, bson.D{}) + _, err = suite.srcMongoClient. + Database(suite.DBNameForTest()). + Collection("testColl"). + InsertOne(sctx, bson.D{}) suite.Require().NoError(err, "should insert doc") insertTs, err := util.GetClusterTimeFromSession(sess) From f26d63da27d2401f61a020d11cb5199e2e6a6a58 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Mon, 9 Dec 2024 13:00:49 -0500 Subject: [PATCH 28/28] =?UTF-8?q?beef=20up=20test=20description=20?= =?UTF-8?q?=E2=80=A6=20why=20flapping=3F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/verifier/migration_verifier_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/verifier/migration_verifier_test.go b/internal/verifier/migration_verifier_test.go index 2dd5569a..a889ac28 100644 --- a/internal/verifier/migration_verifier_test.go +++ b/internal/verifier/migration_verifier_test.go @@ -1622,6 +1622,8 @@ func (suite *IntegrationTestSuite) TestChangesOnDstBeforeSrc() { suite.Assert().Equal( 1, status.FailedTasks, + "failed tasks as expected (status=%+v)", + status, ) // Patch up the other mismatched document in generation 4.