Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
261 changes: 136 additions & 125 deletions internal/partitions/partitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ const (
func PartitionCollectionWithSize(
ctx context.Context,
uuidEntry *uuidutil.NamespaceAndUUID,
retryer *retry.Retryer,
srcClient *mongo.Client,
replicatorList []Replicator,
subLogger *logger.Logger,
Expand All @@ -137,7 +136,6 @@ func PartitionCollectionWithSize(
partitions, docCount, byteCount, err := PartitionCollectionWithParameters(
ctx,
uuidEntry,
retryer,
srcClient,
replicatorList,
defaultSampleRate,
Expand All @@ -153,7 +151,6 @@ func PartitionCollectionWithSize(
return PartitionCollectionWithParameters(
ctx,
uuidEntry,
retryer,
srcClient,
replicatorList,
defaultSampleRate,
Expand All @@ -174,7 +171,6 @@ func PartitionCollectionWithSize(
func PartitionCollectionWithParameters(
ctx context.Context,
uuidEntry *uuidutil.NamespaceAndUUID,
retryer *retry.Retryer,
srcClient *mongo.Client,
replicatorList []Replicator,
sampleRate float64,
Expand All @@ -191,13 +187,13 @@ func PartitionCollectionWithParameters(

// Get the collection's size in bytes and its document count. It is okay if these return zero since there might still be
// items in the collection. Rely on getOuterIDBound to do a majority read to determine if we continue processing the collection.
collSizeInBytes, collDocCount, isCapped, err := GetSizeAndDocumentCount(ctx, subLogger, retryer, srcColl)
collSizeInBytes, collDocCount, isCapped, err := GetSizeAndDocumentCount(ctx, subLogger, srcColl)
if err != nil {
return nil, 0, 0, err
}

// The lower bound for the collection. There is no partitioning to do if the bound is nil.
minIDBound, err := getOuterIDBound(ctx, subLogger, retryer, minBound, srcDB, uuidEntry.CollName, globalFilter)
minIDBound, err := getOuterIDBound(ctx, subLogger, minBound, srcDB, uuidEntry.CollName, globalFilter)
if err != nil {
return nil, 0, 0, err
}
Expand All @@ -210,7 +206,7 @@ func PartitionCollectionWithParameters(
}

// The upper bound for the collection. There is no partitioning to do if the bound is nil.
maxIDBound, err := getOuterIDBound(ctx, subLogger, retryer, maxBound, srcDB, uuidEntry.CollName, globalFilter)
maxIDBound, err := getOuterIDBound(ctx, subLogger, maxBound, srcDB, uuidEntry.CollName, globalFilter)
if err != nil {
return nil, 0, 0, err
}
Expand All @@ -232,7 +228,7 @@ func PartitionCollectionWithParameters(

// If a filter is used for partitioning, number of partitions is calculated with the ratio of filtered documents.
if len(globalFilter) > 0 {
numFilteredDocs, filteredCntErr := GetDocumentCountAfterFiltering(ctx, subLogger, retryer, srcColl, globalFilter)
numFilteredDocs, filteredCntErr := GetDocumentCountAfterFiltering(ctx, subLogger, srcColl, globalFilter)
if filteredCntErr == nil {
numPartitions = getNumPartitions(collSizeInBytes, partitionSizeInBytes, float64(numFilteredDocs)/float64(collDocCount))
} else {
Expand All @@ -251,7 +247,6 @@ func PartitionCollectionWithParameters(
midIDBounds, collDropped, err := getMidIDBounds(
ctx,
subLogger,
retryer,
srcDB,
uuidEntry.CollName,
collDocCount,
Expand Down Expand Up @@ -314,7 +309,7 @@ func PartitionCollectionWithParameters(
// capped status, in that order.
//
// Exported for usage in integration tests.
func GetSizeAndDocumentCount(ctx context.Context, logger *logger.Logger, retryer *retry.Retryer, srcColl *mongo.Collection) (int64, int64, bool, error) {
func GetSizeAndDocumentCount(ctx context.Context, logger *logger.Logger, srcColl *mongo.Collection) (int64, int64, bool, error) {
srcDB := srcColl.Database()
collName := srcColl.Name()

Expand All @@ -324,39 +319,43 @@ func GetSizeAndDocumentCount(ctx context.Context, logger *logger.Logger, retryer
Capped bool `bson:"capped"`
}{}

err := retryer.Run(ctx, logger, func(ctx context.Context, ri *retry.FuncInfo) error {
ri.Log(logger.Logger, "collStats", "source", srcDB.Name(), collName, "Retrieving collection size and document count.")
request := bson.D{
{"aggregate", collName},
{"pipeline", mongo.Pipeline{
bson.D{{"$collStats", bson.D{
{"storageStats", bson.E{"scale", 1}},
}}},
// The "$group" here behaves as a project and rename when there's only one
// document (non-sharded case). When there are multiple documents (one for
// each shard) it correctly sums the counts and sizes from each shard.
bson.D{{"$group", bson.D{
{"_id", "ns"},
{"count", bson.D{{"$sum", "$storageStats.count"}}},
{"size", bson.D{{"$sum", "$storageStats.size"}}},
{"capped", bson.D{{"$first", "$capped"}}}}}},
}},
{"cursor", bson.D{}},
}
err := retry.New().WithCallback(
func(ctx context.Context, ri *retry.FuncInfo) error {
ri.Log(logger.Logger, "collStats", "source", srcDB.Name(), collName, "Retrieving collection size and document count.")
request := bson.D{
{"aggregate", collName},
{"pipeline", mongo.Pipeline{
bson.D{{"$collStats", bson.D{
{"storageStats", bson.E{"scale", 1}},
}}},
// The "$group" here behaves as a project and rename when there's only one
// document (non-sharded case). When there are multiple documents (one for
// each shard) it correctly sums the counts and sizes from each shard.
bson.D{{"$group", bson.D{
{"_id", "ns"},
{"count", bson.D{{"$sum", "$storageStats.count"}}},
{"size", bson.D{{"$sum", "$storageStats.size"}}},
{"capped", bson.D{{"$first", "$capped"}}}}}},
}},
{"cursor", bson.D{}},
}

cursor, driverErr := srcDB.RunCommandCursor(ctx, request)
if driverErr != nil {
return driverErr
}
cursor, driverErr := srcDB.RunCommandCursor(ctx, request)
if driverErr != nil {
return driverErr
}

defer cursor.Close(ctx)
if cursor.Next(ctx) {
if err := cursor.Decode(&value); err != nil {
return errors.Wrapf(err, "failed to decode $collStats response for source namespace %s.%s", srcDB.Name(), collName)
defer cursor.Close(ctx)
if cursor.Next(ctx) {
if err := cursor.Decode(&value); err != nil {
return errors.Wrapf(err, "failed to decode $collStats response for source namespace %s.%s", srcDB.Name(), collName)
}
}
}
return nil
})
return nil
},
"retrieving %#q's statistics",
srcDB.Name()+"."+collName,
).Run(ctx, logger)

// TODO (REP-960): remove this check.
// If we get NamespaceNotFoundError then return 0,0 since we won't do any partitioning with those returns
Expand All @@ -380,7 +379,7 @@ func GetSizeAndDocumentCount(ctx context.Context, logger *logger.Logger, retryer
//
// This function could take a long time, especially if the collection does not have an index
// on the filtered fields.
func GetDocumentCountAfterFiltering(ctx context.Context, logger *logger.Logger, retryer *retry.Retryer, srcColl *mongo.Collection, filter map[string]any) (int64, error) {
func GetDocumentCountAfterFiltering(ctx context.Context, logger *logger.Logger, srcColl *mongo.Collection, filter map[string]any) (int64, error) {
srcDB := srcColl.Database()
collName := srcColl.Name()

Expand All @@ -395,27 +394,31 @@ func GetDocumentCountAfterFiltering(ctx context.Context, logger *logger.Logger,
}
pipeline = append(pipeline, bson.D{{"$count", "numFilteredDocs"}})

err := retryer.Run(ctx, logger, func(ctx context.Context, ri *retry.FuncInfo) error {
ri.Log(logger.Logger, "count", "source", srcDB.Name(), collName, "Counting filtered documents.")
request := bson.D{
{"aggregate", collName},
{"pipeline", pipeline},
{"cursor", bson.D{}},
}
err := retry.New().WithCallback(
func(ctx context.Context, ri *retry.FuncInfo) error {
ri.Log(logger.Logger, "count", "source", srcDB.Name(), collName, "Counting filtered documents.")
request := bson.D{
{"aggregate", collName},
{"pipeline", pipeline},
{"cursor", bson.D{}},
}

cursor, driverErr := srcDB.RunCommandCursor(ctx, request)
if driverErr != nil {
return driverErr
}
cursor, driverErr := srcDB.RunCommandCursor(ctx, request)
if driverErr != nil {
return driverErr
}

defer cursor.Close(ctx)
if cursor.Next(ctx) {
if err := cursor.Decode(&value); err != nil {
return errors.Wrapf(err, "failed to decode $count response (%+v) for source namespace %s.%s after filter (%+v)", cursor.Current, srcDB.Name(), collName, filter)
defer cursor.Close(ctx)
if cursor.Next(ctx) {
if err := cursor.Decode(&value); err != nil {
return errors.Wrapf(err, "failed to decode $count response (%+v) for source namespace %s.%s after filter (%+v)", cursor.Current, srcDB.Name(), collName, filter)
}
}
}
return nil
})
return nil
},
"counting %#q's filtered documents",
srcDB.Name()+"."+collName,
).Run(ctx, logger)

// TODO (REP-960): remove this check.
// If we get NamespaceNotFoundError then return 0 since we won't do any partitioning with those returns
Expand Down Expand Up @@ -458,7 +461,6 @@ func getNumPartitions(collSizeInBytes, partitionSizeInBytes int64, filteredRatio
func getOuterIDBound(
ctx context.Context,
subLogger *logger.Logger,
retryer *retry.Retryer,
minOrMaxBound minOrMaxBound,
srcDB *mongo.Database,
collName string,
Expand Down Expand Up @@ -488,30 +490,35 @@ func getOuterIDBound(
}...)

// Get one document containing only the smallest or largest _id value in the collection.
err := retryer.Run(ctx, subLogger, func(ctx context.Context, ri *retry.FuncInfo) error {
ri.Log(subLogger.Logger, "aggregate", "source", srcDB.Name(), collName, fmt.Sprintf("getting %s _id partition bound", minOrMaxBound))
cursor, cmdErr :=
srcDB.RunCommandCursor(ctx, bson.D{
{"aggregate", collName},
{"pipeline", pipeline},
{"hint", bson.D{{"_id", 1}}},
{"cursor", bson.D{}},
})

if cmdErr != nil {
return cmdErr
}
err := retry.New().WithCallback(
func(ctx context.Context, ri *retry.FuncInfo) error {
ri.Log(subLogger.Logger, "aggregate", "source", srcDB.Name(), collName, fmt.Sprintf("getting %s _id partition bound", minOrMaxBound))
cursor, cmdErr :=
srcDB.RunCommandCursor(ctx, bson.D{
{"aggregate", collName},
{"pipeline", pipeline},
{"hint", bson.D{{"_id", 1}}},
{"cursor", bson.D{}},
})

if cmdErr != nil {
return cmdErr
}

// If we don't have at least one document, the collection is either empty or was dropped.
defer cursor.Close(ctx)
if !cursor.Next(ctx) {
return nil
}
// If we don't have at least one document, the collection is either empty or was dropped.
defer cursor.Close(ctx)
if !cursor.Next(ctx) {
return nil
}

// Return the _id value from that document.
docID, cmdErr = cursor.Current.LookupErr("_id")
return cmdErr
})
// Return the _id value from that document.
docID, cmdErr = cursor.Current.LookupErr("_id")
return cmdErr
},
"finding %#q's %s _id",
srcDB.Name()+"."+collName,
minOrMaxBound,
).Run(ctx, subLogger)

if err != nil {
return nil, errors.Wrapf(err, "could not get %s _id bound for source collection '%s.%s'", minOrMaxBound, srcDB.Name(), collName)
Expand All @@ -528,7 +535,6 @@ func getOuterIDBound(
func getMidIDBounds(
ctx context.Context,
logger *logger.Logger,
retryer *retry.Retryer,
srcDB *mongo.Database,
collName string,
collDocCount int64,
Expand Down Expand Up @@ -576,48 +582,53 @@ func getMidIDBounds(

// Get a cursor for the $sample and $bucketAuto aggregation.
var midIDBounds []interface{}
agRetryer := retryer.WithErrorCodes(util.SampleTooManyDuplicates)
err := agRetryer.Run(ctx, logger, func(ctx context.Context, ri *retry.FuncInfo) error {
ri.Log(logger.Logger, "aggregate", "source", srcDB.Name(), collName, "Retrieving mid _id partition bounds using $sample.")
cursor, cmdErr :=
srcDB.RunCommandCursor(ctx, bson.D{
{"aggregate", collName},
{"pipeline", pipeline},
{"allowDiskUse", true},
{"cursor", bson.D{}},
})

if cmdErr != nil {
return errors.Wrapf(cmdErr, "failed to $sample and $bucketAuto documents for source namespace '%s.%s'", srcDB.Name(), collName)
}

defer cursor.Close(ctx)

// Iterate through all $bucketAuto documents of the form:
// {
// "_id" : {
// "min" : ... ,
// "max" : ...
// },
// "count" : ...
// }
midIDBounds = make([]interface{}, 0, numPartitions)
for cursor.Next(ctx) {
// Get a mid _id bound using the $bucketAuto document's max value.
bucketAutoDoc := make(bson.Raw, len(cursor.Current))
copy(bucketAutoDoc, cursor.Current)
bound, err := bucketAutoDoc.LookupErr("_id", "max")
if err != nil {
return errors.Wrapf(err, "failed to lookup '_id.max' key in $bucketAuto document for source namespace '%s.%s'", srcDB.Name(), collName)
}

// Append the copied bound to the other mid _id bounds.
midIDBounds = append(midIDBounds, bound)
ri.NoteSuccess()
}

return cursor.Err()
})
agRetryer := retry.New().WithErrorCodes(util.SampleTooManyDuplicates)
err := agRetryer.
WithCallback(
func(ctx context.Context, ri *retry.FuncInfo) error {
ri.Log(logger.Logger, "aggregate", "source", srcDB.Name(), collName, "Retrieving mid _id partition bounds using $sample.")
cursor, cmdErr :=
srcDB.RunCommandCursor(ctx, bson.D{
{"aggregate", collName},
{"pipeline", pipeline},
{"allowDiskUse", true},
{"cursor", bson.D{}},
})

if cmdErr != nil {
return errors.Wrapf(cmdErr, "failed to $sample and $bucketAuto documents for source namespace '%s.%s'", srcDB.Name(), collName)
}

defer cursor.Close(ctx)

// Iterate through all $bucketAuto documents of the form:
// {
// "_id" : {
// "min" : ... ,
// "max" : ...
// },
// "count" : ...
// }
midIDBounds = make([]interface{}, 0, numPartitions)
for cursor.Next(ctx) {
// Get a mid _id bound using the $bucketAuto document's max value.
bucketAutoDoc := make(bson.Raw, len(cursor.Current))
copy(bucketAutoDoc, cursor.Current)
bound, err := bucketAutoDoc.LookupErr("_id", "max")
if err != nil {
return errors.Wrapf(err, "failed to lookup '_id.max' key in $bucketAuto document for source namespace '%s.%s'", srcDB.Name(), collName)
}

// Append the copied bound to the other mid _id bounds.
midIDBounds = append(midIDBounds, bound)
ri.NoteSuccess()
}

return cursor.Err()
},
"finding %#q's _id partition boundaries",
srcDB.Name()+"."+collName,
).Run(ctx, logger)

if err != nil {
return nil, false, errors.Wrapf(err, "encountered a problem in the cursor when trying to $sample and $bucketAuto aggregation for source namespace '%s.%s'", srcDB.Name(), collName)
Expand Down
Loading
Loading