Skip to content

Commit 41229d1

Browse files
committed
Fix flapping TestStartAtTimeNoChanges.
1 parent be2638e commit 41229d1

File tree

2 files changed

+47
-34
lines changed

2 files changed

+47
-34
lines changed

internal/verifier/change_stream.go

Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ const (
4747
metadataChangeStreamCollectionName = "changeStream"
4848
)
4949

50+
var maxChangeStreamAwaitTime = time.Second
51+
5052
type UnknownEventError struct {
5153
Event *ParsedEvent
5254
}
@@ -293,6 +295,9 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch(
293295
(csr.lastChangeEventTime == nil ||
294296
csr.lastChangeEventTime.Before(*changeEventBatch[eventsRead].ClusterTime)) {
295297
csr.lastChangeEventTime = changeEventBatch[eventsRead].ClusterTime
298+
csr.logger.Trace().
299+
Interface("event", changeEventBatch[eventsRead]).
300+
Msg("Updated lastChangeEventTime.")
296301
}
297302

298303
eventsRead++
@@ -431,7 +436,7 @@ func (csr *ChangeStreamReader) createChangeStream(
431436
) (*mongo.ChangeStream, mongo.Session, primitive.Timestamp, error) {
432437
pipeline := csr.GetChangeStreamFilter()
433438
opts := options.ChangeStream().
434-
SetMaxAwaitTime(1 * time.Second).
439+
SetMaxAwaitTime(maxChangeStreamAwaitTime).
435440
SetFullDocument(options.UpdateLookup)
436441

437442
if csr.clusterInfo.VersionArray[0] >= 6 {
@@ -488,11 +493,17 @@ func (csr *ChangeStreamReader) createChangeStream(
488493
// With sharded clusters the resume token might lead the cluster time
489494
// by 1 increment. In that case we need the actual cluster time;
490495
// otherwise we will get errors.
491-
clusterTime, err := getClusterTimeFromSession(sess)
496+
clusterTime, err := util.GetClusterTimeFromSession(sess)
492497
if err != nil {
493498
return nil, nil, primitive.Timestamp{}, errors.Wrap(err, "failed to read cluster time from session")
494499
}
495500

501+
csr.logger.Debug().
502+
Interface("resumeTokenTimestamp", startTs).
503+
Interface("clusterTime", clusterTime).
504+
Stringer("changeStreamReader", csr).
505+
Msg("Using earlier time as start timestamp.")
506+
496507
if startTs.After(clusterTime) {
497508
startTs = clusterTime
498509
}
@@ -659,19 +670,3 @@ func extractTimestampFromResumeToken(resumeToken bson.Raw) (primitive.Timestamp,
659670

660671
return resumeTokenTime, nil
661672
}
662-
663-
func getClusterTimeFromSession(sess mongo.Session) (primitive.Timestamp, error) {
664-
ctStruct := struct {
665-
ClusterTime struct {
666-
ClusterTime primitive.Timestamp `bson:"clusterTime"`
667-
} `bson:"$clusterTime"`
668-
}{}
669-
670-
clusterTimeRaw := sess.ClusterTime()
671-
err := bson.Unmarshal(sess.ClusterTime(), &ctStruct)
672-
if err != nil {
673-
return primitive.Timestamp{}, errors.Wrapf(err, "failed to find clusterTime in session cluster time document (%v)", clusterTimeRaw)
674-
}
675-
676-
return ctStruct.ClusterTime.ClusterTime, nil
677-
}

internal/verifier/change_stream_test.go

Lines changed: 34 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,7 @@ func (suite *IntegrationTestSuite) getClusterTime(ctx context.Context, client *m
283283
sctx := mongo.NewSessionContext(ctx, sess)
284284
suite.Require().NoError(sess.Client().Ping(sctx, nil))
285285

286-
newTime, err := getClusterTimeFromSession(sess)
286+
newTime, err := util.GetClusterTimeFromSession(sess)
287287
suite.Require().NoError(err, "should fetch cluster time")
288288

289289
return newTime
@@ -306,21 +306,39 @@ func (suite *IntegrationTestSuite) fetchVerifierRechecks(ctx context.Context, ve
306306
func (suite *IntegrationTestSuite) TestStartAtTimeNoChanges() {
307307
zerolog.SetGlobalLevel(zerolog.TraceLevel)
308308

309-
verifier := suite.BuildVerifier()
310-
ctx := suite.Context()
311-
sess, err := suite.srcMongoClient.StartSession()
312-
suite.Require().NoError(err)
313-
sctx := mongo.NewSessionContext(ctx, sess)
314-
_, err = suite.srcMongoClient.Database("testDb").Collection("testColl").InsertOne(
315-
sctx, bson.D{{"_id", 0}})
316-
suite.Require().NoError(err)
317-
origStartTs := sess.OperationTime()
318-
suite.Require().NotNil(origStartTs)
319-
suite.startSrcChangeStreamReaderAndHandler(ctx, verifier)
320-
suite.Require().Equal(verifier.srcChangeStreamReader.startAtTs, origStartTs)
321-
verifier.srcChangeStreamReader.writesOffTs.Set(*origStartTs)
322-
<-verifier.srcChangeStreamReader.doneChan
323-
suite.Require().Equal(verifier.srcChangeStreamReader.startAtTs, origStartTs)
309+
// Each of these takes ~1s, so don’t do too many of them.
310+
for range 5 {
311+
verifier := suite.BuildVerifier()
312+
ctx := suite.Context()
313+
sess, err := suite.srcMongoClient.StartSession()
314+
suite.Require().NoError(err)
315+
sctx := mongo.NewSessionContext(ctx, sess)
316+
_, err = suite.srcMongoClient.Database("testDb").Collection("testColl").InsertOne(
317+
sctx, bson.D{})
318+
suite.Require().NoError(err, "should insert doc")
319+
320+
insertTs, err := util.GetClusterTimeFromSession(sess)
321+
suite.Require().NoError(err, "should get cluster time")
322+
323+
suite.startSrcChangeStreamReaderAndHandler(ctx, verifier)
324+
325+
startAtTs := verifier.srcChangeStreamReader.startAtTs
326+
suite.Require().NotNil(startAtTs)
327+
328+
suite.Require().False(
329+
startAtTs.After(insertTs),
330+
"change stream should start no later than the last operation",
331+
)
332+
333+
verifier.srcChangeStreamReader.writesOffTs.Set(insertTs)
334+
335+
<-verifier.srcChangeStreamReader.doneChan
336+
337+
suite.Require().False(
338+
verifier.srcChangeStreamReader.startAtTs.Before(*startAtTs),
339+
"new startAtTs should be no earlier than last one",
340+
)
341+
}
324342
}
325343

326344
func (suite *IntegrationTestSuite) TestStartAtTimeWithChanges() {

0 commit comments

Comments
 (0)