From 4618e0c8a6a389317a4cc6c394827b94e257a0a4 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Mon, 25 Nov 2024 15:45:52 -0500 Subject: [PATCH 1/4] handle large docs --- internal/verifier/change_stream.go | 39 +++++++++++++---- internal/verifier/change_stream_test.go | 58 +++++++++++++++++++++++++ 2 files changed, 88 insertions(+), 9 deletions(-) diff --git a/internal/verifier/change_stream.go b/internal/verifier/change_stream.go index 8aea43f9..3178e4f1 100644 --- a/internal/verifier/change_stream.go +++ b/internal/verifier/change_stream.go @@ -116,17 +116,38 @@ func (verifier *Verifier) HandleChangeStreamEvents(ctx context.Context, batch [] // and omit fullDocument, but $bsonSize was new in MongoDB 4.4, and we still // want to verify migrations from 4.2. fullDocument is unlikely to be a // bottleneck anyway. -func (verifier *Verifier) GetChangeStreamFilter() []bson.D { +func (verifier *Verifier) GetChangeStreamFilter() (pipeline mongo.Pipeline) { + if len(verifier.srcNamespaces) == 0 { - return []bson.D{{bson.E{"$match", bson.D{{"ns.db", bson.D{{"$ne", verifier.metaDBName}}}}}}} - } - filter := bson.A{} - for _, ns := range verifier.srcNamespaces { - db, coll := SplitNamespace(ns) - filter = append(filter, bson.D{{"ns", bson.D{{"db", db}, {"coll", coll}}}}) + pipeline = mongo.Pipeline{ + {{"$match", bson.D{ + {"ns.db", bson.D{{"$ne", verifier.metaDBName}}}, + }}}, + } + } else { + filter := []bson.D{} + for _, ns := range verifier.srcNamespaces { + db, coll := SplitNamespace(ns) + filter = append(filter, bson.D{ + {"ns", bson.D{ + {"db", db}, + {"coll", coll}, + }}, + }) + } + pipeline = []bson.D{ + {{"$match", bson.D{{"$or", filter}}}}, + } } - stage := bson.D{{"$match", bson.D{{"$or", filter}}}} - return []bson.D{stage} + + return append( + pipeline, + bson.D{ + {"$unset", []string{ + "updateDescription", + }}, + }, + ) } // This function reads a single `getMore` response into a slice. diff --git a/internal/verifier/change_stream_test.go b/internal/verifier/change_stream_test.go index 71fc326f..9038db6d 100644 --- a/internal/verifier/change_stream_test.go +++ b/internal/verifier/change_stream_test.go @@ -2,9 +2,11 @@ package verifier import ( "context" + "strings" "testing" "time" + "github.com/10gen/migration-verifier/internal/testutil" "github.com/10gen/migration-verifier/internal/util" "github.com/10gen/migration-verifier/mslices" "github.com/pkg/errors" @@ -445,3 +447,59 @@ func (suite *IntegrationTestSuite) TestCreateForbidden() { suite.Require().ErrorAs(err, &eventErr) suite.Assert().Equal("create", eventErr.Event.OpType) } + +func (suite *IntegrationTestSuite) TestLargeEvents() { + ctx := suite.Context() + + docID := 123 + + makeDoc := func(char string, len int) bson.D { + return bson.D{{"_id", docID}, {"str", strings.Repeat(char, len)}} + } + + smallDoc := testutil.MustMarshal(makeDoc("a", 1)) + suite.T().Logf("small size: %v", len(smallDoc)) + maxBSONSize := 16 * 1024 * 1024 + + maxStringLen := maxBSONSize - len(smallDoc) - 1 + + db := suite.srcMongoClient.Database(suite.DBNameForTest()) + suite.Require().NoError(db.CreateCollection(ctx, "mystuff")) + + verifier := suite.BuildVerifier() + verifierRunner := RunVerifierCheck(suite.Context(), suite.T(), verifier) + verifierRunner.AwaitGenerationEnd() + + coll := db.Collection("mystuff") + _, err := coll.InsertOne( + ctx, + makeDoc("a", maxStringLen), + ) + suite.Require().NoError(err, "should insert") + + updated, err := coll.UpdateByID( + ctx, + docID, + bson.D{ + {"$set", bson.D{ + // smallDoc happens to be the minimum length to subtract + // in order to satisfy the server’s requirements on + // document sizes in updates. + {"str", strings.Repeat("b", maxStringLen-len(smallDoc))}, + }}, + }, + ) + suite.Require().NoError(err, "should update") + suite.Require().EqualValues(1, updated.ModifiedCount) + + replaced, err := coll.ReplaceOne( + ctx, + bson.D{{"_id", docID}}, + makeDoc("c", maxStringLen-len(smallDoc)), + ) + suite.Require().NoError(err, "should replace") + suite.Require().EqualValues(1, replaced.ModifiedCount) + + suite.Require().NoError(verifier.WritesOff(ctx)) + suite.Require().NoError(verifierRunner.Await()) +} From b7424bf9649ba25a475511149b67031c0a304dc8 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Mon, 25 Nov 2024 15:46:50 -0500 Subject: [PATCH 2/4] check errors --- internal/verifier/change_stream_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/verifier/change_stream_test.go b/internal/verifier/change_stream_test.go index d25992a4..7a5e8d34 100644 --- a/internal/verifier/change_stream_test.go +++ b/internal/verifier/change_stream_test.go @@ -468,7 +468,7 @@ func (suite *IntegrationTestSuite) TestLargeEvents() { verifier := suite.BuildVerifier() verifierRunner := RunVerifierCheck(suite.Context(), suite.T(), verifier) - verifierRunner.AwaitGenerationEnd() + suite.Require().NoError(verifierRunner.AwaitGenerationEnd()) coll := db.Collection("mystuff") _, err := coll.InsertOne( From 9e74cc106cb566468e63c70b610d415763ba9981 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Mon, 25 Nov 2024 15:51:22 -0500 Subject: [PATCH 3/4] pipeline consistency --- internal/verifier/change_stream.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/verifier/change_stream.go b/internal/verifier/change_stream.go index 3178e4f1..483af491 100644 --- a/internal/verifier/change_stream.go +++ b/internal/verifier/change_stream.go @@ -135,7 +135,7 @@ func (verifier *Verifier) GetChangeStreamFilter() (pipeline mongo.Pipeline) { }}, }) } - pipeline = []bson.D{ + pipeline = mongo.Pipeline{ {{"$match", bson.D{{"$or", filter}}}}, } } From 3a54eacbffd241dd0576325a7a0603f6a2c6735c Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Mon, 25 Nov 2024 16:06:20 -0500 Subject: [PATCH 4/4] fix tests --- internal/verifier/change_stream_test.go | 26 ++++++++++++-------- internal/verifier/migration_verifier_test.go | 2 +- 2 files changed, 17 insertions(+), 11 deletions(-) diff --git a/internal/verifier/change_stream_test.go b/internal/verifier/change_stream_test.go index 7a5e8d34..14990cb8 100644 --- a/internal/verifier/change_stream_test.go +++ b/internal/verifier/change_stream_test.go @@ -11,6 +11,7 @@ import ( "github.com/10gen/migration-verifier/mslices" "github.com/pkg/errors" "github.com/samber/lo" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" @@ -22,19 +23,24 @@ import ( func TestChangeStreamFilter(t *testing.T) { verifier := Verifier{} verifier.SetMetaDBName("metadb") - require.Equal(t, []bson.D{{{"$match", bson.D{{"ns.db", bson.D{{"$ne", "metadb"}}}}}}}, - verifier.GetChangeStreamFilter()) + assert.Contains(t, + verifier.GetChangeStreamFilter(), + bson.D{ + {"$match", bson.D{{"ns.db", bson.D{{"$ne", "metadb"}}}}}, + }, + ) verifier.srcNamespaces = []string{"foo.bar", "foo.baz", "test.car", "test.chaz"} - require.Equal(t, []bson.D{ - {{"$match", bson.D{ - {"$or", bson.A{ - bson.D{{"ns", bson.D{{"db", "foo"}, {"coll", "bar"}}}}, - bson.D{{"ns", bson.D{{"db", "foo"}, {"coll", "baz"}}}}, - bson.D{{"ns", bson.D{{"db", "test"}, {"coll", "car"}}}}, - bson.D{{"ns", bson.D{{"db", "test"}, {"coll", "chaz"}}}}, + assert.Contains(t, + verifier.GetChangeStreamFilter(), + bson.D{{"$match", bson.D{ + {"$or", []bson.D{ + {{"ns", bson.D{{"db", "foo"}, {"coll", "bar"}}}}, + {{"ns", bson.D{{"db", "foo"}, {"coll", "baz"}}}}, + {{"ns", bson.D{{"db", "test"}, {"coll", "car"}}}}, + {{"ns", bson.D{{"db", "test"}, {"coll", "chaz"}}}}, }}, }}}, - }, verifier.GetChangeStreamFilter()) + ) } // TestChangeStreamResumability creates a verifier, starts its change stream, diff --git a/internal/verifier/migration_verifier_test.go b/internal/verifier/migration_verifier_test.go index af18ffdf..6c83e3db 100644 --- a/internal/verifier/migration_verifier_test.go +++ b/internal/verifier/migration_verifier_test.go @@ -1536,7 +1536,7 @@ func (suite *IntegrationTestSuite) TestBackgroundInIndexSpec() { verifier.SetNamespaceMap() runner := RunVerifierCheck(ctx, suite.T(), verifier) - runner.AwaitGenerationEnd() + suite.Require().NoError(runner.AwaitGenerationEnd()) status, err := verifier.GetVerificationStatus() suite.Require().NoError(err)