Skip to content

Commit 378e7c3

Browse files
craig[bot]andyyang890
andcommitted
Merge #153491
153491: changefeedccl: periodically write entire span frontier to job_info r=KeithCh a=andyyang890 **changefeedccl: periodically write entire span frontier to job_info** Fixes #153200 Release note (performance improvement): Changefeeds will now periodically persist their entire span frontiers so that fewer duplicates will need to be emitted during restarts. The default persistence interval is 30s, but this can be configured with the `changefeed.progress.frontier_persistence.interval` cluster setting. --- **roachtest: add benchmark for changefeed frontier persistence** Fixes #153202 Release note: None Co-authored-by: Andy Yang <[email protected]>
2 parents 43e23aa + 8fdaf89 commit 378e7c3

File tree

9 files changed

+320
-17
lines changed

9 files changed

+320
-17
lines changed

docs/generated/metrics/metrics.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1837,6 +1837,14 @@ layers:
18371837
unit: NANOSECONDS
18381838
aggregation: AVG
18391839
derivative: NONE
1840+
- name: changefeed.stage.frontier_persistence.latency
1841+
exported_name: changefeed_stage_frontier_persistence_latency
1842+
description: 'Latency of the changefeed stage: persisting frontier to job info'
1843+
y_axis_label: Latency
1844+
type: HISTOGRAM
1845+
unit: NANOSECONDS
1846+
aggregation: AVG
1847+
derivative: NONE
18401848
- name: changefeed.stage.kv_feed_buffer.latency
18411849
exported_name: changefeed_stage_kv_feed_buffer_latency
18421850
description: 'Latency of the changefeed stage: waiting to buffer kv events'

docs/generated/settings/settings-for-tenants.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ changefeed.kafka_v2_error_details.enabled boolean true if enabled, Kafka v2 sink
2222
changefeed.memory.per_changefeed_limit byte size 512 MiB controls amount of data that can be buffered per changefeed application
2323
changefeed.resolved_timestamp.min_update_interval (alias: changefeed.min_highwater_advance) duration 0s minimum amount of time that must have elapsed since the last time a changefeed's resolved timestamp was updated before it is eligible to be updated again; default of 0 means no minimum interval is enforced but updating will still be limited by the average time it takes to checkpoint progress application
2424
changefeed.node_throttle_config string specifies node level throttling configuration for all changefeeeds application
25+
changefeed.progress.frontier_persistence.interval duration 30s minimum amount of time that must elapse before a changefeed will persist its entire span frontier again application
2526
changefeed.protect_timestamp.max_age duration 96h0m0s fail the changefeed if the protected timestamp age exceeds this threshold; 0 disables expiration application
2627
changefeed.protect_timestamp_interval duration 10m0s controls how often the changefeed forwards its protected timestamp to the resolved timestamp application
2728
changefeed.schema_feed.read_with_priority_after duration 1m0s retry with high priority if we were not able to read descriptors for too long; 0 disables application

