Skip to content

Commit 3c76919

Browse files
craig[bot]andyyang890
andcommitted
Merge #155273
155273: changefeedccl: fix progress skew metrics computation r=log-head,asg0451 a=andyyang890 This patch fixes a bug where the progress skew metrics could show incorrect data because the computation previously relied on comparing the fastest span/table timestamps with the checkpointed timestamp (overall resolved timestamp) and these two pieces of information weren't kept in sync. Fixes #155083 Release note: None Co-authored-by: Andy Yang <[email protected]>
2 parents 3a51a14 + f196a0e commit 3c76919

File tree

6 files changed

+94
-47
lines changed

6 files changed

+94
-47
lines changed

pkg/ccl/changefeedccl/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ go_library(
146146
"//pkg/util/httputil",
147147
"//pkg/util/humanizeutil",
148148
"//pkg/util/intsets",
149+
"//pkg/util/iterutil",
149150
"//pkg/util/json",
150151
"//pkg/util/log",
151152
"//pkg/util/log/channel",

pkg/ccl/changefeedccl/changefeed_processors.go

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1783,7 +1783,7 @@ func (cf *changeFrontier) forwardFrontier(resolved jobspb.ResolvedSpan) error {
17831783
// The feed's checkpoint is tracked in a map which is used to inform the
17841784
// checkpoint_progress metric which will return the lowest timestamp across
17851785
// all feeds in the scope.
1786-
cf.sliMetrics.setCheckpoint(cf.sliMetricsID, cf.frontier.Frontier())
1786+
cf.sliMetrics.setCheckpoint(cf.sliMetricsID, newResolved)
17871787

17881788
return cf.maybeEmitResolved(cf.Ctx(), newResolved)
17891789
}
@@ -2226,16 +2226,29 @@ func (cf *changeFrontier) maybeEmitResolved(ctx context.Context, newResolved hlc
22262226

22272227
// updateProgressSkewMetrics updates the progress skew metrics.
22282228
func (cf *changeFrontier) updateProgressSkewMetrics() {
2229-
maxSpanTS := cf.frontier.LatestTS()
2230-
maxTableTS := cf.frontier.Frontier()
2231-
for _, f := range cf.frontier.Frontiers() {
2232-
tableTS := f.Frontier()
2233-
if tableTS.After(maxTableTS) {
2234-
maxTableTS = tableTS
2229+
fastestSpanTS := cf.frontier.LatestTS()
2230+
fastestTableTS := func() hlc.Timestamp {
2231+
var maxTS hlc.Timestamp
2232+
for _, f := range cf.frontier.Frontiers() {
2233+
if f.Frontier().After(maxTS) {
2234+
maxTS = f.Frontier()
2235+
}
2236+
}
2237+
return maxTS
2238+
}()
2239+
2240+
slowestTS := cf.frontier.Frontier()
2241+
var spanSkew, tableSkew int64
2242+
if slowestTS.IsSet() {
2243+
if fastestSpanTS.IsSet() {
2244+
spanSkew = fastestSpanTS.WallTime - slowestTS.WallTime
2245+
}
2246+
if fastestTableTS.IsSet() {
2247+
tableSkew = fastestTableTS.WallTime - slowestTS.WallTime
22352248
}
22362249
}
22372250

2238-
cf.sliMetrics.setFastestTS(cf.sliMetricsID, maxSpanTS, maxTableTS)
2251+
cf.sliMetrics.setProgressSkew(cf.sliMetricsID, spanSkew, tableSkew)
22392252
}
22402253

22412254
func frontierIsBehind(frontier hlc.Timestamp, sv *settings.Values) bool {

pkg/ccl/changefeedccl/changefeed_progress_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -273,8 +273,7 @@ WITH no_initial_scan, min_checkpoint_frequency='1s', resolved, metrics_label='%s
273273
}
274274
if !perTableTracking {
275275
if tableSkew != 0 {
276-
// TODO(#155083): Return an error here.
277-
return nil
276+
return errors.Newf("expected table skew to be 0, got %d", tableSkew)
278277
}
279278
return nil
280279
}

pkg/ccl/changefeedccl/metrics.go

Lines changed: 27 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@
66
package changefeedccl
77

88
import (
9+
"cmp"
910
"context"
11+
"maps"
1012
"slices"
1113
"strings"
1214
"sync/atomic"
@@ -25,6 +27,7 @@ import (
2527
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
2628
"github.com/cockroachdb/cockroach/pkg/util/cidr"
2729
"github.com/cockroachdb/cockroach/pkg/util/hlc"
30+
"github.com/cockroachdb/cockroach/pkg/util/iterutil"
2831
"github.com/cockroachdb/cockroach/pkg/util/log/logcrash"
2932
"github.com/cockroachdb/cockroach/pkg/util/metric"
3033
"github.com/cockroachdb/cockroach/pkg/util/metric/aggmetric"
@@ -188,11 +191,11 @@ type sliMetrics struct {
188191

189192
mu struct {
190193
syncutil.Mutex
191-
id int64
192-
resolved map[int64]hlc.Timestamp
193-
checkpoint map[int64]hlc.Timestamp
194-
fastestSpan map[int64]hlc.Timestamp
195-
fastestTable map[int64]hlc.Timestamp
194+
id int64
195+
resolved map[int64]hlc.Timestamp
196+
checkpoint map[int64]hlc.Timestamp
197+
spanSkew map[int64]int64
198+
tableSkew map[int64]int64
196199
}
197200
NetMetrics *cidr.NetMetrics
198201

@@ -206,8 +209,8 @@ func (m *sliMetrics) closeId(id int64) {
206209
defer m.mu.Unlock()
207210
delete(m.mu.checkpoint, id)
208211
delete(m.mu.resolved, id)
209-
delete(m.mu.fastestSpan, id)
210-
delete(m.mu.fastestTable, id)
212+
delete(m.mu.spanSkew, id)
213+
delete(m.mu.tableSkew, id)
211214
}
212215

213216
// setResolved writes a resolved timestamp entry for the given id.
@@ -228,15 +231,15 @@ func (m *sliMetrics) setCheckpoint(id int64, ts hlc.Timestamp) {
228231
}
229232
}
230233

231-
// setFastestTS saves the fastest span/table timestamps for a given id.
232-
func (m *sliMetrics) setFastestTS(id int64, spanTS hlc.Timestamp, tableTS hlc.Timestamp) {
234+
// setProgressSkew saves the span skew/table skew for a given ID.
235+
func (m *sliMetrics) setProgressSkew(id int64, spanSkew int64, tableSkew int64) {
233236
m.mu.Lock()
234237
defer m.mu.Unlock()
235-
if _, ok := m.mu.fastestSpan[id]; ok {
236-
m.mu.fastestSpan[id] = spanTS
238+
if _, ok := m.mu.spanSkew[id]; ok {
239+
m.mu.spanSkew[id] = spanSkew
237240
}
238-
if _, ok := m.mu.fastestTable[id]; ok {
239-
m.mu.fastestTable[id] = tableTS
241+
if _, ok := m.mu.tableSkew[id]; ok {
242+
m.mu.tableSkew[id] = tableSkew
240243
}
241244
}
242245

@@ -249,8 +252,8 @@ func (m *sliMetrics) claimId() int64 {
249252
// ignored until a nonzero timestamp is written.
250253
m.mu.checkpoint[id] = hlc.Timestamp{}
251254
m.mu.resolved[id] = hlc.Timestamp{}
252-
m.mu.fastestSpan[id] = hlc.Timestamp{}
253-
m.mu.fastestTable[id] = hlc.Timestamp{}
255+
m.mu.spanSkew[id] = 0
256+
m.mu.tableSkew[id] = 0
254257
m.mu.id++
255258
return id
256259
}
@@ -1296,8 +1299,8 @@ func (a *AggMetrics) getOrCreateScope(scope string) (*sliMetrics, error) {
12961299
}
12971300
sm.mu.resolved = make(map[int64]hlc.Timestamp)
12981301
sm.mu.checkpoint = make(map[int64]hlc.Timestamp)
1299-
sm.mu.fastestSpan = make(map[int64]hlc.Timestamp)
1300-
sm.mu.fastestTable = make(map[int64]hlc.Timestamp)
1302+
sm.mu.spanSkew = make(map[int64]int64)
1303+
sm.mu.tableSkew = make(map[int64]int64)
13011304
sm.mu.id = 1 // start the first id at 1 so we can detect intiialization
13021305

13031306
minTimestampGetter := func(m map[int64]hlc.Timestamp) func() int64 {
@@ -1328,34 +1331,21 @@ func (a *AggMetrics) getOrCreateScope(scope string) (*sliMetrics, error) {
13281331
}
13291332
}
13301333

1331-
maxTimestampSkewGetter := func(
1332-
base map[int64]hlc.Timestamp, ahead map[int64]hlc.Timestamp,
1333-
) func() int64 {
1334+
maxTimestampSkewGetter := func(m map[int64]int64) func() int64 {
13341335
return func() int64 {
13351336
sm.mu.Lock()
13361337
defer sm.mu.Unlock()
1337-
var maxSkew int64
1338-
for id, b := range base {
1339-
a := ahead[id]
1340-
if a.IsEmpty() || b.IsEmpty() {
1341-
continue
1342-
}
1343-
skew := a.WallTime - b.WallTime
1344-
if skew > maxSkew {
1345-
maxSkew = skew
1346-
}
1347-
}
1348-
return maxSkew
1338+
return iterutil.MaxFunc(maps.Values(m), cmp.Compare)
13491339
}
13501340
}
13511341

13521342
sm.AggregatorProgress = a.AggregatorProgress.AddFunctionalChild(minTimestampGetter(sm.mu.resolved), scope)
13531343
sm.CheckpointProgress = a.CheckpointProgress.AddFunctionalChild(minTimestampGetter(sm.mu.checkpoint), scope)
13541344
sm.MaxBehindNanos = a.MaxBehindNanos.AddFunctionalChild(maxBehindNanosGetter(sm.mu.resolved), scope)
13551345
sm.SpanProgressSkew = a.SpanProgressSkew.AddFunctionalChild(
1356-
maxTimestampSkewGetter(sm.mu.checkpoint, sm.mu.fastestSpan), scope)
1346+
maxTimestampSkewGetter(sm.mu.spanSkew), scope)
13571347
sm.TableProgressSkew = a.TableProgressSkew.AddFunctionalChild(
1358-
maxTimestampSkewGetter(sm.mu.checkpoint, sm.mu.fastestTable), scope)
1348+
maxTimestampSkewGetter(sm.mu.tableSkew), scope)
13591349

13601350
a.mu.sliMetrics[scope] = sm
13611351
return sm, nil
@@ -1364,7 +1354,7 @@ func (a *AggMetrics) getOrCreateScope(scope string) (*sliMetrics, error) {
13641354
// getLaggingRangesCallback returns a function which can be called to update the
13651355
// lagging ranges metric. It should be called with the current number of lagging
13661356
// ranges.
1367-
func (s *sliMetrics) getLaggingRangesCallback() func(lagging int64, total int64) {
1357+
func (m *sliMetrics) getLaggingRangesCallback() func(lagging int64, total int64) {
13681358
// Because this gauge is shared between changefeeds in the same metrics scope,
13691359
// we must instead modify it using `Inc` and `Dec` (as opposed to `Update`) to
13701360
// ensure values written by others are not overwritten. The code below is used
@@ -1388,10 +1378,10 @@ func (s *sliMetrics) getLaggingRangesCallback() func(lagging int64, total int64)
13881378
last.Lock()
13891379
defer last.Unlock()
13901380

1391-
s.LaggingRanges.Dec(last.lagging - lagging)
1381+
m.LaggingRanges.Dec(last.lagging - lagging)
13921382
last.lagging = lagging
13931383

1394-
s.TotalRanges.Dec(last.total - total)
1384+
m.TotalRanges.Dec(last.total - total)
13951385
last.total = total
13961386
}
13971387
}

pkg/util/iterutil/iterutil.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,3 +78,15 @@ func MinFunc[E any](seq iter.Seq[E], cmp func(E, E) int) E {
7878
}
7979
return m
8080
}
81+
82+
// MaxFunc returns the maximum element in seq, using cmp to compare elements.
83+
// If seq has no values, the zero value is returned.
84+
func MaxFunc[E any](seq iter.Seq[E], cmp func(E, E) int) E {
85+
var m E
86+
for i, v := range Enumerate(seq) {
87+
if i == 0 || cmp(v, m) > 0 {
88+
m = v
89+
}
90+
}
91+
return m
92+
}

pkg/util/iterutil/iterutil_test.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,3 +95,35 @@ func TestMinFunc(t *testing.T) {
9595
})
9696
}
9797
}
98+
99+
func TestMaxFunc(t *testing.T) {
100+
intCmp := func(a, b int) int {
101+
return a - b
102+
}
103+
104+
for name, tc := range map[string]struct {
105+
input []int
106+
}{
107+
"empty": {
108+
input: nil,
109+
},
110+
"one element": {
111+
input: []int{1},
112+
},
113+
"multiple elements": {
114+
input: []int{1, 3, 2},
115+
},
116+
"multiple elements with zero value": {
117+
input: []int{1, 0, 3, 2},
118+
},
119+
} {
120+
t.Run(name, func(t *testing.T) {
121+
m := iterutil.MaxFunc(slices.Values(tc.input), intCmp)
122+
if len(tc.input) == 0 {
123+
require.Equal(t, 0, m)
124+
} else {
125+
require.Equal(t, slices.MaxFunc(tc.input, intCmp), m)
126+
}
127+
})
128+
}
129+
}

0 commit comments

Comments
 (0)