Skip to content

Commit 199c371

Browse files
committed
changefeedccl: add progress skew metrics
This patch adds two new changefeed metrics for tracking the max skew between a changefeed's slowest and fastest span/table. The metrics are gauge metrics with the names `changefeed.progress_skew.{span,table}`. Release note (ops change): Two new changefeed metrics for tracking the max skew between a changefeed's slowest and fastest span/table have been added. The metrics are gauge metrics with the names `changefeed.progress_skew.{span,table}`.
1 parent f08398d commit 199c371

File tree

5 files changed

+234
-4
lines changed

5 files changed

+234
-4
lines changed

docs/generated/metrics/metrics.yaml

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1725,6 +1725,22 @@ layers:
17251725
unit: NANOSECONDS
17261726
aggregation: AVG
17271727
derivative: NONE
1728+
- name: changefeed.progress_skew.span
1729+
exported_name: changefeed_progress_skew_span
1730+
description: The time difference between the fastest and slowest span's resolved timestamp
1731+
y_axis_label: Nanoseconds
1732+
type: GAUGE
1733+
unit: NANOSECONDS
1734+
aggregation: AVG
1735+
derivative: NONE
1736+
- name: changefeed.progress_skew.table
1737+
exported_name: changefeed_progress_skew_table
1738+
description: The time difference between the fastest and slowest table's resolved timestamp
1739+
y_axis_label: Nanoseconds
1740+
type: GAUGE
1741+
unit: NANOSECONDS
1742+
aggregation: AVG
1743+
derivative: NONE
17281744
- name: changefeed.queue_time_nanos
17291745
exported_name: changefeed_queue_time_nanos
17301746
description: Time KV event spent waiting to be processed

pkg/ccl/changefeedccl/changefeed_processors.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1736,6 +1736,8 @@ func (cf *changeFrontier) noteAggregatorProgress(ctx context.Context, d rowenc.E
17361736
}
17371737
}
17381738

1739+
cf.updateProgressSkewMetrics()
1740+
17391741
return nil
17401742
}
17411743

@@ -2263,6 +2265,20 @@ func (cf *changeFrontier) maybeEmitResolved(ctx context.Context, newResolved hlc
22632265
return nil
22642266
}
22652267

