Skip to content

Commit aea6160

Browse files
committed
revamp retryer
1 parent 4302d79 commit aea6160

File tree

16 files changed

+539
-328
lines changed

16 files changed

+539
-328
lines changed

internal/partitions/partitions.go

Lines changed: 136 additions & 125 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,6 @@ const (
119119
func PartitionCollectionWithSize(
120120
ctx context.Context,
121121
uuidEntry *uuidutil.NamespaceAndUUID,
122-
retryer *retry.Retryer,
123122
srcClient *mongo.Client,
124123
replicatorList []Replicator,
125124
subLogger *logger.Logger,
@@ -137,7 +136,6 @@ func PartitionCollectionWithSize(
137136
partitions, docCount, byteCount, err := PartitionCollectionWithParameters(
138137
ctx,
139138
uuidEntry,
140-
retryer,
141139
srcClient,
142140
replicatorList,
143141
defaultSampleRate,
@@ -153,7 +151,6 @@ func PartitionCollectionWithSize(
153151
return PartitionCollectionWithParameters(
154152
ctx,
155153
uuidEntry,
156-
retryer,
157154
srcClient,
158155
replicatorList,
159156
defaultSampleRate,
@@ -174,7 +171,6 @@ func PartitionCollectionWithSize(
174171
func PartitionCollectionWithParameters(
175172
ctx context.Context,
176173
uuidEntry *uuidutil.NamespaceAndUUID,
177-
retryer *retry.Retryer,
178174
srcClient *mongo.Client,
179175
replicatorList []Replicator,
180176
sampleRate float64,
@@ -191,13 +187,13 @@ func PartitionCollectionWithParameters(
191187

192188
// Get the collection's size in bytes and its document count. It is okay if these return zero since there might still be
193189
// items in the collection. Rely on getOuterIDBound to do a majority read to determine if we continue processing the collection.
194-
collSizeInBytes, collDocCount, isCapped, err := GetSizeAndDocumentCount(ctx, subLogger, retryer, srcColl)
190+
collSizeInBytes, collDocCount, isCapped, err := GetSizeAndDocumentCount(ctx, subLogger, srcColl)
195191
if err != nil {
196192
return nil, 0, 0, err
197193
}
198194

199195
// The lower bound for the collection. There is no partitioning to do if the bound is nil.
200-
minIDBound, err := getOuterIDBound(ctx, subLogger, retryer, minBound, srcDB, uuidEntry.CollName, globalFilter)
196+
minIDBound, err := getOuterIDBound(ctx, subLogger, minBound, srcDB, uuidEntry.CollName, globalFilter)
201197
if err != nil {
202198
return nil, 0, 0, err
203199
}
@@ -210,7 +206,7 @@ func PartitionCollectionWithParameters(
210206
}
211207

212208
// The upper bound for the collection. There is no partitioning to do if the bound is nil.
213-
maxIDBound, err := getOuterIDBound(ctx, subLogger, retryer, maxBound, srcDB, uuidEntry.CollName, globalFilter)
209+
maxIDBound, err := getOuterIDBound(ctx, subLogger, maxBound, srcDB, uuidEntry.CollName, globalFilter)
214210
if err != nil {
215211
return nil, 0, 0, err
216212
}
@@ -232,7 +228,7 @@ func PartitionCollectionWithParameters(
232228

233229
// If a filter is used for partitioning, number of partitions is calculated with the ratio of filtered documents.
234230
if len(globalFilter) > 0 {
235-
numFilteredDocs, filteredCntErr := GetDocumentCountAfterFiltering(ctx, subLogger, retryer, srcColl, globalFilter)
231+
numFilteredDocs, filteredCntErr := GetDocumentCountAfterFiltering(ctx, subLogger, srcColl, globalFilter)
236232
if filteredCntErr == nil {
237233
numPartitions = getNumPartitions(collSizeInBytes, partitionSizeInBytes, float64(numFilteredDocs)/float64(collDocCount))
238234
} else {
@@ -251,7 +247,6 @@ func PartitionCollectionWithParameters(
251247
midIDBounds, collDropped, err := getMidIDBounds(
252248
ctx,
253249
subLogger,
254-
retryer,
255250
srcDB,
256251
uuidEntry.CollName,
257252
collDocCount,
@@ -314,7 +309,7 @@ func PartitionCollectionWithParameters(
314309
// capped status, in that order.
315310
//
316311
// Exported for usage in integration tests.
317-
func GetSizeAndDocumentCount(ctx context.Context, logger *logger.Logger, retryer *retry.Retryer, srcColl *mongo.Collection) (int64, int64, bool, error) {
312+
func GetSizeAndDocumentCount(ctx context.Context, logger *logger.Logger, srcColl *mongo.Collection) (int64, int64, bool, error) {
318313
srcDB := srcColl.Database()
319314
collName := srcColl.Name()
320315

@@ -324,39 +319,43 @@ func GetSizeAndDocumentCount(ctx context.Context, logger *logger.Logger, retryer
324319
Capped bool `bson:"capped"`
325320
}{}
326321

327-
err := retryer.Run(ctx, logger, func(ctx context.Context, ri *retry.FuncInfo) error {
328-
ri.Log(logger.Logger, "collStats", "source", srcDB.Name(), collName, "Retrieving collection size and document count.")
329-
request := bson.D{
330-
{"aggregate", collName},
331-
{"pipeline", mongo.Pipeline{
332-
bson.D{{"$collStats", bson.D{
333-
{"storageStats", bson.E{"scale", 1}},
334-
}}},
335-
// The "$group" here behaves as a project and rename when there's only one
336-
// document (non-sharded case). When there are multiple documents (one for
337-
// each shard) it correctly sums the counts and sizes from each shard.
338-
bson.D{{"$group", bson.D{
339-
{"_id", "ns"},
340-
{"count", bson.D{{"$sum", "$storageStats.count"}}},
341-
{"size", bson.D{{"$sum", "$storageStats.size"}}},
342-
{"capped", bson.D{{"$first", "$capped"}}}}}},
343-
}},
344-
{"cursor", bson.D{}},
345-
}
322+
err := retry.New().WithCallback(
323+
func(ctx context.Context, ri *retry.FuncInfo) error {
324+
ri.Log(logger.Logger, "collStats", "source", srcDB.Name(), collName, "Retrieving collection size and document count.")
325+
request := bson.D{
326+
{"aggregate", collName},
327+
{"pipeline", mongo.Pipeline{
328+
bson.D{{"$collStats", bson.D{
329+
{"storageStats", bson.E{"scale", 1}},
330+
}}},
331+
// The "$group" here behaves as a project and rename when there's only one
332+
// document (non-sharded case). When there are multiple documents (one for
333+
// each shard) it correctly sums the counts and sizes from each shard.
334+
bson.D{{"$group", bson.D{
335+
{"_id", "ns"},
336+
{"count", bson.D{{"$sum", "$storageStats.count"}}},
337+
{"size", bson.D{{"$sum", "$storageStats.size"}}},
338+
{"capped", bson.D{{"$first", "$capped"}}}}}},
339+
}},
340+
{"cursor", bson.D{}},
341+
}
346342

347-
cursor, driverErr := srcDB.RunCommandCursor(ctx, request)
348-
if driverErr != nil {
349-
return driverErr
350-
}
343+
cursor, driverErr := srcDB.RunCommandCursor(ctx, request)
344+
if driverErr != nil {
345+
return driverErr
346+
}
351347

352-
defer cursor.Close(ctx)
353-
if cursor.Next(ctx) {
354-
if err := cursor.Decode(&value); err != nil {
355-
return errors.Wrapf(err, "failed to decode $collStats response for source namespace %s.%s", srcDB.Name(), collName)
348+
defer cursor.Close(ctx)
349+
if cursor.Next(ctx) {
350+
if err := cursor.Decode(&value); err != nil {
351+
return errors.Wrapf(err, "failed to decode $collStats response for source namespace %s.%s", srcDB.Name(), collName)
352+
}
356353
}
357-
}
358-
return nil
359-
})
354+
return nil
355+
},
356+
"retrieving %#q's statistics",
357+
srcDB.Name()+"."+collName,
358+
).Run(ctx, logger)
360359

361360
// TODO (REP-960): remove this check.
362361
// If we get NamespaceNotFoundError then return 0,0 since we won't do any partitioning with those returns
@@ -380,7 +379,7 @@ func GetSizeAndDocumentCount(ctx context.Context, logger *logger.Logger, retryer
380379
//
381380
// This function could take a long time, especially if the collection does not have an index
382381
// on the filtered fields.
383-
func GetDocumentCountAfterFiltering(ctx context.Context, logger *logger.Logger, retryer *retry.Retryer, srcColl *mongo.Collection, filter map[string]any) (int64, error) {
382+
func GetDocumentCountAfterFiltering(ctx context.Context, logger *logger.Logger, srcColl *mongo.Collection, filter map[string]any) (int64, error) {
384383
srcDB := srcColl.Database()
385384
collName := srcColl.Name()
386385

@@ -395,27 +394,31 @@ func GetDocumentCountAfterFiltering(ctx context.Context, logger *logger.Logger,
395394
}
396395
pipeline = append(pipeline, bson.D{{"$count", "numFilteredDocs"}})
397396

398-
err := retryer.Run(ctx, logger, func(ctx context.Context, ri *retry.FuncInfo) error {
399-
ri.Log(logger.Logger, "count", "source", srcDB.Name(), collName, "Counting filtered documents.")
400-
request := bson.D{
401-
{"aggregate", collName},
402-
{"pipeline", pipeline},
403-
{"cursor", bson.D{}},
404-
}
397+
err := retry.New().WithCallback(
398+
func(ctx context.Context, ri *retry.FuncInfo) error {
399+
ri.Log(logger.Logger, "count", "source", srcDB.Name(), collName, "Counting filtered documents.")
400+
request := bson.D{
401+
{"aggregate", collName},
402+
{"pipeline", pipeline},
403+
{"cursor", bson.D{}},
404+
}
405405

406-
cursor, driverErr := srcDB.RunCommandCursor(ctx, request)
407-
if driverErr != nil {
408-
return driverErr
409-
}
406+
cursor, driverErr := srcDB.RunCommandCursor(ctx, request)
407+
if driverErr != nil {
408+
return driverErr
409+
}
410410

411-
defer cursor.Close(ctx)
412-
if cursor.Next(ctx) {
413-
if err := cursor.Decode(&value); err != nil {
414-
return errors.Wrapf(err, "failed to decode $count response (%+v) for source namespace %s.%s after filter (%+v)", cursor.Current, srcDB.Name(), collName, filter)
411+
defer cursor.Close(ctx)
412+
if cursor.Next(ctx) {
413+
if err := cursor.Decode(&value); err != nil {
414+
return errors.Wrapf(err, "failed to decode $count response (%+v) for source namespace %s.%s after filter (%+v)", cursor.Current, srcDB.Name(), collName, filter)
415+
}
415416
}
416-
}
417-
return nil
418-
})
417+
return nil
418+
},
419+
"counting %#q's filtered documents",
420+
srcDB.Name()+"."+collName,
421+
).Run(ctx, logger)
419422

420423
// TODO (REP-960): remove this check.
421424
// If we get NamespaceNotFoundError then return 0 since we won't do any partitioning with those returns
@@ -458,7 +461,6 @@ func getNumPartitions(collSizeInBytes, partitionSizeInBytes int64, filteredRatio
458461
func getOuterIDBound(
459462
ctx context.Context,
460463
subLogger *logger.Logger,
461-
retryer *retry.Retryer,
462464
minOrMaxBound minOrMaxBound,
463465
srcDB *mongo.Database,
464466
collName string,
@@ -488,30 +490,35 @@ func getOuterIDBound(
488490
}...)
489491

490492
// Get one document containing only the smallest or largest _id value in the collection.
491-
err := retryer.Run(ctx, subLogger, func(ctx context.Context, ri *retry.FuncInfo) error {
492-
ri.Log(subLogger.Logger, "aggregate", "source", srcDB.Name(), collName, fmt.Sprintf("getting %s _id partition bound", minOrMaxBound))
493-
cursor, cmdErr :=
494-
srcDB.RunCommandCursor(ctx, bson.D{
495-
{"aggregate", collName},
496-
{"pipeline", pipeline},
497-
{"hint", bson.D{{"_id", 1}}},
498-
{"cursor", bson.D{}},
499-
})
500-
501-
if cmdErr != nil {
502-
return cmdErr
503-
}
493+
err := retry.New().WithCallback(
494+
func(ctx context.Context, ri *retry.FuncInfo) error {
495+
ri.Log(subLogger.Logger, "aggregate", "source", srcDB.Name(), collName, fmt.Sprintf("getting %s _id partition bound", minOrMaxBound))
496+
cursor, cmdErr :=
497+
srcDB.RunCommandCursor(ctx, bson.D{
498+
{"aggregate", collName},
499+
{"pipeline", pipeline},
500+
{"hint", bson.D{{"_id", 1}}},
501+
{"cursor", bson.D{}},
502+
})
503+
504+
if cmdErr != nil {
505+
return cmdErr
506+
}
504507

505-
// If we don't have at least one document, the collection is either empty or was dropped.
506-
defer cursor.Close(ctx)
507-
if !cursor.Next(ctx) {
508-
return nil
509-
}
508+
// If we don't have at least one document, the collection is either empty or was dropped.
509+
defer cursor.Close(ctx)
510+
if !cursor.Next(ctx) {
511+
return nil
512+
}
510513

511-
// Return the _id value from that document.
512-
docID, cmdErr = cursor.Current.LookupErr("_id")
513-
return cmdErr
514-
})
514+
// Return the _id value from that document.
515+
docID, cmdErr = cursor.Current.LookupErr("_id")
516+
return cmdErr
517+
},
518+
"finding %#q's %s _id",
519+
srcDB.Name()+"."+collName,
520+
minOrMaxBound,
521+
).Run(ctx, subLogger)
515522

516523
if err != nil {
517524
return nil, errors.Wrapf(err, "could not get %s _id bound for source collection '%s.%s'", minOrMaxBound, srcDB.Name(), collName)
@@ -528,7 +535,6 @@ func getOuterIDBound(
528535
func getMidIDBounds(
529536
ctx context.Context,
530537
logger *logger.Logger,
531-
retryer *retry.Retryer,
532538
srcDB *mongo.Database,
533539
collName string,
534540
collDocCount int64,
@@ -576,48 +582,53 @@ func getMidIDBounds(
576582

577583
// Get a cursor for the $sample and $bucketAuto aggregation.
578584
var midIDBounds []interface{}
579-
agRetryer := retryer.WithErrorCodes(util.SampleTooManyDuplicates)
580-
err := agRetryer.Run(ctx, logger, func(ctx context.Context, ri *retry.FuncInfo) error {
581-
ri.Log(logger.Logger, "aggregate", "source", srcDB.Name(), collName, "Retrieving mid _id partition bounds using $sample.")
582-
cursor, cmdErr :=
583-
srcDB.RunCommandCursor(ctx, bson.D{
584-
{"aggregate", collName},
585-
{"pipeline", pipeline},
586-
{"allowDiskUse", true},
587-
{"cursor", bson.D{}},
588-
})
589-
590-
if cmdErr != nil {
591-
return errors.Wrapf(cmdErr, "failed to $sample and $bucketAuto documents for source namespace '%s.%s'", srcDB.Name(), collName)
592-
}
593-
594-
defer cursor.Close(ctx)
595-
596-
// Iterate through all $bucketAuto documents of the form:
597-
// {
598-
// "_id" : {
599-
// "min" : ... ,
600-
// "max" : ...
601-
// },
602-
// "count" : ...
603-
// }
604-
midIDBounds = make([]interface{}, 0, numPartitions)
605-
for cursor.Next(ctx) {
606-
// Get a mid _id bound using the $bucketAuto document's max value.
607-
bucketAutoDoc := make(bson.Raw, len(cursor.Current))
608-
copy(bucketAutoDoc, cursor.Current)
609-
bound, err := bucketAutoDoc.LookupErr("_id", "max")
610-
if err != nil {
611-
return errors.Wrapf(err, "failed to lookup '_id.max' key in $bucketAuto document for source namespace '%s.%s'", srcDB.Name(), collName)
612-
}
613-
614-
// Append the copied bound to the other mid _id bounds.
615-
midIDBounds = append(midIDBounds, bound)
616-
ri.NoteSuccess()
617-
}
618-
619-
return cursor.Err()
620-
})
585+
agRetryer := retry.New().WithErrorCodes(util.SampleTooManyDuplicates)
586+
err := agRetryer.
587+
WithCallback(
588+
func(ctx context.Context, ri *retry.FuncInfo) error {
589+
ri.Log(logger.Logger, "aggregate", "source", srcDB.Name(), collName, "Retrieving mid _id partition bounds using $sample.")
590+
cursor, cmdErr :=
591+
srcDB.RunCommandCursor(ctx, bson.D{
592+
{"aggregate", collName},
593+
{"pipeline", pipeline},
594+
{"allowDiskUse", true},
595+
{"cursor", bson.D{}},
596+
})
597+
598+
if cmdErr != nil {
599+
return errors.Wrapf(cmdErr, "failed to $sample and $bucketAuto documents for source namespace '%s.%s'", srcDB.Name(), collName)
600+
}
601+
602+
defer cursor.Close(ctx)
603+
604+
// Iterate through all $bucketAuto documents of the form:
605+
// {
606+
// "_id" : {
607+
// "min" : ... ,
608+
// "max" : ...
609+
// },
610+
// "count" : ...
611+
// }
612+
midIDBounds = make([]interface{}, 0, numPartitions)
613+
for cursor.Next(ctx) {
614+
// Get a mid _id bound using the $bucketAuto document's max value.
615+
bucketAutoDoc := make(bson.Raw, len(cursor.Current))
616+
copy(bucketAutoDoc, cursor.Current)
617+
bound, err := bucketAutoDoc.LookupErr("_id", "max")
618+
if err != nil {
619+
return errors.Wrapf(err, "failed to lookup '_id.max' key in $bucketAuto document for source namespace '%s.%s'", srcDB.Name(), collName)
620+
}
621+
622+
// Append the copied bound to the other mid _id bounds.
623+
midIDBounds = append(midIDBounds, bound)
624+
ri.NoteSuccess()
625+
}
626+
627+
return cursor.Err()
628+
},
629+
"finding %#q's _id partition boundaries",
630+
srcDB.Name()+"."+collName,
631+
).Run(ctx, logger)
621632

622633
if err != nil {
623634
return nil, false, errors.Wrapf(err, "encountered a problem in the cursor when trying to $sample and $bucketAuto aggregation for source namespace '%s.%s'", srcDB.Name(), collName)

0 commit comments

Comments
 (0)