@@ -17,7 +17,7 @@ func TestChangeStreamFilter(t *testing.T) {
1717 verifier := Verifier {}
1818 verifier .SetMetaDBName ("metadb" )
1919 require .Equal (t , []bson.D {{{"$match" , bson.D {{"ns.db" , bson.D {{"$ne" , "metadb" }}}}}}},
20- verifier .GetChangeStreamFilter (verifier . srcNamespaces ))
20+ verifier .srcChangeStreamReader . GetChangeStreamFilter ())
2121 verifier .srcNamespaces = []string {"foo.bar" , "foo.baz" , "test.car" , "test.chaz" }
2222 require .Equal (t , []bson.D {
2323 {{"$match" , bson.D {
@@ -28,7 +28,7 @@ func TestChangeStreamFilter(t *testing.T) {
2828 bson.D {{"ns" , bson.D {{"db" , "test" }, {"coll" , "chaz" }}}},
2929 }},
3030 }}},
31- }, verifier .GetChangeStreamFilter (verifier . srcNamespaces ))
31+ }, verifier .srcChangeStreamReader . GetChangeStreamFilter ())
3232}
3333
3434// TestChangeStreamResumability creates a verifier, starts its change stream,
@@ -39,7 +39,7 @@ func (suite *IntegrationTestSuite) TestChangeStreamResumability() {
3939 verifier1 := suite .BuildVerifier ()
4040 ctx , cancel := context .WithCancel (context .Background ())
4141 defer cancel ()
42- err := verifier1 .StartChangeStream (ctx )
42+ err := verifier1 .srcChangeStreamReader . StartChangeStream (ctx )
4343 suite .Require ().NoError (err )
4444 }()
4545
@@ -64,13 +64,13 @@ func (suite *IntegrationTestSuite) TestChangeStreamResumability() {
6464
6565 newTime := suite .getClusterTime (ctx , suite .srcMongoClient )
6666
67- err = verifier2 .StartChangeStream (ctx )
67+ err = verifier2 .srcChangeStreamReader . StartChangeStream (ctx )
6868 suite .Require ().NoError (err )
6969
70- suite .Require ().NotNil (verifier2 .srcStartAtTs )
70+ suite .Require ().NotNil (verifier2 .srcChangeStreamReader . startAtTs )
7171
7272 suite .Assert ().False (
73- verifier2 .srcStartAtTs .After (newTime ),
73+ verifier2 .srcChangeStreamReader . startAtTs .After (newTime ),
7474 "verifier2's change stream should be no later than this new session" ,
7575 )
7676
@@ -139,12 +139,12 @@ func (suite *IntegrationTestSuite) TestStartAtTimeNoChanges() {
139139 suite .Require ().NoError (err )
140140 origStartTs := sess .OperationTime ()
141141 suite .Require ().NotNil (origStartTs )
142- err = verifier .StartChangeStream (ctx )
142+ err = verifier .srcChangeStreamReader . StartChangeStream (ctx )
143143 suite .Require ().NoError (err )
144- suite .Require ().Equal (verifier .srcStartAtTs , origStartTs )
145- verifier .changeStreamWritesOffTsChan <- * origStartTs
146- <- verifier .changeStreamDoneChan
147- suite .Require ().Equal (verifier .srcStartAtTs , origStartTs )
144+ suite .Require ().Equal (verifier .srcChangeStreamReader . startAtTs , origStartTs )
145+ verifier .srcChangeStreamReader . ChangeStreamWritesOffTsChan <- * origStartTs
146+ <- verifier .srcChangeStreamReader . ChangeStreamDoneChan
147+ suite .Require ().Equal (verifier .srcChangeStreamReader . startAtTs , origStartTs )
148148}
149149
150150func (suite * IntegrationTestSuite ) TestStartAtTimeWithChanges () {
@@ -160,9 +160,9 @@ func (suite *IntegrationTestSuite) TestStartAtTimeWithChanges() {
160160
161161 origSessionTime := sess .OperationTime ()
162162 suite .Require ().NotNil (origSessionTime )
163- err = verifier .StartChangeStream (ctx )
163+ err = verifier .srcChangeStreamReader . StartChangeStream (ctx )
164164 suite .Require ().NoError (err )
165- suite .Require ().Equal (verifier .srcStartAtTs , origSessionTime )
165+ suite .Require ().Equal (verifier .srcChangeStreamReader . startAtTs , origSessionTime )
166166
167167 _ , err = suite .srcMongoClient .Database ("testDb" ).Collection ("testColl" ).InsertOne (
168168 sctx , bson.D {{"_id" , 1 }})
@@ -184,12 +184,12 @@ func (suite *IntegrationTestSuite) TestStartAtTimeWithChanges() {
184184 "session time after events should exceed the original" ,
185185 )
186186
187- verifier .changeStreamWritesOffTsChan <- * postEventsSessionTime
188- <- verifier .changeStreamDoneChan
187+ verifier .srcChangeStreamReader . ChangeStreamWritesOffTsChan <- * postEventsSessionTime
188+ <- verifier .srcChangeStreamReader . ChangeStreamDoneChan
189189
190190 suite .Assert ().Equal (
191191 * postEventsSessionTime ,
192- * verifier .srcStartAtTs ,
192+ * verifier .srcChangeStreamReader . startAtTs ,
193193 "verifier.srcStartAtTs should now be our session timestamp" ,
194194 )
195195}
@@ -206,10 +206,10 @@ func (suite *IntegrationTestSuite) TestNoStartAtTime() {
206206 suite .Require ().NoError (err )
207207 origStartTs := sess .OperationTime ()
208208 suite .Require ().NotNil (origStartTs )
209- err = verifier .StartChangeStream (ctx )
209+ err = verifier .srcChangeStreamReader . StartChangeStream (ctx )
210210 suite .Require ().NoError (err )
211- suite .Require ().NotNil (verifier .srcStartAtTs )
212- suite .Require ().LessOrEqual (origStartTs .Compare (* verifier .srcStartAtTs ), 0 )
211+ suite .Require ().NotNil (verifier .srcChangeStreamReader . startAtTs )
212+ suite .Require ().LessOrEqual (origStartTs .Compare (* verifier .srcChangeStreamReader . startAtTs ), 0 )
213213}
214214
215215func (suite * IntegrationTestSuite ) TestWithChangeEventsBatching () {
@@ -218,7 +218,7 @@ func (suite *IntegrationTestSuite) TestWithChangeEventsBatching() {
218218 ctx , cancel := context .WithCancel (context .Background ())
219219 defer cancel ()
220220
221- suite .Require ().NoError (verifier .StartChangeStream (ctx ))
221+ suite .Require ().NoError (verifier .srcChangeStreamReader . StartChangeStream (ctx ))
222222
223223 _ , err := suite .srcMongoClient .Database ("testDb" ).Collection ("testColl1" ).InsertOne (ctx , bson.D {{"_id" , 1 }})
224224 suite .Require ().NoError (err )
0 commit comments