Skip to content

Commit ff0c7e9

Browse files
committed
Merge branch 'main' into REP-5317-add-dst-change-stream
2 parents d15b020 + 31cc703 commit ff0c7e9

30 files changed

+1152
-776
lines changed

internal/partitions/partition.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ func (p *Partition) FindCmd(
137137
// (e.g. use the partitions on the source to read the destination for verification)
138138
// If the passed-in buildinfo indicates a mongodb version < 5.0, type bracketing is not used.
139139
// filterAndPredicates is a slice of filter criteria that's used to construct the "filter" field in the find option.
140-
func (p *Partition) GetFindOptions(buildInfo *util.BuildInfo, filterAndPredicates bson.A) bson.D {
140+
func (p *Partition) GetFindOptions(clusterInfo *util.ClusterInfo, filterAndPredicates bson.A) bson.D {
141141
if p == nil {
142142
if len(filterAndPredicates) > 0 {
143143
return bson.D{{"filter", bson.D{{"$and", filterAndPredicates}}}}
@@ -158,11 +158,11 @@ func (p *Partition) GetFindOptions(buildInfo *util.BuildInfo, filterAndPredicate
158158
// For non-capped collections, the cursor should use the ID filter and the _id index.
159159
// Get the bounded query filter from the partition to be used in the Find command.
160160
allowTypeBracketing := false
161-
if buildInfo != nil {
161+
if clusterInfo != nil {
162162
allowTypeBracketing = true
163163

164-
if buildInfo.VersionArray != nil {
165-
allowTypeBracketing = buildInfo.VersionArray[0] < 5
164+
if clusterInfo.VersionArray != nil {
165+
allowTypeBracketing = clusterInfo.VersionArray[0] < 5
166166
}
167167
}
168168
if !allowTypeBracketing {

internal/partitions/partition_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -78,37 +78,37 @@ func (suite *UnitTestSuite) TestVersioning() {
7878
suite.Require().Equal(expectedFilter, filter)
7979

8080
// 6.0
81-
findOptions = partition.GetFindOptions(&util.BuildInfo{VersionArray: []int{6, 0, 0}}, nil)
81+
findOptions = partition.GetFindOptions(&util.ClusterInfo{VersionArray: []int{6, 0, 0}}, nil)
8282
filter = getFilterFromFindOptions(findOptions)
8383
suite.Require().Equal(expectedFilter, filter)
8484

8585
// 5.3.0.9
86-
findOptions = partition.GetFindOptions(&util.BuildInfo{VersionArray: []int{5, 3, 0, 9}}, nil)
86+
findOptions = partition.GetFindOptions(&util.ClusterInfo{VersionArray: []int{5, 3, 0, 9}}, nil)
8787
filter = getFilterFromFindOptions(findOptions)
8888
suite.Require().Equal(expectedFilter, filter)
8989

9090
// 7.1.3.5
91-
findOptions = partition.GetFindOptions(&util.BuildInfo{VersionArray: []int{7, 1, 3, 5}}, nil)
91+
findOptions = partition.GetFindOptions(&util.ClusterInfo{VersionArray: []int{7, 1, 3, 5}}, nil)
9292
filter = getFilterFromFindOptions(findOptions)
9393
suite.Require().Equal(expectedFilter, filter)
9494

9595
// 4.4 (int64)
96-
findOptions = partition.GetFindOptions(&util.BuildInfo{VersionArray: []int{4, 4, 0, 0}}, nil)
96+
findOptions = partition.GetFindOptions(&util.ClusterInfo{VersionArray: []int{4, 4, 0, 0}}, nil)
9797
filter = getFilterFromFindOptions(findOptions)
9898
suite.Require().Equal(expectedFilterWithTypeBracketing, filter)
9999

100100
// 4.4
101-
findOptions = partition.GetFindOptions(&util.BuildInfo{VersionArray: []int{4, 4, 0, 0}}, nil)
101+
findOptions = partition.GetFindOptions(&util.ClusterInfo{VersionArray: []int{4, 4, 0, 0}}, nil)
102102
filter = getFilterFromFindOptions(findOptions)
103103
suite.Require().Equal(expectedFilterWithTypeBracketing, filter)
104104

105105
// 4.2
106-
findOptions = partition.GetFindOptions(&util.BuildInfo{VersionArray: []int{4, 2, 0, 0}}, nil)
106+
findOptions = partition.GetFindOptions(&util.ClusterInfo{VersionArray: []int{4, 2, 0, 0}}, nil)
107107
filter = getFilterFromFindOptions(findOptions)
108108
suite.Require().Equal(expectedFilterWithTypeBracketing, filter)
109109

110110
// No version array -- assume old, require type bracketing.
111-
findOptions = partition.GetFindOptions(&util.BuildInfo{}, nil)
111+
findOptions = partition.GetFindOptions(&util.ClusterInfo{}, nil)
112112
filter = getFilterFromFindOptions(findOptions)
113113
suite.Require().Equal(expectedFilterWithTypeBracketing, filter)
114114
}

internal/partitions/partitions.go

Lines changed: 30 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -191,13 +191,13 @@ func PartitionCollectionWithParameters(
191191

192192
// Get the collection's size in bytes and its document count. It is okay if these return zero since there might still be
193193
// items in the collection. Rely on getOuterIDBound to do a majority read to determine if we continue processing the collection.
194-
collSizeInBytes, collDocCount, isCapped, err := GetSizeAndDocumentCount(ctx, subLogger, retryer, srcColl, uuidEntry.UUID)
194+
collSizeInBytes, collDocCount, isCapped, err := GetSizeAndDocumentCount(ctx, subLogger, retryer, srcColl)
195195
if err != nil {
196196
return nil, 0, 0, err
197197
}
198198

199199
// The lower bound for the collection. There is no partitioning to do if the bound is nil.
200-
minIDBound, err := getOuterIDBound(ctx, subLogger, retryer, minBound, srcDB, uuidEntry.CollName, uuidEntry.UUID, globalFilter)
200+
minIDBound, err := getOuterIDBound(ctx, subLogger, retryer, minBound, srcDB, uuidEntry.CollName, globalFilter)
201201
if err != nil {
202202
return nil, 0, 0, err
203203
}
@@ -210,7 +210,7 @@ func PartitionCollectionWithParameters(
210210
}
211211

212212
// The upper bound for the collection. There is no partitioning to do if the bound is nil.
213-
maxIDBound, err := getOuterIDBound(ctx, subLogger, retryer, maxBound, srcDB, uuidEntry.CollName, uuidEntry.UUID, globalFilter)
213+
maxIDBound, err := getOuterIDBound(ctx, subLogger, retryer, maxBound, srcDB, uuidEntry.CollName, globalFilter)
214214
if err != nil {
215215
return nil, 0, 0, err
216216
}
@@ -232,7 +232,7 @@ func PartitionCollectionWithParameters(
232232

233233
// If a filter is used for partitioning, number of partitions is calculated with the ratio of filtered documents.
234234
if len(globalFilter) > 0 {
235-
numFilteredDocs, filteredCntErr := GetDocumentCountAfterFiltering(ctx, subLogger, retryer, srcColl, uuidEntry.UUID, globalFilter)
235+
numFilteredDocs, filteredCntErr := GetDocumentCountAfterFiltering(ctx, subLogger, retryer, srcColl, globalFilter)
236236
if filteredCntErr == nil {
237237
numPartitions = getNumPartitions(collSizeInBytes, partitionSizeInBytes, float64(numFilteredDocs)/float64(collDocCount))
238238
} else {
@@ -254,7 +254,6 @@ func PartitionCollectionWithParameters(
254254
retryer,
255255
srcDB,
256256
uuidEntry.CollName,
257-
uuidEntry.UUID,
258257
collDocCount,
259258
numPartitions,
260259
sampleMinNumDocs,
@@ -315,7 +314,7 @@ func PartitionCollectionWithParameters(
315314
// capped status, in that order.
316315
//
317316
// Exported for usage in integration tests.
318-
func GetSizeAndDocumentCount(ctx context.Context, logger *logger.Logger, retryer *retry.Retryer, srcColl *mongo.Collection, collUUID util.UUID) (int64, int64, bool, error) {
317+
func GetSizeAndDocumentCount(ctx context.Context, logger *logger.Logger, retryer *retry.Retryer, srcColl *mongo.Collection) (int64, int64, bool, error) {
319318
srcDB := srcColl.Database()
320319
collName := srcColl.Name()
321320

@@ -325,10 +324,10 @@ func GetSizeAndDocumentCount(ctx context.Context, logger *logger.Logger, retryer
325324
Capped bool `bson:"capped"`
326325
}{}
327326

328-
currCollName, err := retryer.RunForUUIDAndTransientErrors(ctx, logger, collName, func(ri *retry.Info, collectionName string) error {
329-
ri.Log(logger.Logger, "collStats", "source", srcDB.Name(), collectionName, "Retrieving collection size and document count.")
330-
request := retryer.RequestWithUUID(bson.D{
331-
{"aggregate", collectionName},
327+
err := retryer.Run(ctx, logger, func(ctx context.Context, ri *retry.FuncInfo) error {
328+
ri.Log(logger.Logger, "collStats", "source", srcDB.Name(), collName, "Retrieving collection size and document count.")
329+
request := bson.D{
330+
{"aggregate", collName},
332331
{"pipeline", mongo.Pipeline{
333332
bson.D{{"$collStats", bson.D{
334333
{"storageStats", bson.E{"scale", 1}},
@@ -343,7 +342,7 @@ func GetSizeAndDocumentCount(ctx context.Context, logger *logger.Logger, retryer
343342
{"capped", bson.D{{"$first", "$capped"}}}}}},
344343
}},
345344
{"cursor", bson.D{}},
346-
}, collUUID)
345+
}
347346

348347
cursor, driverErr := srcDB.RunCommandCursor(ctx, request)
349348
if driverErr != nil {
@@ -371,15 +370,8 @@ func GetSizeAndDocumentCount(ctx context.Context, logger *logger.Logger, retryer
371370
return 0, 0, false, errors.Wrapf(err, "failed to run aggregation for $collStats for source namespace %s.%s", srcDB.Name(), collName)
372371
}
373372

374-
// CollectionUUIDMismatch where the collection does not exist will return a nil cursor and nil
375-
// error.
376-
if currCollName == "" {
377-
// Return 0, 0, nil as CollectionUUIDMismatch should not cause an initial sync error.
378-
return 0, 0, false, nil
379-
}
380-
381373
logger.Debug().Msgf("Collection %s.%s size: %d, document count: %d, capped: %v",
382-
srcDB.Name(), currCollName, value.Size, value.Count, value.Capped)
374+
srcDB.Name(), collName, value.Size, value.Count, value.Capped)
383375

384376
return value.Size, value.Count, value.Capped, nil
385377
}
@@ -388,7 +380,7 @@ func GetSizeAndDocumentCount(ctx context.Context, logger *logger.Logger, retryer
388380
//
389381
// This function could take a long time, especially if the collection does not have an index
390382
// on the filtered fields.
391-
func GetDocumentCountAfterFiltering(ctx context.Context, logger *logger.Logger, retryer *retry.Retryer, srcColl *mongo.Collection, collUUID util.UUID, filter map[string]any) (int64, error) {
383+
func GetDocumentCountAfterFiltering(ctx context.Context, logger *logger.Logger, retryer *retry.Retryer, srcColl *mongo.Collection, filter map[string]any) (int64, error) {
392384
srcDB := srcColl.Database()
393385
collName := srcColl.Name()
394386

@@ -403,13 +395,13 @@ func GetDocumentCountAfterFiltering(ctx context.Context, logger *logger.Logger,
403395
}
404396
pipeline = append(pipeline, bson.D{{"$count", "numFilteredDocs"}})
405397

406-
currCollName, err := retryer.RunForUUIDAndTransientErrors(ctx, logger, collName, func(ri *retry.Info, collectionName string) error {
407-
ri.Log(logger.Logger, "count", "source", srcDB.Name(), collectionName, "Counting filtered documents.")
408-
request := retryer.RequestWithUUID(bson.D{
409-
{"aggregate", collectionName},
398+
err := retryer.Run(ctx, logger, func(ctx context.Context, ri *retry.FuncInfo) error {
399+
ri.Log(logger.Logger, "count", "source", srcDB.Name(), collName, "Counting filtered documents.")
400+
request := bson.D{
401+
{"aggregate", collName},
410402
{"pipeline", pipeline},
411403
{"cursor", bson.D{}},
412-
}, collUUID)
404+
}
413405

414406
cursor, driverErr := srcDB.RunCommandCursor(ctx, request)
415407
if driverErr != nil {
@@ -437,15 +429,8 @@ func GetDocumentCountAfterFiltering(ctx context.Context, logger *logger.Logger,
437429
return 0, errors.Wrapf(err, "failed to run aggregation $count for source namespace %s.%s after filter (%+v)", srcDB.Name(), collName, filter)
438430
}
439431

440-
// CollectionUUIDMismatch where the collection does not exist will return a nil cursor and nil
441-
// error.
442-
if currCollName == "" {
443-
// Return 0, nil as CollectionUUIDMismatch should not cause an initial sync error.
444-
return 0, nil
445-
}
446-
447432
logger.Debug().Msgf("Collection %s.%s filtered document count: %d, filter: %+v",
448-
srcDB.Name(), currCollName, value.Count, filter)
433+
srcDB.Name(), collName, value.Count, filter)
449434

450435
return value.Count, nil
451436
}
@@ -477,7 +462,6 @@ func getOuterIDBound(
477462
minOrMaxBound minOrMaxBound,
478463
srcDB *mongo.Database,
479464
collName string,
480-
collUUID util.UUID,
481465
globalFilter map[string]any,
482466
) (interface{}, error) {
483467
// Choose a sort direction based on the minOrMaxBound.
@@ -504,15 +488,15 @@ func getOuterIDBound(
504488
}...)
505489

506490
// Get one document containing only the smallest or largest _id value in the collection.
507-
currCollName, err := retryer.RunForUUIDAndTransientErrors(ctx, subLogger, collName, func(ri *retry.Info, collName string) error {
491+
err := retryer.Run(ctx, subLogger, func(ctx context.Context, ri *retry.FuncInfo) error {
508492
ri.Log(subLogger.Logger, "aggregate", "source", srcDB.Name(), collName, fmt.Sprintf("getting %s _id partition bound", minOrMaxBound))
509493
cursor, cmdErr :=
510-
srcDB.RunCommandCursor(ctx, retryer.RequestWithUUID(bson.D{
494+
srcDB.RunCommandCursor(ctx, bson.D{
511495
{"aggregate", collName},
512496
{"pipeline", pipeline},
513497
{"hint", bson.D{{"_id", 1}}},
514498
{"cursor", bson.D{}},
515-
}, collUUID))
499+
})
516500

517501
if cmdErr != nil {
518502
return cmdErr
@@ -530,12 +514,7 @@ func getOuterIDBound(
530514
})
531515

532516
if err != nil {
533-
return nil, errors.Wrapf(err, "could not get %s _id bound for source collection '%s.%s', UUID %s", minOrMaxBound, srcDB.Name(), collName, collUUID.String())
534-
}
535-
536-
if currCollName == "" {
537-
subLogger.Debug().Msgf("Not getting %s _id bound for source collection '%s.%s', UUID %s, because it was dropped", minOrMaxBound, srcDB.Name(), collName, collUUID.String())
538-
return nil, nil
517+
return nil, errors.Wrapf(err, "could not get %s _id bound for source collection '%s.%s'", minOrMaxBound, srcDB.Name(), collName)
539518
}
540519

541520
return docID, nil
@@ -552,7 +531,6 @@ func getMidIDBounds(
552531
retryer *retry.Retryer,
553532
srcDB *mongo.Database,
554533
collName string,
555-
collUUID util.UUID,
556534
collDocCount int64,
557535
numPartitions, sampleMinNumDocs int,
558536
sampleRate float64,
@@ -585,7 +563,7 @@ func getMidIDBounds(
585563
msg = fmt.Sprintf("Sampling %d documents", numDocsToSample)
586564
}
587565

588-
logger.Info().Msgf("%s to make %d partitions for collection '%s.%s', UUID %s", msg, numPartitions, srcDB.Name(), collName, collUUID.String())
566+
logger.Info().Msgf("%s to make %d partitions for collection '%s.%s'", msg, numPartitions, srcDB.Name(), collName)
589567
pipeline = append(pipeline, []bson.D{
590568
{{"$sample", bson.D{{"size", numDocsToSample}}}},
591569
{{"$project", bson.D{{"_id", 1}}}},
@@ -599,18 +577,18 @@ func getMidIDBounds(
599577
// Get a cursor for the $sample and $bucketAuto aggregation.
600578
var midIDBounds []interface{}
601579
agRetryer := retryer.WithErrorCodes(util.SampleTooManyDuplicates)
602-
currCollName, err := agRetryer.RunForUUIDAndTransientErrors(ctx, logger, collName, func(ri *retry.Info, collName string) error {
580+
err := agRetryer.Run(ctx, logger, func(ctx context.Context, ri *retry.FuncInfo) error {
603581
ri.Log(logger.Logger, "aggregate", "source", srcDB.Name(), collName, "Retrieving mid _id partition bounds using $sample.")
604582
cursor, cmdErr :=
605-
srcDB.RunCommandCursor(ctx, retryer.RequestWithUUID(bson.D{
583+
srcDB.RunCommandCursor(ctx, bson.D{
606584
{"aggregate", collName},
607585
{"pipeline", pipeline},
608586
{"allowDiskUse", true},
609587
{"cursor", bson.D{}},
610-
}, collUUID))
588+
})
611589

612590
if cmdErr != nil {
613-
return errors.Wrapf(cmdErr, "failed to $sample and $bucketAuto documents for source namespace '%s.%s', UUID %s", srcDB.Name(), collName, collUUID.String())
591+
return errors.Wrapf(cmdErr, "failed to $sample and $bucketAuto documents for source namespace '%s.%s'", srcDB.Name(), collName)
614592
}
615593

616594
defer cursor.Close(ctx)
@@ -630,24 +608,19 @@ func getMidIDBounds(
630608
copy(bucketAutoDoc, cursor.Current)
631609
bound, err := bucketAutoDoc.LookupErr("_id", "max")
632610
if err != nil {
633-
return errors.Wrapf(err, "failed to lookup '_id.max' key in $bucketAuto document for source namespace '%s.%s', UUID %s", srcDB.Name(), collName, collUUID.String())
611+
return errors.Wrapf(err, "failed to lookup '_id.max' key in $bucketAuto document for source namespace '%s.%s'", srcDB.Name(), collName)
634612
}
635613

636614
// Append the copied bound to the other mid _id bounds.
637615
midIDBounds = append(midIDBounds, bound)
638-
ri.IterationSuccess()
616+
ri.NoteSuccess()
639617
}
640618

641619
return cursor.Err()
642620
})
643621

644622
if err != nil {
645-
return nil, false, errors.Wrapf(err, "encountered a problem in the cursor when trying to $sample and $bucketAuto aggregation for source namespace '%s.%s', UUID %s", srcDB.Name(), collName, collUUID.String())
646-
}
647-
648-
if currCollName == "" {
649-
// The collection was dropped on the source, so we return a nil mid ID bound.
650-
return nil, true, nil
623+
return nil, false, errors.Wrapf(err, "encountered a problem in the cursor when trying to $sample and $bucketAuto aggregation for source namespace '%s.%s'", srcDB.Name(), collName)
651624
}
652625

653626
if len(midIDBounds) == 0 {

internal/reportutils/reportutils.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,5 +183,6 @@ func FindBestUnit[T num16Plus](count T) DataUnit {
183183
// FmtBytes is a convenience that combines BytesToUnit with FindBestUnit.
184184
// Use it to format a single count of bytes.
185185
func FmtBytes[T num16Plus](count T) string {
186-
return BytesToUnit(count, FindBestUnit(count))
186+
unit := FindBestUnit(count)
187+
return BytesToUnit(count, unit) + " " + string(unit)
187188
}

internal/retry/constants.go

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,9 @@ package retry
33
import "time"
44

55
const (
6-
// DefaultDurationLimit is the default time limit for all retries. This is
7-
// currently 10 minutes.
6+
// DefaultDurationLimit is the default time limit for all retries.
87
DefaultDurationLimit = 10 * time.Minute
98

10-
// The number of CollectionUUIDMismatch error retries we do
11-
// before logging a warning. Under normal circumstances, we
12-
// don't ever expect more than a couple. Anything more than
13-
// that requires collection renames to interleave between
14-
// retry attempts.
15-
numCollectionUUIDRetriesBeforeWarning = 10
16-
179
// Constants for spacing out the retry attempts.
1810
// See: https://en.wikipedia.org/wiki/Exponential_backoff
1911
//

internal/retry/error.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package retry
2+
3+
import (
4+
"fmt"
5+
"time"
6+
7+
"github.com/10gen/migration-verifier/internal/reportutils"
8+
)
9+
10+
type RetryDurationLimitExceededErr struct {
11+
lastErr error
12+
attempts int
13+
duration time.Duration
14+
}
15+
16+
func (rde RetryDurationLimitExceededErr) Error() string {
17+
return fmt.Sprintf(
18+
"retryable function did not succeed after %d attempt(s) over %s; last error was: %v",
19+
rde.attempts,
20+
reportutils.DurationToHMS(rde.duration),
21+
rde.lastErr,
22+
)
23+
}
24+
25+
func (rde RetryDurationLimitExceededErr) Unwrap() error {
26+
return rde.lastErr
27+
}
28+
29+
// errgroupErr is an internal error type that we return from errgroup
30+
// callbacks. It allows us to know (reliably) which error is the one
31+
// that triggers the errgroup's failure
32+
type errgroupErr struct {
33+
funcNum int
34+
errFromCallback error
35+
}
36+
37+
func (ege errgroupErr) Error() string {
38+
return fmt.Sprintf("func %d failed: %v", ege.funcNum, ege.errFromCallback)
39+
}

0 commit comments

Comments
 (0)