Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 30 additions & 9 deletions internal/verifier/change_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,17 +116,38 @@ func (verifier *Verifier) HandleChangeStreamEvents(ctx context.Context, batch []
// and omit fullDocument, but $bsonSize was new in MongoDB 4.4, and we still
// want to verify migrations from 4.2. fullDocument is unlikely to be a
// bottleneck anyway.
func (verifier *Verifier) GetChangeStreamFilter() []bson.D {
func (verifier *Verifier) GetChangeStreamFilter() (pipeline mongo.Pipeline) {

if len(verifier.srcNamespaces) == 0 {
return []bson.D{{bson.E{"$match", bson.D{{"ns.db", bson.D{{"$ne", verifier.metaDBName}}}}}}}
}
filter := bson.A{}
for _, ns := range verifier.srcNamespaces {
db, coll := SplitNamespace(ns)
filter = append(filter, bson.D{{"ns", bson.D{{"db", db}, {"coll", coll}}}})
pipeline = mongo.Pipeline{
{{"$match", bson.D{
{"ns.db", bson.D{{"$ne", verifier.metaDBName}}},
}}},
}
} else {
filter := []bson.D{}
for _, ns := range verifier.srcNamespaces {
db, coll := SplitNamespace(ns)
filter = append(filter, bson.D{
{"ns", bson.D{
{"db", db},
{"coll", coll},
}},
})
}
pipeline = mongo.Pipeline{
{{"$match", bson.D{{"$or", filter}}}},
}
}
stage := bson.D{{"$match", bson.D{{"$or", filter}}}}
return []bson.D{stage}

return append(
pipeline,
bson.D{
{"$unset", []string{
"updateDescription",
}},
},
)
}

// This function reads a single `getMore` response into a slice.
Expand Down
58 changes: 58 additions & 0 deletions internal/verifier/change_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package verifier

import (
"context"
"strings"
"testing"
"time"

"github.com/10gen/migration-verifier/internal/testutil"
"github.com/10gen/migration-verifier/internal/util"
"github.com/10gen/migration-verifier/mslices"
"github.com/pkg/errors"
Expand Down Expand Up @@ -445,3 +447,59 @@ func (suite *IntegrationTestSuite) TestCreateForbidden() {
suite.Require().ErrorAs(err, &eventErr)
suite.Assert().Equal("create", eventErr.Event.OpType)
}

func (suite *IntegrationTestSuite) TestLargeEvents() {
ctx := suite.Context()

docID := 123

makeDoc := func(char string, len int) bson.D {
return bson.D{{"_id", docID}, {"str", strings.Repeat(char, len)}}
}

smallDoc := testutil.MustMarshal(makeDoc("a", 1))
suite.T().Logf("small size: %v", len(smallDoc))
maxBSONSize := 16 * 1024 * 1024

maxStringLen := maxBSONSize - len(smallDoc) - 1

db := suite.srcMongoClient.Database(suite.DBNameForTest())
suite.Require().NoError(db.CreateCollection(ctx, "mystuff"))

verifier := suite.BuildVerifier()
verifierRunner := RunVerifierCheck(suite.Context(), suite.T(), verifier)
suite.Require().NoError(verifierRunner.AwaitGenerationEnd())

coll := db.Collection("mystuff")
_, err := coll.InsertOne(
ctx,
makeDoc("a", maxStringLen),
)
suite.Require().NoError(err, "should insert")

updated, err := coll.UpdateByID(
ctx,
docID,
bson.D{
{"$set", bson.D{
// smallDoc happens to be the minimum length to subtract
// in order to satisfy the server’s requirements on
// document sizes in updates.
{"str", strings.Repeat("b", maxStringLen-len(smallDoc))},
}},
},
)
suite.Require().NoError(err, "should update")
suite.Require().EqualValues(1, updated.ModifiedCount)

replaced, err := coll.ReplaceOne(
ctx,
bson.D{{"_id", docID}},
makeDoc("c", maxStringLen-len(smallDoc)),
)
suite.Require().NoError(err, "should replace")
suite.Require().EqualValues(1, replaced.ModifiedCount)

suite.Require().NoError(verifier.WritesOff(ctx))
suite.Require().NoError(verifierRunner.Await())
}
Loading