Skip to content

Commit bfc4b82

Browse files
committed
rename & plug logic in
1 parent 59def23 commit bfc4b82

File tree

9 files changed

+132
-55
lines changed

9 files changed

+132
-55
lines changed

internal/partitions/partition.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ func (p *Partition) FindCmd(
137137
// (e.g. use the partitions on the source to read the destination for verification)
138138
// If the passed-in buildinfo indicates a mongodb version < 5.0, type bracketing is not used.
139139
// filterAndPredicates is a slice of filter criteria that's used to construct the "filter" field in the find option.
140-
func (p *Partition) GetFindOptions(buildInfo *util.BuildInfo, filterAndPredicates bson.A) bson.D {
140+
func (p *Partition) GetFindOptions(clusterInfo *util.ClusterInfo, filterAndPredicates bson.A) bson.D {
141141
if p == nil {
142142
if len(filterAndPredicates) > 0 {
143143
return bson.D{{"filter", bson.D{{"$and", filterAndPredicates}}}}
@@ -158,11 +158,11 @@ func (p *Partition) GetFindOptions(buildInfo *util.BuildInfo, filterAndPredicate
158158
// For non-capped collections, the cursor should use the ID filter and the _id index.
159159
// Get the bounded query filter from the partition to be used in the Find command.
160160
allowTypeBracketing := false
161-
if buildInfo != nil {
161+
if clusterInfo != nil {
162162
allowTypeBracketing = true
163163

164-
if buildInfo.VersionArray != nil {
165-
allowTypeBracketing = buildInfo.VersionArray[0] < 5
164+
if clusterInfo.VersionArray != nil {
165+
allowTypeBracketing = clusterInfo.VersionArray[0] < 5
166166
}
167167
}
168168
if !allowTypeBracketing {

internal/partitions/partition_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -78,37 +78,37 @@ func (suite *UnitTestSuite) TestVersioning() {
7878
suite.Require().Equal(expectedFilter, filter)
7979

8080
// 6.0
81-
findOptions = partition.GetFindOptions(&util.BuildInfo{VersionArray: []int{6, 0, 0}}, nil)
81+
findOptions = partition.GetFindOptions(&util.ClusterInfo{VersionArray: []int{6, 0, 0}}, nil)
8282
filter = getFilterFromFindOptions(findOptions)
8383
suite.Require().Equal(expectedFilter, filter)
8484

8585
// 5.3.0.9
86-
findOptions = partition.GetFindOptions(&util.BuildInfo{VersionArray: []int{5, 3, 0, 9}}, nil)
86+
findOptions = partition.GetFindOptions(&util.ClusterInfo{VersionArray: []int{5, 3, 0, 9}}, nil)
8787
filter = getFilterFromFindOptions(findOptions)
8888
suite.Require().Equal(expectedFilter, filter)
8989

9090
// 7.1.3.5
91-
findOptions = partition.GetFindOptions(&util.BuildInfo{VersionArray: []int{7, 1, 3, 5}}, nil)
91+
findOptions = partition.GetFindOptions(&util.ClusterInfo{VersionArray: []int{7, 1, 3, 5}}, nil)
9292
filter = getFilterFromFindOptions(findOptions)
9393
suite.Require().Equal(expectedFilter, filter)
9494

9595
// 4.4 (int64)
96-
findOptions = partition.GetFindOptions(&util.BuildInfo{VersionArray: []int{4, 4, 0, 0}}, nil)
96+
findOptions = partition.GetFindOptions(&util.ClusterInfo{VersionArray: []int{4, 4, 0, 0}}, nil)
9797
filter = getFilterFromFindOptions(findOptions)
9898
suite.Require().Equal(expectedFilterWithTypeBracketing, filter)
9999

100100
// 4.4
101-
findOptions = partition.GetFindOptions(&util.BuildInfo{VersionArray: []int{4, 4, 0, 0}}, nil)
101+
findOptions = partition.GetFindOptions(&util.ClusterInfo{VersionArray: []int{4, 4, 0, 0}}, nil)
102102
filter = getFilterFromFindOptions(findOptions)
103103
suite.Require().Equal(expectedFilterWithTypeBracketing, filter)
104104

105105
// 4.2
106-
findOptions = partition.GetFindOptions(&util.BuildInfo{VersionArray: []int{4, 2, 0, 0}}, nil)
106+
findOptions = partition.GetFindOptions(&util.ClusterInfo{VersionArray: []int{4, 2, 0, 0}}, nil)
107107
filter = getFilterFromFindOptions(findOptions)
108108
suite.Require().Equal(expectedFilterWithTypeBracketing, filter)
109109

110110
// No version array -- assume old, require type bracketing.
111-
findOptions = partition.GetFindOptions(&util.BuildInfo{}, nil)
111+
findOptions = partition.GetFindOptions(&util.ClusterInfo{}, nil)
112112
filter = getFilterFromFindOptions(findOptions)
113113
suite.Require().Equal(expectedFilterWithTypeBracketing, filter)
114114
}

internal/util/buildinfo.go

Lines changed: 0 additions & 31 deletions
This file was deleted.

internal/util/clusterinfo.go

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
package util
2+
3+
import (
4+
"context"
5+
6+
"github.com/10gen/migration-verifier/mbson"
7+
"github.com/pkg/errors"
8+
"github.com/samber/lo"
9+
"go.mongodb.org/mongo-driver/bson"
10+
"go.mongodb.org/mongo-driver/mongo"
11+
)
12+
13+
type ClusterTopology string
14+
15+
type ClusterInfo struct {
16+
VersionArray []int
17+
Topology ClusterTopology
18+
}
19+
20+
const (
21+
TopologySharded ClusterTopology = "sharded"
22+
TopologyReplset ClusterTopology = "replset"
23+
)
24+
25+
func GetClusterInfo(ctx context.Context, client *mongo.Client) (ClusterInfo, error) {
26+
va, err := getVersionArray(ctx, client)
27+
if err != nil {
28+
return ClusterInfo{}, errors.Wrap(err, "failed to fetch version array")
29+
}
30+
31+
topology, err := getTopology(ctx, client)
32+
if err != nil {
33+
return ClusterInfo{}, errors.Wrap(err, "failed to determine topology")
34+
}
35+
36+
return ClusterInfo{
37+
VersionArray: va,
38+
Topology: topology,
39+
}, nil
40+
}
41+
42+
func getVersionArray(ctx context.Context, client *mongo.Client) ([]int, error) {
43+
commandResult := client.Database("admin").RunCommand(ctx, bson.D{{"buildinfo", 1}})
44+
45+
rawResp, err := commandResult.Raw()
46+
if err != nil {
47+
return nil, errors.Wrapf(err, "failed to run %#q", "buildinfo")
48+
}
49+
50+
var va []int
51+
_, err = mbson.RawLookup(rawResp, &va, "versionArray")
52+
if err != nil {
53+
return nil, errors.Wrap(err, "failed to decode build info version array")
54+
}
55+
56+
return va, nil
57+
}
58+
59+
func getTopology(ctx context.Context, client *mongo.Client) (ClusterTopology, error) {
60+
resp := client.Database("admin").RunCommand(
61+
ctx,
62+
bson.D{{"hello", 1}},
63+
)
64+
65+
hello := struct {
66+
Msg string
67+
}{}
68+
69+
if err := resp.Decode(&hello); err != nil {
70+
return "", errors.Wrapf(
71+
err,
72+
"failed to decode %#q response",
73+
"hello",
74+
)
75+
}
76+
77+
return lo.Ternary(hello.Msg == "isdbgrid", TopologySharded, TopologyReplset), nil
78+
}

internal/verifier/change_stream.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -315,7 +315,7 @@ func (verifier *Verifier) createChangeStream(
315315
SetMaxAwaitTime(1 * time.Second).
316316
SetFullDocument(options.UpdateLookup)
317317

318-
if verifier.srcBuildInfo.VersionArray[0] >= 6 {
318+
if verifier.srcClusterInfo.VersionArray[0] >= 6 {
319319
opts = opts.SetCustomPipeline(bson.M{"showExpandedEvents": true})
320320
}
321321

internal/verifier/change_stream_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -416,7 +416,7 @@ func (suite *IntegrationTestSuite) testInsertsBeforeWritesOff(docsCount int) {
416416

417417
func (suite *IntegrationTestSuite) TestCreateForbidden() {
418418
ctx := suite.Context()
419-
buildInfo, err := util.GetBuildInfo(ctx, suite.srcMongoClient)
419+
buildInfo, err := util.GetClusterInfo(ctx, suite.srcMongoClient)
420420
suite.Require().NoError(err)
421421

422422
if buildInfo.VersionArray[0] < 6 {

internal/verifier/compare.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,7 @@ func (verifier *Verifier) getFetcherChannels(
266266
cursor, err := verifier.getDocumentsCursor(
267267
ctx,
268268
verifier.srcClientCollection(task),
269-
verifier.srcBuildInfo,
269+
verifier.srcClusterInfo,
270270
verifier.srcStartAtTs,
271271
task,
272272
)
@@ -290,7 +290,7 @@ func (verifier *Verifier) getFetcherChannels(
290290
cursor, err := verifier.getDocumentsCursor(
291291
ctx,
292292
verifier.dstClientCollection(task),
293-
verifier.dstBuildInfo,
293+
verifier.dstClusterInfo,
294294
nil, //startAtTs
295295
task,
296296
)

internal/verifier/migration_verifier.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,8 @@ type Verifier struct {
9090
metaClient *mongo.Client
9191
srcClient *mongo.Client
9292
dstClient *mongo.Client
93-
srcBuildInfo *util.BuildInfo
94-
dstBuildInfo *util.BuildInfo
93+
srcClusterInfo *util.BuildInfo
94+
dstClusterInfo *util.BuildInfo
9595
numWorkers int
9696
failureDisplaySize int64
9797

internal/verifier/uri.go

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,29 @@ func (verifier *Verifier) SetSrcURI(ctx context.Context, uri string) error {
1717
return errors.Wrapf(err, "failed to connect to source %#q", uri)
1818
}
1919

20-
buildInfo, err := util.GetBuildInfo(ctx, verifier.srcClient)
20+
clusterInfo, err := util.GetClusterInfo(ctx, verifier.srcClient)
2121
if err != nil {
2222
return errors.Wrap(err, "failed to read source build info")
2323
}
2424

25-
verifier.srcBuildInfo = &buildInfo
25+
verifier.srcClusterInfo = &clusterInfo
2626

27-
return checkURIAgainstServerVersion(uri, buildInfo)
27+
if clusterInfo.Topology == util.TopologySharded {
28+
err := RefreshAllMongosInstances(
29+
ctx,
30+
verifier.logger,
31+
opts,
32+
)
33+
34+
if err != nil {
35+
return errors.Wrap(
36+
err,
37+
"failed to refresh source mongos instances",
38+
)
39+
}
40+
}
41+
42+
return checkURIAgainstServerVersion(uri, clusterInfo)
2843
}
2944

3045
func (verifier *Verifier) SetDstURI(ctx context.Context, uri string) error {
@@ -35,17 +50,32 @@ func (verifier *Verifier) SetDstURI(ctx context.Context, uri string) error {
3550
return errors.Wrapf(err, "failed to connect to destination %#q", uri)
3651
}
3752

38-
buildInfo, err := util.GetBuildInfo(ctx, verifier.dstClient)
53+
clusterInfo, err := util.GetClusterInfo(ctx, verifier.dstClient)
3954
if err != nil {
4055
return errors.Wrap(err, "failed to read destination build info")
4156
}
4257

43-
verifier.dstBuildInfo = &buildInfo
58+
verifier.dstClusterInfo = &clusterInfo
59+
60+
if clusterInfo.Topology == util.TopologySharded {
61+
err := RefreshAllMongosInstances(
62+
ctx,
63+
verifier.logger,
64+
opts,
65+
)
66+
67+
if err != nil {
68+
return errors.Wrap(
69+
err,
70+
"failed to refresh source mongos instances",
71+
)
72+
}
73+
}
4474

45-
return checkURIAgainstServerVersion(uri, buildInfo)
75+
return checkURIAgainstServerVersion(uri, clusterInfo)
4676
}
4777

48-
func checkURIAgainstServerVersion(uri string, bi util.BuildInfo) error {
78+
func checkURIAgainstServerVersion(uri string, bi util.ClusterInfo) error {
4979
if bi.VersionArray[0] >= 5 {
5080
return nil
5181
}

0 commit comments

Comments
 (0)