Skip to content

Commit 59514d2

Browse files
committed
changefeedccl: add sink backpressure time metric
Add a new histogram metric `changefeed.sink_backpressure_nanos` that measures time spent waiting for quota when emitting to the sink. This provides visibility into downstream sink backpressure affecting changefeed performance. The metric is recorded per changefeed scope in batching_sink when AdmitRequest returns ErrNotEnoughQuota, measuring the time from when we start waiting for quota until it becomes available. Includes unit test that exercises the metric under induced backpressure. Fixes: #148417 Release note (enterprise change): Added changefeed.sink_backpressure_nanos metric to track time spent waiting for quota when emitting to the sink.
1 parent 98d9e07 commit 59514d2

File tree

5 files changed

+162
-0
lines changed

5 files changed

+162
-0
lines changed

docs/generated/metrics/metrics.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1456,6 +1456,14 @@ layers:
14561456
unit: NANOSECONDS
14571457
aggregation: AVG
14581458
derivative: NON_NEGATIVE_DERIVATIVE
1459+
- name: changefeed.sink_backpressure_nanos
1460+
exported_name: changefeed_sink_backpressure_nanos
1461+
description: Time spent waiting for quota when emitting to the sink (back-pressure). Only populated for sinks using the batching_sink wrapper. As of writing, this includes Kafka (v2), Pub/Sub (v2), and Webhook (v2).
1462+
y_axis_label: Nanoseconds
1463+
type: HISTOGRAM
1464+
unit: NANOSECONDS
1465+
aggregation: AVG
1466+
derivative: NONE
14591467
- name: changefeed.sink_batch_hist_nanos
14601468
exported_name: changefeed_sink_batch_hist_nanos
14611469
description: Time spent batched in the sink buffer before being flushed and acknowledged

