Skip to content

Commit 348d004

Browse files
committed
REP-5317 Ignore mongosync dbs in change stream.
This fixes an oversight from PR mongodb-labs#53.
1 parent 016fdbb commit 348d004

File tree

2 files changed

+218
-18
lines changed

2 files changed

+218
-18
lines changed

internal/verifier/change_stream.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,10 @@ func (csr *ChangeStreamReader) GetChangeStreamFilter() (pipeline mongo.Pipeline)
213213
if len(csr.namespaces) == 0 {
214214
pipeline = mongo.Pipeline{
215215
{{"$match", bson.D{
216-
{"ns.db", bson.D{{"$ne", csr.metaDB.Name()}}},
216+
{"ns.db", bson.D{{"$nin", bson.A{
217+
primitive.Regex{Pattern: MongosyncMetaDBsPattern},
218+
csr.metaDB.Name(),
219+
}}}},
217220
}}},
218221
}
219222
} else {

internal/verifier/change_stream_test.go

Lines changed: 214 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"github.com/10gen/migration-verifier/internal/util"
1010
"github.com/10gen/migration-verifier/mslices"
1111
"github.com/pkg/errors"
12+
"github.com/rs/zerolog"
1213
"github.com/samber/lo"
1314
"github.com/stretchr/testify/require"
1415
"go.mongodb.org/mongo-driver/bson"
@@ -18,25 +19,175 @@ import (
1819
"go.mongodb.org/mongo-driver/mongo/readconcern"
1920
)
2021

21-
func (suite *IntegrationTestSuite) TestChangeStreamFilter() {
22+
func (suite *IntegrationTestSuite) TestChangeStreamFilter_NoNamespaces() {
23+
ctx := suite.Context()
24+
2225
verifier := suite.BuildVerifier()
23-
suite.Assert().Contains(
24-
verifier.srcChangeStreamReader.GetChangeStreamFilter(),
25-
bson.D{
26-
{"$match", bson.D{{"ns.db", bson.D{{"$ne", metaDBName}}}}},
27-
},
26+
27+
filter := verifier.srcChangeStreamReader.GetChangeStreamFilter()
28+
29+
_, err := suite.srcMongoClient.
30+
Database("realUserDatabase").
31+
Collection("foo").
32+
InsertOne(ctx, bson.D{{"id", 123}})
33+
suite.Require().NoError(err)
34+
35+
cs, err := suite.srcMongoClient.Watch(ctx, filter)
36+
suite.Require().NoError(err)
37+
38+
defer cs.Close(ctx)
39+
40+
dbsToIgnore := []string{
41+
metaDBName,
42+
"mongosync_reserved_for_internal_use",
43+
"mongosync_internal_foo",
44+
}
45+
for _, dbname := range dbsToIgnore {
46+
_, err := suite.srcMongoClient.
47+
Database(dbname).
48+
Collection("foo").
49+
InsertOne(ctx, bson.D{})
50+
suite.Require().NoError(err)
51+
}
52+
53+
_, err = suite.srcMongoClient.
54+
Database("realUserDatabase").
55+
Collection("foo").
56+
UpdateOne(
57+
ctx,
58+
bson.D{{"id", 123}},
59+
bson.D{{"$set", bson.D{{"foo", 123}}}})
60+
suite.Require().NoError(err)
61+
62+
timedCtx, cancel := context.WithTimeoutCause(
63+
ctx,
64+
time.Minute,
65+
errors.New("should have gotten an event"),
2866
)
29-
verifier.srcChangeStreamReader.namespaces = []string{"foo.bar", "foo.baz", "test.car", "test.chaz"}
30-
suite.Assert().Contains(
31-
verifier.srcChangeStreamReader.GetChangeStreamFilter(),
32-
bson.D{{"$match", bson.D{
33-
{"$or", []bson.D{
34-
{{"ns", bson.D{{"db", "foo"}, {"coll", "bar"}}}},
35-
{{"ns", bson.D{{"db", "foo"}, {"coll", "baz"}}}},
36-
{{"ns", bson.D{{"db", "test"}, {"coll", "car"}}}},
37-
{{"ns", bson.D{{"db", "test"}, {"coll", "chaz"}}}},
38-
}},
39-
}}},
67+
defer cancel()
68+
cs.Next(timedCtx) // No need to check this since we check cs.Err()
69+
suite.Require().NoError(cs.Err())
70+
71+
event := bson.M{}
72+
suite.Require().NoError(cs.Decode(&event))
73+
74+
suite.Assert().Equal(
75+
"update",
76+
event["operationType"],
77+
"only the update should show in the change stream (event: %v)",
78+
event,
79+
)
80+
suite.Assert().NotContains(
81+
event,
82+
"updateDescription",
83+
"update description should be filtered out",
84+
)
85+
86+
/*
87+
suite.Assert().Contains(
88+
89+
,
90+
bson.D{
91+
{"$match", bson.D{{"ns.db", bson.D{{"$ne", metaDBName}}}}},
92+
},
93+
94+
)
95+
verifier.srcChangeStreamReader.namespaces = []string{"foo.bar", "foo.baz", "test.car", "test.chaz"}
96+
suite.Assert().Contains(
97+
98+
verifier.srcChangeStreamReader.GetChangeStreamFilter(),
99+
bson.D{{"$match", bson.D{
100+
{"$or", []bson.D{
101+
{{"ns", bson.D{{"db", "foo"}, {"coll", "bar"}}}},
102+
{{"ns", bson.D{{"db", "foo"}, {"coll", "baz"}}}},
103+
{{"ns", bson.D{{"db", "test"}, {"coll", "car"}}}},
104+
{{"ns", bson.D{{"db", "test"}, {"coll", "chaz"}}}},
105+
}},
106+
}}},
107+
108+
)
109+
*/
110+
}
111+
112+
func (suite *IntegrationTestSuite) TestChangeStreamFilter_WithNamespaces() {
113+
ctx := suite.Context()
114+
115+
verifier := suite.BuildVerifier()
116+
verifier.srcChangeStreamReader.namespaces = []string{
117+
"foo.bar",
118+
"foo.baz",
119+
"test.car",
120+
"test.chaz",
121+
}
122+
123+
filter := verifier.srcChangeStreamReader.GetChangeStreamFilter()
124+
125+
cs, err := suite.srcMongoClient.Watch(ctx, filter)
126+
suite.Require().NoError(err)
127+
128+
defer cs.Close(ctx)
129+
130+
dbsToIgnore := []string{
131+
metaDBName,
132+
"mongosync_reserved_for_internal_use",
133+
"mongosync_internal_foo",
134+
}
135+
for _, dbname := range dbsToIgnore {
136+
_, err := suite.srcMongoClient.
137+
Database(dbname).
138+
Collection("foo").
139+
InsertOne(ctx, bson.D{})
140+
suite.Require().NoError(err)
141+
}
142+
143+
sess, err := suite.srcMongoClient.StartSession()
144+
suite.Require().NoError(err)
145+
sctx := mongo.NewSessionContext(ctx, sess)
146+
147+
for _, ns := range verifier.srcChangeStreamReader.namespaces {
148+
dbAndColl := strings.Split(ns, ".")
149+
150+
_, err := suite.srcMongoClient.
151+
Database(dbAndColl[0]).
152+
Collection(dbAndColl[1]).
153+
InsertOne(sctx, bson.D{})
154+
suite.Require().NoError(err)
155+
}
156+
157+
changeStreamStopTime := sess.OperationTime()
158+
159+
events := []bson.M{}
160+
161+
for {
162+
gotEvent := cs.TryNext(ctx)
163+
suite.Require().NoError(cs.Err())
164+
csOpTime, err := extractTimestampFromResumeToken(cs.ResumeToken())
165+
suite.Require().NoError(err, "should get timestamp from resume token")
166+
167+
if csOpTime.After(*changeStreamStopTime) {
168+
break
169+
}
170+
171+
if gotEvent {
172+
var newEvent bson.M
173+
suite.Require().NoError(cs.Decode(&newEvent))
174+
events = append(events, newEvent)
175+
}
176+
}
177+
178+
suite.Assert().Len(
179+
events,
180+
len(verifier.srcChangeStreamReader.namespaces),
181+
"should have 1 event per in-filter namespace",
182+
)
183+
suite.Assert().True(
184+
lo.EveryBy(
185+
events,
186+
func(evt bson.M) bool {
187+
return evt["operationType"] == "insert"
188+
},
189+
),
190+
"each event should be insert: %v", events,
40191
)
41192
}
42193

@@ -580,3 +731,49 @@ func (suite *IntegrationTestSuite) TestLargeEvents() {
580731
suite.Require().NoError(verifier.WritesOff(ctx))
581732
suite.Require().NoError(verifierRunner.Await())
582733
}
734+
735+
// TestDropMongosyncDB verifies that writes to Mongosync's
736+
// metadata don’t affect migration-verifier.
737+
func (suite *IntegrationTestSuite) TestDropMongosyncDB() {
738+
ctx := suite.Context()
739+
740+
zerolog.SetGlobalLevel(zerolog.TraceLevel)
741+
742+
verifier := suite.BuildVerifier()
743+
744+
dbs := []string{
745+
"mongosync_reserved_for_internal_use",
746+
"mongosync_internal_foo",
747+
}
748+
749+
for _, dbname := range dbs {
750+
suite.Require().NoError(
751+
suite.dstMongoClient.
752+
Database(dbname).
753+
CreateCollection(ctx, "foo"),
754+
)
755+
}
756+
757+
verifier.SetVerifyAll(true)
758+
759+
runner := RunVerifierCheck(ctx, suite.T(), verifier)
760+
suite.Require().NoError(runner.AwaitGenerationEnd())
761+
762+
for _, dbname := range dbs {
763+
_, err := suite.dstMongoClient.
764+
Database(dbname).
765+
Collection("foo").
766+
InsertOne(ctx, bson.D{})
767+
suite.Require().NoError(err)
768+
769+
suite.Require().NoError(
770+
suite.dstMongoClient.
771+
Database(dbname).
772+
Drop(ctx),
773+
)
774+
}
775+
776+
suite.Require().NoError(verifier.WritesOff(ctx))
777+
778+
suite.Require().NoError(runner.Await())
779+
}

0 commit comments

Comments
 (0)