88 "github.com/10gen/migration-verifier/internal/keystring"
99 "github.com/10gen/migration-verifier/internal/retry"
1010 "github.com/10gen/migration-verifier/internal/util"
11+ "github.com/10gen/migration-verifier/option"
1112 "github.com/pkg/errors"
1213 "github.com/rs/zerolog"
1314 "github.com/samber/mo"
@@ -138,14 +139,14 @@ func (verifier *Verifier) GetChangeStreamFilter() []bson.D {
138139// is unideal but shouldn’t impede correctness since post-writesOff events
139140// shouldn’t really happen anyway by definition.
140141func (verifier * Verifier ) readAndHandleOneChangeEventBatch (
141- ctx context. Context ,
142+ sctx mongo. SessionContext ,
142143 cs * mongo.ChangeStream ,
143144) error {
144145 eventsRead := 0
145146 var changeEventBatch []ParsedEvent
146147
147148 for hasEventInBatch := true ; hasEventInBatch ; hasEventInBatch = cs .RemainingBatchLength () > 0 {
148- gotEvent := cs .TryNext (ctx )
149+ gotEvent := cs .TryNext (sctx )
149150
150151 if cs .Err () != nil {
151152 return errors .Wrap (cs .Err (), "change stream iteration failed" )
@@ -170,7 +171,15 @@ func (verifier *Verifier) readAndHandleOneChangeEventBatch(
170171 return nil
171172 }
172173
173- err := verifier .HandleChangeStreamEvents (ctx , changeEventBatch )
174+ // Update the change stream lag, which we’ll display in the logs.
175+ lag , err := computeChangeStreamLag (sctx , cs )
176+ if err == nil {
177+ verifier .changeStreamLag .Store (option .Some (lag ))
178+ } else {
179+ verifier .logger .Debug ().Err (err ).Msg ("Failed to compute change stream lag." )
180+ }
181+
182+ err = verifier .HandleChangeStreamEvents (sctx , changeEventBatch )
174183 if err != nil {
175184 return errors .Wrap (err , "failed to handle change events" )
176185 }
@@ -435,6 +444,20 @@ func (verifier *Verifier) loadChangeStreamResumeToken(ctx context.Context) (bson
435444 return token , err
436445}
437446
447+ func computeChangeStreamLag (
448+ sctx mongo.SessionContext ,
449+ cs * mongo.ChangeStream ,
450+ ) (time.Duration , error ) {
451+ token := cs .ResumeToken ()
452+ ts , err := extractTimestampFromResumeToken (token )
453+ if err != nil {
454+ return 0 , errors .Wrap (err , "failed to extract timestamp from change stream resume token" )
455+ }
456+
457+ optime := sctx .OperationTime ()
458+ return time .Second * time .Duration (optime .T - ts .T ), nil
459+ }
460+
438461func (verifier * Verifier ) persistChangeStreamResumeToken (sctx mongo.SessionContext , cs * mongo.ChangeStream ) error {
439462 token := cs .ResumeToken ()
440463
@@ -453,10 +476,6 @@ func (verifier *Verifier) persistChangeStreamResumeToken(sctx mongo.SessionConte
453476
454477 if err == nil {
455478 logEvent = addTimestampToLogEvent (ts , logEvent )
456-
457- optime := sctx .OperationTime ()
458- lag := time .Second * time .Duration (optime .T - ts .T )
459- logEvent = logEvent .Stringer ("lagAmount" , lag )
460479 } else {
461480 verifier .logger .Warn ().Err (err ).
462481 Msg ("failed to extract resume token timestamp" )
0 commit comments