Skip to content

Commit d5aceba

Browse files
craig[bot]asg0451jbowensdhartunian
committed
150666: changefeedccl: add sink backpressure time metric r=aerfrei a=asg0451 Add a new histogram metric `changefeed.sink_backpressure_nanos` that measures time spent waiting for quota when emitting to the sink. This provides visibility into downstream sink backpressure affecting changefeed performance. The metric is recorded per changefeed scope in batching_sink when AdmitRequest returns ErrNotEnoughQuota, measuring the time from when we start waiting for quota until it becomes available. Includes unit test that exercises the metric under induced backpressure. Fixes: #148417 Release note (enterprise change): Added changefeed.sink_backpressure_nanos metric to track time spent waiting for quota when emitting to the sink. 150861: storage: use AssertionFailedf for panics r=RaduBerinde a=jbowens Update a few panics that threw a simple string to throw an error constructed through AssertionFailedf. Where applicable, this error is propagated through a return value instead. This is motivated by #150058, an instance where the panic string gets redacted, requiring a bit more work to identify the origin of the panic (through mapping the line number on the appropriate commit SHA). Epic: none Release note: none 150863: CLAUDE: use verbose output in filtered tests r=rickystewart a=dhartunian I observed a `./dev test pkg -f=x` scenario where no test matched, and false success was declared. Release note: None Co-authored-by: Miles Frankel <[email protected]> Co-authored-by: Jackson Owens <[email protected]> Co-authored-by: David Hartunian <[email protected]>
4 parents 837a2e3 + 59514d2 + b868ee0 + c1eb031 commit d5aceba

18 files changed

+293
-90
lines changed

CLAUDE.md

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,19 @@ CockroachDB is a distributed SQL database written in Go, built with Bazel and ma
2323

2424
**Testing:**
2525
```bash
26-
./dev test pkg/sql # Run unit tests for SQL package
27-
./dev test pkg/sql -f=TestParse # Run specific test pattern
26+
./dev test pkg/sql # Run unit tests for SQL package
27+
./dev test pkg/sql -f=TestParse -v # Run specific test pattern.
2828
./dev test pkg/sql --race # Run with race detection
2929
./dev test pkg/sql --stress # Run repeatedly until failure
3030
./dev testlogic # Run all SQL logic tests
3131
./dev testlogic ccl # Run enterprise logic tests
3232
./dev testlogic --files='prepare|fk' # Run specific test files
3333
```
3434

