Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 30 additions & 57 deletions internal/partitions/partitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,13 +191,13 @@ func PartitionCollectionWithParameters(

// Get the collection's size in bytes and its document count. It is okay if these return zero since there might still be
// items in the collection. Rely on getOuterIDBound to do a majority read to determine if we continue processing the collection.
collSizeInBytes, collDocCount, isCapped, err := GetSizeAndDocumentCount(ctx, subLogger, retryer, srcColl, uuidEntry.UUID)
collSizeInBytes, collDocCount, isCapped, err := GetSizeAndDocumentCount(ctx, subLogger, retryer, srcColl)
if err != nil {
return nil, 0, 0, err
}

// The lower bound for the collection. There is no partitioning to do if the bound is nil.
minIDBound, err := getOuterIDBound(ctx, subLogger, retryer, minBound, srcDB, uuidEntry.CollName, uuidEntry.UUID, globalFilter)
minIDBound, err := getOuterIDBound(ctx, subLogger, retryer, minBound, srcDB, uuidEntry.CollName, globalFilter)
if err != nil {
return nil, 0, 0, err
}
Expand All @@ -210,7 +210,7 @@ func PartitionCollectionWithParameters(
}

// The upper bound for the collection. There is no partitioning to do if the bound is nil.
maxIDBound, err := getOuterIDBound(ctx, subLogger, retryer, maxBound, srcDB, uuidEntry.CollName, uuidEntry.UUID, globalFilter)
maxIDBound, err := getOuterIDBound(ctx, subLogger, retryer, maxBound, srcDB, uuidEntry.CollName, globalFilter)
if err != nil {
return nil, 0, 0, err
}
Expand All @@ -232,7 +232,7 @@ func PartitionCollectionWithParameters(

// If a filter is used for partitioning, number of partitions is calculated with the ratio of filtered documents.
if len(globalFilter) > 0 {
numFilteredDocs, filteredCntErr := GetDocumentCountAfterFiltering(ctx, subLogger, retryer, srcColl, uuidEntry.UUID, globalFilter)
numFilteredDocs, filteredCntErr := GetDocumentCountAfterFiltering(ctx, subLogger, retryer, srcColl, globalFilter)
if filteredCntErr == nil {
numPartitions = getNumPartitions(collSizeInBytes, partitionSizeInBytes, float64(numFilteredDocs)/float64(collDocCount))
} else {
Expand All @@ -254,7 +254,6 @@ func PartitionCollectionWithParameters(
retryer,
srcDB,
uuidEntry.CollName,
uuidEntry.UUID,
collDocCount,
numPartitions,
sampleMinNumDocs,
Expand Down Expand Up @@ -315,7 +314,7 @@ func PartitionCollectionWithParameters(
// capped status, in that order.
//
// Exported for usage in integration tests.
func GetSizeAndDocumentCount(ctx context.Context, logger *logger.Logger, retryer *retry.Retryer, srcColl *mongo.Collection, collUUID util.UUID) (int64, int64, bool, error) {
func GetSizeAndDocumentCount(ctx context.Context, logger *logger.Logger, retryer *retry.Retryer, srcColl *mongo.Collection) (int64, int64, bool, error) {
srcDB := srcColl.Database()
collName := srcColl.Name()

Expand All @@ -325,10 +324,10 @@ func GetSizeAndDocumentCount(ctx context.Context, logger *logger.Logger, retryer
Capped bool `bson:"capped"`
}{}

currCollName, err := retryer.RunForUUIDAndTransientErrors(ctx, logger, collName, func(ri *retry.Info, collectionName string) error {
ri.Log(logger.Logger, "collStats", "source", srcDB.Name(), collectionName, "Retrieving collection size and document count.")
request := retryer.RequestWithUUID(bson.D{
{"aggregate", collectionName},
err := retryer.Run(ctx, logger, func(ctx context.Context, ri *retry.FuncInfo) error {
ri.Log(logger.Logger, "collStats", "source", srcDB.Name(), collName, "Retrieving collection size and document count.")
request := bson.D{
{"aggregate", collName},
{"pipeline", mongo.Pipeline{
bson.D{{"$collStats", bson.D{
{"storageStats", bson.E{"scale", 1}},
Expand All @@ -343,7 +342,7 @@ func GetSizeAndDocumentCount(ctx context.Context, logger *logger.Logger, retryer
{"capped", bson.D{{"$first", "$capped"}}}}}},
}},
{"cursor", bson.D{}},
}, collUUID)
}

cursor, driverErr := srcDB.RunCommandCursor(ctx, request)
if driverErr != nil {
Expand Down Expand Up @@ -371,15 +370,8 @@ func GetSizeAndDocumentCount(ctx context.Context, logger *logger.Logger, retryer
return 0, 0, false, errors.Wrapf(err, "failed to run aggregation for $collStats for source namespace %s.%s", srcDB.Name(), collName)
}

// CollectionUUIDMismatch where the collection does not exist will return a nil cursor and nil
// error.
if currCollName == "" {
// Return 0, 0, nil as CollectionUUIDMismatch should not cause an initial sync error.
return 0, 0, false, nil
}

logger.Debug().Msgf("Collection %s.%s size: %d, document count: %d, capped: %v",
srcDB.Name(), currCollName, value.Size, value.Count, value.Capped)
srcDB.Name(), collName, value.Size, value.Count, value.Capped)

return value.Size, value.Count, value.Capped, nil
}
Expand All @@ -388,7 +380,7 @@ func GetSizeAndDocumentCount(ctx context.Context, logger *logger.Logger, retryer
//
// This function could take a long time, especially if the collection does not have an index
// on the filtered fields.
func GetDocumentCountAfterFiltering(ctx context.Context, logger *logger.Logger, retryer *retry.Retryer, srcColl *mongo.Collection, collUUID util.UUID, filter map[string]any) (int64, error) {
func GetDocumentCountAfterFiltering(ctx context.Context, logger *logger.Logger, retryer *retry.Retryer, srcColl *mongo.Collection, filter map[string]any) (int64, error) {
srcDB := srcColl.Database()
collName := srcColl.Name()

Expand All @@ -403,13 +395,13 @@ func GetDocumentCountAfterFiltering(ctx context.Context, logger *logger.Logger,
}
pipeline = append(pipeline, bson.D{{"$count", "numFilteredDocs"}})

currCollName, err := retryer.RunForUUIDAndTransientErrors(ctx, logger, collName, func(ri *retry.Info, collectionName string) error {
ri.Log(logger.Logger, "count", "source", srcDB.Name(), collectionName, "Counting filtered documents.")
request := retryer.RequestWithUUID(bson.D{
{"aggregate", collectionName},
err := retryer.Run(ctx, logger, func(ctx context.Context, ri *retry.FuncInfo) error {
ri.Log(logger.Logger, "count", "source", srcDB.Name(), collName, "Counting filtered documents.")
request := bson.D{
{"aggregate", collName},
{"pipeline", pipeline},
{"cursor", bson.D{}},
}, collUUID)
}

cursor, driverErr := srcDB.RunCommandCursor(ctx, request)
if driverErr != nil {
Expand Down Expand Up @@ -437,15 +429,8 @@ func GetDocumentCountAfterFiltering(ctx context.Context, logger *logger.Logger,
return 0, errors.Wrapf(err, "failed to run aggregation $count for source namespace %s.%s after filter (%+v)", srcDB.Name(), collName, filter)
}

// CollectionUUIDMismatch where the collection does not exist will return a nil cursor and nil
// error.
if currCollName == "" {
// Return 0, nil as CollectionUUIDMismatch should not cause an initial sync error.
return 0, nil
}

logger.Debug().Msgf("Collection %s.%s filtered document count: %d, filter: %+v",
srcDB.Name(), currCollName, value.Count, filter)
srcDB.Name(), collName, value.Count, filter)

return value.Count, nil
}
Expand Down Expand Up @@ -477,7 +462,6 @@ func getOuterIDBound(
minOrMaxBound minOrMaxBound,
srcDB *mongo.Database,
collName string,
collUUID util.UUID,
globalFilter map[string]any,
) (interface{}, error) {
// Choose a sort direction based on the minOrMaxBound.
Expand All @@ -504,15 +488,15 @@ func getOuterIDBound(
}...)

// Get one document containing only the smallest or largest _id value in the collection.
currCollName, err := retryer.RunForUUIDAndTransientErrors(ctx, subLogger, collName, func(ri *retry.Info, collName string) error {
err := retryer.Run(ctx, subLogger, func(ctx context.Context, ri *retry.FuncInfo) error {
ri.Log(subLogger.Logger, "aggregate", "source", srcDB.Name(), collName, fmt.Sprintf("getting %s _id partition bound", minOrMaxBound))
cursor, cmdErr :=
srcDB.RunCommandCursor(ctx, retryer.RequestWithUUID(bson.D{
srcDB.RunCommandCursor(ctx, bson.D{
{"aggregate", collName},
{"pipeline", pipeline},
{"hint", bson.D{{"_id", 1}}},
{"cursor", bson.D{}},
}, collUUID))
})

if cmdErr != nil {
return cmdErr
Expand All @@ -530,12 +514,7 @@ func getOuterIDBound(
})

if err != nil {
return nil, errors.Wrapf(err, "could not get %s _id bound for source collection '%s.%s', UUID %s", minOrMaxBound, srcDB.Name(), collName, collUUID.String())
}

if currCollName == "" {
subLogger.Debug().Msgf("Not getting %s _id bound for source collection '%s.%s', UUID %s, because it was dropped", minOrMaxBound, srcDB.Name(), collName, collUUID.String())
return nil, nil
return nil, errors.Wrapf(err, "could not get %s _id bound for source collection '%s.%s'", minOrMaxBound, srcDB.Name(), collName)
}

return docID, nil
Expand All @@ -552,7 +531,6 @@ func getMidIDBounds(
retryer *retry.Retryer,
srcDB *mongo.Database,
collName string,
collUUID util.UUID,
collDocCount int64,
numPartitions, sampleMinNumDocs int,
sampleRate float64,
Expand Down Expand Up @@ -585,7 +563,7 @@ func getMidIDBounds(
msg = fmt.Sprintf("Sampling %d documents", numDocsToSample)
}

logger.Info().Msgf("%s to make %d partitions for collection '%s.%s', UUID %s", msg, numPartitions, srcDB.Name(), collName, collUUID.String())
logger.Info().Msgf("%s to make %d partitions for collection '%s.%s'", msg, numPartitions, srcDB.Name(), collName)
pipeline = append(pipeline, []bson.D{
{{"$sample", bson.D{{"size", numDocsToSample}}}},
{{"$project", bson.D{{"_id", 1}}}},
Expand All @@ -599,18 +577,18 @@ func getMidIDBounds(
// Get a cursor for the $sample and $bucketAuto aggregation.
var midIDBounds []interface{}
agRetryer := retryer.WithErrorCodes(util.SampleTooManyDuplicates)
currCollName, err := agRetryer.RunForUUIDAndTransientErrors(ctx, logger, collName, func(ri *retry.Info, collName string) error {
err := agRetryer.Run(ctx, logger, func(ctx context.Context, ri *retry.FuncInfo) error {
ri.Log(logger.Logger, "aggregate", "source", srcDB.Name(), collName, "Retrieving mid _id partition bounds using $sample.")
cursor, cmdErr :=
srcDB.RunCommandCursor(ctx, retryer.RequestWithUUID(bson.D{
srcDB.RunCommandCursor(ctx, bson.D{
{"aggregate", collName},
{"pipeline", pipeline},
{"allowDiskUse", true},
{"cursor", bson.D{}},
}, collUUID))
})

if cmdErr != nil {
return errors.Wrapf(cmdErr, "failed to $sample and $bucketAuto documents for source namespace '%s.%s', UUID %s", srcDB.Name(), collName, collUUID.String())
return errors.Wrapf(cmdErr, "failed to $sample and $bucketAuto documents for source namespace '%s.%s'", srcDB.Name(), collName)
}

defer cursor.Close(ctx)
Expand All @@ -630,24 +608,19 @@ func getMidIDBounds(
copy(bucketAutoDoc, cursor.Current)
bound, err := bucketAutoDoc.LookupErr("_id", "max")
if err != nil {
return errors.Wrapf(err, "failed to lookup '_id.max' key in $bucketAuto document for source namespace '%s.%s', UUID %s", srcDB.Name(), collName, collUUID.String())
return errors.Wrapf(err, "failed to lookup '_id.max' key in $bucketAuto document for source namespace '%s.%s'", srcDB.Name(), collName)
}

// Append the copied bound to the other mid _id bounds.
midIDBounds = append(midIDBounds, bound)
ri.IterationSuccess()
ri.NoteSuccess()
}

return cursor.Err()
})

if err != nil {
return nil, false, errors.Wrapf(err, "encountered a problem in the cursor when trying to $sample and $bucketAuto aggregation for source namespace '%s.%s', UUID %s", srcDB.Name(), collName, collUUID.String())
}

if currCollName == "" {
// The collection was dropped on the source, so we return a nil mid ID bound.
return nil, true, nil
return nil, false, errors.Wrapf(err, "encountered a problem in the cursor when trying to $sample and $bucketAuto aggregation for source namespace '%s.%s'", srcDB.Name(), collName)
}

if len(midIDBounds) == 0 {
Expand Down
10 changes: 1 addition & 9 deletions internal/retry/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,9 @@ package retry
import "time"

const (
// DefaultDurationLimit is the default time limit for all retries. This is
// currently 10 minutes.
// DefaultDurationLimit is the default time limit for all retries.
DefaultDurationLimit = 10 * time.Minute

// The number of CollectionUUIDMismatch error retries we do
// before logging a warning. Under normal circumstances, we
// don't ever expect more than a couple. Anything more than
// that requires collection renames to interleave between
// retry attempts.
numCollectionUUIDRetriesBeforeWarning = 10

// Constants for spacing out the retry attempts.
// See: https://en.wikipedia.org/wiki/Exponential_backoff
//
Expand Down
39 changes: 39 additions & 0 deletions internal/retry/error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package retry

import (
"fmt"
"time"

"github.com/10gen/migration-verifier/internal/reportutils"
)

type RetryDurationLimitExceededErr struct {
lastErr error
attempts int
duration time.Duration
}

func (rde RetryDurationLimitExceededErr) Error() string {
return fmt.Sprintf(
"retryable function did not succeed after %d attempt(s) over %s; last error was: %v",
rde.attempts,
reportutils.DurationToHMS(rde.duration),
rde.lastErr,
)
}

func (rde RetryDurationLimitExceededErr) Unwrap() error {
return rde.lastErr
}

// errgroupErr is an internal error type that we return from errgroup
// callbacks. It allows us to know (reliably) which error is the one
// that triggers the errgroup's failure
type errgroupErr struct {
funcNum int
errFromCallback error
}

func (ege errgroupErr) Error() string {
return fmt.Sprintf("func %d failed: %v", ege.funcNum, ege.errFromCallback)
}
Loading
Loading