99 "github.com/10gen/migration-verifier/internal/logger"
1010 "github.com/10gen/migration-verifier/internal/retry"
1111 "github.com/10gen/migration-verifier/internal/util"
12+ "github.com/10gen/migration-verifier/msync"
13+ "github.com/10gen/migration-verifier/option"
1214 "github.com/pkg/errors"
1315 "github.com/rs/zerolog"
1416 "github.com/samber/mo"
@@ -71,6 +73,8 @@ type ChangeStreamReader struct {
7173 doneChan chan struct {}
7274
7375 startAtTs * primitive.Timestamp
76+
77+ lag * msync.TypedAtomic [option.Option [time.Duration ]]
7478}
7579
7680func (verifier * Verifier ) initializeChangeStreamReaders () {
@@ -86,6 +90,7 @@ func (verifier *Verifier) initializeChangeStreamReaders() {
8690 writesOffTsChan : make (chan primitive.Timestamp ),
8791 errChan : make (chan error ),
8892 doneChan : make (chan struct {}),
93+ lag : msync .NewTypedAtomic (option .None [time.Duration ]()),
8994 }
9095 verifier .dstChangeStreamReader = & ChangeStreamReader {
9196 readerType : dst ,
@@ -99,6 +104,7 @@ func (verifier *Verifier) initializeChangeStreamReaders() {
99104 writesOffTsChan : make (chan primitive.Timestamp ),
100105 errChan : make (chan error ),
101106 doneChan : make (chan struct {}),
107+ lag : msync .NewTypedAtomic (option .None [time.Duration ]()),
102108 }
103109}
104110
@@ -257,6 +263,7 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch(
257263 ctx context.Context ,
258264 ri * retry.FuncInfo ,
259265 cs * mongo.ChangeStream ,
266+ sess mongo.Session ,
260267) error {
261268 eventsRead := 0
262269 var changeEventBatch []ParsedEvent
@@ -298,6 +305,17 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch(
298305 return nil
299306 }
300307
308+ var curTs primitive.Timestamp
309+ curTs , err := extractTimestampFromResumeToken (cs .ResumeToken ())
310+ if err == nil {
311+ lagSecs := curTs .T - sess .OperationTime ().T
312+ csr .lag .Store (option .Some (time .Second * time .Duration (lagSecs )))
313+ } else {
314+ csr .logger .Warn ().
315+ Err (err ).
316+ Msgf ("Failed to extract timestamp from %s's resume token to compute change stream lag." , csr )
317+ }
318+
301319 csr .changeEventBatchChan <- changeEventBatch
302320 return nil
303321}
@@ -306,6 +324,7 @@ func (csr *ChangeStreamReader) iterateChangeStream(
306324 ctx context.Context ,
307325 ri * retry.FuncInfo ,
308326 cs * mongo.ChangeStream ,
327+ sess mongo.Session ,
309328) error {
310329 var lastPersistedTime time.Time
311330
@@ -363,15 +382,15 @@ func (csr *ChangeStreamReader) iterateChangeStream(
363382 break
364383 }
365384
366- err = csr .readAndHandleOneChangeEventBatch (ctx , ri , cs )
385+ err = csr .readAndHandleOneChangeEventBatch (ctx , ri , cs , sess )
367386
368387 if err != nil {
369388 return err
370389 }
371390 }
372391
373392 default :
374- err = csr .readAndHandleOneChangeEventBatch (ctx , ri , cs )
393+ err = csr .readAndHandleOneChangeEventBatch (ctx , ri , cs , sess )
375394
376395 if err == nil {
377396 err = persistResumeTokenIfNeeded ()
@@ -408,7 +427,7 @@ func (csr *ChangeStreamReader) iterateChangeStream(
408427
409428func (csr * ChangeStreamReader ) createChangeStream (
410429 ctx context.Context ,
411- ) (* mongo.ChangeStream , primitive.Timestamp , error ) {
430+ ) (* mongo.ChangeStream , mongo. Session , primitive.Timestamp , error ) {
412431 pipeline := csr .GetChangeStreamFilter ()
413432 opts := options .ChangeStream ().
414433 SetMaxAwaitTime (1 * time .Second ).
@@ -420,7 +439,7 @@ func (csr *ChangeStreamReader) createChangeStream(
420439
421440 savedResumeToken , err := csr .loadChangeStreamResumeToken (ctx )
422441 if err != nil {
423- return nil , primitive.Timestamp {}, errors .Wrap (err , "failed to load persisted change stream resume token" )
442+ return nil , nil , primitive.Timestamp {}, errors .Wrap (err , "failed to load persisted change stream resume token" )
424443 }
425444
426445 csStartLogEvent := csr .logger .Info ()
@@ -447,37 +466,37 @@ func (csr *ChangeStreamReader) createChangeStream(
447466
448467 sess , err := csr .watcherClient .StartSession ()
449468 if err != nil {
450- return nil , primitive.Timestamp {}, errors .Wrap (err , "failed to start session" )
469+ return nil , nil , primitive.Timestamp {}, errors .Wrap (err , "failed to start session" )
451470 }
452471 sctx := mongo .NewSessionContext (ctx , sess )
453472 changeStream , err := csr .watcherClient .Watch (sctx , pipeline , opts )
454473 if err != nil {
455- return nil , primitive.Timestamp {}, errors .Wrap (err , "failed to open change stream" )
474+ return nil , nil , primitive.Timestamp {}, errors .Wrap (err , "failed to open change stream" )
456475 }
457476
458477 err = csr .persistChangeStreamResumeToken (ctx , changeStream )
459478 if err != nil {
460- return nil , primitive.Timestamp {}, err
479+ return nil , nil , primitive.Timestamp {}, err
461480 }
462481
463482 startTs , err := extractTimestampFromResumeToken (changeStream .ResumeToken ())
464483 if err != nil {
465- return nil , primitive.Timestamp {}, errors .Wrap (err , "failed to extract timestamp from change stream's resume token" )
484+ return nil , nil , primitive.Timestamp {}, errors .Wrap (err , "failed to extract timestamp from change stream's resume token" )
466485 }
467486
468487 // With sharded clusters the resume token might lead the cluster time
469488 // by 1 increment. In that case we need the actual cluster time;
470489 // otherwise we will get errors.
471490 clusterTime , err := getClusterTimeFromSession (sess )
472491 if err != nil {
473- return nil , primitive.Timestamp {}, errors .Wrap (err , "failed to read cluster time from session" )
492+ return nil , nil , primitive.Timestamp {}, errors .Wrap (err , "failed to read cluster time from session" )
474493 }
475494
476495 if startTs .After (clusterTime ) {
477496 startTs = clusterTime
478497 }
479498
480- return changeStream , startTs , nil
499+ return changeStream , sess , startTs , nil
481500}
482501
483502// StartChangeStream starts the change stream.
@@ -499,7 +518,7 @@ func (csr *ChangeStreamReader) StartChangeStream(ctx context.Context) error {
499518
500519 err := retryer .WithCallback (
501520 func (ctx context.Context , ri * retry.FuncInfo ) error {
502- changeStream , startTs , err := csr .createChangeStream (ctx )
521+ changeStream , sess , startTs , err := csr .createChangeStream (ctx )
503522 if err != nil {
504523 if parentThreadWaiting {
505524 initialCreateResultChan <- mo.Err [primitive.Timestamp ](err )
@@ -517,7 +536,7 @@ func (csr *ChangeStreamReader) StartChangeStream(ctx context.Context) error {
517536 parentThreadWaiting = false
518537 }
519538
520- return csr .iterateChangeStream (ctx , ri , changeStream )
539+ return csr .iterateChangeStream (ctx , ri , changeStream , sess )
521540 },
522541 "running %s" , csr ,
523542 ).Run (ctx , csr .logger )
@@ -544,6 +563,10 @@ func (csr *ChangeStreamReader) StartChangeStream(ctx context.Context) error {
544563 return nil
545564}
546565
566+ func (csr * ChangeStreamReader ) GetLag () option.Option [time.Duration ] {
567+ return csr .lag .Load ()
568+ }
569+
547570func addTimestampToLogEvent (ts primitive.Timestamp , event * zerolog.Event ) * zerolog.Event {
548571 return event .
549572 Interface ("timestamp" , ts ).
0 commit comments