Skip to content

Commit 8140003

Browse files
committed
Fix change stream not to miss last single event.
1 parent a85f5a6 commit 8140003

File tree

2 files changed

+10
-3
lines changed

2 files changed

+10
-3
lines changed

internal/verifier/change_stream.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
226226
break
227227
}
228228

229-
if curTs == writesOffTs || curTs.After(writesOffTs) {
229+
if curTs.After(writesOffTs) {
230230
verifier.logger.Debug().
231231
Interface("currentTimestamp", curTs).
232232
Interface("writesOffTimestamp", writesOffTs).

internal/verifier/change_stream_test.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,15 @@ func (suite *IntegrationTestSuite) TestWithChangeEventsBatching() {
241241
)
242242
}
243243

244-
func (suite *IntegrationTestSuite) TestEventBeforeWritesOff() {
244+
func (suite *IntegrationTestSuite) TestManyInsertsBeforeWritesOff() {
245+
suite.testInsertsBeforeWritesOff(10_000)
246+
}
247+
248+
func (suite *IntegrationTestSuite) TestOneInsertBeforeWritesOff() {
249+
suite.testInsertsBeforeWritesOff(1)
250+
}
251+
252+
func (suite *IntegrationTestSuite) testInsertsBeforeWritesOff(docsCount int) {
245253
ctx := suite.Context()
246254

247255
verifier := suite.BuildVerifier()
@@ -258,7 +266,6 @@ func (suite *IntegrationTestSuite) TestEventBeforeWritesOff() {
258266
// wait for generation 0 to end
259267
verifierRunner.AwaitGenerationEnd()
260268

261-
docsCount := 10_000
262269
docs := lo.RepeatBy(docsCount, func(_ int) bson.D { return bson.D{} })
263270
_, err := coll.InsertMany(
264271
ctx,

0 commit comments

Comments
 (0)