99 "github.com/10gen/migration-verifier/internal/util"
1010 "github.com/10gen/migration-verifier/mslices"
1111 "github.com/pkg/errors"
12+ "github.com/rs/zerolog"
1213 "github.com/samber/lo"
1314 "github.com/stretchr/testify/require"
1415 "go.mongodb.org/mongo-driver/bson"
@@ -18,25 +19,178 @@ import (
1819 "go.mongodb.org/mongo-driver/mongo/readconcern"
1920)
2021
21- func (suite * IntegrationTestSuite ) TestChangeStreamFilter () {
22+ func (suite * IntegrationTestSuite ) TestChangeStreamFilter_NoNamespaces () {
23+ ctx := suite .Context ()
24+
2225 verifier := suite .BuildVerifier ()
23- suite .Assert ().Contains (
24- verifier .srcChangeStreamReader .GetChangeStreamFilter (),
25- bson.D {
26- {"$match" , bson.D {{"ns.db" , bson.D {{"$ne" , metaDBName }}}}},
27- },
26+
27+ filter := verifier .srcChangeStreamReader .GetChangeStreamFilter ()
28+
29+ _ , err := suite .srcMongoClient .
30+ Database ("realUserDatabase" ).
31+ Collection ("foo" ).
32+ InsertOne (ctx , bson.D {{"id" , 123 }})
33+ suite .Require ().NoError (err )
34+
35+ cs , err := suite .srcMongoClient .Watch (ctx , filter )
36+ suite .Require ().NoError (err )
37+
38+ defer cs .Close (ctx )
39+
40+ dbsToIgnore := []string {
41+ metaDBName ,
42+ "mongosync_reserved_for_internal_use" ,
43+ "mongosync_internal_foo" ,
44+ }
45+ for _ , dbname := range dbsToIgnore {
46+ _ , err := suite .srcMongoClient .
47+ Database (dbname ).
48+ Collection ("foo" ).
49+ InsertOne (ctx , bson.D {})
50+ suite .Require ().NoError (err )
51+ }
52+
53+ _ , err = suite .srcMongoClient .
54+ Database ("realUserDatabase" ).
55+ Collection ("foo" ).
56+ UpdateOne (
57+ ctx ,
58+ bson.D {{"id" , 123 }},
59+ bson.D {{"$set" , bson.D {{"foo" , 123 }}}})
60+ suite .Require ().NoError (err )
61+
62+ timedCtx , cancel := context .WithTimeoutCause (
63+ ctx ,
64+ time .Minute ,
65+ errors .New ("should have gotten an event" ),
2866 )
29- verifier .srcChangeStreamReader .namespaces = []string {"foo.bar" , "foo.baz" , "test.car" , "test.chaz" }
30- suite .Assert ().Contains (
31- verifier .srcChangeStreamReader .GetChangeStreamFilter (),
32- bson.D {{"$match" , bson.D {
33- {"$or" , []bson.D {
34- {{"ns" , bson.D {{"db" , "foo" }, {"coll" , "bar" }}}},
35- {{"ns" , bson.D {{"db" , "foo" }, {"coll" , "baz" }}}},
36- {{"ns" , bson.D {{"db" , "test" }, {"coll" , "car" }}}},
37- {{"ns" , bson.D {{"db" , "test" }, {"coll" , "chaz" }}}},
38- }},
39- }}},
67+ defer cancel ()
68+ cs .Next (timedCtx ) // No need to check this since we check cs.Err()
69+ suite .Require ().NoError (cs .Err ())
70+
71+ event := bson.M {}
72+ suite .Require ().NoError (cs .Decode (& event ))
73+
74+ suite .Assert ().Equal (
75+ "update" ,
76+ event ["operationType" ],
77+ "only the update should show in the change stream (event: %v)" ,
78+ event ,
79+ )
80+ suite .Assert ().NotContains (
81+ event ,
82+ "updateDescription" ,
83+ "update description should be filtered out" ,
84+ )
85+
86+ /*
87+ suite.Assert().Contains(
88+
89+ ,
90+ bson.D{
91+ {"$match", bson.D{{"ns.db", bson.D{{"$ne", metaDBName}}}}},
92+ },
93+
94+ )
95+ verifier.srcChangeStreamReader.namespaces = []string{"foo.bar", "foo.baz", "test.car", "test.chaz"}
96+ suite.Assert().Contains(
97+
98+ verifier.srcChangeStreamReader.GetChangeStreamFilter(),
99+ bson.D{{"$match", bson.D{
100+ {"$or", []bson.D{
101+ {{"ns", bson.D{{"db", "foo"}, {"coll", "bar"}}}},
102+ {{"ns", bson.D{{"db", "foo"}, {"coll", "baz"}}}},
103+ {{"ns", bson.D{{"db", "test"}, {"coll", "car"}}}},
104+ {{"ns", bson.D{{"db", "test"}, {"coll", "chaz"}}}},
105+ }},
106+ }}},
107+
108+ )
109+ */
110+ }
111+
112+ func (suite * IntegrationTestSuite ) TestChangeStreamFilter_WithNamespaces () {
113+ ctx := suite .Context ()
114+
115+ verifier := suite .BuildVerifier ()
116+ verifier .srcChangeStreamReader .namespaces = []string {
117+ "foo.bar" ,
118+ "foo.baz" ,
119+ "test.car" ,
120+ "test.chaz" ,
121+ }
122+
123+ filter := verifier .srcChangeStreamReader .GetChangeStreamFilter ()
124+
125+ cs , err := suite .srcMongoClient .Watch (ctx , filter )
126+ suite .Require ().NoError (err )
127+
128+ defer cs .Close (ctx )
129+
130+ dbsToIgnore := []string {
131+ metaDBName ,
132+ "mongosync_reserved_for_internal_use" ,
133+ "mongosync_internal_foo" ,
134+ }
135+ for _ , dbname := range dbsToIgnore {
136+ _ , err := suite .srcMongoClient .
137+ Database (dbname ).
138+ Collection ("foo" ).
139+ InsertOne (ctx , bson.D {})
140+ suite .Require ().NoError (err )
141+ }
142+
143+ sess , err := suite .srcMongoClient .StartSession ()
144+ suite .Require ().NoError (err )
145+ sctx := mongo .NewSessionContext (ctx , sess )
146+
147+ for _ , ns := range verifier .srcChangeStreamReader .namespaces {
148+ dbAndColl := strings .Split (ns , "." )
149+
150+ _ , err := suite .srcMongoClient .
151+ Database (dbAndColl [0 ]).
152+ Collection (dbAndColl [1 ]).
153+ InsertOne (sctx , bson.D {})
154+ suite .Require ().NoError (err )
155+ }
156+
157+ changeStreamStopTime := sess .OperationTime ()
158+
159+ suite .T ().Logf ("Insert op time: %v" , changeStreamStopTime )
160+
161+ events := []bson.M {}
162+
163+ for {
164+ gotEvent := cs .TryNext (ctx )
165+ suite .Require ().NoError (cs .Err ())
166+ csOpTime , err := extractTimestampFromResumeToken (cs .ResumeToken ())
167+ suite .Require ().NoError (err , "should get timestamp from resume token" )
168+
169+ if gotEvent {
170+ var newEvent bson.M
171+ suite .Require ().NoError (cs .Decode (& newEvent ))
172+ events = append (events , newEvent )
173+ }
174+
175+ suite .T ().Logf ("Change stream op time (got event? %v): %v" , gotEvent , csOpTime )
176+ if csOpTime .After (* changeStreamStopTime ) {
177+ break
178+ }
179+ }
180+
181+ suite .Assert ().Len (
182+ events ,
183+ len (verifier .srcChangeStreamReader .namespaces ),
184+ "should have 1 event per in-filter namespace" ,
185+ )
186+ suite .Assert ().True (
187+ lo .EveryBy (
188+ events ,
189+ func (evt bson.M ) bool {
190+ return evt ["operationType" ] == "insert"
191+ },
192+ ),
193+ "each event should be insert: %v" , events ,
40194 )
41195}
42196
@@ -580,3 +734,49 @@ func (suite *IntegrationTestSuite) TestLargeEvents() {
580734 suite .Require ().NoError (verifier .WritesOff (ctx ))
581735 suite .Require ().NoError (verifierRunner .Await ())
582736}
737+
738+ // TestDropMongosyncDB verifies that writes to Mongosync's
739+ // metadata don’t affect migration-verifier.
740+ func (suite * IntegrationTestSuite ) TestDropMongosyncDB () {
741+ ctx := suite .Context ()
742+
743+ zerolog .SetGlobalLevel (zerolog .TraceLevel )
744+
745+ verifier := suite .BuildVerifier ()
746+
747+ dbs := []string {
748+ "mongosync_reserved_for_internal_use" ,
749+ "mongosync_internal_foo" ,
750+ }
751+
752+ for _ , dbname := range dbs {
753+ suite .Require ().NoError (
754+ suite .dstMongoClient .
755+ Database (dbname ).
756+ CreateCollection (ctx , "foo" ),
757+ )
758+ }
759+
760+ verifier .SetVerifyAll (true )
761+
762+ runner := RunVerifierCheck (ctx , suite .T (), verifier )
763+ suite .Require ().NoError (runner .AwaitGenerationEnd ())
764+
765+ for _ , dbname := range dbs {
766+ _ , err := suite .dstMongoClient .
767+ Database (dbname ).
768+ Collection ("foo" ).
769+ InsertOne (ctx , bson.D {})
770+ suite .Require ().NoError (err )
771+
772+ suite .Require ().NoError (
773+ suite .dstMongoClient .
774+ Database (dbname ).
775+ Drop (ctx ),
776+ )
777+ }
778+
779+ suite .Require ().NoError (verifier .WritesOff (ctx ))
780+
781+ suite .Require ().NoError (runner .Await ())
782+ }
0 commit comments