2268+
// updateProgressSkewMetrics updates the progress skew metrics.
2269+
func (cf *changeFrontier) updateProgressSkewMetrics() {
2270+
maxSpanTS := cf.frontier.LatestTS()
2271+
maxTableTS := cf.frontier.Frontier()
2272+
for _, f := range cf.frontier.Frontiers() {
2273+
tableTS := f.Frontier()
2274+
if tableTS.After(maxTableTS) {
2275+
maxTableTS = tableTS
2276+
}
2277+
}
2278+
2279+
cf.sliMetrics.setFastestTS(cf.sliMetricsID, maxSpanTS, maxTableTS)
2280+
}
2281+
22662282
func frontierIsBehind(frontier hlc.Timestamp, sv *settings.Values) bool {
22672283
if frontier.IsEmpty() {
22682284
// During backfills we consider ourselves "behind" for the purposes of

pkg/ccl/changefeedccl/changefeed_progress_test.go

Lines changed: 131 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@ package changefeedccl
77

88
import (
99
"context"
10+
"fmt"
11+
"math"
12+
"sync/atomic"
1013
"testing"
1114
"time"
1215

@@ -17,6 +20,7 @@ import (
1720
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
1821
"github.com/cockroachdb/cockroach/pkg/roachpb"
1922
"github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils"
23+
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
2024
"github.com/cockroachdb/cockroach/pkg/sql/isql"
2125
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
2226
"github.com/cockroachdb/cockroach/pkg/testutils"
@@ -40,7 +44,8 @@ func TestChangefeedFrontierPersistence(t *testing.T) {
4044
ctx := context.Background()
4145

4246
// Set a short interval for frontier persistence.
43-
sqlDB.Exec(t, "SET CLUSTER SETTING changefeed.progress.frontier_persistence.interval = '5s'")
47+
changefeedbase.FrontierPersistenceInterval.Override(ctx,
48+
&s.Server.ClusterSettings().SV, 5*time.Second)
4449

4550
// Get frontier persistence metric.
4651
registry := s.Server.JobRegistry().(*jobs.Registry)
@@ -179,3 +184,128 @@ RETURNING cluster_logical_timestamp()`).Scan(&tsStr)
179184

180185
cdcTest(t, testFn, feedTestEnterpriseSinks)
181186
}
187+
188+
func TestChangefeedProgressSkewMetrics(t *testing.T) {
189+
defer leaktest.AfterTest(t)()
190+
defer log.Scope(t).Close(t)
191+
192+
testutils.RunTrueAndFalse(t, "per-table tracking", func(t *testing.T, perTableTracking bool) {
193+
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
194+
sqlDB := sqlutils.MakeSQLRunner(s.DB)
195+
ctx := context.Background()
196+
197+
// Enable/disable per-table tracking.
198+
changefeedbase.TrackPerTableProgress.Override(ctx,
199+
&s.Server.ClusterSettings().SV, perTableTracking)
200+
201+
registry := s.Server.JobRegistry().(*jobs.Registry)
202+
aggMetrics := registry.MetricsStruct().Changefeed.(*Metrics).AggMetrics
203+
const scope = "skew"
204+
scopedMetrics, err := aggMetrics.getOrCreateScope(scope)
205+
require.NoError(t, err)
206+
207+
// Progress skew metrics should start at zero.
208+
require.Zero(t, aggMetrics.SpanProgressSkew.Value())
209+
require.Zero(t, aggMetrics.TableProgressSkew.Value())
210+
require.Zero(t, scopedMetrics.SpanProgressSkew.Value())
211+
require.Zero(t, scopedMetrics.TableProgressSkew.Value())
212+
213+
// Create two tables and insert some initial data.
214+
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`)
215+
sqlDB.Exec(t, `CREATE TABLE bar (b INT PRIMARY KEY)`)
216+
sqlDB.Exec(t, `INSERT INTO foo VALUES (1), (2), (3)`)
217+
sqlDB.Exec(t, `INSERT INTO bar VALUES (1), (2), (3)`)
218+
219+
// Set up testing knobs to block all progress updates for bar.
220+
var blockBarProgress atomic.Bool
221+
blockBarProgress.Store(true)
222+
{
223+
barTableSpan := desctestutils.
224+
TestingGetPublicTableDescriptor(s.Server.DB(), s.Codec, "d", "bar").
225+
PrimaryIndexSpan(s.Codec)
226+
227+
knobs := s.TestingKnobs.
228+
DistSQL.(*execinfra.TestingKnobs).
229+
Changefeed.(*TestingKnobs)
230+
231+
knobs.FilterSpanWithMutation = func(rs *jobspb.ResolvedSpan) (bool, error) {
232+
if blockBarProgress.Load() && barTableSpan.Contains(rs.Span) {
233+
return true, nil
234+
}
235+
return false, nil
236+
}
237+
}
238+
239+
// Create changefeed for both tables with no initial scan.
240+
feed := feed(t, f, fmt.Sprintf(`CREATE CHANGEFEED FOR foo, bar
241+
WITH no_initial_scan, min_checkpoint_frequency='1s', resolved, metrics_label='%s'`, scope))
242+
defer closeFeed(t, feed)
243+
244+
assertSpanSkewInRange := func(start int64, end int64) int64 {
245+
var spanSkew int64
246+
testutils.SucceedsSoon(t, func() error {
247+
spanSkew = aggMetrics.SpanProgressSkew.Value()
248+
scopedSpanSkew := scopedMetrics.SpanProgressSkew.Value()
249+
if spanSkew != scopedSpanSkew {
250+
return errors.Newf("aggregate and scoped span skew don't match")
251+
}
252+
if spanSkew < start {
253+
return errors.Newf("expected span skew to be at least %d, got %d", start, spanSkew)
254+
}
255+
if spanSkew >= end {
256+
return errors.Newf("expected span skew to be less than %d, got %d", end, spanSkew)
257+
}
258+
return nil
259+
})
260+
return spanSkew
261+
}
262+
assertTableSkewInRange := func(start int64, end int64) int64 {
263+
var tableSkew int64
264+
testutils.SucceedsSoon(t, func() error {
265+
tableSkew = aggMetrics.TableProgressSkew.Value()
266+
scopedTableSkew := scopedMetrics.TableProgressSkew.Value()
267+
if tableSkew != scopedTableSkew {
268+
return errors.Newf("aggregate and scoped table skew don't match")
269+
}
270+
if !perTableTracking {
271+
if tableSkew != 0 {
272+
return errors.Newf("expected table skew to be 0, got %d", tableSkew)
273+
}
274+
return nil
275+
}
276+
if tableSkew < start {
277+
return errors.Newf("expected table skew to be at least %d, got %d", start, tableSkew)
278+
}
279+
if tableSkew >= end {
280+
return errors.Newf("expected table skew to be less than %d, got %d", end, tableSkew)
281+
}
282+
return nil
283+
})
284+
return tableSkew
285+
}
286+
287+
// Verify that progress skew metrics show a non-negligible amount of lag
288+
// since bar progress is blocked. Some amount of skew is often unavoidable
289+
// due to the fact the aggregator processes the rangefeed checkpoints for
290+
// different spans separately and at the time of a flush, may have only
291+
// processed a portion of the checkpoints for a specific closed timestamp.
292+
// The duration of 5s has been chosen given the default closed timestamp
293+
// interval is 3s.
294+
startingSpanSkew := assertSpanSkewInRange(int64(5*time.Second), math.MaxInt64)
295+
startingTableSkew := assertTableSkewInRange(int64(5*time.Second), math.MaxInt64)
296+
297+
// Verify that skew continues to increase since bar progress is still blocked.
298+
assertSpanSkewInRange(startingSpanSkew+int64(5*time.Second), math.MaxInt64)
299+
assertTableSkewInRange(startingTableSkew+int64(5*time.Second), math.MaxInt64)
300+
301+
// Re-enable progress updates for bar.
302+
blockBarProgress.Store(false)
303+
304+
// Verify that skew drops below the skew observed at the start.
305+
assertSpanSkewInRange(0, startingSpanSkew)
306+
assertTableSkewInRange(0, startingTableSkew)
307+
}
308+
309+
cdcTest(t, testFn)
310+
})
311+
}

