Skip to content

Commit cbea4d2

Browse files
craig[bot]KeithChyuzefovichstevendannarickystewart
committed
154552: changefeedccl: improve parallel io metrics r=log-head,asg0451 a=KeithCh **changefeedccl: improve parallel io metrics** Rename function parameter used to update pending rows metric and y-axis label for that metric to be more accurate. Release note(ops change): Fix changefeed.parallel_io_pending_rows metric y-axis label to match the metric's definition. Fixes: #147625 --- **changefeedccl: add parallel io workers metric** Add a gauge metric to track the number of workers in ParallelIO. Release note(ops change): Add metric changefeed.parallel_io_workers to track the number of workers in ParallelIO. Resolves: #147625 154651: opt/bench: improve BenchmarkEndToEnd for INSERTs r=yuzefovich a=yuzefovich In `BenchmarkEndToEnd` we have 3 bench cases where we have INSERT statements. Previously, we always used the same placeholder values, which forced us to do TRUNCATE TABLE after _every_ iteration, and that TRUNCATE was included into the operation time. We recently saw a supposed regression on this benchmark because the performance of TRUNCATE has regressed. In my initial approach I tried simply stopping and starting the timer around the TRUNCATE, but it made the benchmark extremely long. (Timer operations require stop-the-world pause, and since the time to perform TRUNCATE wasn't included into the benchmark time, now every single iteration seemed very short, so we'd do thousands of iterations with the default `bench-time=1s`, but truncating the table would make it about 1 minute instead.) To go around this issue I refactored three INSERT queries to generate slightly different arguments for each iteration so that we don't get PK duplicates and then moved the TRUNCATE outside the benchmark loop (and also excluded it from the timer). Now these benchmark cases truly measure what they were supposed to. Fixes: #154597. Release note: None 154750: dbconsole: custom metrics update when units change r=dhartunian a=stevendanna This fixes a long-standing bug in which changing the axis units fails to update the graph unless you also make some other change. This PR was generated by Claude Code. I asked it to write a test and it produced something with enough mocks that I wasn't sure on the value. I have manually tested this and have confirmed it does result in custom graphs being updated immediately when the axis units are changed. Epic: none Release note: None 154771: workflows: change name of provider r=rail a=rickystewart Part of: DEVINFHD-1916 Co-authored-by: Keith Chow <[email protected]> Co-authored-by: Yahor Yuzefovich <[email protected]> Co-authored-by: Steven Danna <[email protected]> Co-authored-by: Ricky Stewart <[email protected]>
5 parents 61a9d7d + e92277e + 978dc8d + 9bb8477 + dc7ef4e commit cbea4d2

File tree

7 files changed

+457
-209
lines changed

7 files changed

+457
-209
lines changed

.github/workflows/pr-analyzer-threestage.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ jobs:
2323
with:
2424
project_id: 'vertex-model-runners'
2525
service_account: '[email protected]'
26-
workload_identity_provider: 'projects/72497726731/locations/global/workloadIdentityPools/ai-review/providers/github'
26+
workload_identity_provider: 'projects/72497726731/locations/global/workloadIdentityPools/ai-review/providers/ai-review'
2727

2828
- name: Stage 1 - Initial Bug Screening
2929
id: stage1

