Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions internal/partitions/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (p *Partition) FindCmd(
// (e.g. use the partitions on the source to read the destination for verification)
// If the passed-in buildinfo indicates a mongodb version < 5.0, type bracketing is not used.
// filterAndPredicates is a slice of filter criteria that's used to construct the "filter" field in the find option.
func (p *Partition) GetFindOptions(buildInfo *util.BuildInfo, filterAndPredicates bson.A) bson.D {
func (p *Partition) GetFindOptions(clusterInfo *util.ClusterInfo, filterAndPredicates bson.A) bson.D {
if p == nil {
if len(filterAndPredicates) > 0 {
return bson.D{{"filter", bson.D{{"$and", filterAndPredicates}}}}
Expand All @@ -158,11 +158,11 @@ func (p *Partition) GetFindOptions(buildInfo *util.BuildInfo, filterAndPredicate
// For non-capped collections, the cursor should use the ID filter and the _id index.
// Get the bounded query filter from the partition to be used in the Find command.
allowTypeBracketing := false
if buildInfo != nil {
if clusterInfo != nil {
allowTypeBracketing = true

if buildInfo.VersionArray != nil {
allowTypeBracketing = buildInfo.VersionArray[0] < 5
if clusterInfo.VersionArray != nil {
allowTypeBracketing = clusterInfo.VersionArray[0] < 5
}
}
if !allowTypeBracketing {
Expand Down
14 changes: 7 additions & 7 deletions internal/partitions/partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,37 +78,37 @@ func (suite *UnitTestSuite) TestVersioning() {
suite.Require().Equal(expectedFilter, filter)

// 6.0
findOptions = partition.GetFindOptions(&util.BuildInfo{VersionArray: []int{6, 0, 0}}, nil)
findOptions = partition.GetFindOptions(&util.ClusterInfo{VersionArray: []int{6, 0, 0}}, nil)
filter = getFilterFromFindOptions(findOptions)
suite.Require().Equal(expectedFilter, filter)

// 5.3.0.9
findOptions = partition.GetFindOptions(&util.BuildInfo{VersionArray: []int{5, 3, 0, 9}}, nil)
findOptions = partition.GetFindOptions(&util.ClusterInfo{VersionArray: []int{5, 3, 0, 9}}, nil)
filter = getFilterFromFindOptions(findOptions)
suite.Require().Equal(expectedFilter, filter)

// 7.1.3.5
findOptions = partition.GetFindOptions(&util.BuildInfo{VersionArray: []int{7, 1, 3, 5}}, nil)
findOptions = partition.GetFindOptions(&util.ClusterInfo{VersionArray: []int{7, 1, 3, 5}}, nil)
filter = getFilterFromFindOptions(findOptions)
suite.Require().Equal(expectedFilter, filter)

// 4.4 (int64)
findOptions = partition.GetFindOptions(&util.BuildInfo{VersionArray: []int{4, 4, 0, 0}}, nil)
findOptions = partition.GetFindOptions(&util.ClusterInfo{VersionArray: []int{4, 4, 0, 0}}, nil)
filter = getFilterFromFindOptions(findOptions)
suite.Require().Equal(expectedFilterWithTypeBracketing, filter)

// 4.4
findOptions = partition.GetFindOptions(&util.BuildInfo{VersionArray: []int{4, 4, 0, 0}}, nil)
findOptions = partition.GetFindOptions(&util.ClusterInfo{VersionArray: []int{4, 4, 0, 0}}, nil)
filter = getFilterFromFindOptions(findOptions)
suite.Require().Equal(expectedFilterWithTypeBracketing, filter)

// 4.2
findOptions = partition.GetFindOptions(&util.BuildInfo{VersionArray: []int{4, 2, 0, 0}}, nil)
findOptions = partition.GetFindOptions(&util.ClusterInfo{VersionArray: []int{4, 2, 0, 0}}, nil)
filter = getFilterFromFindOptions(findOptions)
suite.Require().Equal(expectedFilterWithTypeBracketing, filter)

// No version array -- assume old, require type bracketing.
findOptions = partition.GetFindOptions(&util.BuildInfo{}, nil)
findOptions = partition.GetFindOptions(&util.ClusterInfo{}, nil)
filter = getFilterFromFindOptions(findOptions)
suite.Require().Equal(expectedFilterWithTypeBracketing, filter)
}
Expand Down
31 changes: 0 additions & 31 deletions internal/util/buildinfo.go

This file was deleted.

78 changes: 78 additions & 0 deletions internal/util/clusterinfo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package util

import (
"context"

"github.com/10gen/migration-verifier/mbson"
"github.com/pkg/errors"
"github.com/samber/lo"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
)

type ClusterTopology string

type ClusterInfo struct {
VersionArray []int
Topology ClusterTopology
}

const (
TopologySharded ClusterTopology = "sharded"
TopologyReplset ClusterTopology = "replset"
)

func GetClusterInfo(ctx context.Context, client *mongo.Client) (ClusterInfo, error) {
va, err := getVersionArray(ctx, client)
if err != nil {
return ClusterInfo{}, errors.Wrap(err, "failed to fetch version array")
}

topology, err := getTopology(ctx, client)
if err != nil {
return ClusterInfo{}, errors.Wrap(err, "failed to determine topology")
}

return ClusterInfo{
VersionArray: va,
Topology: topology,
}, nil
}

func getVersionArray(ctx context.Context, client *mongo.Client) ([]int, error) {
commandResult := client.Database("admin").RunCommand(ctx, bson.D{{"buildinfo", 1}})

rawResp, err := commandResult.Raw()
if err != nil {
return nil, errors.Wrapf(err, "failed to run %#q", "buildinfo")
}

var va []int
_, err = mbson.RawLookup(rawResp, &va, "versionArray")
if err != nil {
return nil, errors.Wrap(err, "failed to decode build info version array")
}

return va, nil
}

func getTopology(ctx context.Context, client *mongo.Client) (ClusterTopology, error) {
resp := client.Database("admin").RunCommand(
ctx,
bson.D{{"hello", 1}},
)

hello := struct {
Msg string
}{}

if err := resp.Decode(&hello); err != nil {
return "", errors.Wrapf(
err,
"failed to decode %#q response",
"hello",
)
}

return lo.Ternary(hello.Msg == "isdbgrid", TopologySharded, TopologyReplset), nil
}
2 changes: 1 addition & 1 deletion internal/verifier/change_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ func (verifier *Verifier) createChangeStream(
SetMaxAwaitTime(1 * time.Second).
SetFullDocument(options.UpdateLookup)

if verifier.srcBuildInfo.VersionArray[0] >= 6 {
if verifier.srcClusterInfo.VersionArray[0] >= 6 {
opts = opts.SetCustomPipeline(bson.M{"showExpandedEvents": true})
}

Expand Down
2 changes: 1 addition & 1 deletion internal/verifier/change_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ func (suite *IntegrationTestSuite) testInsertsBeforeWritesOff(docsCount int) {

func (suite *IntegrationTestSuite) TestCreateForbidden() {
ctx := suite.Context()
buildInfo, err := util.GetBuildInfo(ctx, suite.srcMongoClient)
buildInfo, err := util.GetClusterInfo(ctx, suite.srcMongoClient)
suite.Require().NoError(err)

if buildInfo.VersionArray[0] < 6 {
Expand Down
4 changes: 2 additions & 2 deletions internal/verifier/compare.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ func (verifier *Verifier) getFetcherChannels(
cursor, err := verifier.getDocumentsCursor(
ctx,
verifier.srcClientCollection(task),
verifier.srcBuildInfo,
verifier.srcClusterInfo,
verifier.srcStartAtTs,
task,
)
Expand All @@ -290,7 +290,7 @@ func (verifier *Verifier) getFetcherChannels(
cursor, err := verifier.getDocumentsCursor(
ctx,
verifier.dstClientCollection(task),
verifier.dstBuildInfo,
verifier.dstClusterInfo,
nil, //startAtTs
task,
)
Expand Down
3 changes: 1 addition & 2 deletions internal/verifier/integration_test_suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func (suite *IntegrationTestSuite) BuildVerifier() *Verifier {
qfilter := QueryFilter{Namespace: "keyhole.dealers"}
task := VerificationTask{QueryFilter: qfilter}

verifier := NewVerifier(VerifierSettings{})
verifier := NewVerifier(VerifierSettings{}, "stderr")
//verifier.SetStartClean(true)
verifier.SetNumWorkers(3)
verifier.SetGenerationPauseDelayMillis(0)
Expand All @@ -183,7 +183,6 @@ func (suite *IntegrationTestSuite) BuildVerifier() *Verifier {
verifier.SetMetaURI(ctx, suite.metaConnStr),
"should set metadata connection string",
)
verifier.SetLogger("stderr")
verifier.SetMetaDBName(metaDBName)

suite.Require().NoError(verifier.srcClientCollection(&task).Drop(ctx))
Expand Down
53 changes: 10 additions & 43 deletions internal/verifier/migration_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ type Verifier struct {
metaClient *mongo.Client
srcClient *mongo.Client
dstClient *mongo.Client
srcBuildInfo *util.BuildInfo
dstBuildInfo *util.BuildInfo
srcClusterInfo *util.ClusterInfo
dstClusterInfo *util.ClusterInfo
numWorkers int
failureDisplaySize int64

Expand Down Expand Up @@ -187,13 +187,18 @@ type VerifierSettings struct {
}

// NewVerifier creates a new Verifier
func NewVerifier(settings VerifierSettings) *Verifier {
func NewVerifier(settings VerifierSettings, logPath string) *Verifier {
readConcern := settings.ReadConcernSetting
if readConcern == "" {
readConcern = ReadConcernMajority
}

logger, logWriter := getLoggerAndWriter(logPath)

return &Verifier{
logger: logger,
writer: logWriter,

phase: Idle,
numWorkers: NumWorkers,
readPreference: readpref.Primary(),
Expand Down Expand Up @@ -311,40 +316,6 @@ func (verifier *Verifier) AddMetaIndexes(ctx context.Context) error {
return err
}

func (verifier *Verifier) SetSrcURI(ctx context.Context, uri string) error {
opts := verifier.getClientOpts(uri)
var err error
verifier.srcClient, err = mongo.Connect(ctx, opts)
if err != nil {
return errors.Wrapf(err, "failed to connect to source %#q", uri)
}

buildInfo, err := util.GetBuildInfo(ctx, verifier.srcClient)
if err != nil {
return errors.Wrap(err, "failed to read source build info")
}

verifier.srcBuildInfo = &buildInfo
return nil
}

func (verifier *Verifier) SetDstURI(ctx context.Context, uri string) error {
opts := verifier.getClientOpts(uri)
var err error
verifier.dstClient, err = mongo.Connect(ctx, opts)
if err != nil {
return errors.Wrapf(err, "failed to connect to destination %#q", uri)
}

buildInfo, err := util.GetBuildInfo(ctx, verifier.dstClient)
if err != nil {
return errors.Wrap(err, "failed to read destination build info")
}

verifier.dstBuildInfo = &buildInfo
return nil
}

func (verifier *Verifier) SetServerPort(port int) {
verifier.port = port
}
Expand All @@ -366,10 +337,6 @@ func (verifier *Verifier) SetPartitionSizeMB(partitionSizeMB uint32) {
verifier.partitionSizeInBytes = int64(partitionSizeMB) * 1024 * 1024
}

func (verifier *Verifier) SetLogger(logPath string) {
verifier.logger, verifier.writer = getLoggerAndWriter(logPath)
}

func (verifier *Verifier) SetSrcNamespaces(arg []string) {
verifier.srcNamespaces = arg
}
Expand Down Expand Up @@ -473,7 +440,7 @@ func (verifier *Verifier) maybeAppendGlobalFilterToPredicates(predicates bson.A)
return append(predicates, verifier.globalFilter)
}

func (verifier *Verifier) getDocumentsCursor(ctx context.Context, collection *mongo.Collection, buildInfo *util.BuildInfo,
func (verifier *Verifier) getDocumentsCursor(ctx context.Context, collection *mongo.Collection, clusterInfo *util.ClusterInfo,
startAtTs *primitive.Timestamp, task *VerificationTask) (*mongo.Cursor, error) {
var findOptions bson.D
runCommandOptions := options.RunCmd()
Expand All @@ -486,7 +453,7 @@ func (verifier *Verifier) getDocumentsCursor(ctx context.Context, collection *mo
bson.E{"filter", bson.D{{"$and", andPredicates}}},
}
} else {
findOptions = task.QueryFilter.Partition.GetFindOptions(buildInfo, verifier.maybeAppendGlobalFilterToPredicates(andPredicates))
findOptions = task.QueryFilter.Partition.GetFindOptions(clusterInfo, verifier.maybeAppendGlobalFilterToPredicates(andPredicates))
}
if verifier.readPreference.Mode() != readpref.PrimaryMode {
runCommandOptions = runCommandOptions.SetReadPreference(verifier.readPreference)
Expand Down
3 changes: 1 addition & 2 deletions internal/verifier/migration_verifier_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func BenchmarkGeneric(t *testing.B) {
fmt.Printf("Running with %s as the meta db name. Specify META_DB_NAME= to change\n", metaDBName)
// fmt.Printf("Running with %s as the namespace. Specify META_DB_NAME= to change\n", metaDBName)

verifier := NewVerifier(VerifierSettings{})
verifier := NewVerifier(VerifierSettings{}, "stderr")
verifier.SetNumWorkers(numWorkers)
verifier.SetGenerationPauseDelayMillis(0)
verifier.SetWorkerSleepDelayMillis(0)
Expand All @@ -71,7 +71,6 @@ func BenchmarkGeneric(t *testing.B) {
if err != nil {
t.Fatal(err)
}
verifier.SetLogger("stderr")
verifier.SetMetaDBName(metaDBName)
err = verifier.verificationTaskCollection().Drop(context.Background())
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion internal/verifier/migration_verifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ func (suite *IntegrationTestSuite) TestFailedVerificationTaskInsertions() {

func TestVerifierCompareDocs(t *testing.T) {
id := rand.Intn(1000)
verifier := NewVerifier(VerifierSettings{})
verifier := NewVerifier(VerifierSettings{}, "stderr")
verifier.SetIgnoreBSONFieldOrder(true)

type compareTest struct {
Expand Down
Loading
Loading