@@ -33,9 +33,9 @@ func TestChangeStreamFilter(t *testing.T) {
3333// TestChangeStreamResumability creates a verifier, starts its change stream,
3434// terminates that verifier, updates the source cluster, starts a new
3535// verifier with change stream, and confirms that things look as they should.
36- func (suite * MultiSourceVersionTestSuite ) TestChangeStreamResumability () {
36+ func (suite * IntegrationTestSuite ) TestChangeStreamResumability () {
3737 func () {
38- verifier1 := buildVerifier ( suite .T (), suite . srcMongoInstance , suite . dstMongoInstance , suite . metaMongoInstance )
38+ verifier1 := suite .BuildVerifier ( )
3939 ctx , cancel := context .WithCancel (context .Background ())
4040 defer cancel ()
4141 err := verifier1 .StartChangeStream (ctx )
@@ -46,15 +46,15 @@ func (suite *MultiSourceVersionTestSuite) TestChangeStreamResumability() {
4646 defer cancel ()
4747
4848 _ , err := suite .srcMongoClient .
49- Database ("testDb" ).
49+ Database (suite . DBNameForTest () ).
5050 Collection ("testColl" ).
5151 InsertOne (
5252 ctx ,
5353 bson.D {{"_id" , "heyhey" }},
5454 )
5555 suite .Require ().NoError (err )
5656
57- verifier2 := buildVerifier ( suite .T (), suite . srcMongoInstance , suite . dstMongoInstance , suite . metaMongoInstance )
57+ verifier2 := suite .BuildVerifier ( )
5858
5959 suite .Require ().Empty (
6060 suite .fetchVerifierRechecks (ctx , verifier2 ),
@@ -89,7 +89,7 @@ func (suite *MultiSourceVersionTestSuite) TestChangeStreamResumability() {
8989
9090 suite .Assert ().Equal (
9191 bson.M {
92- "db" : "testDb" ,
92+ "db" : suite . DBNameForTest () ,
9393 "coll" : "testColl" ,
9494 "generation" : int32 (0 ),
9595 "docID" : "heyhey" ,
@@ -99,7 +99,7 @@ func (suite *MultiSourceVersionTestSuite) TestChangeStreamResumability() {
9999 )
100100}
101101
102- func (suite * MultiSourceVersionTestSuite ) getClusterTime (ctx context.Context , client * mongo.Client ) primitive.Timestamp {
102+ func (suite * IntegrationTestSuite ) getClusterTime (ctx context.Context , client * mongo.Client ) primitive.Timestamp {
103103 sess , err := client .StartSession ()
104104 suite .Require ().NoError (err , "should start session" )
105105
@@ -112,7 +112,7 @@ func (suite *MultiSourceVersionTestSuite) getClusterTime(ctx context.Context, cl
112112 return newTime
113113}
114114
115- func (suite * MultiSourceVersionTestSuite ) fetchVerifierRechecks (ctx context.Context , verifier * Verifier ) []bson.M {
115+ func (suite * IntegrationTestSuite ) fetchVerifierRechecks (ctx context.Context , verifier * Verifier ) []bson.M {
116116 recheckDocs := []bson.M {}
117117
118118 recheckColl := verifier .verificationDatabase ().Collection (recheckQueue )
@@ -126,8 +126,8 @@ func (suite *MultiSourceVersionTestSuite) fetchVerifierRechecks(ctx context.Cont
126126 return recheckDocs
127127}
128128
129- func (suite * MultiSourceVersionTestSuite ) TestStartAtTimeNoChanges () {
130- verifier := buildVerifier ( suite .T (), suite . srcMongoInstance , suite . dstMongoInstance , suite . metaMongoInstance )
129+ func (suite * IntegrationTestSuite ) TestStartAtTimeNoChanges () {
130+ verifier := suite .BuildVerifier ( )
131131 ctx , cancel := context .WithCancel (context .Background ())
132132 defer cancel ()
133133 sess , err := suite .srcMongoClient .StartSession ()
@@ -146,8 +146,12 @@ func (suite *MultiSourceVersionTestSuite) TestStartAtTimeNoChanges() {
146146 suite .Require ().Equal (verifier .srcStartAtTs , origStartTs )
147147}
148148
149- func (suite * MultiSourceVersionTestSuite ) TestStartAtTimeWithChanges () {
150- verifier := buildVerifier (suite .T (), suite .srcMongoInstance , suite .dstMongoInstance , suite .metaMongoInstance )
149+ func (suite * IntegrationTestSuite ) TestStartAtTimeWithChanges () {
150+ if suite .GetSrcTopology () == TopologySharded {
151+ suite .T ().Skip ("Skipping pending REP-5299." )
152+ }
153+
154+ verifier := suite .BuildVerifier ()
151155 ctx , cancel := context .WithCancel (context .Background ())
152156 defer cancel ()
153157 sess , err := suite .srcMongoClient .StartSession ()
@@ -156,11 +160,13 @@ func (suite *MultiSourceVersionTestSuite) TestStartAtTimeWithChanges() {
156160 _ , err = suite .srcMongoClient .Database ("testDb" ).Collection ("testColl" ).InsertOne (
157161 sctx , bson.D {{"_id" , 0 }})
158162 suite .Require ().NoError (err )
159- origStartTs := sess .OperationTime ()
160- suite .Require ().NotNil (origStartTs )
163+
164+ origSessionTime := sess .OperationTime ()
165+ suite .Require ().NotNil (origSessionTime )
161166 err = verifier .StartChangeStream (ctx )
162167 suite .Require ().NoError (err )
163- suite .Require ().Equal (verifier .srcStartAtTs , origStartTs )
168+ suite .Require ().Equal (verifier .srcStartAtTs , origSessionTime )
169+
164170 _ , err = suite .srcMongoClient .Database ("testDb" ).Collection ("testColl" ).InsertOne (
165171 sctx , bson.D {{"_id" , 1 }})
166172 suite .Require ().NoError (err )
@@ -173,16 +179,26 @@ func (suite *MultiSourceVersionTestSuite) TestStartAtTimeWithChanges() {
173179 _ , err = suite .srcMongoClient .Database ("testDb" ).Collection ("testColl" ).DeleteOne (
174180 sctx , bson.D {{"_id" , 1 }})
175181 suite .Require ().NoError (err )
176- newStartTs := sess .OperationTime ()
177- suite .Require ().NotNil (newStartTs )
178- suite .Require ().Negative (origStartTs .Compare (* newStartTs ))
182+
183+ postEventsSessionTime := sess .OperationTime ()
184+ suite .Require ().NotNil (postEventsSessionTime )
185+ suite .Require ().Negative (
186+ origSessionTime .Compare (* postEventsSessionTime ),
187+ "session time after events should exceed the original" ,
188+ )
189+
179190 verifier .changeStreamEnderChan <- struct {}{}
180191 <- verifier .changeStreamDoneChan
181- suite .Require ().Equal (verifier .srcStartAtTs , newStartTs )
192+
193+ suite .Assert ().GreaterOrEqual (
194+ verifier .srcStartAtTs .Compare (* postEventsSessionTime ),
195+ 0 ,
196+ "verifier.srcStartAtTs should now meet or exceed our session timestamp" ,
197+ )
182198}
183199
184- func (suite * MultiSourceVersionTestSuite ) TestNoStartAtTime () {
185- verifier := buildVerifier ( suite .T (), suite . srcMongoInstance , suite . dstMongoInstance , suite . metaMongoInstance )
200+ func (suite * IntegrationTestSuite ) TestNoStartAtTime () {
201+ verifier := suite .BuildVerifier ( )
186202 ctx , cancel := context .WithCancel (context .Background ())
187203 defer cancel ()
188204 sess , err := suite .srcMongoClient .StartSession ()
@@ -199,8 +215,9 @@ func (suite *MultiSourceVersionTestSuite) TestNoStartAtTime() {
199215 suite .Require ().LessOrEqual (origStartTs .Compare (* verifier .srcStartAtTs ), 0 )
200216}
201217
202- func (suite * MultiSourceVersionTestSuite ) TestWithChangeEventsBatching () {
203- verifier := buildVerifier (suite .T (), suite .srcMongoInstance , suite .dstMongoInstance , suite .metaMongoInstance )
218+ func (suite * IntegrationTestSuite ) TestWithChangeEventsBatching () {
219+ verifier := suite .BuildVerifier ()
220+
204221 ctx , cancel := context .WithCancel (context .Background ())
205222 defer cancel ()
206223
0 commit comments