diff --git a/internal/verifier/change_stream.go b/internal/verifier/change_stream.go index 8aea43f9..483af491 100644 --- a/internal/verifier/change_stream.go +++ b/internal/verifier/change_stream.go @@ -116,17 +116,38 @@ func (verifier *Verifier) HandleChangeStreamEvents(ctx context.Context, batch [] // and omit fullDocument, but $bsonSize was new in MongoDB 4.4, and we still // want to verify migrations from 4.2. fullDocument is unlikely to be a // bottleneck anyway. -func (verifier *Verifier) GetChangeStreamFilter() []bson.D { +func (verifier *Verifier) GetChangeStreamFilter() (pipeline mongo.Pipeline) { + if len(verifier.srcNamespaces) == 0 { - return []bson.D{{bson.E{"$match", bson.D{{"ns.db", bson.D{{"$ne", verifier.metaDBName}}}}}}} - } - filter := bson.A{} - for _, ns := range verifier.srcNamespaces { - db, coll := SplitNamespace(ns) - filter = append(filter, bson.D{{"ns", bson.D{{"db", db}, {"coll", coll}}}}) + pipeline = mongo.Pipeline{ + {{"$match", bson.D{ + {"ns.db", bson.D{{"$ne", verifier.metaDBName}}}, + }}}, + } + } else { + filter := []bson.D{} + for _, ns := range verifier.srcNamespaces { + db, coll := SplitNamespace(ns) + filter = append(filter, bson.D{ + {"ns", bson.D{ + {"db", db}, + {"coll", coll}, + }}, + }) + } + pipeline = mongo.Pipeline{ + {{"$match", bson.D{{"$or", filter}}}}, + } } - stage := bson.D{{"$match", bson.D{{"$or", filter}}}} - return []bson.D{stage} + + return append( + pipeline, + bson.D{ + {"$unset", []string{ + "updateDescription", + }}, + }, + ) } // This function reads a single `getMore` response into a slice. diff --git a/internal/verifier/change_stream_test.go b/internal/verifier/change_stream_test.go index 741c1738..14990cb8 100644 --- a/internal/verifier/change_stream_test.go +++ b/internal/verifier/change_stream_test.go @@ -2,13 +2,16 @@ package verifier import ( "context" + "strings" "testing" "time" + "github.com/10gen/migration-verifier/internal/testutil" "github.com/10gen/migration-verifier/internal/util" "github.com/10gen/migration-verifier/mslices" "github.com/pkg/errors" "github.com/samber/lo" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" @@ -20,19 +23,24 @@ import ( func TestChangeStreamFilter(t *testing.T) { verifier := Verifier{} verifier.SetMetaDBName("metadb") - require.Equal(t, []bson.D{{{"$match", bson.D{{"ns.db", bson.D{{"$ne", "metadb"}}}}}}}, - verifier.GetChangeStreamFilter()) + assert.Contains(t, + verifier.GetChangeStreamFilter(), + bson.D{ + {"$match", bson.D{{"ns.db", bson.D{{"$ne", "metadb"}}}}}, + }, + ) verifier.srcNamespaces = []string{"foo.bar", "foo.baz", "test.car", "test.chaz"} - require.Equal(t, []bson.D{ - {{"$match", bson.D{ - {"$or", bson.A{ - bson.D{{"ns", bson.D{{"db", "foo"}, {"coll", "bar"}}}}, - bson.D{{"ns", bson.D{{"db", "foo"}, {"coll", "baz"}}}}, - bson.D{{"ns", bson.D{{"db", "test"}, {"coll", "car"}}}}, - bson.D{{"ns", bson.D{{"db", "test"}, {"coll", "chaz"}}}}, + assert.Contains(t, + verifier.GetChangeStreamFilter(), + bson.D{{"$match", bson.D{ + {"$or", []bson.D{ + {{"ns", bson.D{{"db", "foo"}, {"coll", "bar"}}}}, + {{"ns", bson.D{{"db", "foo"}, {"coll", "baz"}}}}, + {{"ns", bson.D{{"db", "test"}, {"coll", "car"}}}}, + {{"ns", bson.D{{"db", "test"}, {"coll", "chaz"}}}}, }}, }}}, - }, verifier.GetChangeStreamFilter()) + ) } // TestChangeStreamResumability creates a verifier, starts its change stream, @@ -445,3 +453,59 @@ func (suite *IntegrationTestSuite) TestCreateForbidden() { suite.Require().ErrorAs(err, &eventErr) suite.Assert().Equal("create", eventErr.Event.OpType) } + +func (suite *IntegrationTestSuite) TestLargeEvents() { + ctx := suite.Context() + + docID := 123 + + makeDoc := func(char string, len int) bson.D { + return bson.D{{"_id", docID}, {"str", strings.Repeat(char, len)}} + } + + smallDoc := testutil.MustMarshal(makeDoc("a", 1)) + suite.T().Logf("small size: %v", len(smallDoc)) + maxBSONSize := 16 * 1024 * 1024 + + maxStringLen := maxBSONSize - len(smallDoc) - 1 + + db := suite.srcMongoClient.Database(suite.DBNameForTest()) + suite.Require().NoError(db.CreateCollection(ctx, "mystuff")) + + verifier := suite.BuildVerifier() + verifierRunner := RunVerifierCheck(suite.Context(), suite.T(), verifier) + suite.Require().NoError(verifierRunner.AwaitGenerationEnd()) + + coll := db.Collection("mystuff") + _, err := coll.InsertOne( + ctx, + makeDoc("a", maxStringLen), + ) + suite.Require().NoError(err, "should insert") + + updated, err := coll.UpdateByID( + ctx, + docID, + bson.D{ + {"$set", bson.D{ + // smallDoc happens to be the minimum length to subtract + // in order to satisfy the server’s requirements on + // document sizes in updates. + {"str", strings.Repeat("b", maxStringLen-len(smallDoc))}, + }}, + }, + ) + suite.Require().NoError(err, "should update") + suite.Require().EqualValues(1, updated.ModifiedCount) + + replaced, err := coll.ReplaceOne( + ctx, + bson.D{{"_id", docID}}, + makeDoc("c", maxStringLen-len(smallDoc)), + ) + suite.Require().NoError(err, "should replace") + suite.Require().EqualValues(1, replaced.ModifiedCount) + + suite.Require().NoError(verifier.WritesOff(ctx)) + suite.Require().NoError(verifierRunner.Await()) +} diff --git a/internal/verifier/migration_verifier_test.go b/internal/verifier/migration_verifier_test.go index af18ffdf..6c83e3db 100644 --- a/internal/verifier/migration_verifier_test.go +++ b/internal/verifier/migration_verifier_test.go @@ -1536,7 +1536,7 @@ func (suite *IntegrationTestSuite) TestBackgroundInIndexSpec() { verifier.SetNamespaceMap() runner := RunVerifierCheck(ctx, suite.T(), verifier) - runner.AwaitGenerationEnd() + suite.Require().NoError(runner.AwaitGenerationEnd()) status, err := verifier.GetVerificationStatus() suite.Require().NoError(err)