docs/generated/metrics/metrics.yaml

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1780,7 +1780,7 @@ layers:
17801780
- name: changefeed.parallel_io_pending_rows
17811781
exported_name: changefeed_parallel_io_pending_rows
17821782
description: Number of rows which are blocked from being sent due to conflicting in-flight keys
1783-
y_axis_label: Keys
1783+
y_axis_label: Messages
17841784
type: GAUGE
17851785
unit: COUNT
17861786
aggregation: AVG
@@ -1801,6 +1801,14 @@ layers:
18011801
unit: NANOSECONDS
18021802
aggregation: AVG
18031803
derivative: NONE
1804+
- name: changefeed.parallel_io_workers
1805+
exported_name: changefeed_parallel_io_workers
1806+
description: The number of workers in the ParallelIO
1807+
y_axis_label: Workers
1808+
type: GAUGE
1809+
unit: COUNT
1810+
aggregation: AVG
1811+
derivative: NONE
18041812
- name: changefeed.progress_skew.span
18051813
exported_name: changefeed_progress_skew_span
18061814
description: The time difference between the fastest and slowest span's resolved timestamp

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11712,9 +11712,9 @@ func TestParallelIOMetrics(t *testing.T) {
1171211712
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
1171311713
registry := s.Server.JobRegistry().(*jobs.Registry)
1171411714
metrics := registry.MetricsStruct().Changefeed.(*Metrics).AggMetrics
11715-
11715+
numWorkers := 1
1171611716
db := sqlutils.MakeSQLRunner(s.DB)
11717-
db.Exec(t, `SET CLUSTER SETTING changefeed.sink_io_workers = 1`)
11717+
db.Exec(t, fmt.Sprintf(`SET CLUSTER SETTING changefeed.sink_io_workers = %d`, numWorkers))
1171811718
db.Exec(t, `
1171911719
CREATE TABLE foo (a INT PRIMARY KEY);
1172011720
`)
@@ -11743,6 +11743,7 @@ func TestParallelIOMetrics(t *testing.T) {
1174311743
// Set the frequency to 1s. The default frequency at the time of writing is
1174411744
foo, err := f.Feed("CREATE CHANGEFEED FOR TABLE foo WITH pubsub_sink_config=" +
1174511745
"'{\"Flush\": {\"Frequency\": \"100ms\"}}'")
11746+
defer closeFeed(t, foo)
1174611747
require.NoError(t, err)
1174711748

1174811749
testutils.SucceedsSoon(t, func() error {
@@ -11775,6 +11776,19 @@ func TestParallelIOMetrics(t *testing.T) {
1177511776
}
1177611777
return nil
1177711778
})
11779+
11780+
assert.Equal(t, int64(numWorkers), metrics.ParallelIOWorkers.Value())
11781+
jobFeed := foo.(cdctest.EnterpriseTestFeed)
11782+
require.NoError(t, jobFeed.Pause())
11783+
db.Exec(t, fmt.Sprintf(`SET CLUSTER SETTING changefeed.sink_io_workers = %d`, numWorkers+1))
11784+
require.NoError(t, jobFeed.Resume())
11785+
testutils.SucceedsSoon(t, func() error {
11786+
if metrics.ParallelIOWorkers.Value() != int64(numWorkers+1) {
11787+
return errors.Newf("waiting for workers: %d", metrics.ParallelIOWorkers.Value())
11788+
}
11789+
return nil
11790+
})
11791+
1177811792
close(done)
1177911793
require.NoError(t, g.Wait())
1178011794
require.NoError(t, foo.Close())

pkg/ccl/changefeedccl/metrics.go

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ type AggMetrics struct {
6969
ParallelIOPendingRows *aggmetric.AggGauge
7070
ParallelIOResultQueueNanos *aggmetric.AggHistogram
7171
ParallelIOInFlightKeys *aggmetric.AggGauge
72+
ParallelIOWorkers *aggmetric.AggGauge
7273
SinkIOInflight *aggmetric.AggGauge
7374
SinkBackpressureNanos *aggmetric.AggHistogram
7475
CommitLatency *aggmetric.AggHistogram
@@ -128,6 +129,7 @@ type metricsRecorder interface {
128129
recordSizeBasedFlush()
129130
newParallelIOMetricsRecorder() parallelIOMetricsRecorder
130131
recordSinkIOInflightChange(int64)
132+
recordParallelIOWorkers(int64)
131133
recordSinkBackpressure(time.Duration)
132134
makeCloudstorageFileAllocCallback() func(delta int64)
133135
getKafkaThrottlingMetrics(*cluster.Settings) metrics.Histogram
@@ -158,6 +160,7 @@ type sliMetrics struct {
158160
ParallelIOPendingRows *aggmetric.Gauge
159161
ParallelIOResultQueueNanos *aggmetric.Histogram
160162
ParallelIOInFlightKeys *aggmetric.Gauge
163+
ParallelIOWorkers *aggmetric.Gauge
161164
SinkIOInflight *aggmetric.Gauge
162165
SinkBackpressureNanos *aggmetric.Histogram
163166
CommitLatency *aggmetric.Histogram
@@ -554,8 +557,8 @@ func (k *kafkaHistogramAdapter) Variance() (_ float64) {
554557
}
555558

556559
type parallelIOMetricsRecorder interface {
557-
recordPendingQueuePush(numKeys int64)
558-
recordPendingQueuePop(numKeys int64, latency time.Duration)
560+
recordPendingQueuePush(numMessages int64)
561+
recordPendingQueuePop(numMessages int64, latency time.Duration)
559562
recordResultQueueLatency(latency time.Duration)
560563
setInFlightKeys(n int64)
561564
}
@@ -626,6 +629,14 @@ func (m *sliMetrics) recordSinkIOInflightChange(delta int64) {
626629
m.SinkIOInflight.Inc(delta)
627630
}
628631

632+
func (m *sliMetrics) recordParallelIOWorkers(n int64) {
633+
if m == nil {
634+
return
635+
}
636+
637+
m.ParallelIOWorkers.Update(n)
638+
}
639+
629640
func (m *sliMetrics) recordSinkBackpressure(duration time.Duration) {
630641
if m == nil {
631642
return
@@ -712,6 +723,10 @@ func (w *wrappingCostController) recordSinkIOInflightChange(delta int64) {
712723
w.inner.recordSinkIOInflightChange(delta)
713724
}
714725

726+
func (w *wrappingCostController) recordParallelIOWorkers(n int64) {
727+
w.inner.recordParallelIOWorkers(n)
728+
}
729+
715730
func (w *wrappingCostController) recordSinkBackpressure(duration time.Duration) {
716731
w.inner.recordSinkBackpressure(duration)
717732
}
@@ -976,7 +991,7 @@ func newAggregateMetrics(histogramWindow time.Duration, lookup *cidr.Lookup) *Ag
976991
metaChangefeedParallelIOPendingRows := metric.Metadata{
977992
Name: "changefeed.parallel_io_pending_rows",
978993
Help: "Number of rows which are blocked from being sent due to conflicting in-flight keys",
979-
Measurement: "Keys",
994+
Measurement: "Messages",
980995
Unit: metric.Unit_COUNT,
981996
}
982997
metaChangefeedParallelIOResultQueueNanos := metric.Metadata{
@@ -992,6 +1007,12 @@ func newAggregateMetrics(histogramWindow time.Duration, lookup *cidr.Lookup) *Ag
9921007
Measurement: "Keys",
9931008
Unit: metric.Unit_COUNT,
9941009
}
1010+
metaChangefeedParallelIOWorkers := metric.Metadata{
1011+
Name: "changefeed.parallel_io_workers",
1012+
Help: "The number of workers in the ParallelIO",
1013+
Measurement: "Workers",
1014+
Unit: metric.Unit_COUNT,
1015+
}
9951016
metaChangefeedSinkIOInflight := metric.Metadata{
9961017
Name: "changefeed.sink_io_inflight",
9971018
Help: "The number of keys currently inflight as IO requests being sent to the sink",
@@ -1138,6 +1159,7 @@ func newAggregateMetrics(histogramWindow time.Duration, lookup *cidr.Lookup) *Ag
11381159
SigFigs: 2,
11391160
BucketConfig: metric.ChangefeedBatchLatencyBuckets,
11401161
}),
1162+
ParallelIOWorkers: b.Gauge(metaChangefeedParallelIOWorkers),
11411163
BatchHistNanos: b.Histogram(metric.HistogramOptions{
11421164
Metadata: metaChangefeedBatchHistNanos,
11431165
Duration: histogramWindow,
@@ -1245,6 +1267,7 @@ func (a *AggMetrics) getOrCreateScope(scope string) (*sliMetrics, error) {
12451267
ParallelIOPendingRows: a.ParallelIOPendingRows.AddChild(scope),
12461268
ParallelIOResultQueueNanos: a.ParallelIOResultQueueNanos.AddChild(scope),
12471269
ParallelIOInFlightKeys: a.ParallelIOInFlightKeys.AddChild(scope),
1270+
ParallelIOWorkers: a.ParallelIOWorkers.AddChild(scope),
12481271
SinkIOInflight: a.SinkIOInflight.AddChild(scope),
12491272
SinkBackpressureNanos: a.SinkBackpressureNanos.AddChild(scope),
12501273
CommitLatency: a.CommitLatency.AddChild(scope),

pkg/ccl/changefeedccl/parallel_io.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ func NewParallelIO(
100100
wg.GoCtx(func(ctx context.Context) error {
101101
return io.processIO(ctx, numWorkers)
102102
})
103-
103+
io.metrics.recordParallelIOWorkers(int64(numWorkers))
104104
return io
105105
}
106106

0 commit comments

Comments
 (0)