Skip to content

Commit be8a103

Browse files
committed
changefeedccl: record metrics for pts management
Before this commit, we did not measure the performance of protected timestamp creation or management. Since DB-level changefeeds will potentially create many more PTS records per job (one per watched table), we will use these metrics to establish a baseline of how long these operations take and to alert us to issues if the new DB-level feeds overwhelm the PTS record table. We will measure the speed of both creating the initial PTS records in changefeed.stage.pts.create.latency and the speed of advancing the PTS records when the frontier progresses in changefeed.stage.pts.manage.latency. We also include changefeed.stage.pts.manage_error.latency for cases where the pts management errors. Epic: none Fixes: #147779 Informs: #147780 Release note (general): This change adds new metrics: changefeed.stage.pts.create.latency, changefeed.stage.pts.manage.latency, changefeed.stage.pts.manage_error.latency, to measure the performance of managing protected ts records.
1 parent fd18c1b commit be8a103

File tree

7 files changed

+147
-3
lines changed

7 files changed

+147
-3
lines changed

docs/generated/metrics/metrics.yaml

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1211,7 +1211,7 @@ layers:
12111211
- name: changefeed.checkpoint_hist_nanos
12121212
exported_name: changefeed_checkpoint_hist_nanos
12131213
description: Time spent checkpointing changefeed progress
1214-
y_axis_label: Changefeeds
1214+
y_axis_label: Nanoseconds
12151215
type: HISTOGRAM
12161216
unit: NANOSECONDS
12171217
aggregation: AVG
@@ -1536,6 +1536,30 @@ layers:
15361536
unit: NANOSECONDS
15371537
aggregation: AVG
15381538
derivative: NONE
1539+
- name: changefeed.stage.pts.create.latency
1540+
exported_name: changefeed_stage_pts_create_latency
1541+
description: 'Latency of the changefeed stage: Time spent creating protected timestamp records on changefeed creation'
1542+
y_axis_label: Latency
1543+
type: HISTOGRAM
1544+
unit: NANOSECONDS
1545+
aggregation: AVG
1546+
derivative: NONE
1547+
- name: changefeed.stage.pts.manage.latency
1548+
exported_name: changefeed_stage_pts_manage_latency
1549+
description: 'Latency of the changefeed stage: Time spent successfully managing protected timestamp records on highwater advance, including time spent creating new protected timestamps when needed'
1550+
y_axis_label: Latency
1551+
type: HISTOGRAM
1552+
unit: NANOSECONDS
1553+
aggregation: AVG
1554+
derivative: NONE
1555+
- name: changefeed.stage.pts.manage_error.latency
1556+
exported_name: changefeed_stage_pts_manage_error_latency
1557+
description: 'Latency of the changefeed stage: Time spent managing protected timestamp when we eventually error'
1558+
y_axis_label: Latency
1559+
type: HISTOGRAM
1560+
unit: NANOSECONDS
1561+
aggregation: AVG
1562+
derivative: NONE
15391563
- name: changefeed.stage.rangefeed_buffer_checkpoint.latency
15401564
exported_name: changefeed_stage_rangefeed_buffer_checkpoint_latency
15411565
description: 'Latency of the changefeed stage: buffering rangefeed checkpoint events'

pkg/ccl/changefeedccl/changefeed_processors.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1919,6 +1919,18 @@ func (cf *changeFrontier) manageProtectedTimestamps(
19191919
return false, nil
19201920
}
19211921

1922+
recordPTSMetricsTime := cf.sliMetrics.Timers.PTSManage.Start()
1923+
recordPTSMetricsErrorTime := cf.sliMetrics.Timers.PTSManageError.Start()
1924+
defer func() {
1925+
if err != nil {
1926+
recordPTSMetricsErrorTime()
1927+
return
1928+
}
1929+
if updated {
1930+
recordPTSMetricsTime()
1931+
}
1932+
}()
1933+
19221934
pts := cf.FlowCtx.Cfg.ProtectedTimestampProvider.WithTxn(txn)
19231935

19241936
// Create / advance the protected timestamp record to the highwater mark
@@ -1974,6 +1986,10 @@ func (cf *changeFrontier) manageProtectedTimestamps(
19741986
return false, nil
19751987
}
19761988

