Skip to content
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions .github/workflows/all.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,14 @@ jobs:
srcConnStr: mongodb://localhost:27020,localhost:27021,localhost:27022
dstConnStr: mongodb://localhost:27030,localhost:27031,localhost:27032

- name: replset-to-sharded
dstArgs: --sharded 2
srcConnStr: mongodb://localhost:27020,localhost:27021,localhost:27022
dstConnStr: mongodb://localhost:27030

- name: sharded
args: --sharded 2
srcArgs: --sharded 2
dstArgs: --sharded 2
srcConnStr: mongodb://localhost:27020
dstConnStr: mongodb://localhost:27030

Expand Down Expand Up @@ -82,8 +88,8 @@ jobs:
- name: Start clusters
run: |-
{
echo mlaunch init --binarypath $(cat .srcpath) --port 27020 --dir src --replicaset ${{ matrix.topology.args }}
echo mlaunch init --binarypath $(cat .dstpath) --port 27030 --dir dst --replicaset ${{ matrix.topology.args }}
echo mlaunch init --binarypath $(cat .srcpath) --port 27020 --dir src --replicaset ${{ matrix.topology.srcArgs }}
echo mlaunch init --binarypath $(cat .dstpath) --port 27030 --dir dst --replicaset ${{ matrix.topology.dstArgs }}
echo mlaunch init --binarypath $(cat .metapath) --port 27040 --dir meta --replicaset --nodes 1
} | parallel
Expand Down
63 changes: 63 additions & 0 deletions internal/util/askserver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package util

import (
"context"
"slices"

"github.com/10gen/migration-verifier/mslices"
"github.com/10gen/migration-verifier/option"
"github.com/pkg/errors"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
)

// ServerThinksTheseMatch runs an aggregation on the server that determines
// whether the server thinks a & b are equal. This allows you, e.g., to
Comment on lines +14 to +15
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment is not very accurate. It has a shortcut that compares the values in memory first.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That’s an implementation, though. It seems trivially true that two binary-equivalent BSON values are ones that the server would also consider equivalent.

// ignore BSON type differences for equivalent numbers.
//
// tinker is an optional pipeline that operates on the documents in the
// pipeline (`a` and `b`, respectively) before they're compared.
func ServerThinksTheseMatch(
ctx context.Context,
client *mongo.Client,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we want to restrict this function to only run on the meta cluster. Can we make it a method of the verifier and make sure that the query only runs on the meta cluster?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually think it’s better as a static function so that there’s better separation of concerns. The more things we “hang” on Verifier, the less modular it all is.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a comment that to the function that explains it expects the client to only be the meta client for most of the cases?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

a, b any,
tinker option.Option[mongo.Pipeline],
) (bool, error) {
pipeline := mongo.Pipeline{
{{"$documents", []bson.D{
{
{"a", bson.D{{"$literal", a}}},
{"b", bson.D{{"$literal", b}}},
},
}}},

// Now check to be sure that those specs match.
{{"$match", bson.D{
{"$expr", bson.D{
{"$eq", mslices.Of("$a", "$b")},
}},
}}},
}

if extra, hasExtra := tinker.Get(); hasExtra {
pipeline = slices.Insert(
pipeline,
1,
extra...,
)
}

cursor, err := client.Database("admin").Aggregate(ctx, pipeline)

if err == nil {
defer cursor.Close(ctx)

if cursor.Next(ctx) {
return true, nil
}

err = cursor.Err()
}

return false, errors.Wrapf(err, "failed to ask server if a (%v) matches b (%v)", a, b)
}
66 changes: 66 additions & 0 deletions internal/util/sharding.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package util

import (
"context"

"github.com/10gen/migration-verifier/option"
"github.com/pkg/errors"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
)

const (
configDBName = "config"
collsCollName = "collections"
)

// GetShardKey returns the collection's shard key, or an empty option
// if the collection is unsharded.
func GetShardKey(
ctx context.Context,
coll *mongo.Collection,
) (option.Option[bson.Raw], error) {
namespace := coll.Database().Name() + "." + coll.Name()

configCollectionsColl := coll.Database().Client().
Database(configDBName).
Collection(collsCollName)

rawResult, err := configCollectionsColl.FindOne(ctx, bson.D{{"_id", namespace}}).Raw()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we define a struct to consume the sharding config doc? I think it will make the code more straightforward.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if errors.Is(err, mongo.ErrNoDocuments) {
return option.None[bson.Raw](), nil
} else if err != nil {
return option.None[bson.Raw](), errors.Wrapf(
err,
"failed to find sharding info for %#q",
namespace,
)
}

keyAsVal, err := rawResult.LookupErr("key")
if errors.Is(err, bsoncore.ErrElementNotFound) {
return option.None[bson.Raw](), nil
} else if err != nil {
return option.None[bson.Raw](), errors.Wrapf(
err,
"failed to find %#q in %#q's %#q entry",
"key",
namespace,
FullName(configCollectionsColl),
)
}

keyAsRaw, isDoc := keyAsVal.DocumentOK()
if !isDoc {
return option.None[bson.Raw](), errors.Errorf(
"%#q in %#q's %#q entry is of type %#q, not an object",
"key",
namespace,
FullName(configCollectionsColl),
keyAsVal.Type,
)
}

return option.Some(keyAsRaw), nil
}
4 changes: 2 additions & 2 deletions internal/verifier/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ const (
GenerationComplete GenerationStatus = "complete"
)

