Skip to content

Commit e8b4330

Browse files
committed
Merge branch 'main' into REP-5274-index-comparison
2 parents 24a404a + a4086df commit e8b4330

File tree

2 files changed

+17
-4
lines changed

2 files changed

+17
-4
lines changed

internal/verifier/change_stream.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
226226
break
227227
}
228228

229-
if curTs == writesOffTs || curTs.After(writesOffTs) {
229+
if curTs.After(writesOffTs) {
230230
verifier.logger.Debug().
231231
Interface("currentTimestamp", curTs).
232232
Interface("writesOffTimestamp", writesOffTs).

internal/verifier/change_stream_test.go

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,13 @@ func (suite *IntegrationTestSuite) TestStartAtTimeWithChanges() {
162162
suite.Require().NotNil(origSessionTime)
163163
err = verifier.StartChangeStream(ctx)
164164
suite.Require().NoError(err)
165-
suite.Require().Equal(verifier.srcStartAtTs, origSessionTime)
165+
166+
// srcStartAtTs derives from the change stream’s resume token, which can
167+
// postdate our session time but should not precede it.
168+
suite.Require().False(
169+
verifier.srcStartAtTs.Before(*origSessionTime),
170+
"srcStartAtTs should be >= the insert’s optime",
171+
)
166172

167173
_, err = suite.srcMongoClient.Database("testDb").Collection("testColl").InsertOne(
168174
sctx, bson.D{{"_id", 1}})
@@ -241,7 +247,15 @@ func (suite *IntegrationTestSuite) TestWithChangeEventsBatching() {
241247
)
242248
}
243249

244-
func (suite *IntegrationTestSuite) TestEventBeforeWritesOff() {
250+
func (suite *IntegrationTestSuite) TestManyInsertsBeforeWritesOff() {
251+
suite.testInsertsBeforeWritesOff(10_000)
252+
}
253+
254+
func (suite *IntegrationTestSuite) TestOneInsertBeforeWritesOff() {
255+
suite.testInsertsBeforeWritesOff(1)
256+
}
257+
258+
func (suite *IntegrationTestSuite) testInsertsBeforeWritesOff(docsCount int) {
245259
ctx := suite.Context()
246260

247261
verifier := suite.BuildVerifier()
@@ -258,7 +272,6 @@ func (suite *IntegrationTestSuite) TestEventBeforeWritesOff() {
258272
// wait for generation 0 to end
259273
verifierRunner.AwaitGenerationEnd()
260274

261-
docsCount := 10_000
262275
docs := lo.RepeatBy(docsCount, func(_ int) bson.D { return bson.D{} })
263276
_, err := coll.InsertMany(
264277
ctx,

0 commit comments

Comments
 (0)