1989+
if cf.knobs.ManagePTSError != nil {
1990+
return false, cf.knobs.ManagePTSError()
1991+
}
1992+
19771993
log.VEventf(ctx, 2, "updating protected timestamp %v at %v", progress.ProtectedTimestampRecord, highWater)
19781994
return true, pts.UpdateTimestamp(ctx, progress.ProtectedTimestampRecord, highWater)
19791995
}

pkg/ccl/changefeedccl/changefeed_stmt.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,15 @@ func changefeedPlanHook(
302302
jobID := p.ExecCfg().JobRegistry.MakeJobID()
303303
jr.JobID = jobID
304304
{
305+
metrics := p.ExecCfg().JobRegistry.MetricsStruct().Changefeed.(*Metrics)
306+
scope, _ := opts.GetMetricScope()
307+
sliMetrics, err := metrics.getSLIMetrics(scope)
308+
if err != nil {
309+
return err
310+
}
311+
312+
recordPTSMetricsTime := sliMetrics.Timers.PTSCreate.Start()
313+
305314
var ptr *ptpb.Record
306315
codec := p.ExecCfg().Codec
307316
ptr = createProtectedTimestampRecord(
@@ -358,6 +367,7 @@ func changefeedPlanHook(
358367
}
359368
return err
360369
}
370+
recordPTSMetricsTime()
361371
}
362372

363373
// Start the job.

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 80 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11474,14 +11474,28 @@ func TestChangefeedProtectedTimestampUpdate(t *testing.T) {
1147411474

1147511475
sqlDB.Exec(t, `CREATE TABLE foo (id INT)`)
1147611476

11477+
registry := s.Server.JobRegistry().(*jobs.Registry)
11478+
metrics := registry.MetricsStruct().Changefeed.(*Metrics)
11479+
createPtsCount, _ := metrics.AggMetrics.Timers.PTSCreate.WindowedSnapshot().Total()
11480+
managePtsCount, _ := metrics.AggMetrics.Timers.PTSManage.WindowedSnapshot().Total()
11481+
managePTSErrorCount, _ := metrics.AggMetrics.Timers.PTSManageError.WindowedSnapshot().Total()
11482+
require.Equal(t, int64(0), createPtsCount)
11483+
require.Equal(t, int64(0), managePtsCount)
11484+
require.Equal(t, int64(0), managePTSErrorCount)
11485+
1147711486
createStmt := `CREATE CHANGEFEED FOR foo WITH resolved='10ms', no_initial_scan`
1147811487
testFeed := feed(t, f, createStmt)
1147911488
defer closeFeed(t, testFeed)
1148011489

11490+
createPtsCount, _ = metrics.AggMetrics.Timers.PTSCreate.WindowedSnapshot().Total()
11491+
managePtsCount, _ = metrics.AggMetrics.Timers.PTSManage.WindowedSnapshot().Total()
11492+
require.Equal(t, int64(1), createPtsCount)
11493+
require.Equal(t, int64(0), managePtsCount)
11494+
1148111495
eFeed, ok := testFeed.(cdctest.EnterpriseTestFeed)
1148211496
require.True(t, ok)
1148311497

11484-
// Wait for the changefeed to checkpoint.
11498+
// Wait for the changefeed to checkpoint and update PTS at least once.
1148511499
var lastHWM hlc.Timestamp
1148611500
checkHWM := func() error {
1148711501
hwm, err := eFeed.HighWaterMark()
@@ -11526,6 +11540,11 @@ func TestChangefeedProtectedTimestampUpdate(t *testing.T) {
1152611540
sqlDB.QueryRow(t, ptsQry).Scan(&ts2)
1152711541
require.NoError(t, err)
1152811542
require.Less(t, ts, ts2)
11543+
11544+
managePtsCount, _ = metrics.AggMetrics.Timers.PTSManage.WindowedSnapshot().Total()
11545+
managePTSErrorCount, _ = metrics.AggMetrics.Timers.PTSManageError.WindowedSnapshot().Total()
11546+
require.GreaterOrEqual(t, managePtsCount, int64(2))
11547+
require.Equal(t, int64(0), managePTSErrorCount)
1152911548
}
1153011549

1153111550
withTxnRetries := withArgsFn(func(args *base.TestServerArgs) {
@@ -11539,6 +11558,66 @@ func TestChangefeedProtectedTimestampUpdate(t *testing.T) {
1153911558
cdcTest(t, testFn, feedTestForceSink("kafka"), withTxnRetries)
1154011559
}
1154111560

11561+
func TestChangefeedProtectedTimestampUpdateError(t *testing.T) {
11562+
defer leaktest.AfterTest(t)()
11563+
defer log.Scope(t).Close(t)
11564+
11565+
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
11566+
sqlDB := sqlutils.MakeSQLRunner(s.DB)
11567+
// Checkpoint and trigger potential protected timestamp updates frequently.
11568+
// Make the protected timestamp lag long enough that it shouldn't be
11569+
// immediately updated after a restart.
11570+
changefeedbase.SpanCheckpointInterval.Override(
11571+
context.Background(), &s.Server.ClusterSettings().SV, 10*time.Millisecond)
11572+
changefeedbase.ProtectTimestampInterval.Override(
11573+
context.Background(), &s.Server.ClusterSettings().SV, 10*time.Millisecond)
11574+
changefeedbase.ProtectTimestampLag.Override(
11575+
context.Background(), &s.Server.ClusterSettings().SV, 10*time.Hour)
11576+
11577+
sqlDB.Exec(t, `CREATE TABLE foo (id INT)`)
11578+
11579+
registry := s.Server.JobRegistry().(*jobs.Registry)
11580+
metrics := registry.MetricsStruct().Changefeed.(*Metrics)
11581+
createPtsCount, _ := metrics.AggMetrics.Timers.PTSCreate.WindowedSnapshot().Total()
11582+
managePtsCount, _ := metrics.AggMetrics.Timers.PTSManage.WindowedSnapshot().Total()
11583+
managePTSErrorCount, _ := metrics.AggMetrics.Timers.PTSManageError.WindowedSnapshot().Total()
11584+
require.Equal(t, int64(0), createPtsCount)
11585+
require.Equal(t, int64(0), managePtsCount)
11586+
require.Equal(t, int64(0), managePTSErrorCount)
11587+
11588+
knobs := s.TestingKnobs.
11589+
DistSQL.(*execinfra.TestingKnobs).
11590+
Changefeed.(*TestingKnobs)
11591+
11592+
knobs.ManagePTSError = func() error {
11593+
return errors.New("test error")
11594+
}
11595+
11596+
createStmt := `CREATE CHANGEFEED FOR foo WITH resolved='10ms', no_initial_scan`
11597+
testFeed := feed(t, f, createStmt)
11598+
defer closeFeed(t, testFeed)
11599+
11600+
createPtsCount, _ = metrics.AggMetrics.Timers.PTSCreate.WindowedSnapshot().Total()
11601+
require.Equal(t, int64(1), createPtsCount)
11602+
managePTSErrorCount, _ = metrics.AggMetrics.Timers.PTSManageError.WindowedSnapshot().Total()
11603+
require.Equal(t, int64(0), managePTSErrorCount)
11604+
11605+
// Lower the PTS lag to trigger a PTS update.
11606+
changefeedbase.ProtectTimestampLag.Override(
11607+
context.Background(), &s.Server.ClusterSettings().SV, 10*time.Millisecond)
11608+
11609+
testutils.SucceedsSoon(t, func() error {
11610+
managePTSErrorCount, _ = metrics.AggMetrics.Timers.PTSManageError.WindowedSnapshot().Total()
11611+
if managePTSErrorCount > 0 {
11612+
fmt.Println("manage protected timestamps test: manage pts error count", managePTSErrorCount)
11613+
return nil
11614+
}
11615+
return errors.New("waiting for manage pts error")
11616+
})
11617+
}
11618+
cdcTest(t, testFn, feedTestForceSink("kafka"))
11619+
}
11620+
1154211621
func TestCDCQuerySelectSingleRow(t *testing.T) {
1154311622
defer leaktest.AfterTest(t)()
1154411623
defer log.Scope(t).Close(t)

pkg/ccl/changefeedccl/metrics.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -731,7 +731,7 @@ var (
731731
metaChangefeedCheckpointHistNanos = metric.Metadata{
732732
Name: "changefeed.checkpoint_hist_nanos",
733733
Help: "Time spent checkpointing changefeed progress",
734-
Measurement: "Changefeeds",
734+
Measurement: "Nanoseconds",
735735
Unit: metric.Unit_NANOSECONDS,
736736
}
737737

pkg/ccl/changefeedccl/testing_knobs.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,9 @@ type TestingKnobs struct {
9090
// its PTS record to include all required targets.
9191
PreservePTSTargets func() bool
9292

93+
// ManagePTSError is used to return an error when managing protected timestamps.
94+
ManagePTSError func() error
95+
9396
// PulsarClientSkipCreation skips creating the sink client when
9497
// dialing.
9598
PulsarClientSkipCreation bool

pkg/ccl/changefeedccl/timers/timers.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ type Timers struct {
2323
KVFeedBuffer *aggmetric.AggHistogram
2424
RangefeedBufferValue *aggmetric.AggHistogram
2525
RangefeedBufferCheckpoint *aggmetric.AggHistogram
26+
PTSManage *aggmetric.AggHistogram
27+
PTSManageError *aggmetric.AggHistogram
28+
PTSCreate *aggmetric.AggHistogram
2629
}
2730

2831
func (*Timers) MetricStruct() {}
@@ -54,6 +57,9 @@ func New(histogramWindow time.Duration) *Timers {
5457
KVFeedBuffer: b.Histogram(histogramOptsFor("changefeed.stage.kv_feed_buffer.latency", "Latency of the changefeed stage: waiting to buffer kv events")),
5558
RangefeedBufferValue: b.Histogram(histogramOptsFor("changefeed.stage.rangefeed_buffer_value.latency", "Latency of the changefeed stage: buffering rangefeed value events")),
5659
RangefeedBufferCheckpoint: b.Histogram(histogramOptsFor("changefeed.stage.rangefeed_buffer_checkpoint.latency", "Latency of the changefeed stage: buffering rangefeed checkpoint events")),
60+
PTSManage: b.Histogram(histogramOptsFor("changefeed.stage.pts.manage.latency", "Latency of the changefeed stage: Time spent successfully managing protected timestamp records on highwater advance, including time spent creating new protected timestamps when needed")),
61+
PTSManageError: b.Histogram(histogramOptsFor("changefeed.stage.pts.manage_error.latency", "Latency of the changefeed stage: Time spent managing protected timestamp when we eventually error")),
62+
PTSCreate: b.Histogram(histogramOptsFor("changefeed.stage.pts.create.latency", "Latency of the changefeed stage: Time spent creating protected timestamp records on changefeed creation")),
5763
}
5864
}
5965

@@ -67,6 +73,9 @@ func (ts *Timers) GetOrCreateScopedTimers(scope string) *ScopedTimers {
6773
KVFeedBuffer: &timer{ts.KVFeedBuffer.AddChild(scope)},
6874
RangefeedBufferValue: &timer{ts.RangefeedBufferValue.AddChild(scope)},
6975
RangefeedBufferCheckpoint: &timer{ts.RangefeedBufferCheckpoint.AddChild(scope)},
76+
PTSManage: &timer{ts.PTSManage.AddChild(scope)},
77+
PTSManageError: &timer{ts.PTSManageError.AddChild(scope)},
78+
PTSCreate: &timer{ts.PTSCreate.AddChild(scope)},
7079
}
7180
}
7281

@@ -77,6 +86,9 @@ type ScopedTimers struct {
7786
DownstreamClientSend *timer
7887
KVFeedWaitForTableEvent *timer
7988
KVFeedBuffer *timer
89+
PTSCreate *timer
90+
PTSManage *timer
91+
PTSManageError *timer
8092
RangefeedBufferValue *timer
8193
RangefeedBufferCheckpoint *timer
8294
}

0 commit comments

Comments
 (0)