Skip to content

Commit 96f6d6b

Browse files
craig[bot]aerfrei
andcommitted
Merge #148471
148471: changefeedccl: record metrics for pts management r=asg0451,andyyang890 a=aerfrei Before this commit, we did not measure the performance of protected timestamp creation or management. Since DB-level changefeeds will potentially create many more PTS records per job (one per watched table), we will use these metrics to establish a baseline of how long these operations take and to alert us to issues if the new DB-level feeds overwhelm the PTS record table. We will measure the speed of both creating the initial PTS records in changefeed.create_pts_hist_nanos and the speed of advancing the PTS records when the frontier progresses in changefeed.manage_pts_hist_nanos. Epic: none Fixes: #147779 Informs: #147780 Release note: This change adds new metrics: changefeed.create_pts_hist_nanos and changefeed.manage_pts_hist_nanos to measure the performance of managing protected ts records. Co-authored-by: Aerin Freilich <[email protected]>
2 parents 77f7cea + be8a103 commit 96f6d6b

File tree

7 files changed

+147
-3
lines changed

7 files changed

+147
-3
lines changed

docs/generated/metrics/metrics.yaml

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1211,7 +1211,7 @@ layers:
12111211
- name: changefeed.checkpoint_hist_nanos
12121212
exported_name: changefeed_checkpoint_hist_nanos
12131213
description: Time spent checkpointing changefeed progress
1214-
y_axis_label: Changefeeds
1214+
y_axis_label: Nanoseconds
12151215
type: HISTOGRAM
12161216
unit: NANOSECONDS
12171217
aggregation: AVG
@@ -1536,6 +1536,30 @@ layers:
15361536
unit: NANOSECONDS
15371537
aggregation: AVG
15381538
derivative: NONE
1539+
- name: changefeed.stage.pts.create.latency
1540+
exported_name: changefeed_stage_pts_create_latency
1541+
description: 'Latency of the changefeed stage: Time spent creating protected timestamp records on changefeed creation'
1542+
y_axis_label: Latency
1543+
type: HISTOGRAM
1544+
unit: NANOSECONDS
1545+
aggregation: AVG
1546+
derivative: NONE
1547+
- name: changefeed.stage.pts.manage.latency
1548+
exported_name: changefeed_stage_pts_manage_latency
1549+
description: 'Latency of the changefeed stage: Time spent successfully managing protected timestamp records on highwater advance, including time spent creating new protected timestamps when needed'
1550+
y_axis_label: Latency
1551+
type: HISTOGRAM
1552+
unit: NANOSECONDS
1553+
aggregation: AVG
1554+
derivative: NONE
1555+
- name: changefeed.stage.pts.manage_error.latency
1556+
exported_name: changefeed_stage_pts_manage_error_latency
1557+
description: 'Latency of the changefeed stage: Time spent managing protected timestamp when we eventually error'
1558+
y_axis_label: Latency
1559+
type: HISTOGRAM
1560+
unit: NANOSECONDS
1561+
aggregation: AVG
1562+
derivative: NONE
15391563
- name: changefeed.stage.rangefeed_buffer_checkpoint.latency
15401564
exported_name: changefeed_stage_rangefeed_buffer_checkpoint_latency
15411565
description: 'Latency of the changefeed stage: buffering rangefeed checkpoint events'

pkg/ccl/changefeedccl/changefeed_processors.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1928,6 +1928,18 @@ func (cf *changeFrontier) manageProtectedTimestamps(
19281928
return false, nil
19291929
}
19301930

1931+
recordPTSMetricsTime := cf.sliMetrics.Timers.PTSManage.Start()
1932+
recordPTSMetricsErrorTime := cf.sliMetrics.Timers.PTSManageError.Start()
1933+
defer func() {
1934+
if err != nil {
1935+
recordPTSMetricsErrorTime()
1936+
return
1937+
}
1938+
if updated {
1939+
recordPTSMetricsTime()
1940+
}
1941+
}()
1942+
19311943
pts := cf.FlowCtx.Cfg.ProtectedTimestampProvider.WithTxn(txn)
19321944