pkg/ccl/changefeedccl/batching_sink.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -409,6 +409,8 @@ func (s *batchingSink) runBatchingWorker(ctx context.Context) {
409409

410410
req, send, err := ioEmitter.AdmitRequest(ctx, batchBuffer)
411411
if errors.Is(err, ErrNotEnoughQuota) {
412+
waitStart := timeutil.Now()
413+
412414
// Quota can only be freed by consuming a result.
413415
select {
414416
case <-ctx.Done():
@@ -428,8 +430,12 @@ func (s *batchingSink) runBatchingWorker(ctx context.Context) {
428430
} else if err != nil {
429431
return err
430432
}
433+
434+
s.metrics.recordSinkBackpressure(timeutil.Since(waitStart))
431435
} else if err != nil {
432436
return err
437+
} else {
438+
s.metrics.recordSinkBackpressure(0)
433439
}
434440

435441
// The request was admitted, it must be sent. There are no concurrent requests being sent which

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11424,6 +11424,59 @@ func TestParallelIOMetrics(t *testing.T) {
1142411424
cdcTest(t, testFn, feedTestForceSink("pubsub"))
1142511425
}
1142611426

11427+
// TestSinkBackpressureMetric tests that the sink backpressure metric is recorded
11428+
// when quota limits are hit.
11429+
func TestSinkBackpressureMetric(t *testing.T) {
11430+
defer leaktest.AfterTest(t)()
11431+
defer log.Scope(t).Close(t)
11432+
11433+
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
11434+
registry := s.Server.JobRegistry().(*jobs.Registry)
11435+
metrics := registry.MetricsStruct().Changefeed.(*Metrics).AggMetrics
11436+
11437+
db := sqlutils.MakeSQLRunner(s.DB)
11438+
db.Exec(t, `SET CLUSTER SETTING changefeed.sink_io_workers = 1`)
11439+
db.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`)
11440+
11441+
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
11442+
defer cancel()
11443+
11444+
g := ctxgroup.WithContext(ctx)
11445+
done := make(chan struct{})
11446+
g.Go(func() error {
11447+
for {
11448+
select {
11449+
case <-ctx.Done():
11450+
return nil
11451+
case <-done:
11452+
return nil
11453+
default:
11454+
_, err := s.DB.Exec(`UPSERT INTO foo (a) SELECT * FROM generate_series(1, 10)`)
11455+
if err != nil {
11456+
return err
11457+
}
11458+
}
11459+
}
11460+
})
11461+
11462+
foo, err := f.Feed(`CREATE CHANGEFEED FOR TABLE foo WITH pubsub_sink_config='{"Flush": {"Frequency": "100ms"}}'`)
11463+
require.NoError(t, err)
11464+
11465+
testutils.SucceedsSoon(t, func() error {
11466+
numSamples, sum := metrics.SinkBackpressureNanos.WindowedSnapshot().Total()
11467+
if numSamples <= 0 && sum <= 0.0 {
11468+
return errors.Newf("waiting for backpressure nanos: %d %f", numSamples, sum)
11469+
}
11470+
return nil
11471+
})
11472+
11473+
close(done)
11474+
require.NoError(t, g.Wait())
11475+
require.NoError(t, foo.Close())
11476+
}
11477+
cdcTest(t, testFn, feedTestForceSink("pubsub"))
11478+
}
11479+
1142711480
type changefeedLogSpy struct {
1142811481
syncutil.Mutex
1142911482
logs []string

pkg/ccl/changefeedccl/metrics.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ const (
4040
changefeedIOQueueMaxLatency = 5 * time.Minute
4141
admitLatencyMaxValue = 1 * time.Minute
4242
commitLatencyMaxValue = 10 * time.Minute
43+
backpressureMaxValue = 10 * time.Minute
4344
kafkaThrottlingTimeMaxValue = 5 * time.Minute
4445
)
4546

@@ -68,6 +69,7 @@ type AggMetrics struct {
6869
ParallelIOResultQueueNanos *aggmetric.AggHistogram
6970
ParallelIOInFlightKeys *aggmetric.AggGauge
7071
SinkIOInflight *aggmetric.AggGauge
72+
SinkBackpressureNanos *aggmetric.AggHistogram
7173
CommitLatency *aggmetric.AggHistogram
7274
BackfillCount *aggmetric.AggGauge
7375
BackfillPendingRanges *aggmetric.AggGauge
@@ -123,6 +125,7 @@ type metricsRecorder interface {
123125
recordSizeBasedFlush()
124126
newParallelIOMetricsRecorder() parallelIOMetricsRecorder
125127
recordSinkIOInflightChange(int64)
128+
recordSinkBackpressure(time.Duration)
126129
makeCloudstorageFileAllocCallback() func(delta int64)
127130
getKafkaThrottlingMetrics(*cluster.Settings) metrics.Histogram
128131
netMetrics() *cidr.NetMetrics
@@ -153,6 +156,7 @@ type sliMetrics struct {
153156
ParallelIOResultQueueNanos *aggmetric.Histogram
154157
ParallelIOInFlightKeys *aggmetric.Gauge
155158
SinkIOInflight *aggmetric.Gauge
159+
SinkBackpressureNanos *aggmetric.Histogram
156160
CommitLatency *aggmetric.Histogram
157161
ErrorRetries *aggmetric.Counter
158162
AdmitLatency *aggmetric.Histogram
@@ -599,6 +603,14 @@ func (m *sliMetrics) recordSinkIOInflightChange(delta int64) {
599603
m.SinkIOInflight.Inc(delta)
600604
}
601605

606+
func (m *sliMetrics) recordSinkBackpressure(duration time.Duration) {
607+
if m == nil {
608+
return
609+
}
610+
611+
m.SinkBackpressureNanos.RecordValue(duration.Nanoseconds())
612+
}
613+
602614
type wrappingCostController struct {
603615
ctx context.Context
604616
inner metricsRecorder
@@ -677,6 +689,10 @@ func (w *wrappingCostController) recordSinkIOInflightChange(delta int64) {
677689
w.inner.recordSinkIOInflightChange(delta)
678690
}
679691

692+
func (w *wrappingCostController) recordSinkBackpressure(duration time.Duration) {
693+
w.inner.recordSinkBackpressure(duration)
694+
}
695+
680696
func (w *wrappingCostController) newParallelIOMetricsRecorder() parallelIOMetricsRecorder {
681697
return w.inner.newParallelIOMetricsRecorder()
682698
}
@@ -959,6 +975,14 @@ func newAggregateMetrics(histogramWindow time.Duration, lookup *cidr.Lookup) *Ag
959975
Measurement: "Messages",
960976
Unit: metric.Unit_COUNT,
961977
}
978+
metaChangefeedSinkBackpressureNanos := metric.Metadata{
979+
Name: "changefeed.sink_backpressure_nanos",
980+
Help: "Time spent waiting for quota when emitting to the sink (back-pressure). " +
981+
"Only populated for sinks using the batching_sink wrapper. As of writing, " +
982+
"this includes Kafka (v2), Pub/Sub (v2), and Webhook (v2).",
983+
Measurement: "Nanoseconds",
984+
Unit: metric.Unit_NANOSECONDS,
985+
}
962986
metaAggregatorProgress := metric.Metadata{
963987
Name: "changefeed.aggregator_progress",
964988
Help: "The earliest timestamp up to which any aggregator is guaranteed to have emitted all values for",
@@ -1072,6 +1096,13 @@ func newAggregateMetrics(histogramWindow time.Duration, lookup *cidr.Lookup) *Ag
10721096
}),
10731097
ParallelIOInFlightKeys: b.Gauge(metaChangefeedParallelIOInFlightKeys),
10741098
SinkIOInflight: b.Gauge(metaChangefeedSinkIOInflight),
1099+
SinkBackpressureNanos: b.Histogram(metric.HistogramOptions{
1100+
Metadata: metaChangefeedSinkBackpressureNanos,
1101+
Duration: histogramWindow,
1102+
MaxVal: backpressureMaxValue.Nanoseconds(),
1103+
SigFigs: 2,
1104+
BucketConfig: metric.ChangefeedBatchLatencyBuckets,
1105+
}),
10751106
BatchHistNanos: b.Histogram(metric.HistogramOptions{
10761107
Metadata: metaChangefeedBatchHistNanos,
10771108
Duration: histogramWindow,
@@ -1178,6 +1209,7 @@ func (a *AggMetrics) getOrCreateScope(scope string) (*sliMetrics, error) {
11781209
ParallelIOResultQueueNanos: a.ParallelIOResultQueueNanos.AddChild(scope),
11791210
ParallelIOInFlightKeys: a.ParallelIOInFlightKeys.AddChild(scope),
11801211
SinkIOInflight: a.SinkIOInflight.AddChild(scope),
1212+
SinkBackpressureNanos: a.SinkBackpressureNanos.AddChild(scope),
11811213
CommitLatency: a.CommitLatency.AddChild(scope),
11821214
ErrorRetries: a.ErrorRetries.AddChild(scope),
11831215
AdmitLatency: a.AdmitLatency.AddChild(scope),

pkg/cmd/roachtest/tests/cdc.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1679,6 +1679,60 @@ highwaterLoop:
16791679
}
16801680
}
16811681

1682+
// runCDCWebhookBackpressureMetrics tests that the sink backpressure metric
1683+
// gets populated when using a slow webhook sink.
1684+
func runCDCWebhookBackpressureMetrics(ctx context.Context, t test.Test, c cluster.Cluster) {
1685+
ct := newCDCTester(ctx, t, c)
1686+
defer ct.Close()
1687+
1688+
ips, err := c.ExternalIP(ctx, t.L(), c.WorkloadNode())
1689+
if err != nil {
1690+
t.Fatal(err)
1691+
}
1692+
sinkURL := fmt.Sprintf("https://%s:%d", ips[0], debug.WebhookServerPort)
1693+
sink := &http.Client{Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true}}}
1694+
1695+
m := c.NewDeprecatedMonitor(ctx, c.All())
1696+
1697+
m.Go(func(ctx context.Context) error {
1698+
t.L().Printf("starting slow webhook server at %s...", sinkURL)
1699+
// Use webhook-server-slow with delays to create backpressure
1700+
return c.RunE(ctx, option.WithNodes(c.WorkloadNode()),
1701+
"./cockroach workload debug webhook-server-slow 100 5000 5000 5000 5000 5000")
1702+
})
1703+
defer func() {
1704+
_, err := sink.Get(sinkURL + "/exit")
1705+
t.L().Printf("exiting webhook sink status: %v", err)
1706+
}()
1707+
1708+
db := c.Conn(ctx, t.L(), 1)
1709+
defer db.Close()
1710+
1711+
ct.runTPCCWorkload(tpccArgs{warehouses: 10, duration: "1m"})
1712+
1713+
ct.newChangefeed(feedArgs{
1714+
sinkType: webhookSink,
1715+
targets: allTpccTargets,
1716+
opts: map[string]string{"updated": "", "min_checkpoint_frequency": "'1s'", "webhook_sink_config": `'{"Flush": {"Messages": 10, "Frequency": "100ms"}}'`},
1717+
sinkURIOverride: fmt.Sprintf("webhook-%s/?insecure_tls_skip_verify=true", sinkURL),
1718+
})
1719+
1720+
// Wait a bit for metrics to be recorded
1721+
time.Sleep(30 * time.Second)
1722+
1723+
t.L().Printf("verifying backpressure metric...")
1724+
1725+
ct.verifyMetrics(ctx, func(metrics map[string]*prompb.MetricFamily) (ok bool) {
1726+
for _, m := range metrics {
1727+
if m.GetName() == "changefeed_sink_backpressure_nanos" {
1728+
count, sum := m.GetMetric()[0].GetHistogram().GetSampleCount(), m.GetMetric()[0].GetHistogram().GetSampleSum()
1729+
return count > 0 && sum > 0
1730+
}
1731+
}
1732+
return false
1733+
})
1734+
}
1735+
16821736
func runMessageTooLarge(ctx context.Context, t test.Test, c cluster.Cluster) {
16831737
ct := newCDCTester(ctx, t, c)
16841738
db := ct.DB()
@@ -2709,6 +2763,15 @@ func registerCDC(r registry.Registry) {
27092763
ct.waitForWorkload()
27102764
},
27112765
})
2766+
r.Add(registry.TestSpec{
2767+
Name: "cdc/webhook-sink-backpressure-metrics",
2768+
Owner: `cdc`,
2769+
Cluster: r.MakeClusterSpec(4, spec.CPU(4), spec.WorkloadNode()),
2770+
Leases: registry.MetamorphicLeases,
2771+
CompatibleClouds: registry.OnlyGCE,
2772+
Suites: registry.Suites(registry.Nightly),
2773+
Run: runCDCWebhookBackpressureMetrics,
2774+
})
27122775
r.Add(registry.TestSpec{
27132776
Name: "cdc/message-too-large-error",
27142777
Owner: registry.OwnerCDC,

0 commit comments

Comments
 (0)