var failedStatus = mapset.NewSet(
var failedStatuses = mapset.NewSet(
verificationTaskFailed,
verificationTaskMetadataMismatch,
)
Expand Down Expand Up @@ -406,7 +406,7 @@ func FetchFailedAndIncompleteTasks(ctx context.Context, coll *mongo.Collection,
return FailedTasks, IncompleteTasks, err
}
for _, t := range allTasks {
if failedStatus.Contains(t.Status) {
if failedStatuses.Contains(t.Status) {
FailedTasks = append(FailedTasks, t)
} else if t.Status != verificationTaskCompleted {
IncompleteTasks = append(IncompleteTasks, t)
Expand Down
27 changes: 7 additions & 20 deletions internal/verifier/integration_test_suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ import (
"strings"
"time"

"github.com/10gen/migration-verifier/internal/util"
mapset "github.com/deckarep/golang-set/v2"
"github.com/pkg/errors"
"github.com/samber/lo"
"github.com/stretchr/testify/suite"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
Expand All @@ -16,13 +16,8 @@ import (
"go.mongodb.org/mongo-driver/mongo/writeconcern"
)

type TestTopology string

const (
metaDBName = "VERIFIER_TEST_META"
topologyEnvVar = "MVTEST_TOPOLOGY"
TopologyReplset TestTopology = "replset"
TopologySharded TestTopology = "sharded"
metaDBName = "VERIFIER_TEST_META"
)

type IntegrationTestSuite struct {
Expand Down Expand Up @@ -139,22 +134,14 @@ func (suite *IntegrationTestSuite) TearDownTest() {
}
}

func (suite *IntegrationTestSuite) GetSrcTopology() TestTopology {
hello := struct {
Msg string
}{}

resp := suite.srcMongoClient.Database("admin").RunCommand(
func (suite *IntegrationTestSuite) GetTopology(client *mongo.Client) util.ClusterTopology {
clusterInfo, err := util.GetClusterInfo(
suite.Context(),
bson.D{{"hello", 1}},
)

suite.Require().NoError(
resp.Decode(&hello),
"should fetch & decode hello",
client,
)
suite.Require().NoError(err, "should fetch src cluster info")

return lo.Ternary(hello.Msg == "isdbgrid", TopologySharded, "")
return clusterInfo.Topology
}

func (suite *IntegrationTestSuite) BuildVerifier() *Verifier {
Expand Down
93 changes: 93 additions & 0 deletions internal/verifier/metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package verifier

import (
"bytes"
"context"
"fmt"

"github.com/10gen/migration-verifier/internal/util"
"github.com/10gen/migration-verifier/option"
"github.com/pkg/errors"
"github.com/samber/lo"
"go.mongodb.org/mongo-driver/mongo"
)

// This is the Field for a VerificationResult for shard key mismatches.
const ShardKeyField = "Shard Key"

func (verifier *Verifier) verifyShardingIfNeeded(
ctx context.Context,
srcColl, dstColl *mongo.Collection,
) ([]VerificationResult, error) {

// We only need to compare if both clusters are sharded
srcSharded := verifier.srcClusterInfo.Topology == util.TopologySharded
dstSharded := verifier.dstClusterInfo.Topology == util.TopologySharded

if !srcSharded || !dstSharded {
return nil, nil
}

srcShardOpt, err := util.GetShardKey(ctx, srcColl)
if err != nil {
return nil, errors.Wrapf(
err,
"failed to fetch %#q's shard key on source",
FullName(srcColl),
)
}

dstShardOpt, err := util.GetShardKey(ctx, dstColl)
if err != nil {
return nil, errors.Wrapf(
err,
"failed to fetch %#q's shard key on destination",
FullName(dstColl),
)
}

srcKey, srcIsSharded := srcShardOpt.Get()
dstKey, dstIsSharded := dstShardOpt.Get()

if !srcIsSharded && !dstIsSharded {
return nil, nil
}

if srcIsSharded != dstIsSharded {
return []VerificationResult{{
Field: ShardKeyField,
Cluster: lo.Ternary(srcIsSharded, ClusterTarget, ClusterSource),
Details: Missing,
NameSpace: FullName(srcColl),
}}, nil
}

if bytes.Equal(srcKey, dstKey) {
return nil, nil
}

areEqual, err := util.ServerThinksTheseMatch(
ctx,
verifier.metaClient,
srcKey, dstKey,
option.None[mongo.Pipeline](),
)
if err != nil {
return nil, errors.Wrapf(
err,
"failed to ask server if shard keys (src %v; dst: %v) match",
srcKey,
dstKey,
)
}

if !areEqual {
return []VerificationResult{{
Field: ShardKeyField,
Details: fmt.Sprintf("%s: src=%v; dst=%v", Mismatch, srcKey, dstKey),
NameSpace: FullName(srcColl),
}}, nil
}

return nil, nil
}
Loading
Loading