docs/generated/settings/settings.html

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
<tr><td><div id="setting-changefeed-memory-per-changefeed-limit" class="anchored"><code>changefeed.memory.per_changefeed_limit</code></div></td><td>byte size</td><td><code>512 MiB</code></td><td>controls amount of data that can be buffered per changefeed</td><td>Basic/Standard/Advanced/Self-Hosted</td></tr>
2828
<tr><td><div id="setting-changefeed-min-highwater-advance" class="anchored"><code>changefeed.resolved_timestamp.min_update_interval<br />(alias: changefeed.min_highwater_advance)</code></div></td><td>duration</td><td><code>0s</code></td><td>minimum amount of time that must have elapsed since the last time a changefeed&#39;s resolved timestamp was updated before it is eligible to be updated again; default of 0 means no minimum interval is enforced but updating will still be limited by the average time it takes to checkpoint progress</td><td>Basic/Standard/Advanced/Self-Hosted</td></tr>
2929
<tr><td><div id="setting-changefeed-node-throttle-config" class="anchored"><code>changefeed.node_throttle_config</code></div></td><td>string</td><td><code></code></td><td>specifies node level throttling configuration for all changefeeeds</td><td>Basic/Standard/Advanced/Self-Hosted</td></tr>
30+
<tr><td><div id="setting-changefeed-progress-frontier-persistence-interval" class="anchored"><code>changefeed.progress.frontier_persistence.interval</code></div></td><td>duration</td><td><code>30s</code></td><td>minimum amount of time that must elapse before a changefeed will persist its entire span frontier again</td><td>Basic/Standard/Advanced/Self-Hosted</td></tr>
3031
<tr><td><div id="setting-changefeed-protect-timestamp-max-age" class="anchored"><code>changefeed.protect_timestamp.max_age</code></div></td><td>duration</td><td><code>96h0m0s</code></td><td>fail the changefeed if the protected timestamp age exceeds this threshold; 0 disables expiration</td><td>Basic/Standard/Advanced/Self-Hosted</td></tr>
3132
<tr><td><div id="setting-changefeed-protect-timestamp-interval" class="anchored"><code>changefeed.protect_timestamp_interval</code></div></td><td>duration</td><td><code>10m0s</code></td><td>controls how often the changefeed forwards its protected timestamp to the resolved timestamp</td><td>Basic/Standard/Advanced/Self-Hosted</td></tr>
3233
<tr><td><div id="setting-changefeed-schema-feed-read-with-priority-after" class="anchored"><code>changefeed.schema_feed.read_with_priority_after</code></div></td><td>duration</td><td><code>1m0s</code></td><td>retry with high priority if we were not able to read descriptors for too long; 0 disables</td><td>Basic/Standard/Advanced/Self-Hosted</td></tr>