35+
Note that when filtering tests via `-f` to include the `-v` flag which
36+
will warn you in the output if your filter didn't match anything. Look
37+
for `testing: warning: no tests to run` in the output.
38+
3539
**Code Generation and Linting:**
3640
```bash
3741
./dev generate # Generate all code (protobuf, parsers, etc.)
@@ -129,4 +133,4 @@ Always run `./dev generate` after modifying `.proto` files, SQL grammar, or opti
129133
- Require the user to specify an epic number (or None) which should be included at the bottom of the commit record following "Epic:".
130134
- Prefix the subject line with the package in which the bulk of the changes occur.
131135
- For multi-commit PRs, summarize each commit in the PR record.
132-
- Do not include a test plan unless explicitly asked by the user.
136+
- Do not include a test plan unless explicitly asked by the user.

docs/generated/metrics/metrics.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1530,6 +1530,14 @@ layers:
15301530
unit: NANOSECONDS
15311531
aggregation: AVG
15321532
derivative: NON_NEGATIVE_DERIVATIVE
1533+
- name: changefeed.sink_backpressure_nanos
1534+
exported_name: changefeed_sink_backpressure_nanos
1535+
description: Time spent waiting for quota when emitting to the sink (back-pressure). Only populated for sinks using the batching_sink wrapper. As of writing, this includes Kafka (v2), Pub/Sub (v2), and Webhook (v2).
1536+
y_axis_label: Nanoseconds
1537+
type: HISTOGRAM
1538+
unit: NANOSECONDS
1539+
aggregation: AVG
1540+
derivative: NONE
15331541
- name: changefeed.sink_batch_hist_nanos
15341542
exported_name: changefeed_sink_batch_hist_nanos
15351543
description: Time spent batched in the sink buffer before being flushed and acknowledged

pkg/ccl/changefeedccl/batching_sink.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -409,6 +409,8 @@ func (s *batchingSink) runBatchingWorker(ctx context.Context) {
409409

410410
req, send, err := ioEmitter.AdmitRequest(ctx, batchBuffer)
411411
if errors.Is(err, ErrNotEnoughQuota) {
412+
waitStart := timeutil.Now()
413+
412414
// Quota can only be freed by consuming a result.
413415
select {
414416
case <-ctx.Done():
@@ -428,8 +430,12 @@ func (s *batchingSink) runBatchingWorker(ctx context.Context) {
428430
} else if err != nil {
429431
return err
430432
}
433+
434+
s.metrics.recordSinkBackpressure(timeutil.Since(waitStart))
431435
} else if err != nil {
432436
return err
437+
} else {
438+
s.metrics.recordSinkBackpressure(0)
433439
}
434440

435441
// The request was admitted, it must be sent. There are no concurrent requests being sent which

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11424,6 +11424,59 @@ func TestParallelIOMetrics(t *testing.T) {
1142411424
cdcTest(t, testFn, feedTestForceSink("pubsub"))
1142511425
}
1142611426

11427+
// TestSinkBackpressureMetric tests that the sink backpressure metric is recorded
11428+
// when quota limits are hit.
11429+
func TestSinkBackpressureMetric(t *testing.T) {
11430+
defer leaktest.AfterTest(t)()
11431+
defer log.Scope(t).Close(t)
11432+
11433+
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
11434+
registry := s.Server.JobRegistry().(*jobs.Registry)
11435+
metrics := registry.MetricsStruct().Changefeed.(*Metrics).AggMetrics
11436+
11437+
db := sqlutils.MakeSQLRunner(s.DB)
11438+
db.Exec(t, `SET CLUSTER SETTING changefeed.sink_io_workers = 1`)
11439+
db.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`)
11440+
11441+
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
11442+
defer cancel()
11443+
11444+
g := ctxgroup.WithContext(ctx)
11445+
done := make(chan struct{})
11446+
g.Go(func() error {
11447+
for {
11448+
select {
11449+
case <-ctx.Done():
11450+
return nil
11451+
case <-done:
11452+
return nil
11453+
default:
11454+
_, err := s.DB.Exec(`UPSERT INTO foo (a) SELECT * FROM generate_series(1, 10)`)
11455+
if err != nil {
11456+
return err
11457+
}
11458+
}
11459+
}
11460+
})
11461+
11462+
foo, err := f.Feed(`CREATE CHANGEFEED FOR TABLE foo WITH pubsub_sink_config='{"Flush": {"Frequency": "100ms"}}'`)
11463+
require.NoError(t, err)
11464+
11465+
testutils.SucceedsSoon(t, func() error {
11466+
numSamples, sum := metrics.SinkBackpressureNanos.WindowedSnapshot().Total()
11467+
if numSamples <= 0 && sum <= 0.0 {
11468+
return errors.Newf("waiting for backpressure nanos: %d %f", numSamples, sum)
11469+
}
11470+
return nil
11471+
})
11472+
11473+
close(done)
11474+
require.NoError(t, g.Wait())
11475+
require.NoError(t, foo.Close())
11476+
}
11477+
cdcTest(t, testFn, feedTestForceSink("pubsub"))
11478+
}
11479+
1142711480
type changefeedLogSpy struct {
1142811481
syncutil.Mutex
1142911482
logs []string

pkg/ccl/changefeedccl/metrics.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ const (
4040
changefeedIOQueueMaxLatency = 5 * time.Minute
4141
admitLatencyMaxValue = 1 * time.Minute
4242
commitLatencyMaxValue = 10 * time.Minute
43+
backpressureMaxValue = 10 * time.Minute
4344
kafkaThrottlingTimeMaxValue = 5 * time.Minute
4445
)
4546

@@ -68,6 +69,7 @@ type AggMetrics struct {
6869
ParallelIOResultQueueNanos *aggmetric.AggHistogram
6970
ParallelIOInFlightKeys *aggmetric.AggGauge
7071
SinkIOInflight *aggmetric.AggGauge
72+
SinkBackpressureNanos *aggmetric.AggHistogram
7173
CommitLatency *aggmetric.AggHistogram
7274
BackfillCount *aggmetric.AggGauge
7375
BackfillPendingRanges *aggmetric.AggGauge
@@ -123,6 +125,7 @@ type metricsRecorder interface {
123125
recordSizeBasedFlush()
124126
newParallelIOMetricsRecorder() parallelIOMetricsRecorder
125127
recordSinkIOInflightChange(int64)
128+
recordSinkBackpressure(time.Duration)
126129
makeCloudstorageFileAllocCallback() func(delta int64)
127130
getKafkaThrottlingMetrics(*cluster.Settings) metrics.Histogram
128131
netMetrics() *cidr.NetMetrics
@@ -153,6 +156,7 @@ type sliMetrics struct {
153156
ParallelIOResultQueueNanos *aggmetric.Histogram
154157
ParallelIOInFlightKeys *aggmetric.Gauge
155158
SinkIOInflight *aggmetric.Gauge
159+
SinkBackpressureNanos *aggmetric.Histogram
156160
CommitLatency *aggmetric.Histogram
157161
ErrorRetries *aggmetric.Counter
158162
AdmitLatency *aggmetric.Histogram
@@ -599,6 +603,14 @@ func (m *sliMetrics) recordSinkIOInflightChange(delta int64) {
599603
m.SinkIOInflight.Inc(delta)
600604
}
601605

606+
func (m *sliMetrics) recordSinkBackpressure(duration time.Duration) {
607+
if m == nil {
608+
return
609+
}
610+
611+
m.SinkBackpressureNanos.RecordValue(duration.Nanoseconds())
612+
}
613+
602614
type wrappingCostController struct {
603615
ctx context.Context
604616
inner metricsRecorder
@@ -677,6 +689,10 @@ func (w *wrappingCostController) recordSinkIOInflightChange(delta int64) {
677689
w.inner.recordSinkIOInflightChange(delta)
678690
}
679691

692+
func (w *wrappingCostController) recordSinkBackpressure(duration time.Duration) {
693+
w.inner.recordSinkBackpressure(duration)
694+
}
695+
680696
func (w *wrappingCostController) newParallelIOMetricsRecorder() parallelIOMetricsRecorder {
681697
return w.inner.newParallelIOMetricsRecorder()
682698
}
@@ -959,6 +975,14 @@ func newAggregateMetrics(histogramWindow time.Duration, lookup *cidr.Lookup) *Ag
959975
Measurement: "Messages",
960976
Unit: metric.Unit_COUNT,
961977
}
978+
metaChangefeedSinkBackpressureNanos := metric.Metadata{
979+
Name: "changefeed.sink_backpressure_nanos",
980+
Help: "Time spent waiting for quota when emitting to the sink (back-pressure). " +
981+
"Only populated for sinks using the batching_sink wrapper. As of writing, " +
982+
"this includes Kafka (v2), Pub/Sub (v2), and Webhook (v2).",
983+
Measurement: "Nanoseconds",
984+
Unit: metric.Unit_NANOSECONDS,
985+
}
962986
metaAggregatorProgress := metric.Metadata{
963987
Name: "changefeed.aggregator_progress",
964988
Help: "The earliest timestamp up to which any aggregator is guaranteed to have emitted all values for",
@@ -1072,6 +1096,13 @@ func newAggregateMetrics(histogramWindow time.Duration, lookup *cidr.Lookup) *Ag
10721096
}),
10731097
ParallelIOInFlightKeys: b.Gauge(metaChangefeedParallelIOInFlightKeys),
10741098
SinkIOInflight: b.Gauge(metaChangefeedSinkIOInflight),
1099+
SinkBackpressureNanos: b.Histogram(metric.HistogramOptions{
1100+
Metadata: metaChangefeedSinkBackpressureNanos,
1101+
Duration: histogramWindow,
1102+
MaxVal: backpressureMaxValue.Nanoseconds(),
1103+
SigFigs: 2,
1104+
BucketConfig: metric.ChangefeedBatchLatencyBuckets,
1105+
}),
10751106
BatchHistNanos: b.Histogram(metric.HistogramOptions{
10761107
Metadata: metaChangefeedBatchHistNanos,
10771108
Duration: histogramWindow,
@@ -1178,6 +1209,7 @@ func (a *AggMetrics) getOrCreateScope(scope string) (*sliMetrics, error) {
11781209
ParallelIOResultQueueNanos: a.ParallelIOResultQueueNanos.AddChild(scope),
11791210
ParallelIOInFlightKeys: a.ParallelIOInFlightKeys.AddChild(scope),
11801211
SinkIOInflight: a.SinkIOInflight.AddChild(scope),
1212+
SinkBackpressureNanos: a.SinkBackpressureNanos.AddChild(scope),
11811213
CommitLatency: a.CommitLatency.AddChild(scope),
11821214
ErrorRetries: a.ErrorRetries.AddChild(scope),
11831215
AdmitLatency: a.AdmitLatency.AddChild(scope),

pkg/cmd/roachtest/tests/cdc.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1679,6 +1679,60 @@ highwaterLoop:
16791679
}
16801680
}
16811681

1682+
// runCDCWebhookBackpressureMetrics tests that the sink backpressure metric
1683+
// gets populated when using a slow webhook sink.
1684+
func runCDCWebhookBackpressureMetrics(ctx context.Context, t test.Test, c cluster.Cluster) {
1685+
ct := newCDCTester(ctx, t, c)
1686+
defer ct.Close()
1687+
1688+
ips, err := c.ExternalIP(ctx, t.L(), c.WorkloadNode())
1689+
if err != nil {
1690+
t.Fatal(err)
1691+
}
1692+
sinkURL := fmt.Sprintf("https://%s:%d", ips[0], debug.WebhookServerPort)
1693+
sink := &http.Client{Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true}}}
1694+
1695+
m := c.NewDeprecatedMonitor(ctx, c.All())
1696+
1697+
m.Go(func(ctx context.Context) error {
1698+
t.L().Printf("starting slow webhook server at %s...", sinkURL)
1699+
// Use webhook-server-slow with delays to create backpressure
1700+
return c.RunE(ctx, option.WithNodes(c.WorkloadNode()),
1701+
"./cockroach workload debug webhook-server-slow 100 5000 5000 5000 5000 5000")
1702+
})
1703+
defer func() {
1704+
_, err := sink.Get(sinkURL + "/exit")
1705+
t.L().Printf("exiting webhook sink status: %v", err)
1706+
}()
1707+
1708+
db := c.Conn(ctx, t.L(), 1)
1709+
defer db.Close()
1710+
1711+
ct.runTPCCWorkload(tpccArgs{warehouses: 10, duration: "1m"})
1712+
1713+
ct.newChangefeed(feedArgs{
1714+
sinkType: webhookSink,
1715+
targets: allTpccTargets,
1716+
opts: map[string]string{"updated": "", "min_checkpoint_frequency": "'1s'", "webhook_sink_config": `'{"Flush": {"Messages": 10, "Frequency": "100ms"}}'`},
1717+
sinkURIOverride: fmt.Sprintf("webhook-%s/?insecure_tls_skip_verify=true", sinkURL),
1718+
})
1719+
1720+
// Wait a bit for metrics to be recorded
1721+
time.Sleep(30 * time.Second)
1722+
1723+
t.L().Printf("verifying backpressure metric...")
1724+
1725+
ct.verifyMetrics(ctx, func(metrics map[string]*prompb.MetricFamily) (ok bool) {
1726+
for _, m := range metrics {
1727+
if m.GetName() == "changefeed_sink_backpressure_nanos" {
1728+
count, sum := m.GetMetric()[0].GetHistogram().GetSampleCount(), m.GetMetric()[0].GetHistogram().GetSampleSum()
1729+
return count > 0 && sum > 0
1730+
}
1731+
}
1732+
return false
1733+
})
1734+
}
1735+
16821736
func runMessageTooLarge(ctx context.Context, t test.Test, c cluster.Cluster) {
16831737
ct := newCDCTester(ctx, t, c)
16841738
db := ct.DB()
@@ -2709,6 +2763,15 @@ func registerCDC(r registry.Registry) {
27092763
ct.waitForWorkload()
27102764
},
27112765
})
2766+
r.Add(registry.TestSpec{
2767+
Name: "cdc/webhook-sink-backpressure-metrics",
2768+
Owner: `cdc`,
2769+
Cluster: r.MakeClusterSpec(4, spec.CPU(4), spec.WorkloadNode()),
2770+
Leases: registry.MetamorphicLeases,
2771+
CompatibleClouds: registry.OnlyGCE,
2772+
Suites: registry.Suites(registry.Nightly),
2773+
Run: runCDCWebhookBackpressureMetrics,
2774+
})
27122775
r.Add(registry.TestSpec{
27132776
Name: "cdc/message-too-large-error",
27142777
Owner: registry.OwnerCDC,

pkg/storage/backup_compaction_iterator.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ func (f *BackupCompactionIterator) ValueLen() int {
116116
func (f *BackupCompactionIterator) HasPointAndRange() (bool, bool) {
117117
hasPoint, hasRange := f.iter.HasPointAndRange()
118118
if hasRange {
119-
panic("unexpected range tombstone")
119+
panic(errors.AssertionFailedf("unexpected range tombstone"))
120120
}
121121
return hasPoint, hasRange
122122
}

pkg/storage/batch.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ func (r *BatchReader) EngineKey() (EngineKey, error) {
128128
func (r *BatchReader) Value() []byte {
129129
switch r.kind {
130130
case pebble.InternalKeyKindDelete, pebble.InternalKeyKindSingleDelete:
131-
panic("cannot call Value on a deletion entry")
131+
panic(errors.AssertionFailedf("cannot call Value on a deletion entry"))
132132
default:
133133
return r.value
134134
}

0 commit comments

Comments
 (0)