Skip to content

Commit 07f1db1

Browse files
serprexilidemi
andauthored
pg/mysql cdc: report fetched bytes metric with interval (#3403)
@ilidemi noticed 10% of cpu usage in pg/mysql was attributed to metrics reuse pattern used by qrep where we add to atomic counters & regularly empty those counters mongo cdc already does this --------- Co-authored-by: Ilia Demianenko <[email protected]>
1 parent d07a9de commit 07f1db1

File tree

2 files changed

+39
-18
lines changed

2 files changed

+39
-18
lines changed

flow/connectors/mysql/cdc.go

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"slices"
1111
"strconv"
1212
"strings"
13+
"sync/atomic"
1314
"time"
1415

1516
"github.com/go-mysql-org/go-mysql/mysql"
@@ -326,19 +327,31 @@ func (c *MySqlConnector) PullRecords(
326327
var updatedOffset string
327328
var inTx bool
328329
var recordCount uint32
329-
var bytesRead int
330+
330331
// set when a tx is preventing us from respecting the timeout, immediately exit after we see inTx false
331332
var overtime bool
333+
var fetchedBytes, totalFetchedBytes, allFetchedBytes atomic.Int64
332334
defer func() {
333335
if recordCount == 0 {
334336
req.RecordStream.SignalAsEmpty()
335337
}
336338
c.logger.Info("[mysql] PullRecords finished streaming",
337339
slog.Uint64("records", uint64(recordCount)),
338-
slog.Int("bytes", bytesRead),
340+
slog.Int64("bytes", totalFetchedBytes.Load()),
339341
slog.Int("channelLen", req.RecordStream.ChannelLen()))
340342
}()
341343

344+
defer func() {
345+
otelManager.Metrics.FetchedBytesCounter.Add(ctx, fetchedBytes.Swap(0))
346+
otelManager.Metrics.AllFetchedBytesCounter.Add(ctx, allFetchedBytes.Swap(0))
347+
}()
348+
shutdown := shared.Interval(ctx, time.Minute, func() {
349+
otelManager.Metrics.FetchedBytesCounter.Add(ctx, fetchedBytes.Swap(0))
350+
otelManager.Metrics.AllFetchedBytesCounter.Add(ctx, allFetchedBytes.Swap(0))
351+
c.logger.Info("pulling records", slog.Int64("bytes", totalFetchedBytes.Load()))
352+
})
353+
defer shutdown()
354+
342355
timeoutCtx, cancelTimeout := context.WithTimeout(ctx, time.Hour)
343356
//nolint:gocritic // cancelTimeout is rebound, do not defer cancelTimeout()
344357
defer func() {
@@ -358,7 +371,7 @@ func (c *MySqlConnector) PullRecords(
358371
if recordCount%50000 == 0 {
359372
c.logger.Info("[mysql] PullRecords streaming",
360373
slog.Uint64("records", uint64(recordCount)),
361-
slog.Int("bytes", bytesRead),
374+
slog.Int64("bytes", totalFetchedBytes.Load()),
362375
slog.Int("channelLen", req.RecordStream.ChannelLen()),
363376
slog.Bool("inTx", inTx),
364377
slog.Bool("overtime", overtime))
@@ -397,7 +410,7 @@ func (c *MySqlConnector) PullRecords(
397410
} else if inTx {
398411
c.logger.Info("[mysql] timeout reached, but still in transaction, waiting for inTx false",
399412
slog.Uint64("records", uint64(recordCount)),
400-
slog.Int("bytes", bytesRead),
413+
slog.Int64("bytes", totalFetchedBytes.Load()),
401414
slog.Int("channelLen", req.RecordStream.ChannelLen()))
402415
// reset timeoutCtx to a low value and wait for inTx to become false
403416
cancelTimeout()
@@ -415,7 +428,7 @@ func (c *MySqlConnector) PullRecords(
415428
return err
416429
}
417430

418-
otelManager.Metrics.AllFetchedBytesCounter.Add(ctx, int64(len(event.RawData)))
431+
allFetchedBytes.Add(int64(len(event.RawData)))
419432

420433
switch ev := event.Event.(type) {
421434
case *replication.GTIDEvent:
@@ -473,7 +486,8 @@ func (c *MySqlConnector) PullRecords(
473486
schema := req.TableNameSchemaMapping[destinationTableName]
474487
if schema != nil {
475488
otelManager.Metrics.FetchedBytesCounter.Add(ctx, int64(len(event.RawData)))
476-
bytesRead += len(event.RawData)
489+
fetchedBytes.Add(int64(len(event.RawData)))
490+
totalFetchedBytes.Add(int64(len(event.RawData)))
477491
inTx = true
478492
enumMap := ev.Table.EnumStrValueMap()
479493
setMap := ev.Table.SetStrValueMap()

flow/connectors/postgres/cdc.go

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -440,16 +440,23 @@ func PullCdcRecords[Items model.Items](
440440

441441
logger.Info("pulling records start")
442442

443+
var fetchedBytes, totalFetchedBytes, allFetchedBytes atomic.Int64
444+
defer func() {
445+
p.otelManager.Metrics.FetchedBytesCounter.Add(ctx, fetchedBytes.Swap(0))
446+
p.otelManager.Metrics.AllFetchedBytesCounter.Add(ctx, allFetchedBytes.Swap(0))
447+
}()
443448
shutdown := shared.Interval(ctx, time.Minute, func() {
444-
logger.Info("pulling records", slog.Int("records", cdcRecordsStorage.Len()))
449+
p.otelManager.Metrics.FetchedBytesCounter.Add(ctx, fetchedBytes.Swap(0))
450+
p.otelManager.Metrics.AllFetchedBytesCounter.Add(ctx, allFetchedBytes.Swap(0))
451+
logger.Info("pulling records", slog.Int("records", cdcRecordsStorage.Len()),
452+
slog.Int64("bytes", totalFetchedBytes.Load()))
445453
})
446454
defer shutdown()
447455

448456
nextStandbyMessageDeadline := time.Now().Add(req.IdleTimeout)
449457

450458
pkmRequiresResponse := false
451459
waitingForCommit := false
452-
fetchedBytes := 0
453460

454461
addRecordWithKey := func(key model.TableWithPkey, rec model.Record[Items]) error {
455462
if err := cdcRecordsStorage.Set(logger, key, rec); err != nil {
@@ -467,7 +474,7 @@ func PullCdcRecords[Items model.Items](
467474
if cdcRecordsStorage.Len()%50000 == 0 {
468475
logger.Info("pulling records",
469476
slog.Int("records", cdcRecordsStorage.Len()),
470-
slog.Int("bytes", fetchedBytes),
477+
slog.Int64("bytes", totalFetchedBytes.Load()),
471478
slog.Int("channelLen", records.ChannelLen()),
472479
slog.Bool("waitingForCommit", waitingForCommit))
473480
}
@@ -497,7 +504,7 @@ func PullCdcRecords[Items model.Items](
497504
if time.Since(standByLastLogged) > 10*time.Second {
498505
logger.Info("Sent Standby status message",
499506
slog.Int("records", cdcRecordsStorage.Len()),
500-
slog.Int("bytes", fetchedBytes),
507+
slog.Int64("bytes", totalFetchedBytes.Load()),
501508
slog.Int("channelLen", records.ChannelLen()),
502509
slog.Bool("waitingForCommit", waitingForCommit))
503510
standByLastLogged = time.Now()
@@ -508,15 +515,15 @@ func PullCdcRecords[Items model.Items](
508515
if cdcRecordsStorage.Len() >= int(req.MaxBatchSize) {
509516
logger.Info("batch filled, returning currently accumulated records",
510517
slog.Int("records", cdcRecordsStorage.Len()),
511-
slog.Int("bytes", fetchedBytes),
518+
slog.Int64("bytes", totalFetchedBytes.Load()),
512519
slog.Int("channelLen", records.ChannelLen()))
513520
return nil
514521
}
515522

516523
if waitingForCommit {
517524
logger.Info("commit received, returning currently accumulated records",
518525
slog.Int("records", cdcRecordsStorage.Len()),
519-
slog.Int("bytes", fetchedBytes),
526+
slog.Int64("bytes", totalFetchedBytes.Load()),
520527
slog.Int("channelLen", records.ChannelLen()))
521528
return nil
522529
}
@@ -530,13 +537,13 @@ func PullCdcRecords[Items model.Items](
530537
if p.commitLock == nil {
531538
logger.Info("no commit lock, returning currently accumulated records",
532539
slog.Int("records", cdcRecordsStorage.Len()),
533-
slog.Int("bytes", fetchedBytes),
540+
slog.Int64("bytes", totalFetchedBytes.Load()),
534541
slog.Int("channelLen", records.ChannelLen()))
535542
return nil
536543
} else {
537544
logger.Info("commit lock, waiting for commit to return records",
538545
slog.Int("records", cdcRecordsStorage.Len()),
539-
slog.Int("bytes", fetchedBytes),
546+
slog.Int64("bytes", totalFetchedBytes.Load()),
540547
slog.Int("channelLen", records.ChannelLen()))
541548
waitingForCommit = true
542549
}
@@ -568,7 +575,7 @@ func PullCdcRecords[Items model.Items](
568575
if pgconn.Timeout(err) {
569576
logger.Info("Stand-by deadline reached, returning currently accumulated records",
570577
slog.Int("records", cdcRecordsStorage.Len()),
571-
slog.Int("bytes", fetchedBytes),
578+
slog.Int64("bytes", totalFetchedBytes.Load()),
572579
slog.Int("channelLen", records.ChannelLen()))
573580
return nil
574581
} else {
@@ -580,7 +587,7 @@ func PullCdcRecords[Items model.Items](
580587
case *pgproto3.ErrorResponse:
581588
return shared.LogError(logger, exceptions.NewPostgresWalError(errors.New("received error response"), msg))
582589
case *pgproto3.CopyData:
583-
p.otelManager.Metrics.AllFetchedBytesCounter.Add(ctx, int64(len(msg.Data)))
590+
allFetchedBytes.Add(int64(len(msg.Data)))
584591
switch msg.Data[0] {
585592
case pglogrepl.PrimaryKeepaliveMessageByteID:
586593
pkm, err := pglogrepl.ParsePrimaryKeepaliveMessage(msg.Data[1:])
@@ -615,8 +622,8 @@ func PullCdcRecords[Items model.Items](
615622
}
616623

617624
if rec != nil {
618-
p.otelManager.Metrics.FetchedBytesCounter.Add(ctx, int64(len(msg.Data)))
619-
fetchedBytes += len(msg.Data)
625+
fetchedBytes.Add(int64(len(msg.Data)))
626+
totalFetchedBytes.Add(int64(len(msg.Data)))
620627
tableName := rec.GetDestinationTableName()
621628
switch r := rec.(type) {
622629
case *model.UpdateRecord[Items]:

0 commit comments

Comments
 (0)