pkg/ccl/changefeedccl/BUILD.bazel

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ go_library(
7474
"//pkg/docs",
7575
"//pkg/featureflag",
7676
"//pkg/jobs",
77+
"//pkg/jobs/jobfrontier",
7778
"//pkg/jobs/jobsauth",
7879
"//pkg/jobs/jobspb",
7980
"//pkg/jobs/jobsprofiler",
@@ -127,6 +128,7 @@ go_library(
127128
"//pkg/sql/sessiondatapb",
128129
"//pkg/sql/syntheticprivilege",
129130
"//pkg/sql/types",
131+
"//pkg/util",
130132
"//pkg/util/admission",
131133
"//pkg/util/admission/admissionpb",
132134
"//pkg/util/bufalloc",
@@ -201,6 +203,7 @@ go_test(
201203
"alter_changefeed_test.go",
202204
"changefeed_dist_test.go",
203205
"changefeed_job_info_test.go",
206+
"changefeed_progress_test.go",
204207
"changefeed_stmt_test.go",
205208
"changefeed_test.go",
206209
"csv_test.go",
@@ -262,6 +265,7 @@ go_test(
262265
"//pkg/geo/geopb",
263266
"//pkg/internal/sqlsmith",
264267
"//pkg/jobs",
268+
"//pkg/jobs/jobfrontier",
265269
"//pkg/jobs/jobspb",
266270
"//pkg/jobs/jobstest",
267271
"//pkg/keys",

pkg/ccl/changefeedccl/changefeed_processors.go

Lines changed: 112 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/schemafeed"
2525
"github.com/cockroachdb/cockroach/pkg/clusterversion"
2626
"github.com/cockroachdb/cockroach/pkg/jobs"
27+
"github.com/cockroachdb/cockroach/pkg/jobs/jobfrontier"
2728
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
2829
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
2930
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts"
@@ -41,6 +42,7 @@ import (
4142
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
4243
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
4344
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
45+
"github.com/cockroachdb/cockroach/pkg/util"
4446
bulkutil "github.com/cockroachdb/cockroach/pkg/util/bulk"
4547
"github.com/cockroachdb/cockroach/pkg/util/cancelchecker"
4648
"github.com/cockroachdb/cockroach/pkg/util/envutil"
@@ -1055,6 +1057,9 @@ type changeFrontier struct {
10551057
// record was updated to the frontier's highwater mark
10561058
lastProtectedTimestampUpdate time.Time
10571059

1060+
// frontierPersistenceLimiter is a rate limiter for persisting the span frontier.
1061+
frontierPersistenceLimiter *saveRateLimiter
1062+
10581063
// js, if non-nil, is called to checkpoint the changefeed's
10591064
// progress in the corresponding system job entry.
10601065
js *jobState
@@ -1302,6 +1307,9 @@ func newChangeFrontierProcessor(
13021307
cf.freqEmitResolved = emitNoResolved
13031308
}
13041309

1310+
cf.frontierPersistenceLimiter = newSaveRateLimiter(
1311+
"frontier" /* name */, changefeedbase.FrontierPersistenceInterval)
1312+
13051313
encodingOpts, err := opts.GetEncodingOptions()
13061314
if err != nil {
13071315
return nil, err
@@ -1803,33 +1811,33 @@ func (cf *changeFrontier) maybeCheckpointJob(
18031811
return false, nil
18041812
}
18051813
checkpointStart := timeutil.Now()
1806-
updated, err := cf.checkpointJobProgress(ctx, cf.frontier.Frontier(), checkpoint, cf.evalCtx.Settings.Version)
1807-
if err != nil {
1814+
if err := cf.checkpointJobProgress(ctx, cf.frontier.Frontier(), checkpoint); err != nil {
18081815
return false, err
18091816
}
18101817
cf.js.checkpointCompleted(ctx, timeutil.Since(checkpointStart))
1811-
return updated, nil
18121818
}
18131819

1814-
return false, nil
1820+
if err := cf.maybePersistFrontier(ctx); err != nil {
1821+
return false, err
1822+
}
1823+
1824+
// TODO(#153462): Determine if this return value should return true
1825+
// only if the highwater was updated.
1826+
return updateCheckpoint || updateHighWater, nil
18151827
}
18161828

18171829
const changefeedJobProgressTxnName = "changefeed job progress"
18181830

18191831
func (cf *changeFrontier) checkpointJobProgress(
1820-
ctx context.Context,
1821-
frontier hlc.Timestamp,
1822-
spanLevelCheckpoint *jobspb.TimestampSpansMap,
1823-
cv clusterversion.Handle,
1824-
) (bool, error) {
1832+
ctx context.Context, frontier hlc.Timestamp, spanLevelCheckpoint *jobspb.TimestampSpansMap,
1833+
) error {
18251834
ctx, sp := tracing.ChildSpan(ctx, "changefeed.frontier.checkpoint_job_progress")
18261835
defer sp.Finish()
18271836
defer cf.sliMetrics.Timers.CheckpointJobProgress.Start()()
18281837

18291838
if cf.knobs.RaiseRetryableError != nil {
18301839
if err := cf.knobs.RaiseRetryableError(); err != nil {
1831-
return false, changefeedbase.MarkRetryableError(
1832-
errors.New("cf.knobs.RaiseRetryableError"))
1840+
return changefeedbase.MarkRetryableError(errors.New("cf.knobs.RaiseRetryableError"))
18331841
}
18341842
}
18351843

@@ -1857,6 +1865,9 @@ func (cf *changeFrontier) checkpointJobProgress(
18571865
changefeedProgress := progress.Details.(*jobspb.Progress_Changefeed).Changefeed
18581866
changefeedProgress.SpanLevelCheckpoint = spanLevelCheckpoint
18591867

1868+
// TODO(#153299): Make sure we only updated per-table PTS if we persisted
1869+
// the span frontier. We'll probably want to move this code out of
1870+
// checkpointJobProgress and into maybeCheckpointJob.
18601871
if ptsUpdated, err = cf.manageProtectedTimestamps(ctx, txn, changefeedProgress); err != nil {
18611872
log.Changefeed.Warningf(ctx, "error managing protected timestamp record: %v", err)
18621873
return err
@@ -1884,7 +1895,7 @@ func (cf *changeFrontier) checkpointJobProgress(
18841895

18851896
return nil
18861897
}); err != nil {
1887-
return false, err
1898+
return err
18881899
}
18891900
if ptsUpdated {
18901901
cf.lastProtectedTimestampUpdate = timeutil.Now()
@@ -1898,7 +1909,28 @@ func (cf *changeFrontier) checkpointJobProgress(
18981909
cf.localState.SetHighwater(frontier)
18991910
cf.localState.SetCheckpoint(spanLevelCheckpoint)
19001911

1901-
return true, nil
1912+
return nil
1913+
}
1914+
1915+
func (cf *changeFrontier) maybePersistFrontier(ctx context.Context) error {
1916+
ctx, sp := tracing.ChildSpan(ctx, "changefeed.frontier.maybe_persist_frontier")
1917+
defer sp.Finish()
1918+
1919+
if cf.spec.JobID == 0 ||
1920+
!cf.evalCtx.Settings.Version.IsActive(ctx, clusterversion.V25_4) ||
1921+
!cf.frontierPersistenceLimiter.canSave(ctx, &cf.FlowCtx.Cfg.Settings.SV) {
1922+
return nil
1923+
}
1924+
1925+
timer := cf.sliMetrics.Timers.FrontierPersistence.Start()
1926+
if err := cf.FlowCtx.Cfg.DB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
1927+
return jobfrontier.Store(ctx, txn, cf.spec.JobID, "coordinator", cf.frontier)
1928+
}); err != nil {
1929+
return err
1930+
}
1931+
persistDuration := timer()
1932+
cf.frontierPersistenceLimiter.doneSave(persistDuration)
1933+
return nil
19021934
}
19031935

19041936
// manageProtectedTimestamps periodically advances the protected timestamp for
@@ -2315,3 +2347,70 @@ func shouldCountUsageError(err error) bool {
23152347
pgerror.GetPGCode(err) != pgcode.UndefinedTable &&
23162348
status.Code(err) != codes.Canceled
23172349
}
2350+
2351+
// durationSetting is a duration cluster setting.
2352+
type durationSetting interface {
2353+
Name() settings.SettingName
2354+
Get(sv *settings.Values) time.Duration
2355+
}
2356+
2357+
// saveRateLimiter is a rate limiter for saving a piece of progress.
2358+
// It uses a duration setting as the minimum interval between saves.
2359+
// It also limits saving to not be more frequent than the average
2360+
// duration it takes to save progress.
2361+
type saveRateLimiter struct {
2362+
name redact.SafeString
2363+
saveInterval durationSetting
2364+
warnEveryN util.EveryN
2365+
2366+
lastSave time.Time
2367+
avgSaveDuration time.Duration
2368+
}
2369+
2370+
// newSaveRateLimiter returns a new saveRateLimiter.
2371+
func newSaveRateLimiter(name redact.SafeString, saveInterval durationSetting) *saveRateLimiter {
2372+
return &saveRateLimiter{
2373+
name: name,
2374+
saveInterval: saveInterval,
2375+
warnEveryN: util.Every(time.Minute),
2376+
}
2377+
}
2378+
2379+
// canSave returns whether enough time has passed to save progress again.
2380+
func (l *saveRateLimiter) canSave(ctx context.Context, sv *settings.Values) bool {
2381+
interval := l.saveInterval.Get(sv)
2382+
if interval == 0 {
2383+
return false
2384+
}
2385+
now := timeutil.Now()
2386+
elapsed := now.Sub(l.lastSave)
2387+
if elapsed < interval {
2388+
return false
2389+
}
2390+
if elapsed < l.avgSaveDuration {
2391+
if l.warnEveryN.ShouldProcess(now) {
2392+
log.Changefeed.Warningf(ctx, "cannot save %s even though %s has elapsed "+
2393+
"since last save and %s is set to %s because average duration to save was %s "+
2394+
"and further saving is disabled until that much time elapses",
2395+
l.name, elapsed, l.saveInterval.Name(),
2396+
interval, l.avgSaveDuration)
2397+
}
2398+
return false
2399+
}
2400+
return true
2401+
}
2402+
2403+
// doneSave must be called after each save is completed with the duration
2404+
// it took to save progress.
2405+
func (l *saveRateLimiter) doneSave(saveDuration time.Duration) {
2406+
l.lastSave = timeutil.Now()
2407+
2408+
// Update the average save duration using an exponential moving average.
2409+
if l.avgSaveDuration == 0 {
2410+
l.avgSaveDuration = saveDuration
2411+
} else {
2412+
const alpha = 0.1
2413+
l.avgSaveDuration = time.Duration(
2414+
alpha*float64(saveDuration) + (1-alpha)*float64(l.avgSaveDuration))
2415+
}
2416+
}
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package changefeedccl
7+
8+
import (
9+
"context"
10+
"testing"
11+
"time"
12+
13+
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest"
14+
"github.com/cockroachdb/cockroach/pkg/jobs"
15+
"github.com/cockroachdb/cockroach/pkg/jobs/jobfrontier"
16+
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
17+
"github.com/cockroachdb/cockroach/pkg/sql/isql"
18+
"github.com/cockroachdb/cockroach/pkg/testutils"
19+
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
20+
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
21+
"github.com/cockroachdb/cockroach/pkg/util/log"
22+
"github.com/cockroachdb/errors"
23+
"github.com/stretchr/testify/require"
24+
)
25+
26+
// TestChangefeedFrontierPersistence verifies that changefeeds periodically
27+
// persist their span frontiers to the job info table.
28+
func TestChangefeedFrontierPersistence(t *testing.T) {
29+
defer leaktest.AfterTest(t)()
30+
defer log.Scope(t).Close(t)
31+
32+
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
33+
sqlDB := sqlutils.MakeSQLRunner(s.DB)
34+
ctx := context.Background()
35+
36+
// Set a short interval for frontier persistence.
37+
sqlDB.Exec(t, "SET CLUSTER SETTING changefeed.progress.frontier_persistence.interval = '5s'")
38+
39+
// Get frontier persistence metric.
40+
registry := s.Server.JobRegistry().(*jobs.Registry)
41+
metric := registry.MetricsStruct().Changefeed.(*Metrics).AggMetrics.Timers.FrontierPersistence
42+
43+
// Verify metric count starts at zero.
44+
initialCount, _ := metric.CumulativeSnapshot().Total()
45+
require.Equal(t, int64(0), initialCount)
46+
47+
// Create a table and insert some data.
48+
sqlDB.Exec(t, "CREATE TABLE foo (a INT PRIMARY KEY, b STRING)")
49+
sqlDB.Exec(t, "INSERT INTO foo VALUES (1, 'a'), (2, 'b'), (3, 'c')")
50+
51+
// Start a changefeed.
52+
foo := feed(t, f, "CREATE CHANGEFEED FOR foo")
53+
defer closeFeed(t, foo)
54+
55+
// Make sure frontier gets persisted to job_info table.
56+
jobID := foo.(cdctest.EnterpriseTestFeed).JobID()
57+
testutils.SucceedsSoon(t, func() error {
58+
var found bool
59+
var allSpans []jobspb.ResolvedSpan
60+
if err := s.Server.InternalDB().(isql.DB).Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
61+
var err error
62+
allSpans, found, err = jobfrontier.GetAllResolvedSpans(ctx, txn, jobID)
63+
if err != nil {
64+
return err
65+
}
66+
return nil
67+
}); err != nil {
68+
return err
69+
}
70+
if !found {
71+
return errors.Newf("frontier not yet persisted")
72+
}
73+
t.Logf("found resolved spans in job_info table: %+v", allSpans)
74+
return nil
75+
})
76+
77+
// Verify metric count and average latency have sensible values.
78+
testutils.SucceedsSoon(t, func() error {
79+
metricSnapshot := metric.CumulativeSnapshot()
80+
count, _ := metricSnapshot.Total()
81+
if count == 0 {
82+
return errors.Newf("metrics not yet updated")
83+
}
84+
avgLatency := time.Duration(metricSnapshot.Mean())
85+
t.Logf("frontier persistence metrics - count: %d, avg latency: %s", count, avgLatency)
86+
require.Greater(t, count, int64(0))
87+
require.Greater(t, avgLatency, time.Duration(0))
88+
return nil
89+
})
90+
}
91+
92+
cdcTest(t, testFn, feedTestEnterpriseSinks)
93+
}

pkg/ccl/changefeedccl/changefeedbase/settings.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -420,3 +420,15 @@ var TrackPerTableProgress = settings.RegisterBoolSetting(
420420
"but doing so may incur additional overhead during ordinary changefeed execution",
421421
metamorphic.ConstantWithTestBool("changefeed.progress.per_table_tracking.enabled", true),
422422
)
423+
424+
// FrontierPersistenceInterval configures the minimum amount of time that must
425+
// elapse before a changefeed will persist its entire span frontier again.
426+
var FrontierPersistenceInterval = settings.RegisterDurationSettingWithExplicitUnit(
427+
settings.ApplicationLevel,
428+
"changefeed.progress.frontier_persistence.interval",
429+
"minimum amount of time that must elapse before a changefeed "+
430+
"will persist its entire span frontier again",
431+
30*time.Second, /* defaultValue */
432+
settings.DurationInRange(5*time.Second, 10*time.Minute),
433+
settings.WithPublic,
434+
)

0 commit comments

Comments
 (0)