pkg/ccl/changefeedccl/metrics.go

Lines changed: 66 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,8 @@ type AggMetrics struct {
8888
KafkaThrottlingNanos *aggmetric.AggHistogram
8989
SinkErrors *aggmetric.AggCounter
9090
MaxBehindNanos *aggmetric.AggGauge
91+
SpanProgressSkew *aggmetric.AggGauge
92+
TableProgressSkew *aggmetric.AggGauge
9193

9294
Timers *timers.Timers
9395

@@ -175,14 +177,18 @@ type sliMetrics struct {
175177
KafkaThrottlingNanos *aggmetric.Histogram
176178
SinkErrors *aggmetric.Counter
177179
MaxBehindNanos *aggmetric.Gauge
180+
SpanProgressSkew *aggmetric.Gauge
181+
TableProgressSkew *aggmetric.Gauge
178182

179183
Timers *timers.ScopedTimers
180184

181185
mu struct {
182186
syncutil.Mutex
183-
id int64
184-
resolved map[int64]hlc.Timestamp
185-
checkpoint map[int64]hlc.Timestamp
187+
id int64
188+
resolved map[int64]hlc.Timestamp
189+
checkpoint map[int64]hlc.Timestamp
190+
fastestSpan map[int64]hlc.Timestamp
191+
fastestTable map[int64]hlc.Timestamp
186192
}
187193
NetMetrics *cidr.NetMetrics
188194

@@ -196,6 +202,8 @@ func (m *sliMetrics) closeId(id int64) {
196202
defer m.mu.Unlock()
197203
delete(m.mu.checkpoint, id)
198204
delete(m.mu.resolved, id)
205+
delete(m.mu.fastestSpan, id)
206+
delete(m.mu.fastestTable, id)
199207
}
200208

201209
// setResolved writes a resolved timestamp entry for the given id.
@@ -216,6 +224,18 @@ func (m *sliMetrics) setCheckpoint(id int64, ts hlc.Timestamp) {
216224
}
217225
}
218226

227+
// setFastestTS saves the fastest span/table timestamps for a given id.
228+
func (m *sliMetrics) setFastestTS(id int64, spanTS hlc.Timestamp, tableTS hlc.Timestamp) {
229+
m.mu.Lock()
230+
defer m.mu.Unlock()
231+
if _, ok := m.mu.fastestSpan[id]; ok {
232+
m.mu.fastestSpan[id] = spanTS
233+
}
234+
if _, ok := m.mu.fastestTable[id]; ok {
235+
m.mu.fastestTable[id] = tableTS
236+
}
237+
}
238+
219239
// claimId claims a unique ID.
220240
func (m *sliMetrics) claimId() int64 {
221241
m.mu.Lock()
@@ -225,6 +245,8 @@ func (m *sliMetrics) claimId() int64 {
225245
// ignored until a nonzero timestamp is written.
226246
m.mu.checkpoint[id] = hlc.Timestamp{}
227247
m.mu.resolved[id] = hlc.Timestamp{}
248+
m.mu.fastestSpan[id] = hlc.Timestamp{}
249+
m.mu.fastestTable[id] = hlc.Timestamp{}
228250
m.mu.id++
229251
return id
230252
}
@@ -1035,6 +1057,18 @@ func newAggregateMetrics(histogramWindow time.Duration, lookup *cidr.Lookup) *Ag
10351057
Measurement: "Nanoseconds",
10361058
Unit: metric.Unit_NANOSECONDS,
10371059
}
1060+
metaChangefeedSpanProgressSkew := metric.Metadata{
1061+
Name: "changefeed.progress_skew.span",
1062+
Help: "The time difference between the fastest and slowest span's resolved timestamp",
1063+
Measurement: "Nanoseconds",
1064+
Unit: metric.Unit_NANOSECONDS,
1065+
}
1066+
metaChangefeedTableProgressSkew := metric.Metadata{
1067+
Name: "changefeed.progress_skew.table",
1068+
Help: "The time difference between the fastest and slowest table's resolved timestamp",
1069+
Measurement: "Nanoseconds",
1070+
Unit: metric.Unit_NANOSECONDS,
1071+
}
10381072

10391073
functionalGaugeMinFn := func(childValues []int64) int64 {
10401074
var min int64
@@ -1152,6 +1186,8 @@ func newAggregateMetrics(histogramWindow time.Duration, lookup *cidr.Lookup) *Ag
11521186
}),
11531187
SinkErrors: b.Counter(metaSinkErrors),
11541188
MaxBehindNanos: b.FunctionalGauge(metaChangefeedMaxBehindNanos, functionalGaugeMaxFn),
1189+
SpanProgressSkew: b.FunctionalGauge(metaChangefeedSpanProgressSkew, functionalGaugeMaxFn),
1190+
TableProgressSkew: b.FunctionalGauge(metaChangefeedTableProgressSkew, functionalGaugeMaxFn),
11551191
Timers: timers.New(histogramWindow),
11561192
NetMetrics: lookup.MakeNetMetrics(metaNetworkBytesOut, metaNetworkBytesIn, "sink"),
11571193
CheckpointMetrics: checkpoint.NewAggMetrics(b),
@@ -1236,6 +1272,8 @@ func (a *AggMetrics) getOrCreateScope(scope string) (*sliMetrics, error) {
12361272
}
12371273
sm.mu.resolved = make(map[int64]hlc.Timestamp)
12381274
sm.mu.checkpoint = make(map[int64]hlc.Timestamp)
1275+
sm.mu.fastestSpan = make(map[int64]hlc.Timestamp)
1276+
sm.mu.fastestTable = make(map[int64]hlc.Timestamp)
12391277
sm.mu.id = 1 // start the first id at 1 so we can detect intiialization
12401278

