Skip to content

Commit e424812

Browse files
committed
Merge branch 'felipe_log_lag_redux' into REP-5358-add-retry-description
2 parents ac170b4 + 90f7502 commit e424812

File tree

2 files changed

+80
-41
lines changed

2 files changed

+80
-41
lines changed

internal/verifier/change_stream.go

Lines changed: 35 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ import (
99
"github.com/10gen/migration-verifier/internal/logger"
1010
"github.com/10gen/migration-verifier/internal/retry"
1111
"github.com/10gen/migration-verifier/internal/util"
12+
"github.com/10gen/migration-verifier/msync"
13+
"github.com/10gen/migration-verifier/option"
1214
"github.com/pkg/errors"
1315
"github.com/rs/zerolog"
1416
"github.com/samber/mo"
@@ -71,6 +73,8 @@ type ChangeStreamReader struct {
7173
doneChan chan struct{}
7274

7375
startAtTs *primitive.Timestamp
76+
77+
lag *msync.TypedAtomic[option.Option[time.Duration]]
7478
}
7579

7680
func (verifier *Verifier) initializeChangeStreamReaders() {
@@ -86,6 +90,7 @@ func (verifier *Verifier) initializeChangeStreamReaders() {
8690
writesOffTsChan: make(chan primitive.Timestamp),
8791
errChan: make(chan error),
8892
doneChan: make(chan struct{}),
93+
lag: msync.NewTypedAtomic(option.None[time.Duration]()),
8994
}
9095
verifier.dstChangeStreamReader = &ChangeStreamReader{
9196
readerType: dst,
@@ -99,6 +104,7 @@ func (verifier *Verifier) initializeChangeStreamReaders() {
99104
writesOffTsChan: make(chan primitive.Timestamp),
100105
errChan: make(chan error),
101106
doneChan: make(chan struct{}),
107+
lag: msync.NewTypedAtomic(option.None[time.Duration]()),
102108
}
103109
}
104110

@@ -257,6 +263,7 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch(
257263
ctx context.Context,
258264
ri *retry.FuncInfo,
259265
cs *mongo.ChangeStream,
266+
sess mongo.Session,
260267
) error {
261268
eventsRead := 0
262269
var changeEventBatch []ParsedEvent
@@ -298,6 +305,17 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch(
298305
return nil
299306
}
300307

308+
var curTs primitive.Timestamp
309+
curTs, err := extractTimestampFromResumeToken(cs.ResumeToken())
310+
if err == nil {
311+
lagSecs := curTs.T - sess.OperationTime().T
312+
csr.lag.Store(option.Some(time.Second * time.Duration(lagSecs)))
313+
} else {
314+
csr.logger.Warn().
315+
Err(err).
316+
Msgf("Failed to extract timestamp from %s's resume token to compute change stream lag.", csr)
317+
}
318+
301319
csr.changeEventBatchChan <- changeEventBatch
302320
return nil
303321
}
@@ -306,6 +324,7 @@ func (csr *ChangeStreamReader) iterateChangeStream(
306324
ctx context.Context,
307325
ri *retry.FuncInfo,
308326
cs *mongo.ChangeStream,
327+
sess mongo.Session,
309328
) error {
310329
var lastPersistedTime time.Time
311330

@@ -363,15 +382,15 @@ func (csr *ChangeStreamReader) iterateChangeStream(
363382
break
364383
}
365384

366-
err = csr.readAndHandleOneChangeEventBatch(ctx, ri, cs)
385+
err = csr.readAndHandleOneChangeEventBatch(ctx, ri, cs, sess)
367386

368387
if err != nil {
369388
return err
370389
}
371390
}
372391

373392
default:
374-
err = csr.readAndHandleOneChangeEventBatch(ctx, ri, cs)
393+
err = csr.readAndHandleOneChangeEventBatch(ctx, ri, cs, sess)
375394

376395
if err == nil {
377396
err = persistResumeTokenIfNeeded()
@@ -408,7 +427,7 @@ func (csr *ChangeStreamReader) iterateChangeStream(
408427

409428
func (csr *ChangeStreamReader) createChangeStream(
410429
ctx context.Context,
411-
) (*mongo.ChangeStream, primitive.Timestamp, error) {
430+
) (*mongo.ChangeStream, mongo.Session, primitive.Timestamp, error) {
412431
pipeline := csr.GetChangeStreamFilter()
413432
opts := options.ChangeStream().
414433
SetMaxAwaitTime(1 * time.Second).
@@ -420,7 +439,7 @@ func (csr *ChangeStreamReader) createChangeStream(
420439

421440
savedResumeToken, err := csr.loadChangeStreamResumeToken(ctx)
422441
if err != nil {
423-
return nil, primitive.Timestamp{}, errors.Wrap(err, "failed to load persisted change stream resume token")
442+
return nil, nil, primitive.Timestamp{}, errors.Wrap(err, "failed to load persisted change stream resume token")
424443
}
425444

426445
csStartLogEvent := csr.logger.Info()
@@ -447,37 +466,37 @@ func (csr *ChangeStreamReader) createChangeStream(
447466

448467
sess, err := csr.watcherClient.StartSession()
449468
if err != nil {
450-
return nil, primitive.Timestamp{}, errors.Wrap(err, "failed to start session")
469+
return nil, nil, primitive.Timestamp{}, errors.Wrap(err, "failed to start session")
451470
}
452471
sctx := mongo.NewSessionContext(ctx, sess)
453472
changeStream, err := csr.watcherClient.Watch(sctx, pipeline, opts)
454473
if err != nil {
455-
return nil, primitive.Timestamp{}, errors.Wrap(err, "failed to open change stream")
474+
return nil, nil, primitive.Timestamp{}, errors.Wrap(err, "failed to open change stream")
456475
}
457476

458477
err = csr.persistChangeStreamResumeToken(ctx, changeStream)
459478
if err != nil {
460-
return nil, primitive.Timestamp{}, err
479+
return nil, nil, primitive.Timestamp{}, err
461480
}
462481

463482
startTs, err := extractTimestampFromResumeToken(changeStream.ResumeToken())
464483
if err != nil {
465-
return nil, primitive.Timestamp{}, errors.Wrap(err, "failed to extract timestamp from change stream's resume token")
484+
return nil, nil, primitive.Timestamp{}, errors.Wrap(err, "failed to extract timestamp from change stream's resume token")
466485
}
467486

468487
// With sharded clusters the resume token might lead the cluster time
469488
// by 1 increment. In that case we need the actual cluster time;
470489
// otherwise we will get errors.
471490
clusterTime, err := getClusterTimeFromSession(sess)
472491
if err != nil {
473-
return nil, primitive.Timestamp{}, errors.Wrap(err, "failed to read cluster time from session")
492+
return nil, nil, primitive.Timestamp{}, errors.Wrap(err, "failed to read cluster time from session")
474493
}
475494

476495
if startTs.After(clusterTime) {
477496
startTs = clusterTime
478497
}
479498

480-
return changeStream, startTs, nil
499+
return changeStream, sess, startTs, nil
481500
}
482501

483502
// StartChangeStream starts the change stream.
@@ -499,7 +518,7 @@ func (csr *ChangeStreamReader) StartChangeStream(ctx context.Context) error {
499518

500519
err := retryer.WithCallback(
501520
func(ctx context.Context, ri *retry.FuncInfo) error {
502-
changeStream, startTs, err := csr.createChangeStream(ctx)
521+
changeStream, sess, startTs, err := csr.createChangeStream(ctx)
503522
if err != nil {
504523
if parentThreadWaiting {
505524
initialCreateResultChan <- mo.Err[primitive.Timestamp](err)
@@ -517,7 +536,7 @@ func (csr *ChangeStreamReader) StartChangeStream(ctx context.Context) error {
517536
parentThreadWaiting = false
518537
}
519538

520-
return csr.iterateChangeStream(ctx, ri, changeStream)
539+
return csr.iterateChangeStream(ctx, ri, changeStream, sess)
521540
},
522541
"running %s", csr,
523542
).Run(ctx, csr.logger)
@@ -544,6 +563,10 @@ func (csr *ChangeStreamReader) StartChangeStream(ctx context.Context) error {
544563
return nil
545564
}
546565

566+
func (csr *ChangeStreamReader) GetLag() option.Option[time.Duration] {
567+
return csr.lag.Load()
568+
}
569+
547570
func addTimestampToLogEvent(ts primitive.Timestamp, event *zerolog.Event) *zerolog.Event {
548571
return event.
549572
Interface("timestamp", ts).

internal/verifier/summary.go

Lines changed: 45 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -399,43 +399,59 @@ func (verifier *Verifier) printChangeEventStatistics(builder *strings.Builder) {
399399

400400
builder.WriteString(fmt.Sprintf("\nChange events this generation: %s\n", eventsDescr))
401401

402-
if totalEvents == 0 {
403-
return
404-
}
402+
if totalEvents > 0 {
405403

406-
reverseSortedNamespaces := maps.Keys(nsTotals)
407-
sort.Slice(
408-
reverseSortedNamespaces,
409-
func(i, j int) bool {
410-
return nsTotals[reverseSortedNamespaces[i]] > nsTotals[reverseSortedNamespaces[j]]
411-
},
412-
)
404+
reverseSortedNamespaces := maps.Keys(nsTotals)
405+
sort.Slice(
406+
reverseSortedNamespaces,
407+
func(i, j int) bool {
408+
return nsTotals[reverseSortedNamespaces[i]] > nsTotals[reverseSortedNamespaces[j]]
409+
},
410+
)
413411

414-
// Only report the busiest namespaces.
415-
if len(reverseSortedNamespaces) > changeEventsTableMaxSize {
416-
reverseSortedNamespaces = reverseSortedNamespaces[:changeEventsTableMaxSize]
417-
}
412+
// Only report the busiest namespaces.
413+
if len(reverseSortedNamespaces) > changeEventsTableMaxSize {
414+
reverseSortedNamespaces = reverseSortedNamespaces[:changeEventsTableMaxSize]
415+
}
418416

419-
table := tablewriter.NewWriter(builder)
420-
table.SetHeader([]string{"Namespace", "Insert", "Update", "Replace", "Delete", "Total"})
417+
table := tablewriter.NewWriter(builder)
418+
table.SetHeader([]string{"Namespace", "Insert", "Update", "Replace", "Delete", "Total"})
419+
420+
for _, ns := range reverseSortedNamespaces {
421+
curNsStats := nsStats[ns]
422+
423+
table.Append(
424+
append(
425+
[]string{ns},
426+
strconv.Itoa(curNsStats.Insert),
427+
strconv.Itoa(curNsStats.Update),
428+
strconv.Itoa(curNsStats.Replace),
429+
strconv.Itoa(curNsStats.Delete),
430+
strconv.Itoa(curNsStats.Total()),
431+
),
432+
)
433+
}
421434

422-
for _, ns := range reverseSortedNamespaces {
423-
curNsStats := nsStats[ns]
435+
builder.WriteString("\nMost frequently-changing namespaces:\n")
436+
table.Render()
437+
}
424438

425-
table.Append(
426-
append(
427-
[]string{ns},
428-
strconv.Itoa(curNsStats.Insert),
429-
strconv.Itoa(curNsStats.Update),
430-
strconv.Itoa(curNsStats.Replace),
431-
strconv.Itoa(curNsStats.Delete),
432-
strconv.Itoa(curNsStats.Total()),
433-
),
439+
srcLag, hasSrcLag := verifier.srcChangeStreamReader.GetLag().Get()
440+
if hasSrcLag {
441+
builder.WriteString(
442+
fmt.Sprintf("\nSource change stream lag: %s\n", reportutils.DurationToHMS(srcLag)),
434443
)
435444
}
436445

437-
builder.WriteString("\nMost frequently-changing namespaces:\n")
438-
table.Render()
446+
dstLag, hasDstLag := verifier.dstChangeStreamReader.GetLag().Get()
447+
if hasDstLag {
448+
if !hasSrcLag {
449+
builder.WriteString("\n")
450+
}
451+
builder.WriteString(
452+
fmt.Sprintf("Destination change stream lag: %s\n", reportutils.DurationToHMS(dstLag)),
453+
)
454+
}
439455
}
440456

441457
func (verifier *Verifier) printWorkerStatus(builder *strings.Builder) {

0 commit comments

Comments
 (0)