Skip to content

Commit 6a2ced2

Browse files
authored
Cache multiple change event batches at once (#156)
This lets the verifier read the change stream & persist rechecks concurrently. This also fixes a few minor formatting issues (along with adding the buffer-saturation to the log) and reduces the lag threshold from 5min to 2min. The updated log format is: ``` Source change events this generation: 312,966 total, across 1 namespace(s) Source: 45,395.47 writes per second (lag: 8m 35s; buffer 0% full) ⚠️ Lag is excessive. Verification may fail. See documentation. Destination change events this generation: 258,761 total, across 1 namespace(s) Destination: 40,404.2 writes per second (lag: 7m 40s; buffer 97% full) ⚠️ Lag is excessive. Verification may fail. See documentation. ```
1 parent 31015e5 commit 6a2ced2

File tree

2 files changed

+32
-8
lines changed

2 files changed

+32
-8
lines changed

internal/verifier/change_stream.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ type ddlEventHandling string
3131
const (
3232
fauxDocSizeForDeleteEvents = 1024
3333

34+
// The number of batches we’ll hold in memory at once.
35+
batchChanBufferSize = 100
36+
3437
onDDLEventAllow ddlEventHandling = "allow"
3538
)
3639

@@ -108,7 +111,7 @@ func (verifier *Verifier) initializeChangeStreamReaders() {
108111
for _, csr := range mslices.Of(srcReader, dstReader) {
109112
csr.logger = verifier.logger
110113
csr.metaDB = verifier.metaClient.Database(verifier.metaDBName)
111-
csr.changeEventBatchChan = make(chan changeEventBatch)
114+
csr.changeEventBatchChan = make(chan changeEventBatch, batchChanBufferSize)
112115
csr.writesOffTs = util.NewEventual[bson.Timestamp]()
113116
csr.readerError = util.NewEventual[error]()
114117
csr.handlerError = util.NewEventual[error]()
@@ -770,10 +773,22 @@ func (csr *ChangeStreamReader) StartChangeStream(ctx context.Context) error {
770773
return nil
771774
}
772775

776+
// GetLag returns the observed change stream lag (i.e., the delta between
777+
// cluster time and the most-recently-seen change event).
773778
func (csr *ChangeStreamReader) GetLag() option.Option[time.Duration] {
774779
return csr.lag.Load()
775780
}
776781

782+
// GetSaturation returns the reader’s internal buffer’s saturation level as
783+
// a fraction. If saturation rises, that means we’re reading events faster than
784+
// we can persist them.
785+
func (csr *ChangeStreamReader) GetSaturation() float64 {
786+
return util.DivideToF64(len(csr.changeEventBatchChan), cap(csr.changeEventBatchChan))
787+
}
788+
789+
// GetEventsPerSecond returns the number of change events per second we’ve been
790+
// seeing “recently”. (See implementation for the actual period over which we
791+
// compile this metric.)
777792
func (csr *ChangeStreamReader) GetEventsPerSecond() option.Option[float64] {
778793
logs := csr.batchSizeHistory.Get()
779794
lastLog, hasLogs := lo.Last(logs)

internal/verifier/summary.go

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,11 @@ import (
2121
"golang.org/x/exp/maps"
2222
)
2323

24-
const changeEventsTableMaxSize = 10
24+
const (
25+
changeEventsTableMaxSize = 10
26+
27+
lagWarnThreshold = 2 * time.Minute
28+
)
2529

2630
// NOTE: Each of the following should print one trailing and one final
2731
// newline.
@@ -516,6 +520,8 @@ func (verifier *Verifier) printMismatchInvestigationNotes(strBuilder *strings.Bu
516520
func (verifier *Verifier) printChangeEventStatistics(builder io.Writer) {
517521
var eventsTable *tablewriter.Table
518522

523+
fmt.Fprint(builder, "\n")
524+
519525
for _, cluster := range []struct {
520526
title string
521527
eventRecorder *EventRecorder
@@ -544,35 +550,38 @@ func (verifier *Verifier) printChangeEventStatistics(builder io.Writer) {
544550
)
545551
}
546552

547-
fmt.Fprintf(builder, "\n%s change events this generation: %s\n", cluster.title, eventsDescr)
553+
fmt.Fprintf(builder, "%s change events this generation: %s\n", cluster.title, eventsDescr)
548554

549555
if eventsPerSec, has := cluster.csReader.GetEventsPerSecond().Get(); has {
550556
var lagNote string
551557

552558
lag, hasLag := cluster.csReader.GetLag().Get()
553559

554560
if hasLag {
555-
lagNote = fmt.Sprintf(" (lag: %s)", reportutils.DurationToHMS(lag))
561+
lagNote = fmt.Sprintf("lag: %s; ", reportutils.DurationToHMS(lag))
556562
}
557563

558564
fmt.Fprintf(
559565
builder,
560-
"%s observed change rate: %s/sec%s",
566+
"%s: %s writes per second (%sbuffer %s%% full)\n",
561567
cluster.title,
562568
reportutils.FmtReal(eventsPerSec),
563569
lagNote,
570+
reportutils.FmtReal(100*cluster.csReader.GetSaturation()),
564571
)
565572

566-
const lagWarnThreshold = 5 * time.Minute
567-
568573
if hasLag && lag > lagWarnThreshold {
569574
fmt.Fprintf(
570575
builder,
571-
"⚠️ Lag is excessive. Verification may fail. See documentation.",
576+
"⚠️ Lag is excessive. Verification may fail. See documentation.\n",
572577
)
573578
}
574579
}
575580

581+
if cluster.csReader == verifier.srcChangeStreamReader {
582+
fmt.Fprint(builder, "\n")
583+
}
584+
576585
// We only print event breakdowns for the source because we assume that
577586
// events on the destination will largely mirror the source’s.
578587
if totalEvents > 0 && cluster.csReader == verifier.srcChangeStreamReader {

0 commit comments

Comments
 (0)