Skip to content

Commit e92277e

Browse files
committed
changefeedccl: add parallel io workers metric
Add a gauge metric to track the number of workers in ParallelIO. Release note(ops change): Add metric changefeed.parallel_io_workers to track the number of workers in ParallelIO.
1 parent bfef620 commit e92277e

File tree

4 files changed

+49
-4
lines changed

4 files changed

+49
-4
lines changed

docs/generated/metrics/metrics.yaml

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1780,7 +1780,7 @@ layers:
17801780
- name: changefeed.parallel_io_pending_rows
17811781
exported_name: changefeed_parallel_io_pending_rows
17821782
description: Number of rows which are blocked from being sent due to conflicting in-flight keys
1783-
y_axis_label: Keys
1783+
y_axis_label: Messages
17841784
type: GAUGE
17851785
unit: COUNT
17861786
aggregation: AVG
@@ -1801,6 +1801,14 @@ layers:
18011801
unit: NANOSECONDS
18021802
aggregation: AVG
18031803
derivative: NONE
1804+
- name: changefeed.parallel_io_workers
1805+
exported_name: changefeed_parallel_io_workers
1806+
description: The number of workers in the ParallelIO
1807+
y_axis_label: Workers
1808+
type: GAUGE
1809+
unit: COUNT
1810+
aggregation: AVG
1811+
derivative: NONE
18041812
- name: changefeed.progress_skew.span
18051813
exported_name: changefeed_progress_skew_span
18061814
description: The time difference between the fastest and slowest span's resolved timestamp

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11712,9 +11712,9 @@ func TestParallelIOMetrics(t *testing.T) {
1171211712
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
1171311713
registry := s.Server.JobRegistry().(*jobs.Registry)
1171411714
metrics := registry.MetricsStruct().Changefeed.(*Metrics).AggMetrics
11715-
11715+
numWorkers := 1
1171611716
db := sqlutils.MakeSQLRunner(s.DB)
11717-
db.Exec(t, `SET CLUSTER SETTING changefeed.sink_io_workers = 1`)
11717+
db.Exec(t, fmt.Sprintf(`SET CLUSTER SETTING changefeed.sink_io_workers = %d`, numWorkers))
1171811718
db.Exec(t, `
1171911719
CREATE TABLE foo (a INT PRIMARY KEY);
1172011720
`)
@@ -11743,6 +11743,7 @@ func TestParallelIOMetrics(t *testing.T) {
1174311743
// Set the frequency to 1s. The default frequency at the time of writing is
1174411744
foo, err := f.Feed("CREATE CHANGEFEED FOR TABLE foo WITH pubsub_sink_config=" +
1174511745
"'{\"Flush\": {\"Frequency\": \"100ms\"}}'")
11746+
defer closeFeed(t, foo)
1174611747
require.NoError(t, err)
1174711748

1174811749
testutils.SucceedsSoon(t, func() error {
@@ -11775,6 +11776,19 @@ func TestParallelIOMetrics(t *testing.T) {
1177511776
}
1177611777
return nil
1177711778
})
11779+
11780+
assert.Equal(t, int64(numWorkers), metrics.ParallelIOWorkers.Value())
11781+
jobFeed := foo.(cdctest.EnterpriseTestFeed)
11782+
require.NoError(t, jobFeed.Pause())
11783+
db.Exec(t, fmt.Sprintf(`SET CLUSTER SETTING changefeed.sink_io_workers = %d`, numWorkers+1))
11784+
require.NoError(t, jobFeed.Resume())
11785+
testutils.SucceedsSoon(t, func() error {
11786+
if metrics.ParallelIOWorkers.Value() != int64(numWorkers+1) {
11787+
return errors.Newf("waiting for workers: %d", metrics.ParallelIOWorkers.Value())
11788+
}
11789+
return nil
11790+
})
11791+
1177811792
close(done)
1177911793
require.NoError(t, g.Wait())
1178011794
require.NoError(t, foo.Close())

pkg/ccl/changefeedccl/metrics.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ type AggMetrics struct {
6969
ParallelIOPendingRows *aggmetric.AggGauge
7070
ParallelIOResultQueueNanos *aggmetric.AggHistogram
7171
ParallelIOInFlightKeys *aggmetric.AggGauge
72+
ParallelIOWorkers *aggmetric.AggGauge
7273
SinkIOInflight *aggmetric.AggGauge
7374
SinkBackpressureNanos *aggmetric.AggHistogram
7475
CommitLatency *aggmetric.AggHistogram
@@ -128,6 +129,7 @@ type metricsRecorder interface {
128129
recordSizeBasedFlush()
129130
newParallelIOMetricsRecorder() parallelIOMetricsRecorder
130131
recordSinkIOInflightChange(int64)
132+
recordParallelIOWorkers(int64)
131133
recordSinkBackpressure(time.Duration)
132134
makeCloudstorageFileAllocCallback() func(delta int64)
133135
getKafkaThrottlingMetrics(*cluster.Settings) metrics.Histogram
@@ -158,6 +160,7 @@ type sliMetrics struct {
158160
ParallelIOPendingRows *aggmetric.Gauge
159161
ParallelIOResultQueueNanos *aggmetric.Histogram
160162
ParallelIOInFlightKeys *aggmetric.Gauge
163+
ParallelIOWorkers *aggmetric.Gauge
161164
SinkIOInflight *aggmetric.Gauge
162165
SinkBackpressureNanos *aggmetric.Histogram
163166
CommitLatency *aggmetric.Histogram
@@ -626,6 +629,14 @@ func (m *sliMetrics) recordSinkIOInflightChange(delta int64) {
626629
m.SinkIOInflight.Inc(delta)
627630
}
628631

632+
func (m *sliMetrics) recordParallelIOWorkers(n int64) {
633+
if m == nil {
634+
return
635+
}
636+
637+
m.ParallelIOWorkers.Update(n)
638+
}
639+
629640
func (m *sliMetrics) recordSinkBackpressure(duration time.Duration) {
630641
if m == nil {
631642
return
@@ -712,6 +723,10 @@ func (w *wrappingCostController) recordSinkIOInflightChange(delta int64) {
712723
w.inner.recordSinkIOInflightChange(delta)
713724
}
714725

726+
func (w *wrappingCostController) recordParallelIOWorkers(n int64) {
727+
w.inner.recordParallelIOWorkers(n)
728+
}
729+
715730
func (w *wrappingCostController) recordSinkBackpressure(duration time.Duration) {
716731
w.inner.recordSinkBackpressure(duration)
717732
}
@@ -992,6 +1007,12 @@ func newAggregateMetrics(histogramWindow time.Duration, lookup *cidr.Lookup) *Ag
9921007
Measurement: "Keys",
9931008
Unit: metric.Unit_COUNT,
9941009
}
1010+
metaChangefeedParallelIOWorkers := metric.Metadata{
1011+
Name: "changefeed.parallel_io_workers",
1012+
Help: "The number of workers in the ParallelIO",
1013+
Measurement: "Workers",
1014+
Unit: metric.Unit_COUNT,
1015+
}
9951016
metaChangefeedSinkIOInflight := metric.Metadata{
9961017
Name: "changefeed.sink_io_inflight",
9971018
Help: "The number of keys currently inflight as IO requests being sent to the sink",
@@ -1138,6 +1159,7 @@ func newAggregateMetrics(histogramWindow time.Duration, lookup *cidr.Lookup) *Ag
11381159
SigFigs: 2,
11391160
BucketConfig: metric.ChangefeedBatchLatencyBuckets,
11401161
}),
1162+
ParallelIOWorkers: b.Gauge(metaChangefeedParallelIOWorkers),
11411163
BatchHistNanos: b.Histogram(metric.HistogramOptions{
11421164
Metadata: metaChangefeedBatchHistNanos,
11431165
Duration: histogramWindow,
@@ -1245,6 +1267,7 @@ func (a *AggMetrics) getOrCreateScope(scope string) (*sliMetrics, error) {
12451267
ParallelIOPendingRows: a.ParallelIOPendingRows.AddChild(scope),
12461268
ParallelIOResultQueueNanos: a.ParallelIOResultQueueNanos.AddChild(scope),
12471269
ParallelIOInFlightKeys: a.ParallelIOInFlightKeys.AddChild(scope),
1270+
ParallelIOWorkers: a.ParallelIOWorkers.AddChild(scope),
12481271
SinkIOInflight: a.SinkIOInflight.AddChild(scope),
12491272
SinkBackpressureNanos: a.SinkBackpressureNanos.AddChild(scope),
12501273
CommitLatency: a.CommitLatency.AddChild(scope),

pkg/ccl/changefeedccl/parallel_io.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ func NewParallelIO(
100100
wg.GoCtx(func(ctx context.Context) error {
101101
return io.processIO(ctx, numWorkers)
102102
})
103-
103+
io.metrics.recordParallelIOWorkers(int64(numWorkers))
104104
return io
105105
}
106106

0 commit comments

Comments
 (0)