Skip to content

Commit 3004e1d

Browse files
authored
Merge pull request #79 from FGasper/felipe_fix_lag_calculation
The previous logic (PR #74) was backwards, which caused uint overflow. This adds a test that usually passes without this change but should occasionally fail. (It should always pass, though, with this change.)
2 parents 19ad17a + 07effe5 commit 3004e1d

File tree

2 files changed

+54
-3
lines changed

2 files changed

+54
-3
lines changed

internal/verifier/change_stream.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -319,10 +319,10 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch(
319319
Msg("Updated lastChangeEventTime.")
320320
}
321321

322-
var curTs primitive.Timestamp
323-
curTs, err := extractTimestampFromResumeToken(cs.ResumeToken())
322+
var tokenTs primitive.Timestamp
323+
tokenTs, err := extractTimestampFromResumeToken(cs.ResumeToken())
324324
if err == nil {
325-
lagSecs := curTs.T - sess.OperationTime().T
325+
lagSecs := int64(sess.OperationTime().T) - int64(tokenTs.T)
326326
csr.lag.Store(option.Some(time.Second * time.Duration(lagSecs)))
327327
} else {
328328
csr.logger.Warn().

internal/verifier/change_stream_test.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,57 @@ func (suite *IntegrationTestSuite) fetchVerifierRechecks(ctx context.Context, ve
303303
return recheckDocs
304304
}
305305

306+
func (suite *IntegrationTestSuite) TestChangeStreamLag() {
307+
zerolog.SetGlobalLevel(zerolog.TraceLevel)
308+
309+
ctx := suite.Context()
310+
311+
db := suite.srcMongoClient.
312+
Database(suite.DBNameForTest())
313+
314+
suite.Require().NoError(
315+
db.CreateCollection(ctx, "mycoll"),
316+
)
317+
318+
verifier := suite.BuildVerifier()
319+
320+
verifier.SetSrcNamespaces([]string{db.Name() + ".mycoll"})
321+
verifier.SetDstNamespaces([]string{db.Name() + ".mycoll"})
322+
verifier.SetNamespaceMap()
323+
324+
verifierRunner := RunVerifierCheck(ctx, suite.T(), verifier)
325+
suite.Require().NoError(
326+
verifierRunner.AwaitGenerationEnd(),
327+
)
328+
329+
_, err := db.Collection("mycoll").InsertOne(ctx, bson.D{})
330+
suite.Require().NoError(err)
331+
332+
// On sharded clusters sometimes the event hasn’t shown yet.
333+
suite.Require().Eventually(
334+
func() bool {
335+
suite.Require().NoError(
336+
verifierRunner.StartNextGeneration(),
337+
)
338+
suite.Require().NoError(
339+
verifierRunner.AwaitGenerationEnd(),
340+
)
341+
342+
return verifier.srcChangeStreamReader.GetLag().IsSome()
343+
},
344+
time.Minute,
345+
100*time.Millisecond,
346+
)
347+
348+
// NB: The lag will include whatever time elapsed above before
349+
// verifier read the event, so it can be several seconds.
350+
suite.Assert().Less(
351+
verifier.srcChangeStreamReader.GetLag().MustGet(),
352+
10*time.Minute,
353+
"verifier lag is as expected",
354+
)
355+
}
356+
306357
func (suite *IntegrationTestSuite) TestStartAtTimeNoChanges() {
307358
zerolog.SetGlobalLevel(zerolog.TraceLevel)
308359

0 commit comments

Comments
 (0)