Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 17 additions & 14 deletions internal/verifier/migration_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -1082,22 +1082,25 @@ func (verifier *Verifier) verifyMetadataAndPartitionCollection(ctx context.Conte
task.Status = verificationTaskMetadataMismatch
}

partitions, shardKeys, docsCount, bytesCount, err := verifier.partitionAndInspectNamespace(ctx, srcNs)
if err != nil {
task.Status = verificationTaskFailed
verifier.logger.Error().Msgf("[Worker %d] Error partitioning collection: %+v", workerNum, err)
return
}
verifier.logger.Debug().Msgf("[Worker %d] split collection “%s” into %d partitions", workerNum, srcNs, len(partitions))

task.SourceDocumentCount = docsCount
task.SourceByteCount = bytesCount

for _, partition := range partitions {
_, err := verifier.InsertPartitionVerificationTask(partition, shardKeys, dstNs)
if task.Generation == 0 {
partitions, shardKeys, docsCount, bytesCount, err := verifier.partitionAndInspectNamespace(ctx, srcNs)
if err != nil {
task.Status = verificationTaskFailed
verifier.logger.Error().Msgf("[Worker %d] Error inserting verifier tasks: %+v", workerNum, err)

verifier.logger.Error().Msgf("[Worker %d] Error partitioning collection: %+v", workerNum, err)
return
}
verifier.logger.Debug().Msgf("[Worker %d] split collection “%s” into %d partitions", workerNum, srcNs, len(partitions))

task.SourceDocumentCount = docsCount
task.SourceByteCount = bytesCount

for _, partition := range partitions {
_, err := verifier.InsertPartitionVerificationTask(partition, shardKeys, dstNs)
if err != nil {
task.Status = verificationTaskFailed
verifier.logger.Error().Msgf("[Worker %d] Error inserting verifier tasks: %+v", workerNum, err)
}
}
}

Expand Down
63 changes: 63 additions & 0 deletions internal/verifier/migration_verifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

"github.com/10gen/migration-verifier/internal/partitions"
"github.com/10gen/migration-verifier/internal/testutil"
"github.com/10gen/migration-verifier/mslices"
"github.com/cespare/permute/v2"
"github.com/rs/zerolog"
"github.com/samber/lo"
Expand Down Expand Up @@ -1283,6 +1284,68 @@ func (suite *IntegrationTestSuite) TestVerificationStatus() {
suite.Equal(1, status.CompletedTasks, "completed tasks not equal")
}

func (suite *IntegrationTestSuite) TestMetadataMismatchAndPartitioning() {
ctx := suite.Context()

srcColl := suite.srcMongoClient.Database(suite.DBNameForTest()).Collection("coll")
dstColl := suite.dstMongoClient.Database(suite.DBNameForTest()).Collection("coll")

verifier := suite.BuildVerifier()

ns := srcColl.Database().Name() + "." + srcColl.Name()
verifier.SetSrcNamespaces([]string{ns})
verifier.SetDstNamespaces([]string{ns})
verifier.SetNamespaceMap()

for _, coll := range mslices.Of(srcColl, dstColl) {
_, err := coll.InsertOne(ctx, bson.M{"_id": 1, "x": 42})
suite.Require().NoError(err)
}

_, err := srcColl.Indexes().CreateOne(
ctx,
mongo.IndexModel{
Keys: bson.D{{"foo", 1}},
},
)
suite.Require().NoError(err)

runner := RunVerifierCheck(ctx, suite.T(), verifier)
runner.AwaitGenerationEnd()

cursor, err := verifier.verificationTaskCollection().Find(
ctx,
bson.M{"generation": 0},
options.Find().SetSort(bson.M{"type": 1}),
)
suite.Require().NoError(err)

var tasks []VerificationTask
suite.Require().NoError(cursor.All(ctx, &tasks))

suite.Require().Len(tasks, 2)
suite.Require().Equal(verificationTaskVerifyDocuments, tasks[0].Type)
suite.Require().Equal(verificationTaskCompleted, tasks[0].Status)
suite.Require().Equal(verificationTaskVerifyCollection, tasks[1].Type)
suite.Require().Equal(verificationTaskMetadataMismatch, tasks[1].Status)

runner.StartNextGeneration()
runner.AwaitGenerationEnd()

cursor, err = verifier.verificationTaskCollection().Find(
ctx,
bson.M{"generation": 1},
options.Find().SetSort(bson.M{"type": 1}),
)
suite.Require().NoError(err)

suite.Require().NoError(cursor.All(ctx, &tasks))

suite.Require().Len(tasks, 1, "generation 1 should only have done 1 task")
suite.Require().Equal(verificationTaskVerifyCollection, tasks[0].Type)
suite.Require().Equal(verificationTaskMetadataMismatch, tasks[0].Status)
}

func (suite *IntegrationTestSuite) TestGenerationalRechecking() {
zerolog.SetGlobalLevel(zerolog.DebugLevel)
verifier := suite.BuildVerifier()
Expand Down
11 changes: 11 additions & 0 deletions mslices/slices.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package mslices

// This package complements the Go standard library’s package of the
// same name with broadly-useful tools that the standard library lacks.

// Of returns a slice out of the given arguments. It’s syntactic sugar
// to capitalize on Go’s type inference, similar to
// [this declined feature proposal](https://github.com/golang/go/issues/47709).
func Of[T any](pieces ...T) []T {
return append([]T{}, pieces...)
}
27 changes: 27 additions & 0 deletions mslices/slices_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package mslices

import (
"testing"

"github.com/stretchr/testify/suite"
)

type mySuite struct {
suite.Suite
}

func TestUnitTestSuite(t *testing.T) {
suite.Run(t, &mySuite{})
}

func (s *mySuite) Test_Of() {
slc := Of(12, 23, 34)

s.Assert().IsType([]int{}, slc, "expected type")

a := []int{1, 2, 3}
b := Of(a...)
a[0] = 4

s.Assert().Equal(1, b[0], "should copy slice")
}