Skip to content

Commit f398a89

Browse files
authored
REP-6492 Switch to $sampleRate-style partitioning when possible (#128)
$sample-based partitioning has proven problematic for some years now because it often creates highly-imbalanced partitions. This changeset switches partitioning to use $sampleRate instead. Because this entails a full index scan it tends to be slower; we offset that by creating partition tasks immediately as we receive sampled partition boundaries rather than all at once at the end of the aggregation. Because MongoDB 4.2 lacked $sampleRate (and $rand as well), the legacy partitioning logic remains for use with that server version. Both legacy & $sampleRate partitioning are made to use `available` read concern and `secondaryPreferred` read preference. These aggregations don’t need consistency, but they benefit substantially from speed & minimizing workload on the primary. A few simplifications are made here as well. For example, `MongosyncID` is removed from the PartitionKey struct since it’s never actually relevant, and certain parameters to the legacy partitioner are made constant (since they were always used thus). The `util.Divide` function is renamed `util.DivideToF64` to clarify that its return is a float.
1 parent b7a46a6 commit f398a89

File tree

9 files changed

+398
-79
lines changed

9 files changed

+398
-79
lines changed

internal/partitions/partition.go

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package partitions
33
import (
44
"fmt"
55
"slices"
6+
"time"
67

78
"github.com/10gen/migration-verifier/internal/util"
89
"github.com/10gen/migration-verifier/mbson"
@@ -13,13 +14,27 @@ import (
1314
"go.mongodb.org/mongo-driver/bson"
1415
"go.mongodb.org/mongo-driver/bson/primitive"
1516
"go.mongodb.org/mongo-driver/mongo"
17+
"go.mongodb.org/mongo-driver/mongo/options"
18+
"go.mongodb.org/mongo-driver/mongo/readconcern"
19+
"go.mongodb.org/mongo-driver/mongo/readpref"
1620
)
1721

22+
func ForPartitionAggregation(coll *mongo.Collection) *mongo.Collection {
23+
return coll.
24+
Database().
25+
Collection(
26+
coll.Name(),
27+
options.Collection().
28+
SetReadConcern(readconcern.Available()).
29+
SetReadPreference(
30+
readpref.SecondaryPreferred(readpref.WithMaxStaleness(90*time.Second))),
31+
)
32+
}
33+
1834
// PartitionKey represents the _id of a partition document stored in the destination.
1935
type PartitionKey struct {
20-
SourceUUID util.UUID `bson:"srcUUID"`
21-
MongosyncID string `bson:"id"`
22-
Lower any `bson:"lowerBound"`
36+
SourceUUID util.UUID `bson:"srcUUID"`
37+
Lower any `bson:"lowerBound"`
2338
}
2439

2540
// Namespace stores the database and collection name of the namespace being copied.
@@ -46,8 +61,8 @@ type Partition struct {
4661
// String returns a string representation of the partition.
4762
func (p *Partition) String() string {
4863
return fmt.Sprintf(
49-
"{db: %s, coll: %s, collUUID: %s, mongosyncID: %s, lower: %s, upper: %s}",
50-
p.Ns.DB, p.Ns.Coll, p.Key.SourceUUID, p.Key.MongosyncID, p.GetLowerBoundString(), p.GetUpperBoundString())
64+
"{db: %s, coll: %s, collUUID: %s, lower: %s, upper: %s}",
65+
p.Ns.DB, p.Ns.Coll, p.Key.SourceUUID, p.GetLowerBoundString(), p.GetUpperBoundString())
5166
}
5267

5368
// GetLowerBoundString returns the string representation of this partition's lower bound.

internal/partitions/partition_test.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -103,9 +103,8 @@ func getFilterFromFindOptions(opts bson.D) any {
103103
func (suite *UnitTestSuite) makeTestPartition() (Partition, bson.D) {
104104
partition := Partition{
105105
Key: PartitionKey{
106-
SourceUUID: util.NewUUID(),
107-
Lower: primitive.ObjectID([12]byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}),
108-
MongosyncID: "",
106+
SourceUUID: util.NewUUID(),
107+
Lower: primitive.ObjectID([12]byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}),
109108
},
110109
Ns: &Namespace{DB: "testDB", Coll: "testColl"},
111110
Upper: primitive.ObjectID([12]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2}),

internal/partitions/partitions.go

Lines changed: 16 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package partitions
33
import (
44
"context"
55
"fmt"
6-
"math/rand"
76

87
"github.com/10gen/migration-verifier/internal/logger"
98
"github.com/10gen/migration-verifier/internal/reportutils"
@@ -15,6 +14,7 @@ import (
1514
"github.com/pkg/errors"
1615
"go.mongodb.org/mongo-driver/bson"
1716
"go.mongodb.org/mongo-driver/mongo"
17+
"go.mongodb.org/mongo-driver/mongo/options"
1818
)
1919

2020
const (
@@ -29,13 +29,13 @@ const (
2929
// possible that $sample does a collection scan if the number of documents increases very quickly, but
3030
// that should be very rare.
3131
//
32-
defaultSampleRate = 0.04
32+
sampleRate = 0.04
3333

3434
//
3535
// The minimum number of documents $sample requires in order to use a pseudo-random cursor.
3636
// See: https://docs.mongodb.com/manual/reference/operator/aggregation/sample/#behavior
3737
//
38-
defaultSampleMinNumDocs = 101
38+
sampleMinNumDocs = 101
3939

4040
//
4141
// The maximum number of documents to sample per partition. Previously this is set to 10.
@@ -70,13 +70,6 @@ const (
7070
defaultPartitionSizeInBytes = 400 * 1024 * 1024 // = 400 MB
7171
)
7272

73-
// Replicator contains the id of a mongosync replicator.
74-
// It is used here to avoid changing the interface of partitioning (from the mongosync version)
75-
// overmuch.
76-
type Replicator struct {
77-
ID string `bson:"id"`
78-
}
79-
8073
// Partitions is a slice of partitions.
8174
type Partitions struct {
8275
logger *logger.Logger
@@ -123,7 +116,6 @@ func PartitionCollectionWithSize(
123116
ctx context.Context,
124117
uuidEntry *uuidutil.NamespaceAndUUID,
125118
srcClient *mongo.Client,
126-
replicatorList []Replicator,
127119
subLogger *logger.Logger,
128120
partitionSizeInBytes int64,
129121
globalFilter bson.D,
@@ -138,13 +130,10 @@ func PartitionCollectionWithSize(
138130
partitionSizeInBytes = defaultPartitionSizeInBytes
139131
}
140132

141-
partitions, docCount, byteCount, err := PartitionCollectionWithParameters(
133+
partitions, docCount, byteCount, err := partitionCollectionWithParameters(
142134
ctx,
143135
uuidEntry,
144136
srcClient,
145-
replicatorList,
146-
defaultSampleRate,
147-
defaultSampleMinNumDocs,
148137
partitionSizeInBytes,
149138
subLogger,
150139
globalFilter,
@@ -157,13 +146,10 @@ func PartitionCollectionWithSize(
157146
Str("filter", fmt.Sprintf("%+v", globalFilter)).
158147
Msg("Timed out while partitioning with filter. Continuing by partitioning without the filter.")
159148

160-
return PartitionCollectionWithParameters(
149+
return partitionCollectionWithParameters(
161150
ctx,
162151
uuidEntry,
163152
srcClient,
164-
replicatorList,
165-
defaultSampleRate,
166-
defaultSampleMinNumDocs,
167153
partitionSizeInBytes,
168154
subLogger,
169155
nil,
@@ -173,17 +159,14 @@ func PartitionCollectionWithSize(
173159
return partitions, docCount, byteCount, err
174160
}
175161

176-
// PartitionCollectionWithParameters is the implementation for
162+
// partitionCollectionWithParameters is the implementation for
177163
// PartitionCollection. It is only directly used in integration tests.
178-
// See PartitionCollectionWithParameters for a description of inputs
164+
// See partitionCollectionWithParameters for a description of inputs
179165
// & outputs. (Alas, the parameter order differs slightly here …)
180-
func PartitionCollectionWithParameters(
166+
func partitionCollectionWithParameters(
181167
ctx context.Context,
182168
uuidEntry *uuidutil.NamespaceAndUUID,
183169
srcClient *mongo.Client,
184-
replicatorList []Replicator,
185-
sampleRate float64,
186-
sampleMinNumDocs int,
187170
partitionSizeInBytes int64,
188171
subLogger *logger.Logger,
189172
globalFilter bson.D,
@@ -315,9 +298,6 @@ func PartitionCollectionWithParameters(
315298
Msg("_id bounds should outnumber partitions by 1.")
316299
}
317300

318-
// Choose a random index to start to avoid over-assigning partitions to a specific replicator.
319-
// rand.Int() generates non-negative integers only.
320-
replIndex := rand.Int() % len(replicatorList)
321301
subLogger.Debug().
322302
Int("numPartitions", len(allIDBounds)-1).
323303
Str("namespace", uuidEntry.DBName+"."+uuidEntry.CollName).
@@ -329,9 +309,8 @@ func PartitionCollectionWithParameters(
329309

330310
for i := 0; i < len(allIDBounds)-1; i++ {
331311
partitionKey := PartitionKey{
332-
SourceUUID: uuidEntry.UUID,
333-
MongosyncID: replicatorList[replIndex].ID,
334-
Lower: allIDBounds[i],
312+
SourceUUID: uuidEntry.UUID,
313+
Lower: allIDBounds[i],
335314
}
336315
partition := &Partition{
337316
Key: partitionKey,
@@ -340,8 +319,6 @@ func PartitionCollectionWithParameters(
340319
IsCapped: isCapped,
341320
}
342321
partitions = append(partitions, partition)
343-
344-
replIndex = (replIndex + 1) % len(replicatorList)
345322
}
346323

347324
return partitions, types.DocumentCount(collDocCount), types.ByteCount(collSizeInBytes), nil
@@ -641,13 +618,12 @@ func getMidIDBounds(
641618
WithCallback(
642619
func(ctx context.Context, ri *retry.FuncInfo) error {
643620
ri.Log(logger.Logger, "aggregate", "source", srcDB.Name(), collName, "Retrieving mid _id partition bounds using $sample.")
644-
cursor, cmdErr :=
645-
srcDB.RunCommandCursor(ctx, bson.D{
646-
{"aggregate", collName},
647-
{"pipeline", pipeline},
648-
{"allowDiskUse", true},
649-
{"cursor", bson.D{}},
650-
})
621+
cursor, cmdErr := ForPartitionAggregation(srcDB.Collection(collName)).
622+
Aggregate(
623+
ctx,
624+
pipeline,
625+
options.Aggregate().SetAllowDiskUse(true),
626+
)
651627

652628
if cmdErr != nil {
653629
return errors.Wrapf(cmdErr, "failed to $sample and $bucketAuto documents for source namespace '%s.%s'", srcDB.Name(), collName)

internal/reportutils/reportutils.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ func BytesToUnit[T num16Plus](count T, unit DataUnit) string {
9595
panic(fmt.Sprintf("Missing unit in unitSize: %s", unit))
9696
}
9797

98-
return FmtReal(util.Divide(count, myUnitSize))
98+
return FmtReal(util.DivideToF64(count, myUnitSize))
9999
}
100100

101101
// FmtReal provides a standard formatting of real numbers, with a consistent
@@ -128,7 +128,7 @@ func roundFloat(val float64, precision uint) float64 {
128128
}
129129

130130
func fmtQuotient[T, U realNum](dividend T, divisor U) string {
131-
return FmtReal(util.Divide(dividend, divisor))
131+
return FmtReal(util.DivideToF64(dividend, divisor))
132132
}
133133

134134
// FmtPercent returns a stringified percentage without a trailing `%`,

internal/util/math.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package util
22

33
import "github.com/10gen/migration-verifier/internal/types"
44

5-
// Divide is syntactic sugar around float64(numerator) / float64(denominator).
6-
func Divide[N types.RealNumber, D types.RealNumber](numerator N, denominator D) float64 {
5+
// DivideToF64 is syntactic sugar around float64(numerator) / float64(denominator).
6+
func DivideToF64[N types.RealNumber, D types.RealNumber](numerator N, denominator D) float64 {
77
return float64(numerator) / float64(denominator)
88
}

internal/verifier/migration_verifier.go

Lines changed: 39 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -757,21 +757,17 @@ func (verifier *Verifier) partitionAndInspectNamespace(ctx context.Context, name
757757
return nil, nil, 0, 0, err
758758
}
759759

760-
// The partitioner doles out ranges to replicators; we don't use that functionality so we just pass
761-
// one "replicator".
762-
replicator1 := partitions.Replicator{ID: "verifier"}
763-
replicators := []partitions.Replicator{replicator1}
764760
partitionList, srcDocs, srcBytes, err := partitions.PartitionCollectionWithSize(
765-
ctx, namespaceAndUUID, verifier.srcClient, replicators, verifier.logger, verifier.partitionSizeInBytes, verifier.globalFilter)
761+
ctx, namespaceAndUUID, verifier.srcClient, verifier.logger, verifier.partitionSizeInBytes, verifier.globalFilter)
766762
if err != nil {
767763
return nil, nil, 0, 0, err
768764
}
769765
// TODO: Test the empty collection (which returns no partitions)
770766
if len(partitionList) == 0 {
771767
partitionList = []*partitions.Partition{{
772768
Key: partitions.PartitionKey{
773-
SourceUUID: namespaceAndUUID.UUID,
774-
MongosyncID: "verifier"},
769+
SourceUUID: namespaceAndUUID.UUID,
770+
},
775771
Ns: &partitions.Namespace{
776772
DB: namespaceAndUUID.DBName,
777773
Coll: namespaceAndUUID.CollName}}}
@@ -1228,33 +1224,50 @@ func (verifier *Verifier) verifyMetadataAndPartitionCollection(
12281224
// matches between soruce & destination. Now we can partition the collection.
12291225

12301226
if task.Generation == 0 {
1231-
partitions, shardKeys, docsCount, bytesCount, err := verifier.partitionAndInspectNamespace(ctx, srcNs)
1232-
if err != nil {
1233-
return errors.Wrapf(
1234-
err,
1235-
"failed to partition collection %#q",
1236-
srcNs,
1237-
)
1227+
var partitionsCount int
1228+
var docsCount types.DocumentCount
1229+
var bytesCount types.ByteCount
1230+
1231+
if verifier.srcHasSampleRate() {
1232+
var err error
1233+
partitionsCount, docsCount, bytesCount, err = verifier.createPartitionTasksWithSampleRate(ctx, task)
1234+
if err != nil {
1235+
return errors.Wrapf(err, "partitioning %#q via $sampleRate", srcNs)
1236+
}
1237+
} else {
1238+
verifier.logger.Warn().
1239+
Msg("Source MongoDB version lacks $sampleRate. Using legacy partitioning logic. This may cause imbalanced partitions, which will impede performance.")
1240+
1241+
var partitions []*partitions.Partition
1242+
var shardKeys []string
1243+
1244+
partitions, shardKeys, docsCount, bytesCount, err = verifier.partitionAndInspectNamespace(ctx, srcNs)
1245+
if err != nil {
1246+
return errors.Wrapf(err, "partitioning %#q via $sample", srcNs)
1247+
}
1248+
1249+
partitionsCount = len(partitions)
1250+
1251+
for _, partition := range partitions {
1252+
_, err := verifier.InsertPartitionVerificationTask(ctx, partition, shardKeys, dstNs)
1253+
if err != nil {
1254+
return errors.Wrapf(
1255+
err,
1256+
"failed to insert a partition task for namespace %#q",
1257+
srcNs,
1258+
)
1259+
}
1260+
}
12381261
}
1262+
12391263
verifier.logger.Debug().
12401264
Int("workerNum", workerNum).
12411265
Str("namespace", srcNs).
1242-
Int("partitionsCount", len(partitions)).
1266+
Int("partitionsCount", partitionsCount).
12431267
Msg("Divided collection into partitions.")
12441268

12451269
task.SourceDocumentCount = docsCount
12461270
task.SourceByteCount = bytesCount
1247-
1248-
for _, partition := range partitions {
1249-
_, err := verifier.InsertPartitionVerificationTask(ctx, partition, shardKeys, dstNs)
1250-
if err != nil {
1251-
return errors.Wrapf(
1252-
err,
1253-
"failed to insert a partition task for namespace %#q",
1254-
srcNs,
1255-
)
1256-
}
1257-
}
12581271
}
12591272

12601273
if task.Status == verificationTaskProcessing {

internal/verifier/migration_verifier_test.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,56 @@ func TestIntegration(t *testing.T) {
6868
suite.Run(t, testSuite)
6969
}
7070

71+
func (suite *IntegrationTestSuite) TestPartitionEmptyCollection() {
72+
ctx := suite.Context()
73+
require := require.New(suite.T())
74+
75+
verifier := suite.BuildVerifier()
76+
77+
db := suite.srcMongoClient.Database(suite.DBNameForTest())
78+
collName := "stuff"
79+
require.NoError(db.CreateCollection(ctx, collName))
80+
81+
task := &VerificationTask{
82+
PrimaryKey: primitive.NewObjectID(),
83+
Generation: 0,
84+
Status: verificationTaskAdded,
85+
Type: verificationTaskVerifyCollection,
86+
QueryFilter: QueryFilter{
87+
Namespace: db.Name() + "." + collName,
88+
To: db.Name() + "." + collName,
89+
},
90+
}
91+
92+
partitions, docs, bytes, err := verifier.createPartitionTasksWithSampleRate(ctx, task)
93+
require.NoError(err, "should partition collection")
94+
95+
assert.EqualValues(suite.T(), 1, partitions, "should be 1 partition")
96+
assert.Zero(suite.T(), docs, "should be 0 docs")
97+
assert.Zero(suite.T(), bytes, "should be 0 bytes")
98+
99+
taskOpt, err := verifier.FindNextVerifyTaskAndUpdate(ctx)
100+
require.NoError(err, "should look up task")
101+
102+
foundTask, gotTask := taskOpt.Get()
103+
require.True(gotTask, "should find task")
104+
105+
require.Equal(verificationTaskVerifyDocuments, foundTask.Type, "task type")
106+
assert.Equal(
107+
suite.T(),
108+
primitive.MinKey{},
109+
foundTask.QueryFilter.Partition.Key.Lower,
110+
"min bound",
111+
)
112+
113+
assert.Equal(
114+
suite.T(),
115+
primitive.MaxKey{},
116+
foundTask.QueryFilter.Partition.Upper,
117+
"max bound",
118+
)
119+
}
120+
71121
func (suite *IntegrationTestSuite) TestProcessVerifyTask_Failure() {
72122
verifier := suite.BuildVerifier()
73123
ctx := suite.Context()

0 commit comments

Comments
 (0)