Skip to content

Commit 25d06df

Browse files
committed
Add a calculation of change stream lag to the log.
This provides an easy, at-a-glace indication of the amount of time between the change stream’s resume token and the cluster time.
1 parent 1348a60 commit 25d06df

File tree

1 file changed

+25
-19
lines changed

1 file changed

+25
-19
lines changed

internal/verifier/change_stream.go

Lines changed: 25 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -178,15 +178,15 @@ func (verifier *Verifier) readAndHandleOneChangeEventBatch(
178178
return nil
179179
}
180180

181-
func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.ChangeStream) error {
181+
func (verifier *Verifier) iterateChangeStream(sctx mongo.SessionContext, cs *mongo.ChangeStream) error {
182182
var lastPersistedTime time.Time
183183

184184
persistResumeTokenIfNeeded := func() error {
185185
if time.Since(lastPersistedTime) <= minChangeStreamPersistInterval {
186186
return nil
187187
}
188188

189-
err := verifier.persistChangeStreamResumeToken(ctx, cs)
189+
err := verifier.persistChangeStreamResumeToken(sctx, cs)
190190
if err == nil {
191191
lastPersistedTime = time.Now()
192192
}
@@ -201,8 +201,8 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
201201
select {
202202

203203
// If the context is canceled, return immmediately.
204-
case <-ctx.Done():
205-
return ctx.Err()
204+
case <-sctx.Done():
205+
return sctx.Err()
206206

207207
// If the changeStreamEnderChan has a message, the user has indicated that
208208
// source writes are ended. This means we should exit rather than continue
@@ -234,15 +234,15 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
234234
break
235235
}
236236

237-
err = verifier.readAndHandleOneChangeEventBatch(ctx, cs)
237+
err = verifier.readAndHandleOneChangeEventBatch(sctx, cs)
238238

239239
if err != nil {
240240
return err
241241
}
242242
}
243243

244244
default:
245-
err = verifier.readAndHandleOneChangeEventBatch(ctx, cs)
245+
err = verifier.readAndHandleOneChangeEventBatch(sctx, cs)
246246

247247
if err == nil {
248248
err = persistResumeTokenIfNeeded()
@@ -280,14 +280,14 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
280280
}
281281

282282
func (verifier *Verifier) createChangeStream(
283-
ctx context.Context,
283+
sctx mongo.SessionContext,
284284
) (*mongo.ChangeStream, primitive.Timestamp, error) {
285285
pipeline := verifier.GetChangeStreamFilter()
286286
opts := options.ChangeStream().
287287
SetMaxAwaitTime(1 * time.Second).
288288
SetFullDocument(options.UpdateLookup)
289289

290-
savedResumeToken, err := verifier.loadChangeStreamResumeToken(ctx)
290+
savedResumeToken, err := verifier.loadChangeStreamResumeToken(sctx)
291291
if err != nil {
292292
return nil, primitive.Timestamp{}, errors.Wrap(err, "failed to load persisted change stream resume token")
293293
}
@@ -314,17 +314,12 @@ func (verifier *Verifier) createChangeStream(
314314
csStartLogEvent.Msg("Starting change stream from current source cluster time.")
315315
}
316316

317-
sess, err := verifier.srcClient.StartSession()
318-
if err != nil {
319-
return nil, primitive.Timestamp{}, errors.Wrap(err, "failed to start session")
320-
}
321-
sctx := mongo.NewSessionContext(ctx, sess)
322317
srcChangeStream, err := verifier.srcClient.Watch(sctx, pipeline, opts)
323318
if err != nil {
324319
return nil, primitive.Timestamp{}, errors.Wrap(err, "failed to open change stream")
325320
}
326321

327-
err = verifier.persistChangeStreamResumeToken(ctx, srcChangeStream)
322+
err = verifier.persistChangeStreamResumeToken(sctx, srcChangeStream)
328323
if err != nil {
329324
return nil, primitive.Timestamp{}, err
330325
}
@@ -337,7 +332,7 @@ func (verifier *Verifier) createChangeStream(
337332
// With sharded clusters the resume token might lead the cluster time
338333
// by 1 increment. In that case we need the actual cluster time;
339334
// otherwise we will get errors.
340-
clusterTime, err := getClusterTimeFromSession(sess)
335+
clusterTime, err := getClusterTimeFromSession(sctx)
341336
if err != nil {
342337
return nil, primitive.Timestamp{}, errors.Wrap(err, "failed to read cluster time from session")
343338
}
@@ -365,7 +360,14 @@ func (verifier *Verifier) StartChangeStream(ctx context.Context) error {
365360
ctx,
366361
verifier.logger,
367362
func(i *retry.Info) error {
368-
srcChangeStream, startTs, err := verifier.createChangeStream(ctx)
363+
sess, err := verifier.srcClient.StartSession()
364+
if err != nil {
365+
return errors.Wrap(err, "failed to start change stream session")
366+
}
367+
368+
sctx := mongo.NewSessionContext(ctx, sess)
369+
370+
srcChangeStream, startTs, err := verifier.createChangeStream(sctx)
369371
if err != nil {
370372
return err
371373
}
@@ -378,7 +380,7 @@ func (verifier *Verifier) StartChangeStream(ctx context.Context) error {
378380
parentThreadWaiting = false
379381
}
380382

381-
return verifier.iterateChangeStream(ctx, srcChangeStream)
383+
return verifier.iterateChangeStream(sctx, srcChangeStream)
382384
},
383385
)
384386

@@ -433,12 +435,12 @@ func (verifier *Verifier) loadChangeStreamResumeToken(ctx context.Context) (bson
433435
return token, err
434436
}
435437

436-
func (verifier *Verifier) persistChangeStreamResumeToken(ctx context.Context, cs *mongo.ChangeStream) error {
438+
func (verifier *Verifier) persistChangeStreamResumeToken(sctx mongo.SessionContext, cs *mongo.ChangeStream) error {
437439
token := cs.ResumeToken()
438440

439441
coll := verifier.getChangeStreamMetadataCollection()
440442
_, err := coll.ReplaceOne(
441-
ctx,
443+
sctx,
442444
bson.D{{"_id", "resumeToken"}},
443445
token,
444446
options.Replace().SetUpsert(true),
@@ -451,6 +453,10 @@ func (verifier *Verifier) persistChangeStreamResumeToken(ctx context.Context, cs
451453

452454
if err == nil {
453455
logEvent = addTimestampToLogEvent(ts, logEvent)
456+
457+
optime := sctx.OperationTime()
458+
lag := time.Second * time.Duration(optime.T-ts.T)
459+
logEvent = logEvent.Stringer("lagAmount", lag)
454460
} else {
455461
verifier.logger.Warn().Err(err).
456462
Msg("failed to extract resume token timestamp")

0 commit comments

Comments
 (0)