Skip to content

Commit fcb677f

Browse files
committed
REP-5317 Ignore mongosync dbs in change stream.
This fixes an oversight from PR #53. It also “upgrades” the test for the change stream filter to focus on behavior rather than implementation.
1 parent 0d41595 commit fcb677f

File tree

2 files changed

+221
-18
lines changed

2 files changed

+221
-18
lines changed

internal/verifier/change_stream.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,10 @@ func (csr *ChangeStreamReader) GetChangeStreamFilter() (pipeline mongo.Pipeline)
213213
if len(csr.namespaces) == 0 {
214214
pipeline = mongo.Pipeline{
215215
{{"$match", bson.D{
216-
{"ns.db", bson.D{{"$ne", csr.metaDB.Name()}}},
216+
{"ns.db", bson.D{{"$nin", bson.A{
217+
primitive.Regex{Pattern: MongosyncMetaDBsPattern},
218+
csr.metaDB.Name(),
219+
}}}},
217220
}}},
218221
}
219222
} else {

internal/verifier/change_stream_test.go

Lines changed: 217 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"github.com/10gen/migration-verifier/internal/util"
1010
"github.com/10gen/migration-verifier/mslices"
1111
"github.com/pkg/errors"
12+
"github.com/rs/zerolog"
1213
"github.com/samber/lo"
1314
"github.com/stretchr/testify/require"
1415
"go.mongodb.org/mongo-driver/bson"
@@ -18,25 +19,178 @@ import (
1819
"go.mongodb.org/mongo-driver/mongo/readconcern"
1920
)
2021

21-
func (suite *IntegrationTestSuite) TestChangeStreamFilter() {
22+
func (suite *IntegrationTestSuite) TestChangeStreamFilter_NoNamespaces() {
23+
ctx := suite.Context()
24+
2225
verifier := suite.BuildVerifier()
23-
suite.Assert().Contains(
24-
verifier.srcChangeStreamReader.GetChangeStreamFilter(),
25-
bson.D{
26-
{"$match", bson.D{{"ns.db", bson.D{{"$ne", metaDBName}}}}},
27-
},
26+
27+
filter := verifier.srcChangeStreamReader.GetChangeStreamFilter()
28+
29+
_, err := suite.srcMongoClient.
30+
Database("realUserDatabase").
31+
Collection("foo").
32+
InsertOne(ctx, bson.D{{"id", 123}})
33+
suite.Require().NoError(err)
34+
35+
cs, err := suite.srcMongoClient.Watch(ctx, filter)
36+
suite.Require().NoError(err)
37+
38+
defer cs.Close(ctx)
39+
40+
dbsToIgnore := []string{
41+
metaDBName,
42+
"mongosync_reserved_for_internal_use",
43+
"mongosync_internal_foo",
44+
}
45+
for _, dbname := range dbsToIgnore {
46+
_, err := suite.srcMongoClient.
47+
Database(dbname).
48+
Collection("foo").
49+
InsertOne(ctx, bson.D{})
50+
suite.Require().NoError(err)
51+
}
52+
53+
_, err = suite.srcMongoClient.
54+
Database("realUserDatabase").
55+
Collection("foo").
56+
UpdateOne(
57+
ctx,
58+
bson.D{{"id", 123}},
59+
bson.D{{"$set", bson.D{{"foo", 123}}}})
60+
suite.Require().NoError(err)
61+
62+
timedCtx, cancel := context.WithTimeoutCause(
63+
ctx,
64+
time.Minute,
65+
errors.New("should have gotten an event"),
2866
)
29-
verifier.srcChangeStreamReader.namespaces = []string{"foo.bar", "foo.baz", "test.car", "test.chaz"}
30-
suite.Assert().Contains(
31-
verifier.srcChangeStreamReader.GetChangeStreamFilter(),
32-
bson.D{{"$match", bson.D{
33-
{"$or", []bson.D{
34-
{{"ns", bson.D{{"db", "foo"}, {"coll", "bar"}}}},
35-
{{"ns", bson.D{{"db", "foo"}, {"coll", "baz"}}}},
36-
{{"ns", bson.D{{"db", "test"}, {"coll", "car"}}}},
37-
{{"ns", bson.D{{"db", "test"}, {"coll", "chaz"}}}},
38-
}},
39-
}}},
67+
defer cancel()
68+
cs.Next(timedCtx) // No need to check this since we check cs.Err()
69+
suite.Require().NoError(cs.Err())
70+
71+
event := bson.M{}
72+
suite.Require().NoError(cs.Decode(&event))
73+
74+
suite.Assert().Equal(
75+
"update",
76+
event["operationType"],
77+
"only the update should show in the change stream (event: %v)",
78+
event,
79+
)
80+
suite.Assert().NotContains(
81+
event,
82+
"updateDescription",
83+
"update description should be filtered out",
84+
)
85+
86+
/*
87+
suite.Assert().Contains(
88+
89+
,
90+
bson.D{
91+
{"$match", bson.D{{"ns.db", bson.D{{"$ne", metaDBName}}}}},
92+
},
93+
94+
)
95+
verifier.srcChangeStreamReader.namespaces = []string{"foo.bar", "foo.baz", "test.car", "test.chaz"}
96+
suite.Assert().Contains(
97+
98+
verifier.srcChangeStreamReader.GetChangeStreamFilter(),
99+
bson.D{{"$match", bson.D{
100+
{"$or", []bson.D{
101+
{{"ns", bson.D{{"db", "foo"}, {"coll", "bar"}}}},
102+
{{"ns", bson.D{{"db", "foo"}, {"coll", "baz"}}}},
103+
{{"ns", bson.D{{"db", "test"}, {"coll", "car"}}}},
104+
{{"ns", bson.D{{"db", "test"}, {"coll", "chaz"}}}},
105+
}},
106+
}}},
107+
108+
)
109+
*/
110+
}
111+
112+
func (suite *IntegrationTestSuite) TestChangeStreamFilter_WithNamespaces() {
113+
ctx := suite.Context()
114+
115+
verifier := suite.BuildVerifier()
116+
verifier.srcChangeStreamReader.namespaces = []string{
117+
"foo.bar",
118+
"foo.baz",
119+
"test.car",
120+
"test.chaz",
121+
}
122+
123+
filter := verifier.srcChangeStreamReader.GetChangeStreamFilter()
124+
125+
cs, err := suite.srcMongoClient.Watch(ctx, filter)
126+
suite.Require().NoError(err)
127+
128+
defer cs.Close(ctx)
129+
130+
dbsToIgnore := []string{
131+
metaDBName,
132+
"mongosync_reserved_for_internal_use",
133+
"mongosync_internal_foo",
134+
}
135+
for _, dbname := range dbsToIgnore {
136+
_, err := suite.srcMongoClient.
137+
Database(dbname).
138+
Collection("foo").
139+
InsertOne(ctx, bson.D{})
140+
suite.Require().NoError(err)
141+
}
142+
143+
sess, err := suite.srcMongoClient.StartSession()
144+
suite.Require().NoError(err)
145+
sctx := mongo.NewSessionContext(ctx, sess)
146+
147+
for _, ns := range verifier.srcChangeStreamReader.namespaces {
148+
dbAndColl := strings.Split(ns, ".")
149+
150+
_, err := suite.srcMongoClient.
151+
Database(dbAndColl[0]).
152+
Collection(dbAndColl[1]).
153+
InsertOne(sctx, bson.D{})
154+
suite.Require().NoError(err)
155+
}
156+
157+
changeStreamStopTime := sess.OperationTime()
158+
159+
suite.T().Logf("Insert op time: %v", changeStreamStopTime)
160+
161+
events := []bson.M{}
162+
163+
for {
164+
gotEvent := cs.TryNext(ctx)
165+
suite.Require().NoError(cs.Err())
166+
csOpTime, err := extractTimestampFromResumeToken(cs.ResumeToken())
167+
suite.Require().NoError(err, "should get timestamp from resume token")
168+
169+
if gotEvent {
170+
var newEvent bson.M
171+
suite.Require().NoError(cs.Decode(&newEvent))
172+
events = append(events, newEvent)
173+
}
174+
175+
suite.T().Logf("Change stream op time (got event? %v): %v", gotEvent, csOpTime)
176+
if csOpTime.After(*changeStreamStopTime) {
177+
break
178+
}
179+
}
180+
181+
suite.Assert().Len(
182+
events,
183+
len(verifier.srcChangeStreamReader.namespaces),
184+
"should have 1 event per in-filter namespace",
185+
)
186+
suite.Assert().True(
187+
lo.EveryBy(
188+
events,
189+
func(evt bson.M) bool {
190+
return evt["operationType"] == "insert"
191+
},
192+
),
193+
"each event should be insert: %v", events,
40194
)
41195
}
42196

@@ -582,3 +736,49 @@ func (suite *IntegrationTestSuite) TestLargeEvents() {
582736
suite.Require().NoError(verifier.WritesOff(ctx))
583737
suite.Require().NoError(verifierRunner.Await())
584738
}
739+
740+
// TestDropMongosyncDB verifies that writes to Mongosync's
741+
// metadata don’t affect migration-verifier.
742+
func (suite *IntegrationTestSuite) TestDropMongosyncDB() {
743+
ctx := suite.Context()
744+
745+
zerolog.SetGlobalLevel(zerolog.TraceLevel)
746+
747+
verifier := suite.BuildVerifier()
748+
749+
dbs := []string{
750+
"mongosync_reserved_for_internal_use",
751+
"mongosync_internal_foo",
752+
}
753+
754+
for _, dbname := range dbs {
755+
suite.Require().NoError(
756+
suite.dstMongoClient.
757+
Database(dbname).
758+
CreateCollection(ctx, "foo"),
759+
)
760+
}
761+
762+
verifier.SetVerifyAll(true)
763+
764+
runner := RunVerifierCheck(ctx, suite.T(), verifier)
765+
suite.Require().NoError(runner.AwaitGenerationEnd())
766+
767+
for _, dbname := range dbs {
768+
_, err := suite.dstMongoClient.
769+
Database(dbname).
770+
Collection("foo").
771+
InsertOne(ctx, bson.D{})
772+
suite.Require().NoError(err)
773+
774+
suite.Require().NoError(
775+
suite.dstMongoClient.
776+
Database(dbname).
777+
Drop(ctx),
778+
)
779+
}
780+
781+
suite.Require().NoError(verifier.WritesOff(ctx))
782+
783+
suite.Require().NoError(runner.Await())
784+
}

0 commit comments

Comments
 (0)