Skip to content

Commit 7c547be

Browse files
committed
detect sharding mismatches
1 parent 2c7fc0c commit 7c547be

File tree

10 files changed

+286
-89
lines changed

10 files changed

+286
-89
lines changed

internal/partitions/partition.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ func (p *Partition) FindCmd(
137137
// (e.g. use the partitions on the source to read the destination for verification)
138138
// If the passed-in buildinfo indicates a mongodb version < 5.0, type bracketing is not used.
139139
// filterAndPredicates is a slice of filter criteria that's used to construct the "filter" field in the find option.
140-
func (p *Partition) GetFindOptions(buildInfo *util.BuildInfo, filterAndPredicates bson.A) bson.D {
140+
func (p *Partition) GetFindOptions(buildInfo *util.ClusterInfo, filterAndPredicates bson.A) bson.D {
141141
if p == nil {
142142
if len(filterAndPredicates) > 0 {
143143
return bson.D{{"filter", bson.D{{"$and", filterAndPredicates}}}}

internal/partitions/partition_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -78,37 +78,37 @@ func (suite *UnitTestSuite) TestVersioning() {
7878
suite.Require().Equal(expectedFilter, filter)
7979

8080
// 6.0
81-
findOptions = partition.GetFindOptions(&util.BuildInfo{VersionArray: []int{6, 0, 0}}, nil)
81+
findOptions = partition.GetFindOptions(&util.ClusterInfo{VersionArray: []int{6, 0, 0}}, nil)
8282
filter = getFilterFromFindOptions(findOptions)
8383
suite.Require().Equal(expectedFilter, filter)
8484

8585
// 5.3.0.9
86-
findOptions = partition.GetFindOptions(&util.BuildInfo{VersionArray: []int{5, 3, 0, 9}}, nil)
86+
findOptions = partition.GetFindOptions(&util.ClusterInfo{VersionArray: []int{5, 3, 0, 9}}, nil)
8787
filter = getFilterFromFindOptions(findOptions)
8888
suite.Require().Equal(expectedFilter, filter)
8989

9090
// 7.1.3.5
91-
findOptions = partition.GetFindOptions(&util.BuildInfo{VersionArray: []int{7, 1, 3, 5}}, nil)
91+
findOptions = partition.GetFindOptions(&util.ClusterInfo{VersionArray: []int{7, 1, 3, 5}}, nil)
9292
filter = getFilterFromFindOptions(findOptions)
9393
suite.Require().Equal(expectedFilter, filter)
9494

9595
// 4.4 (int64)
96-
findOptions = partition.GetFindOptions(&util.BuildInfo{VersionArray: []int{4, 4, 0, 0}}, nil)
96+
findOptions = partition.GetFindOptions(&util.ClusterInfo{VersionArray: []int{4, 4, 0, 0}}, nil)
9797
filter = getFilterFromFindOptions(findOptions)
9898
suite.Require().Equal(expectedFilterWithTypeBracketing, filter)
9999

100100
// 4.4
101-
findOptions = partition.GetFindOptions(&util.BuildInfo{VersionArray: []int{4, 4, 0, 0}}, nil)
101+
findOptions = partition.GetFindOptions(&util.ClusterInfo{VersionArray: []int{4, 4, 0, 0}}, nil)
102102
filter = getFilterFromFindOptions(findOptions)
103103
suite.Require().Equal(expectedFilterWithTypeBracketing, filter)
104104

105105
// 4.2
106-
findOptions = partition.GetFindOptions(&util.BuildInfo{VersionArray: []int{4, 2, 0, 0}}, nil)
106+
findOptions = partition.GetFindOptions(&util.ClusterInfo{VersionArray: []int{4, 2, 0, 0}}, nil)
107107
filter = getFilterFromFindOptions(findOptions)
108108
suite.Require().Equal(expectedFilterWithTypeBracketing, filter)
109109

110110
// No version array -- assume old, require type bracketing.
111-
findOptions = partition.GetFindOptions(&util.BuildInfo{}, nil)
111+
findOptions = partition.GetFindOptions(&util.ClusterInfo{}, nil)
112112
filter = getFilterFromFindOptions(findOptions)
113113
suite.Require().Equal(expectedFilterWithTypeBracketing, filter)
114114
}

internal/retry/retry.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,12 @@ import (
1111
"go.mongodb.org/mongo-driver/bson"
1212
)
1313

14-
// RunForUUIDAndTransientErrors retries f() for the CollectionUUIDMismatch error and for transient errors.
14+
// RunForUUIDAndTransientErrors should never be used. Its presence here
15+
// is a vestige of the retryer's origin in mongosync. Since migration-verifier
16+
// forbids DDL operations, a namespace should be just as stable as its UUID; thus,
17+
// this function is needless complexity.
18+
//
19+
// This function retries f() for the CollectionUUIDMismatch error and for transient errors.
1520
// This should be used to run a driver operation that optionally specifies the `collectionUUID` parameter
1621
// for a collection that may have been:
1722
//
@@ -34,15 +39,20 @@ import (
3439
// f() is provided with a collection name string, which is the one that should be used in the body
3540
// of f() when a collection name is needed. The initial value of this string is expectedCollName.
3641
//
37-
// RunForUUIDAndTransientErrors always returns the collection's current name. It returns
42+
// This function always returns the collection's current name. It returns
3843
// an error if the duration limit is reached, or if f() returns a non-transient error.
3944
func (r *Retryer) RunForUUIDAndTransientErrors(
4045
ctx context.Context, logger *logger.Logger, expectedCollName string, f func(*Info, string) error,
4146
) (string, error) {
4247
return r.runRetryLoop(ctx, logger, expectedCollName, f, true, true)
4348
}
4449

45-
// RunForUUIDErrorOnly retries f() for the CollectionUUIDMismatch error only. This should primarily
50+
// RunForUUIDErrorOnly should never be used. Its presence here
51+
// is a vestige of the retryer's origin in mongosync. Since migration-verifier
52+
// forbids DDL operations, a namespace should be just as stable as its UUID; thus,
53+
// this function is needless complexity.
54+
//
55+
// This function retries f() for the CollectionUUIDMismatch error only. This should primarily
4656
// be used to wrap a transaction callback containing an operation that specifies the `collectionUUID`
4757
// parameter for a collection that may have been:
4858
//
@@ -57,7 +67,7 @@ func (r *Retryer) RunForUUIDAndTransientErrors(
5767
// f() is provided with a collection name string, which is the one that should be used in the body
5868
// of f() where a collection name is needed. The initial value of this string is expectedCollName.
5969
//
60-
// RunForUUIDErrorOnly returns the collection's current name in all cases.
70+
// This function returns the collection's current name in all cases.
6171
func (r *Retryer) RunForUUIDErrorOnly(
6272
ctx context.Context, logger *logger.Logger, expectedCollName string, f func(*Info, string) error,
6373
) (string, error) {

internal/util/buildinfo.go

Lines changed: 0 additions & 31 deletions
This file was deleted.

internal/util/clusterinfo.go

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
package util
2+
3+
import (
4+
"context"
5+
6+
"github.com/10gen/migration-verifier/mbson"
7+
"github.com/pkg/errors"
8+
"github.com/samber/lo"
9+
"go.mongodb.org/mongo-driver/bson"
10+
"go.mongodb.org/mongo-driver/mongo"
11+
)
12+
13+
type ClusterTopology string
14+
15+
type ClusterInfo struct {
16+
VersionArray []int
17+
Topology ClusterTopology
18+
}
19+
20+
const (
21+
TopologySharded ClusterTopology = "sharded"
22+
TopologyReplset ClusterTopology = "replset"
23+
)
24+
25+
func GetClusterInfo(ctx context.Context, client *mongo.Client) (ClusterInfo, error) {
26+
va, err := getVersionArray(ctx, client)
27+
if err != nil {
28+
return ClusterInfo{}, errors.Wrap(err, "failed to fetch version array")
29+
}
30+
31+
topology, err := getTopology(ctx, client)
32+
if err != nil {
33+
return ClusterInfo{}, errors.Wrap(err, "failed to determine topology")
34+
}
35+
36+
return ClusterInfo{
37+
VersionArray: va,
38+
Topology: topology,
39+
}, nil
40+
}
41+
42+
func getVersionArray(ctx context.Context, client *mongo.Client) ([]int, error) {
43+
commandResult := client.Database("admin").RunCommand(ctx, bson.D{{"buildinfo", 1}})
44+
45+
rawResp, err := commandResult.Raw()
46+
if err != nil {
47+
return nil, errors.Wrapf(err, "failed to run %#q", "buildinfo")
48+
}
49+
50+
var va []int
51+
_, err = mbson.RawLookup(rawResp, &va, "versionArray")
52+
if err != nil {
53+
return nil, errors.Wrap(err, "failed to decode build info version array")
54+
}
55+
56+
return va, nil
57+
}
58+
59+
func getTopology(ctx context.Context, client *mongo.Client) (ClusterTopology, error) {
60+
resp := client.Database("admin").RunCommand(
61+
ctx,
62+
bson.D{{"hello", 1}},
63+
)
64+
65+
hello := struct {
66+
Msg string
67+
}{}
68+
69+
if err := resp.Decode(&hello); err != nil {
70+
return "", errors.Wrapf(
71+
err,
72+
"failed to decode %#q response",
73+
"hello",
74+
)
75+
}
76+
77+
return lo.Ternary(hello.Msg == "isdbgrid", TopologySharded, TopologyReplset), nil
78+
}

internal/util/sharding.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package util
2+
3+
import (
4+
"context"
5+
6+
"github.com/10gen/migration-verifier/option"
7+
"github.com/pkg/errors"
8+
"go.mongodb.org/mongo-driver/bson"
9+
"go.mongodb.org/mongo-driver/mongo"
10+
"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
11+
)
12+
13+
const (
14+
configDBName = "config"
15+
collsCollName = "collections"
16+
)
17+
18+
// GetShardKey returns the collection's shard key, or an empty option
19+
// if the collection is unsharded.
20+
func GetShardKey(
21+
ctx context.Context,
22+
coll *mongo.Collection,
23+
) (option.Option[bson.Raw], error) {
24+
namespace := coll.Database().Name() + "." + coll.Name()
25+
26+
configCollectionsColl := coll.Database().Client().
27+
Database(configDBName).
28+
Collection(collsCollName)
29+
30+
rawResult, err := configCollectionsColl.FindOne(ctx, bson.D{{"_id", namespace}}).Raw()
31+
if errors.Is(err, mongo.ErrNoDocuments) {
32+
return option.None[bson.Raw](), nil
33+
} else if err != nil {
34+
return option.None[bson.Raw](), errors.Wrapf(
35+
err,
36+
"failed to find sharding info for %#q",
37+
namespace,
38+
)
39+
}
40+
41+
keyAsVal, err := rawResult.LookupErr("key")
42+
if errors.Is(err, bsoncore.ErrElementNotFound) {
43+
return option.None[bson.Raw](), nil
44+
} else if err != nil {
45+
return option.None[bson.Raw](), errors.Wrapf(
46+
err,
47+
"failed to find %#q in %#q's %#q entry",
48+
"key",
49+
namespace,
50+
FullName(configCollectionsColl),
51+
)
52+
}
53+
54+
keyAsRaw, isDoc := keyAsVal.DocumentOK()
55+
if !isDoc {
56+
return option.None[bson.Raw](), errors.Errorf(
57+
"%#q in %#q's %#q entry is of type %#q, not an object",
58+
"key",
59+
namespace,
60+
FullName(configCollectionsColl),
61+
keyAsVal.Type,
62+
)
63+
}
64+
65+
return option.Some(keyAsRaw), nil
66+
}

internal/verifier/change_stream.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -315,7 +315,7 @@ func (verifier *Verifier) createChangeStream(
315315
SetMaxAwaitTime(1 * time.Second).
316316
SetFullDocument(options.UpdateLookup)
317317

318-
if verifier.srcBuildInfo.VersionArray[0] >= 6 {
318+
if verifier.srcClusterInfo.VersionArray[0] >= 6 {
319319
opts = opts.SetCustomPipeline(bson.M{"showExpandedEvents": true})
320320
}
321321

internal/verifier/change_stream_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -416,7 +416,7 @@ func (suite *IntegrationTestSuite) testInsertsBeforeWritesOff(docsCount int) {
416416

417417
func (suite *IntegrationTestSuite) TestCreateForbidden() {
418418
ctx := suite.Context()
419-
buildInfo, err := util.GetBuildInfo(ctx, suite.srcMongoClient)
419+
buildInfo, err := util.GetClusterInfo(ctx, suite.srcMongoClient)
420420
suite.Require().NoError(err)
421421

422422
if buildInfo.VersionArray[0] < 6 {

internal/verifier/compare.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,7 @@ func (verifier *Verifier) getFetcherChannels(
266266
cursor, err := verifier.getDocumentsCursor(
267267
ctx,
268268
verifier.srcClientCollection(task),
269-
verifier.srcBuildInfo,
269+
verifier.srcClusterInfo,
270270
verifier.srcStartAtTs,
271271
task,
272272
)
@@ -290,7 +290,7 @@ func (verifier *Verifier) getFetcherChannels(
290290
cursor, err := verifier.getDocumentsCursor(
291291
ctx,
292292
verifier.dstClientCollection(task),
293-
verifier.dstBuildInfo,
293+
verifier.dstClusterInfo,
294294
nil, //startAtTs
295295
task,
296296
)

0 commit comments

Comments
 (0)