19331945
// Create / advance the protected timestamp record to the highwater mark
@@ -1983,6 +1995,10 @@ func (cf *changeFrontier) manageProtectedTimestamps(
19831995
return false, nil
19841996
}
19851997

1998+
if cf.knobs.ManagePTSError != nil {
1999+
return false, cf.knobs.ManagePTSError()
2000+
}
2001+
19862002
log.VEventf(ctx, 2, "updating protected timestamp %v at %v", progress.ProtectedTimestampRecord, highWater)
19872003
return true, pts.UpdateTimestamp(ctx, progress.ProtectedTimestampRecord, highWater)
19882004
}

pkg/ccl/changefeedccl/changefeed_stmt.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,15 @@ func changefeedPlanHook(
302302
jobID := p.ExecCfg().JobRegistry.MakeJobID()
303303
jr.JobID = jobID
304304
{
305+
metrics := p.ExecCfg().JobRegistry.MetricsStruct().Changefeed.(*Metrics)
306+
scope, _ := opts.GetMetricScope()
307+
sliMetrics, err := metrics.getSLIMetrics(scope)
308+
if err != nil {
309+
return err
310+
}
311+
312+
recordPTSMetricsTime := sliMetrics.Timers.PTSCreate.Start()
313+
305314
var ptr *ptpb.Record
306315
codec := p.ExecCfg().Codec
307316
ptr = createProtectedTimestampRecord(
@@ -358,6 +367,7 @@ func changefeedPlanHook(
358367
}
359368
return err
360369
}
370+
recordPTSMetricsTime()
361371
}
362372

363373
// Start the job.

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 80 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11497,14 +11497,28 @@ func TestChangefeedProtectedTimestampUpdate(t *testing.T) {
1149711497

1149811498
sqlDB.Exec(t, `CREATE TABLE foo (id INT)`)
1149911499

11500+
registry := s.Server.JobRegistry().(*jobs.Registry)
11501+
metrics := registry.MetricsStruct().Changefeed.(*Metrics)
11502+
createPtsCount, _ := metrics.AggMetrics.Timers.PTSCreate.WindowedSnapshot().Total()
11503+
managePtsCount, _ := metrics.AggMetrics.Timers.PTSManage.WindowedSnapshot().Total()
11504+
managePTSErrorCount, _ := metrics.AggMetrics.Timers.PTSManageError.WindowedSnapshot().Total()
11505+
require.Equal(t, int64(0), createPtsCount)
11506+
require.Equal(t, int64(0), managePtsCount)
11507+
require.Equal(t, int64(0), managePTSErrorCount)
11508+
1150011509
createStmt := `CREATE CHANGEFEED FOR foo WITH resolved='10ms', no_initial_scan`
1150111510
testFeed := feed(t, f, createStmt)
1150211511
defer closeFeed(t, testFeed)
1150311512

11513+
createPtsCount, _ = metrics.AggMetrics.Timers.PTSCreate.WindowedSnapshot().Total()
11514+
managePtsCount, _ = metrics.AggMetrics.Timers.PTSManage.WindowedSnapshot().Total()
11515+
require.Equal(t, int64(1), createPtsCount)
11516+
require.Equal(t, int64(0), managePtsCount)
11517+
1150411518
eFeed, ok := testFeed.(cdctest.EnterpriseTestFeed)
1150511519
require.True(t, ok)
1150611520

11507-
// Wait for the changefeed to checkpoint.
11521+
// Wait for the changefeed to checkpoint and update PTS at least once.
1150811522
var lastHWM hlc.Timestamp
1150911523
checkHWM := func() error {
1151011524
hwm, err := eFeed.HighWaterMark()
@@ -11549,6 +11563,11 @@ func TestChangefeedProtectedTimestampUpdate(t *testing.T) {
1154911563
sqlDB.QueryRow(t, ptsQry).Scan(&ts2)
1155011564
require.NoError(t, err)
1155111565
require.Less(t, ts, ts2)
11566+
11567+
managePtsCount, _ = metrics.AggMetrics.Timers.PTSManage.WindowedSnapshot().Total()
11568+
managePTSErrorCount, _ = metrics.AggMetrics.Timers.PTSManageError.WindowedSnapshot().Total()
11569+
require.GreaterOrEqual(t, managePtsCount, int64(2))
11570+
require.Equal(t, int64(0), managePTSErrorCount)
1155211571
}
1155311572

1155411573
withTxnRetries := withArgsFn(func(args *base.TestServerArgs) {
@@ -11562,6 +11581,66 @@ func TestChangefeedProtectedTimestampUpdate(t *testing.T) {
1156211581
cdcTest(t, testFn, feedTestForceSink("kafka"), withTxnRetries)
1156311582
}
1156411583

11584+
func TestChangefeedProtectedTimestampUpdateError(t *testing.T) {
11585+
defer leaktest.AfterTest(t)()
11586+
defer log.Scope(t).Close(t)
11587+
11588+
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
11589+
sqlDB := sqlutils.MakeSQLRunner(s.DB)
11590+
// Checkpoint and trigger potential protected timestamp updates frequently.
11591+
// Make the protected timestamp lag long enough that it shouldn't be
11592+
// immediately updated after a restart.
11593+
changefeedbase.SpanCheckpointInterval.Override(
11594+
context.Background(), &s.Server.ClusterSettings().SV, 10*time.Millisecond)
11595+
changefeedbase.ProtectTimestampInterval.Override(
11596+
context.Background(), &s.Server.ClusterSettings().SV, 10*time.Millisecond)
11597+
changefeedbase.ProtectTimestampLag.Override(
11598+
context.Background(), &s.Server.ClusterSettings().SV, 10*time.Hour)
11599+
11600+
sqlDB.Exec(t, `CREATE TABLE foo (id INT)`)
11601+
11602+
registry := s.Server.JobRegistry().(*jobs.Registry)
11603+
metrics := registry.MetricsStruct().Changefeed.(*Metrics)
11604+
createPtsCount, _ := metrics.AggMetrics.Timers.PTSCreate.WindowedSnapshot().Total()
11605+
managePtsCount, _ := metrics.AggMetrics.Timers.PTSManage.WindowedSnapshot().Total()
11606+
managePTSErrorCount, _ := metrics.AggMetrics.Timers.PTSManageError.WindowedSnapshot().Total()
11607+
require.Equal(t, int64(0), createPtsCount)
11608+
require.Equal(t, int64(0), managePtsCount)
11609+
require.Equal(t, int64(0), managePTSErrorCount)
11610+
11611+
knobs := s.TestingKnobs.
11612+
DistSQL.(*execinfra.TestingKnobs).
11613+
Changefeed.(*TestingKnobs)
11614+
11615+
knobs.ManagePTSError = func() error {
11616+
return errors.New("test error")
11617+
}
11618+
11619+
createStmt := `CREATE CHANGEFEED FOR foo WITH resolved='10ms', no_initial_scan`
11620+
testFeed := feed(t, f, createStmt)
11621+
defer closeFeed(t, testFeed)
11622+
11623+
createPtsCount, _ = metrics.AggMetrics.Timers.PTSCreate.WindowedSnapshot().Total()
11624+
require.Equal(t, int64(1), createPtsCount)
11625+
managePTSErrorCount, _ = metrics.AggMetrics.Timers.PTSManageError.WindowedSnapshot().Total()
11626+
require.Equal(t, int64(0), managePTSErrorCount)
11627+
11628+
// Lower the PTS lag to trigger a PTS update.
11629+
changefeedbase.ProtectTimestampLag.Override(
11630+
context.Background(), &s.Server.ClusterSettings().SV, 10*time.Millisecond)
11631+
11632+
testutils.SucceedsSoon(t, func() error {
11633+
managePTSErrorCount, _ = metrics.AggMetrics.Timers.PTSManageError.WindowedSnapshot().Total()
11634+
if managePTSErrorCount > 0 {
11635+
fmt.Println("manage protected timestamps test: manage pts error count", managePTSErrorCount)
11636+
return nil
11637+
}
11638+
return errors.New("waiting for manage pts error")
11639+
})
11640+
}
11641+
cdcTest(t, testFn, feedTestForceSink("kafka"))
11642+
}
11643+
1156511644
func TestCDCQuerySelectSingleRow(t *testing.T) {
1156611645
defer leaktest.AfterTest(t)()
1156711646
defer log.Scope(t).Close(t)

pkg/ccl/changefeedccl/metrics.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -731,7 +731,7 @@ var (
731731
metaChangefeedCheckpointHistNanos = metric.Metadata{
732732
Name: "changefeed.checkpoint_hist_nanos",
733733
Help: "Time spent checkpointing changefeed progress",
734-
Measurement: "Changefeeds",
734+
Measurement: "Nanoseconds",
735735
Unit: metric.Unit_NANOSECONDS,
736736
}
737737

pkg/ccl/changefeedccl/testing_knobs.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,9 @@ type TestingKnobs struct {
9090
// its PTS record to include all required targets.
9191
PreservePTSTargets func() bool
9292

93+
// ManagePTSError is used to return an error when managing protected timestamps.
94+
ManagePTSError func() error
95+
9396
// PulsarClientSkipCreation skips creating the sink client when
9497
// dialing.
9598
PulsarClientSkipCreation bool

pkg/ccl/changefeedccl/timers/timers.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ type Timers struct {
2323
KVFeedBuffer *aggmetric.AggHistogram
2424
RangefeedBufferValue *aggmetric.AggHistogram
2525
RangefeedBufferCheckpoint *aggmetric.AggHistogram
26+
PTSManage *aggmetric.AggHistogram
27+
PTSManageError *aggmetric.AggHistogram
28+
PTSCreate *aggmetric.AggHistogram
2629
}
2730

2831
func (*Timers) MetricStruct() {}
@@ -54,6 +57,9 @@ func New(histogramWindow time.Duration) *Timers {
5457
KVFeedBuffer: b.Histogram(histogramOptsFor("changefeed.stage.kv_feed_buffer.latency", "Latency of the changefeed stage: waiting to buffer kv events")),
5558
RangefeedBufferValue: b.Histogram(histogramOptsFor("changefeed.stage.rangefeed_buffer_value.latency", "Latency of the changefeed stage: buffering rangefeed value events")),
5659
RangefeedBufferCheckpoint: b.Histogram(histogramOptsFor("changefeed.stage.rangefeed_buffer_checkpoint.latency", "Latency of the changefeed stage: buffering rangefeed checkpoint events")),
60+
PTSManage: b.Histogram(histogramOptsFor("changefeed.stage.pts.manage.latency", "Latency of the changefeed stage: Time spent successfully managing protected timestamp records on highwater advance, including time spent creating new protected timestamps when needed")),
61+
PTSManageError: b.Histogram(histogramOptsFor("changefeed.stage.pts.manage_error.latency", "Latency of the changefeed stage: Time spent managing protected timestamp when we eventually error")),
62+
PTSCreate: b.Histogram(histogramOptsFor("changefeed.stage.pts.create.latency", "Latency of the changefeed stage: Time spent creating protected timestamp records on changefeed creation")),
5763
}
5864
}
5965

@@ -67,6 +73,9 @@ func (ts *Timers) GetOrCreateScopedTimers(scope string) *ScopedTimers {
6773
KVFeedBuffer: &timer{ts.KVFeedBuffer.AddChild(scope)},
6874
RangefeedBufferValue: &timer{ts.RangefeedBufferValue.AddChild(scope)},
6975
RangefeedBufferCheckpoint: &timer{ts.RangefeedBufferCheckpoint.AddChild(scope)},
76+
PTSManage: &timer{ts.PTSManage.AddChild(scope)},
77+
PTSManageError: &timer{ts.PTSManageError.AddChild(scope)},
78+
PTSCreate: &timer{ts.PTSCreate.AddChild(scope)},
7079
}
7180
}
7281

@@ -77,6 +86,9 @@ type ScopedTimers struct {
7786
DownstreamClientSend *timer
7887
KVFeedWaitForTableEvent *timer
7988
KVFeedBuffer *timer
89+
PTSCreate *timer
90+
PTSManage *timer
91+
PTSManageError *timer
8092
RangefeedBufferValue *timer
8193
RangefeedBufferCheckpoint *timer
8294
}

0 commit comments

Comments
 (0)