Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions internal/partitions/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (p *Partition) FindCmd(
// (e.g. use the partitions on the source to read the destination for verification)
// If the passed-in buildinfo indicates a mongodb version < 5.0, type bracketing is not used.
// filterAndPredicates is a slice of filter criteria that's used to construct the "filter" field in the find option.
func (p *Partition) GetFindOptions(buildInfo *util.BuildInfo, filterAndPredicates bson.A) bson.D {
func (p *Partition) GetFindOptions(clusterInfo *util.ClusterInfo, filterAndPredicates bson.A) bson.D {
if p == nil {
if len(filterAndPredicates) > 0 {
return bson.D{{"filter", bson.D{{"$and", filterAndPredicates}}}}
Expand All @@ -158,11 +158,11 @@ func (p *Partition) GetFindOptions(buildInfo *util.BuildInfo, filterAndPredicate
// For non-capped collections, the cursor should use the ID filter and the _id index.
// Get the bounded query filter from the partition to be used in the Find command.
allowTypeBracketing := false
if buildInfo != nil {
if clusterInfo != nil {
allowTypeBracketing = true

if buildInfo.VersionArray != nil {
allowTypeBracketing = buildInfo.VersionArray[0] < 5
if clusterInfo.VersionArray != nil {
allowTypeBracketing = clusterInfo.VersionArray[0] < 5
}
}
if !allowTypeBracketing {
Expand Down
14 changes: 7 additions & 7 deletions internal/partitions/partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,37 +78,37 @@ func (suite *UnitTestSuite) TestVersioning() {
suite.Require().Equal(expectedFilter, filter)

// 6.0
findOptions = partition.GetFindOptions(&util.BuildInfo{VersionArray: []int{6, 0, 0}}, nil)
findOptions = partition.GetFindOptions(&util.ClusterInfo{VersionArray: []int{6, 0, 0}}, nil)
filter = getFilterFromFindOptions(findOptions)
suite.Require().Equal(expectedFilter, filter)

// 5.3.0.9
findOptions = partition.GetFindOptions(&util.BuildInfo{VersionArray: []int{5, 3, 0, 9}}, nil)
findOptions = partition.GetFindOptions(&util.ClusterInfo{VersionArray: []int{5, 3, 0, 9}}, nil)
filter = getFilterFromFindOptions(findOptions)
suite.Require().Equal(expectedFilter, filter)

// 7.1.3.5
findOptions = partition.GetFindOptions(&util.BuildInfo{VersionArray: []int{7, 1, 3, 5}}, nil)
findOptions = partition.GetFindOptions(&util.ClusterInfo{VersionArray: []int{7, 1, 3, 5}}, nil)
filter = getFilterFromFindOptions(findOptions)
suite.Require().Equal(expectedFilter, filter)

// 4.4 (int64)
findOptions = partition.GetFindOptions(&util.BuildInfo{VersionArray: []int{4, 4, 0, 0}}, nil)
findOptions = partition.GetFindOptions(&util.ClusterInfo{VersionArray: []int{4, 4, 0, 0}}, nil)
filter = getFilterFromFindOptions(findOptions)
suite.Require().Equal(expectedFilterWithTypeBracketing, filter)

// 4.4
findOptions = partition.GetFindOptions(&util.BuildInfo{VersionArray: []int{4, 4, 0, 0}}, nil)
findOptions = partition.GetFindOptions(&util.ClusterInfo{VersionArray: []int{4, 4, 0, 0}}, nil)
filter = getFilterFromFindOptions(findOptions)
suite.Require().Equal(expectedFilterWithTypeBracketing, filter)

// 4.2
findOptions = partition.GetFindOptions(&util.BuildInfo{VersionArray: []int{4, 2, 0, 0}}, nil)
findOptions = partition.GetFindOptions(&util.ClusterInfo{VersionArray: []int{4, 2, 0, 0}}, nil)
filter = getFilterFromFindOptions(findOptions)
suite.Require().Equal(expectedFilterWithTypeBracketing, filter)

// No version array -- assume old, require type bracketing.
findOptions = partition.GetFindOptions(&util.BuildInfo{}, nil)
findOptions = partition.GetFindOptions(&util.ClusterInfo{}, nil)
filter = getFilterFromFindOptions(findOptions)
suite.Require().Equal(expectedFilterWithTypeBracketing, filter)
}
Expand Down
31 changes: 0 additions & 31 deletions internal/util/buildinfo.go

This file was deleted.

78 changes: 78 additions & 0 deletions internal/util/clusterinfo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package util

import (
"context"

"github.com/10gen/migration-verifier/mbson"
"github.com/pkg/errors"
"github.com/samber/lo"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
)

type ClusterTopology string

type ClusterInfo struct {
VersionArray []int
Topology ClusterTopology
}

const (
TopologySharded ClusterTopology = "sharded"
TopologyReplset ClusterTopology = "replset"
)

func GetClusterInfo(ctx context.Context, client *mongo.Client) (ClusterInfo, error) {
va, err := getVersionArray(ctx, client)
if err != nil {
return ClusterInfo{}, errors.Wrap(err, "failed to fetch version array")
}

topology, err := getTopology(ctx, client)
if err != nil {
return ClusterInfo{}, errors.Wrap(err, "failed to determine topology")
}

return ClusterInfo{
VersionArray: va,
Topology: topology,
}, nil
}

func getVersionArray(ctx context.Context, client *mongo.Client) ([]int, error) {
commandResult := client.Database("admin").RunCommand(ctx, bson.D{{"buildinfo", 1}})

rawResp, err := commandResult.Raw()
if err != nil {
return nil, errors.Wrapf(err, "failed to run %#q", "buildinfo")
}

var va []int
_, err = mbson.RawLookup(rawResp, &va, "versionArray")
if err != nil {
return nil, errors.Wrap(err, "failed to decode build info version array")
}

return va, nil
}

func getTopology(ctx context.Context, client *mongo.Client) (ClusterTopology, error) {
resp := client.Database("admin").RunCommand(
ctx,
bson.D{{"hello", 1}},
)

hello := struct {
Msg string
}{}

if err := resp.Decode(&hello); err != nil {
return "", errors.Wrapf(
err,
"failed to decode %#q response",
"hello",
)
}

return lo.Ternary(hello.Msg == "isdbgrid", TopologySharded, TopologyReplset), nil
}
2 changes: 1 addition & 1 deletion internal/verifier/change_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ func (verifier *Verifier) createChangeStream(
SetMaxAwaitTime(1 * time.Second).
SetFullDocument(options.UpdateLookup)

if verifier.srcBuildInfo.VersionArray[0] >= 6 {
if verifier.srcClusterInfo.VersionArray[0] >= 6 {
opts = opts.SetCustomPipeline(bson.M{"showExpandedEvents": true})
}

Expand Down
2 changes: 1 addition & 1 deletion internal/verifier/change_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ func (suite *IntegrationTestSuite) testInsertsBeforeWritesOff(docsCount int) {

func (suite *IntegrationTestSuite) TestCreateForbidden() {
ctx := suite.Context()
buildInfo, err := util.GetBuildInfo(ctx, suite.srcMongoClient)
buildInfo, err := util.GetClusterInfo(ctx, suite.srcMongoClient)
suite.Require().NoError(err)

if buildInfo.VersionArray[0] < 6 {
Expand Down
4 changes: 2 additions & 2 deletions internal/verifier/compare.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ func (verifier *Verifier) getFetcherChannels(
cursor, err := verifier.getDocumentsCursor(
ctx,
verifier.srcClientCollection(task),
verifier.srcBuildInfo,
verifier.srcClusterInfo,
verifier.srcStartAtTs,
task,
)
Expand All @@ -290,7 +290,7 @@ func (verifier *Verifier) getFetcherChannels(
cursor, err := verifier.getDocumentsCursor(
ctx,
verifier.dstClientCollection(task),
verifier.dstBuildInfo,
verifier.dstClusterInfo,
nil, //startAtTs
task,
)
Expand Down
3 changes: 1 addition & 2 deletions internal/verifier/integration_test_suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func (suite *IntegrationTestSuite) BuildVerifier() *Verifier {
qfilter := QueryFilter{Namespace: "keyhole.dealers"}
task := VerificationTask{QueryFilter: qfilter}

verifier := NewVerifier(VerifierSettings{})
verifier := NewVerifier(VerifierSettings{}, "stderr")
//verifier.SetStartClean(true)
verifier.SetNumWorkers(3)
verifier.SetGenerationPauseDelayMillis(0)
Expand All @@ -183,7 +183,6 @@ func (suite *IntegrationTestSuite) BuildVerifier() *Verifier {
verifier.SetMetaURI(ctx, suite.metaConnStr),
"should set metadata connection string",
)
verifier.SetLogger("stderr")
verifier.SetMetaDBName(metaDBName)

suite.Require().NoError(verifier.srcClientCollection(&task).Drop(ctx))
Expand Down
53 changes: 10 additions & 43 deletions internal/verifier/migration_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ type Verifier struct {
metaClient *mongo.Client
srcClient *mongo.Client
dstClient *mongo.Client
srcBuildInfo *util.BuildInfo
dstBuildInfo *util.BuildInfo
srcClusterInfo *util.ClusterInfo
dstClusterInfo *util.ClusterInfo
numWorkers int
failureDisplaySize int64

Expand Down Expand Up @@ -187,13 +187,18 @@ type VerifierSettings struct {
}

// NewVerifier creates a new Verifier
func NewVerifier(settings VerifierSettings) *Verifier {
func NewVerifier(settings VerifierSettings, logPath string) *Verifier {
readConcern := settings.ReadConcernSetting
if readConcern == "" {
readConcern = ReadConcernMajority
}

logger, logWriter := getLoggerAndWriter(logPath)

return &Verifier{
logger: logger,
writer: logWriter,

phase: Idle,
numWorkers: NumWorkers,
readPreference: readpref.Primary(),
Expand Down Expand Up @@ -311,40 +316,6 @@ func (verifier *Verifier) AddMetaIndexes(ctx context.Context) error {
return err
}

func (verifier *Verifier) SetSrcURI(ctx context.Context, uri string) error {
opts := verifier.getClientOpts(uri)
var err error
verifier.srcClient, err = mongo.Connect(ctx, opts)
if err != nil {
return errors.Wrapf(err, "failed to connect to source %#q", uri)
}

buildInfo, err := util.GetBuildInfo(ctx, verifier.srcClient)
if err != nil {
return errors.Wrap(err, "failed to read source build info")
}

verifier.srcBuildInfo = &buildInfo
return nil
}

func (verifier *Verifier) SetDstURI(ctx context.Context, uri string) error {
opts := verifier.getClientOpts(uri)
var err error
verifier.dstClient, err = mongo.Connect(ctx, opts)
if err != nil {
return errors.Wrapf(err, "failed to connect to destination %#q", uri)
}

buildInfo, err := util.GetBuildInfo(ctx, verifier.dstClient)
if err != nil {
return errors.Wrap(err, "failed to read destination build info")
}

verifier.dstBuildInfo = &buildInfo
return nil
}

func (verifier *Verifier) SetServerPort(port int) {
verifier.port = port
}
Expand All @@ -366,10 +337,6 @@ func (verifier *Verifier) SetPartitionSizeMB(partitionSizeMB uint32) {
verifier.partitionSizeInBytes = int64(partitionSizeMB) * 1024 * 1024
}

func (verifier *Verifier) SetLogger(logPath string) {
verifier.logger, verifier.writer = getLoggerAndWriter(logPath)
}

func (verifier *Verifier) SetSrcNamespaces(arg []string) {
verifier.srcNamespaces = arg
}
Expand Down Expand Up @@ -473,7 +440,7 @@ func (verifier *Verifier) maybeAppendGlobalFilterToPredicates(predicates bson.A)
return append(predicates, verifier.globalFilter)
}

func (verifier *Verifier) getDocumentsCursor(ctx context.Context, collection *mongo.Collection, buildInfo *util.BuildInfo,
func (verifier *Verifier) getDocumentsCursor(ctx context.Context, collection *mongo.Collection, clusterInfo *util.ClusterInfo,
startAtTs *primitive.Timestamp, task *VerificationTask) (*mongo.Cursor, error) {
var findOptions bson.D
runCommandOptions := options.RunCmd()
Expand All @@ -486,7 +453,7 @@ func (verifier *Verifier) getDocumentsCursor(ctx context.Context, collection *mo
bson.E{"filter", bson.D{{"$and", andPredicates}}},
}
} else {
findOptions = task.QueryFilter.Partition.GetFindOptions(buildInfo, verifier.maybeAppendGlobalFilterToPredicates(andPredicates))
findOptions = task.QueryFilter.Partition.GetFindOptions(clusterInfo, verifier.maybeAppendGlobalFilterToPredicates(andPredicates))
}
if verifier.readPreference.Mode() != readpref.PrimaryMode {
runCommandOptions = runCommandOptions.SetReadPreference(verifier.readPreference)
Expand Down
3 changes: 1 addition & 2 deletions internal/verifier/migration_verifier_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func BenchmarkGeneric(t *testing.B) {
fmt.Printf("Running with %s as the meta db name. Specify META_DB_NAME= to change\n", metaDBName)
// fmt.Printf("Running with %s as the namespace. Specify META_DB_NAME= to change\n", metaDBName)

verifier := NewVerifier(VerifierSettings{})
verifier := NewVerifier(VerifierSettings{}, "stderr")
verifier.SetNumWorkers(numWorkers)
verifier.SetGenerationPauseDelayMillis(0)
verifier.SetWorkerSleepDelayMillis(0)
Expand All @@ -71,7 +71,6 @@ func BenchmarkGeneric(t *testing.B) {
if err != nil {
t.Fatal(err)
}
verifier.SetLogger("stderr")
verifier.SetMetaDBName(metaDBName)
err = verifier.verificationTaskCollection().Drop(context.Background())
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion internal/verifier/migration_verifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ func (suite *IntegrationTestSuite) TestFailedVerificationTaskInsertions() {

func TestVerifierCompareDocs(t *testing.T) {
id := rand.Intn(1000)
verifier := NewVerifier(VerifierSettings{})
verifier := NewVerifier(VerifierSettings{}, "stderr")
verifier.SetIgnoreBSONFieldOrder(true)

type compareTest struct {
Expand Down
Loading