Skip to content

Commit 5447ce5

Browse files
authored
REP-5337 Flush mongos router configs. (#64)
This accommodates SERVER-32198 by: a) flushing all mongos configs whenever it connects to a pre-v5 sharded cluster b) forbidding SRV connection strings for pre-v5 clusters
1 parent baea449 commit 5447ce5

15 files changed

+413
-97
lines changed

internal/partitions/partition.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ func (p *Partition) FindCmd(
137137
// (e.g. use the partitions on the source to read the destination for verification)
138138
// If the passed-in buildinfo indicates a mongodb version < 5.0, type bracketing is not used.
139139
// filterAndPredicates is a slice of filter criteria that's used to construct the "filter" field in the find option.
140-
func (p *Partition) GetFindOptions(buildInfo *util.BuildInfo, filterAndPredicates bson.A) bson.D {
140+
func (p *Partition) GetFindOptions(clusterInfo *util.ClusterInfo, filterAndPredicates bson.A) bson.D {
141141
if p == nil {
142142
if len(filterAndPredicates) > 0 {
143143
return bson.D{{"filter", bson.D{{"$and", filterAndPredicates}}}}
@@ -158,11 +158,11 @@ func (p *Partition) GetFindOptions(buildInfo *util.BuildInfo, filterAndPredicate
158158
// For non-capped collections, the cursor should use the ID filter and the _id index.
159159
// Get the bounded query filter from the partition to be used in the Find command.
160160
allowTypeBracketing := false
161-
if buildInfo != nil {
161+
if clusterInfo != nil {
162162
allowTypeBracketing = true
163163

164-
if buildInfo.VersionArray != nil {
165-
allowTypeBracketing = buildInfo.VersionArray[0] < 5
164+
if clusterInfo.VersionArray != nil {
165+
allowTypeBracketing = clusterInfo.VersionArray[0] < 5
166166
}
167167
}
168168
if !allowTypeBracketing {

internal/partitions/partition_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -78,37 +78,37 @@ func (suite *UnitTestSuite) TestVersioning() {
7878
suite.Require().Equal(expectedFilter, filter)
7979

8080
// 6.0
81-
findOptions = partition.GetFindOptions(&util.BuildInfo{VersionArray: []int{6, 0, 0}}, nil)
81+
findOptions = partition.GetFindOptions(&util.ClusterInfo{VersionArray: []int{6, 0, 0}}, nil)
8282
filter = getFilterFromFindOptions(findOptions)
8383
suite.Require().Equal(expectedFilter, filter)
8484

8585
// 5.3.0.9
86-
findOptions = partition.GetFindOptions(&util.BuildInfo{VersionArray: []int{5, 3, 0, 9}}, nil)
86+
findOptions = partition.GetFindOptions(&util.ClusterInfo{VersionArray: []int{5, 3, 0, 9}}, nil)
8787
filter = getFilterFromFindOptions(findOptions)
8888
suite.Require().Equal(expectedFilter, filter)
8989

9090
// 7.1.3.5
91-
findOptions = partition.GetFindOptions(&util.BuildInfo{VersionArray: []int{7, 1, 3, 5}}, nil)
91+
findOptions = partition.GetFindOptions(&util.ClusterInfo{VersionArray: []int{7, 1, 3, 5}}, nil)
9292
filter = getFilterFromFindOptions(findOptions)
9393
suite.Require().Equal(expectedFilter, filter)
9494

9595
// 4.4 (int64)
96-
findOptions = partition.GetFindOptions(&util.BuildInfo{VersionArray: []int{4, 4, 0, 0}}, nil)
96+
findOptions = partition.GetFindOptions(&util.ClusterInfo{VersionArray: []int{4, 4, 0, 0}}, nil)
9797
filter = getFilterFromFindOptions(findOptions)
9898
suite.Require().Equal(expectedFilterWithTypeBracketing, filter)
9999

100100
// 4.4
101-
findOptions = partition.GetFindOptions(&util.BuildInfo{VersionArray: []int{4, 4, 0, 0}}, nil)
101+
findOptions = partition.GetFindOptions(&util.ClusterInfo{VersionArray: []int{4, 4, 0, 0}}, nil)
102102
filter = getFilterFromFindOptions(findOptions)
103103
suite.Require().Equal(expectedFilterWithTypeBracketing, filter)
104104

105105
// 4.2
106-
findOptions = partition.GetFindOptions(&util.BuildInfo{VersionArray: []int{4, 2, 0, 0}}, nil)
106+
findOptions = partition.GetFindOptions(&util.ClusterInfo{VersionArray: []int{4, 2, 0, 0}}, nil)
107107
filter = getFilterFromFindOptions(findOptions)
108108
suite.Require().Equal(expectedFilterWithTypeBracketing, filter)
109109

110110
// No version array -- assume old, require type bracketing.
111-
findOptions = partition.GetFindOptions(&util.BuildInfo{}, nil)
111+
findOptions = partition.GetFindOptions(&util.ClusterInfo{}, nil)
112112
filter = getFilterFromFindOptions(findOptions)
113113
suite.Require().Equal(expectedFilterWithTypeBracketing, filter)
114114
}

internal/util/buildinfo.go

Lines changed: 0 additions & 31 deletions
This file was deleted.

internal/util/clusterinfo.go

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
package util
2+
3+
import (
4+
"context"
5+
6+
"github.com/10gen/migration-verifier/mbson"
7+
"github.com/pkg/errors"
8+
"github.com/samber/lo"
9+
"go.mongodb.org/mongo-driver/bson"
10+
"go.mongodb.org/mongo-driver/mongo"
11+
)
12+
13+
type ClusterTopology string
14+
15+
type ClusterInfo struct {
16+
VersionArray []int
17+
Topology ClusterTopology
18+
}
19+
20+
const (
21+
TopologySharded ClusterTopology = "sharded"
22+
TopologyReplset ClusterTopology = "replset"
23+
)
24+
25+
func GetClusterInfo(ctx context.Context, client *mongo.Client) (ClusterInfo, error) {
26+
va, err := getVersionArray(ctx, client)
27+
if err != nil {
28+
return ClusterInfo{}, errors.Wrap(err, "failed to fetch version array")
29+
}
30+
31+
topology, err := getTopology(ctx, client)
32+
if err != nil {
33+
return ClusterInfo{}, errors.Wrap(err, "failed to determine topology")
34+
}
35+
36+
return ClusterInfo{
37+
VersionArray: va,
38+
Topology: topology,
39+
}, nil
40+
}
41+
42+
func getVersionArray(ctx context.Context, client *mongo.Client) ([]int, error) {
43+
commandResult := client.Database("admin").RunCommand(ctx, bson.D{{"buildinfo", 1}})
44+
45+
rawResp, err := commandResult.Raw()
46+
if err != nil {
47+
return nil, errors.Wrapf(err, "failed to run %#q", "buildinfo")
48+
}
49+
50+
var va []int
51+
_, err = mbson.RawLookup(rawResp, &va, "versionArray")
52+
if err != nil {
53+
return nil, errors.Wrap(err, "failed to decode build info version array")
54+
}
55+
56+
return va, nil
57+
}
58+
59+
func getTopology(ctx context.Context, client *mongo.Client) (ClusterTopology, error) {
60+
resp := client.Database("admin").RunCommand(
61+
ctx,
62+
bson.D{{"hello", 1}},
63+
)
64+
65+
hello := struct {
66+
Msg string
67+
}{}
68+
69+
if err := resp.Decode(&hello); err != nil {
70+
return "", errors.Wrapf(
71+
err,
72+
"failed to decode %#q response",
73+
"hello",
74+
)
75+
}
76+
77+
return lo.Ternary(hello.Msg == "isdbgrid", TopologySharded, TopologyReplset), nil
78+
}

internal/verifier/change_stream.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -315,7 +315,7 @@ func (verifier *Verifier) createChangeStream(
315315
SetMaxAwaitTime(1 * time.Second).
316316
SetFullDocument(options.UpdateLookup)
317317

318-
if verifier.srcBuildInfo.VersionArray[0] >= 6 {
318+
if verifier.srcClusterInfo.VersionArray[0] >= 6 {
319319
opts = opts.SetCustomPipeline(bson.M{"showExpandedEvents": true})
320320
}
321321

internal/verifier/change_stream_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -416,7 +416,7 @@ func (suite *IntegrationTestSuite) testInsertsBeforeWritesOff(docsCount int) {
416416

417417
func (suite *IntegrationTestSuite) TestCreateForbidden() {
418418
ctx := suite.Context()
419-
buildInfo, err := util.GetBuildInfo(ctx, suite.srcMongoClient)
419+
buildInfo, err := util.GetClusterInfo(ctx, suite.srcMongoClient)
420420
suite.Require().NoError(err)
421421

422422
if buildInfo.VersionArray[0] < 6 {

internal/verifier/compare.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -294,7 +294,7 @@ func (verifier *Verifier) getFetcherChannels(
294294
cursor, err := verifier.getDocumentsCursor(
295295
ctx,
296296
verifier.srcClientCollection(task),
297-
verifier.srcBuildInfo,
297+
verifier.srcClusterInfo,
298298
verifier.srcStartAtTs,
299299
task,
300300
)
@@ -318,7 +318,7 @@ func (verifier *Verifier) getFetcherChannels(
318318
cursor, err := verifier.getDocumentsCursor(
319319
ctx,
320320
verifier.dstClientCollection(task),
321-
verifier.dstBuildInfo,
321+
verifier.dstClusterInfo,
322322
nil, //startAtTs
323323
task,
324324
)

internal/verifier/integration_test_suite.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ func (suite *IntegrationTestSuite) BuildVerifier() *Verifier {
161161
qfilter := QueryFilter{Namespace: "keyhole.dealers"}
162162
task := VerificationTask{QueryFilter: qfilter}
163163

164-
verifier := NewVerifier(VerifierSettings{})
164+
verifier := NewVerifier(VerifierSettings{}, "stderr")
165165
//verifier.SetStartClean(true)
166166
verifier.SetNumWorkers(3)
167167
verifier.SetGenerationPauseDelayMillis(0)
@@ -183,7 +183,6 @@ func (suite *IntegrationTestSuite) BuildVerifier() *Verifier {
183183
verifier.SetMetaURI(ctx, suite.metaConnStr),
184184
"should set metadata connection string",
185185
)
186-
verifier.SetLogger("stderr")
187186
verifier.SetMetaDBName(metaDBName)
188187

189188
suite.Require().NoError(verifier.srcClientCollection(&task).Drop(ctx))

internal/verifier/migration_verifier.go

Lines changed: 10 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,8 @@ type Verifier struct {
9090
metaClient *mongo.Client
9191
srcClient *mongo.Client
9292
dstClient *mongo.Client
93-
srcBuildInfo *util.BuildInfo
94-
dstBuildInfo *util.BuildInfo
93+
srcClusterInfo *util.ClusterInfo
94+
dstClusterInfo *util.ClusterInfo
9595
numWorkers int
9696
failureDisplaySize int64
9797

@@ -187,13 +187,18 @@ type VerifierSettings struct {
187187
}
188188

189189
// NewVerifier creates a new Verifier
190-
func NewVerifier(settings VerifierSettings) *Verifier {
190+
func NewVerifier(settings VerifierSettings, logPath string) *Verifier {
191191
readConcern := settings.ReadConcernSetting
192192
if readConcern == "" {
193193
readConcern = ReadConcernMajority
194194
}
195195

196+
logger, logWriter := getLoggerAndWriter(logPath)
197+
196198
return &Verifier{
199+
logger: logger,
200+
writer: logWriter,
201+
197202
phase: Idle,
198203
numWorkers: NumWorkers,
199204
readPreference: readpref.Primary(),
@@ -311,40 +316,6 @@ func (verifier *Verifier) AddMetaIndexes(ctx context.Context) error {
311316
return err
312317
}
313318

314-
func (verifier *Verifier) SetSrcURI(ctx context.Context, uri string) error {
315-
opts := verifier.getClientOpts(uri)
316-
var err error
317-
verifier.srcClient, err = mongo.Connect(ctx, opts)
318-
if err != nil {
319-
return errors.Wrapf(err, "failed to connect to source %#q", uri)
320-
}
321-
322-
buildInfo, err := util.GetBuildInfo(ctx, verifier.srcClient)
323-
if err != nil {
324-
return errors.Wrap(err, "failed to read source build info")
325-
}
326-
327-
verifier.srcBuildInfo = &buildInfo
328-
return nil
329-
}
330-
331-
func (verifier *Verifier) SetDstURI(ctx context.Context, uri string) error {
332-
opts := verifier.getClientOpts(uri)
333-
var err error
334-
verifier.dstClient, err = mongo.Connect(ctx, opts)
335-
if err != nil {
336-
return errors.Wrapf(err, "failed to connect to destination %#q", uri)
337-
}
338-
339-
buildInfo, err := util.GetBuildInfo(ctx, verifier.dstClient)
340-
if err != nil {
341-
return errors.Wrap(err, "failed to read destination build info")
342-
}
343-
344-
verifier.dstBuildInfo = &buildInfo
345-
return nil
346-
}
347-
348319
func (verifier *Verifier) SetServerPort(port int) {
349320
verifier.port = port
350321
}
@@ -366,10 +337,6 @@ func (verifier *Verifier) SetPartitionSizeMB(partitionSizeMB uint32) {
366337
verifier.partitionSizeInBytes = int64(partitionSizeMB) * 1024 * 1024
367338
}
368339

369-
func (verifier *Verifier) SetLogger(logPath string) {
370-
verifier.logger, verifier.writer = getLoggerAndWriter(logPath)
371-
}
372-
373340
func (verifier *Verifier) SetSrcNamespaces(arg []string) {
374341
verifier.srcNamespaces = arg
375342
}
@@ -473,7 +440,7 @@ func (verifier *Verifier) maybeAppendGlobalFilterToPredicates(predicates bson.A)
473440
return append(predicates, verifier.globalFilter)
474441
}
475442

476-
func (verifier *Verifier) getDocumentsCursor(ctx context.Context, collection *mongo.Collection, buildInfo *util.BuildInfo,
443+
func (verifier *Verifier) getDocumentsCursor(ctx context.Context, collection *mongo.Collection, clusterInfo *util.ClusterInfo,
477444
startAtTs *primitive.Timestamp, task *VerificationTask) (*mongo.Cursor, error) {
478445
var findOptions bson.D
479446
runCommandOptions := options.RunCmd()
@@ -486,7 +453,7 @@ func (verifier *Verifier) getDocumentsCursor(ctx context.Context, collection *mo
486453
bson.E{"filter", bson.D{{"$and", andPredicates}}},
487454
}
488455
} else {
489-
findOptions = task.QueryFilter.Partition.GetFindOptions(buildInfo, verifier.maybeAppendGlobalFilterToPredicates(andPredicates))
456+
findOptions = task.QueryFilter.Partition.GetFindOptions(clusterInfo, verifier.maybeAppendGlobalFilterToPredicates(andPredicates))
490457
}
491458
if verifier.readPreference.Mode() != readpref.PrimaryMode {
492459
runCommandOptions = runCommandOptions.SetReadPreference(verifier.readPreference)

internal/verifier/migration_verifier_bench_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ func BenchmarkGeneric(t *testing.B) {
5454
fmt.Printf("Running with %s as the meta db name. Specify META_DB_NAME= to change\n", metaDBName)
5555
// fmt.Printf("Running with %s as the namespace. Specify META_DB_NAME= to change\n", metaDBName)
5656

57-
verifier := NewVerifier(VerifierSettings{})
57+
verifier := NewVerifier(VerifierSettings{}, "stderr")
5858
verifier.SetNumWorkers(numWorkers)
5959
verifier.SetGenerationPauseDelayMillis(0)
6060
verifier.SetWorkerSleepDelayMillis(0)
@@ -71,7 +71,6 @@ func BenchmarkGeneric(t *testing.B) {
7171
if err != nil {
7272
t.Fatal(err)
7373
}
74-
verifier.SetLogger("stderr")
7574
verifier.SetMetaDBName(metaDBName)
7675
err = verifier.verificationTaskCollection().Drop(context.Background())
7776
if err != nil {

0 commit comments

Comments
 (0)