12411279
minTimestampGetter := func(m map[int64]hlc.Timestamp) func() int64 {
@@ -1266,9 +1304,34 @@ func (a *AggMetrics) getOrCreateScope(scope string) (*sliMetrics, error) {
12661304
}
12671305
}
12681306

1307+
maxTimestampSkewGetter := func(
1308+
base map[int64]hlc.Timestamp, ahead map[int64]hlc.Timestamp,
1309+
) func() int64 {
1310+
return func() int64 {
1311+
sm.mu.Lock()
1312+
defer sm.mu.Unlock()
1313+
var maxSkew int64
1314+
for id, b := range base {
1315+
a := ahead[id]
1316+
if a.IsEmpty() || b.IsEmpty() {
1317+
continue
1318+
}
1319+
skew := a.WallTime - b.WallTime
1320+
if skew > maxSkew {
1321+
maxSkew = skew
1322+
}
1323+
}
1324+
return maxSkew
1325+
}
1326+
}
1327+
12691328
sm.AggregatorProgress = a.AggregatorProgress.AddFunctionalChild(minTimestampGetter(sm.mu.resolved), scope)
12701329
sm.CheckpointProgress = a.CheckpointProgress.AddFunctionalChild(minTimestampGetter(sm.mu.checkpoint), scope)
12711330
sm.MaxBehindNanos = a.MaxBehindNanos.AddFunctionalChild(maxBehindNanosGetter(sm.mu.resolved), scope)
1331+
sm.SpanProgressSkew = a.SpanProgressSkew.AddFunctionalChild(
1332+
maxTimestampSkewGetter(sm.mu.checkpoint, sm.mu.fastestSpan), scope)
1333+
sm.TableProgressSkew = a.TableProgressSkew.AddFunctionalChild(
1334+
maxTimestampSkewGetter(sm.mu.checkpoint, sm.mu.fastestTable), scope)
12721335

12731336
a.mu.sliMetrics[scope] = sm
12741337
return sm, nil

pkg/ccl/changefeedccl/resolvedspan/frontier.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -354,6 +354,11 @@ func (f *resolvedSpanFrontier) HasLaggingSpans(sv *settings.Values) bool {
354354
return frontier.Add(lagThresholdNanos, 0).Less(f.latestTS)
355355
}
356356

357+
// LatestTS returns the latest timestamp in the frontier.
358+
func (f *resolvedSpanFrontier) LatestTS() hlc.Timestamp {
359+
return f.latestTS
360+
}
361+
357362
// All returns an iterator over the resolved spans in the frontier.
358363
func (f *resolvedSpanFrontier) All() iter.Seq[jobspb.ResolvedSpan] {
359364
return func(yield func(jobspb.ResolvedSpan) bool) {

0 commit comments

Comments
 (0)