Skip to content

Commit c4c7a66

Browse files
committed
Update change_stream_test.go
1 parent b201a20 commit c4c7a66

File tree

1 file changed

+9
-15
lines changed

1 file changed

+9
-15
lines changed

internal/verifier/change_stream_test.go

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -199,38 +199,32 @@ func (suite *MultiSourceVersionTestSuite) TestNoStartAtTime() {
199199
suite.Require().LessOrEqual(origStartTs.Compare(*verifier.srcStartAtTs), 0)
200200
}
201201

202-
func (suite *MultiSourceVersionTestSuite) TestWithChangeStreamBatching() {
202+
func (suite *MultiSourceVersionTestSuite) TestWithChangeEventsBatching() {
203203
verifier := buildVerifier(suite.T(), suite.srcMongoInstance, suite.dstMongoInstance, suite.metaMongoInstance)
204204
ctx, cancel := context.WithCancel(context.Background())
205205
defer cancel()
206206

207-
batchSize := int32(2)
207+
batchSize := int32(3)
208208
verifier.StartChangeStream(ctx, &batchSize)
209209

210-
_, err := suite.srcMongoClient.Database("testDb").Collection("testColl").InsertOne(ctx, bson.D{{"_id", 1}})
210+
_, err := suite.srcMongoClient.Database("testDb").Collection("testColl1").InsertOne(ctx, bson.D{{"_id", 1}})
211+
suite.Require().NoError(err)
212+
_, err = suite.srcMongoClient.Database("testDb").Collection("testColl1").InsertOne(ctx, bson.D{{"_id", 2}})
211213
suite.Require().NoError(err)
212214

213-
require.Eventually(
214-
suite.T(),
215-
func() bool {
216-
return len(verifier.changeEventRecheckBuf["testDB.testColl"]) == 1
217-
},
218-
time.Minute,
219-
500*time.Millisecond,
220-
"the verifier should buffer a recheck doc",
221-
)
222-
suite.Require().Empty(suite.fetchVerifierRechecks(ctx, verifier))
223-
224-
_, err = suite.srcMongoClient.Database("testDb").Collection("testColl").InsertOne(ctx, bson.D{{"_id", 2}})
215+
_, err = suite.srcMongoClient.Database("testDb").Collection("testColl2").InsertOne(ctx, bson.D{{"_id", 1}})
225216
suite.Require().NoError(err)
226217

227218
require.Eventually(
228219
suite.T(),
229220
func() bool {
221+
// There should be 2 recheck docs due to batching, one for each namespace.
230222
return len(suite.fetchVerifierRechecks(ctx, verifier)) == 2
231223
},
232224
time.Minute,
233225
500*time.Millisecond,
234226
"the verifier should flush a recheck doc after a batch",
235227
)
228+
suite.Require().Empty(verifier.changeEventRecheckBuf["testDB.testColl1"])
229+
suite.Require().Empty(verifier.changeEventRecheckBuf["testDB.testColl2"])
236230
}

0 commit comments

Comments
 (0)