Skip to content

Commit c7be0d5

Browse files
committed
execinfrapb: add ChangefeedProgressConfig protobuf
This patch adds a new `ChangefeedProgressConfig` protobuf message, which is being added to changefeed processor specs so that all of a changefeed's processors use the same consistent configuration for progress tracking and recording. Release note: None
1 parent eae530e commit c7be0d5

File tree

8 files changed

+62
-43
lines changed

8 files changed

+62
-43
lines changed

pkg/ccl/changefeedccl/changefeed_dist.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdceval"
1515
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
1616
"github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvfollowerreadsccl"
17+
"github.com/cockroachdb/cockroach/pkg/clusterversion"
1718
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
1819
"github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler"
1920
"github.com/cockroachdb/cockroach/pkg/keys"
@@ -453,6 +454,14 @@ func makePlan(
453454
maybeCfKnobs.SpanPartitionsCallback(spanPartitions)
454455
}
455456

457+
// Create progress config based on current settings.
458+
var progressConfig *execinfrapb.ChangefeedProgressConfig
459+
if execCtx.ExecCfg().Settings.Version.IsActive(ctx, clusterversion.V25_4) {
460+
progressConfig = &execinfrapb.ChangefeedProgressConfig{
461+
PerTableTracking: changefeedbase.TrackPerTableProgress.Get(sv),
462+
}
463+
}
464+
456465
aggregatorSpecs := make([]*execinfrapb.ChangeAggregatorSpec, len(spanPartitions))
457466
for i, sp := range spanPartitions {
458467
if log.ExpensiveLogEnabled(ctx, 2) {
@@ -475,6 +484,7 @@ func makePlan(
475484
JobID: jobID,
476485
Select: execinfrapb.Expression{Expr: details.Select},
477486
Description: description,
487+
ProgressConfig: progressConfig,
478488
}
479489
}
480490

@@ -489,6 +499,7 @@ func makePlan(
489499
JobID: jobID,
490500
UserProto: execCtx.User().EncodeProto(),
491501
Description: description,
502+
ProgressConfig: progressConfig,
492503
}
493504

494505
if haveKnobs && maybeCfKnobs.OnDistflowSpec != nil {

pkg/ccl/changefeedccl/changefeed_processors.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -615,9 +615,13 @@ func (ca *changeAggregator) setupSpansAndFrontier() (spans []roachpb.Span, err e
615615
if err != nil {
616616
return nil, err
617617
}
618+
var perTableTracking bool
619+
if ca.spec.ProgressConfig != nil {
620+
perTableTracking = ca.spec.ProgressConfig.PerTableTracking
621+
}
618622
ca.frontier, err = resolvedspan.NewAggregatorFrontier(
619623
ca.spec.Feed.StatementTime, initialHighWater, ca.FlowCtx.Codec(),
620-
&ca.FlowCtx.Cfg.Settings.SV,
624+
perTableTracking,
621625
spans...)
622626
if err != nil {
623627
return nil, err
@@ -1422,9 +1426,13 @@ func (cf *changeFrontier) Start(ctx context.Context) {
14221426
}
14231427

14241428
// Set up the resolved span frontier.
1429+
var perTableTracking bool
1430+
if cf.spec.ProgressConfig != nil {
1431+
perTableTracking = cf.spec.ProgressConfig.PerTableTracking
1432+
}
14251433
cf.frontier, err = resolvedspan.NewCoordinatorFrontier(
14261434
cf.spec.Feed.StatementTime, initialHighwater, cf.FlowCtx.Codec(),
1427-
&cf.FlowCtx.Cfg.Settings.SV,
1435+
perTableTracking,
14281436
cf.spec.TrackedSpans...)
14291437
if err != nil {
14301438
log.Infof(cf.Ctx(), "change frontier moving to draining due to error setting up frontier: %v", err)

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8940,7 +8940,7 @@ func TestChangefeedBackfillCheckpoint(t *testing.T) {
89408940
hlc.Timestamp{},
89418941
hlc.Timestamp{},
89428942
s.Codec,
8943-
&s.Server.ClusterSettings().SV,
8943+
changefeedbase.TrackPerTableProgress.Get(&s.Server.ClusterSettings().SV),
89448944
tableSpan,
89458945
)
89468946
if err != nil {

pkg/ccl/changefeedccl/changefeedbase/settings.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -388,3 +388,16 @@ var UseBareTableNames = settings.RegisterBoolSetting(
388388
"changefeed.bare_table_names.enabled",
389389
"set to true to use bare table names in changefeed topics, false to use quoted table names; default is true",
390390
true)
391+
392+
// TrackPerTableProgress controls whether a changefeed's in-memory frontiers
393+
// should track span progress on a per-table basis (via partitioning into
394+
// one sub-frontier per table). Enabling this is necessary for any other
395+
// per-table progress features (e.g. per-table PTS) to work.
396+
var TrackPerTableProgress = settings.RegisterBoolSetting(
397+
settings.ApplicationLevel,
398+
"changefeed.progress.per_table_tracking.enabled",
399+
"track progress on a per-table basis in-memory; enabling this will enable more "+
400+
"granular saving/restoring of progress, which will reduce duplicates during restarts, "+
401+
"but doing so may incur additional overhead during ordinary changefeed execution",
402+
metamorphic.ConstantWithTestBool("changefeed.progress.per_table_tracking.enabled", true),
403+
)

pkg/ccl/changefeedccl/resolvedspan/BUILD.bazel

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ go_library(
1616
"//pkg/settings",
1717
"//pkg/sql/catalog/descpb",
1818
"//pkg/util/hlc",
19-
"//pkg/util/metamorphic",
2019
"//pkg/util/span",
2120
"@com_github_cockroachdb_errors//:errors",
2221
"@com_github_cockroachdb_redact//:redact",
@@ -31,7 +30,6 @@ go_test(
3130
"//pkg/jobs/jobspb",
3231
"//pkg/keys",
3332
"//pkg/roachpb",
34-
"//pkg/settings/cluster",
3533
"//pkg/sql/catalog/descpb",
3634
"//pkg/util/hlc",
3735
"//pkg/util/leaktest",

pkg/ccl/changefeedccl/resolvedspan/frontier.go

Lines changed: 6 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,22 +16,11 @@ import (
1616
"github.com/cockroachdb/cockroach/pkg/settings"
1717
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
1818
"github.com/cockroachdb/cockroach/pkg/util/hlc"
19-
"github.com/cockroachdb/cockroach/pkg/util/metamorphic"
2019
"github.com/cockroachdb/cockroach/pkg/util/span"
2120
"github.com/cockroachdb/errors"
2221
"github.com/cockroachdb/redact"
2322
)
2423

25-
// FrontierPerTableTracking controls whether the in-memory frontiers changefeeds
26-
// use for tracking span progress should do per-table tracking (via partitioning
27-
// into one sub-frontier per table).
28-
var FrontierPerTableTracking = settings.RegisterBoolSetting(
29-
settings.ApplicationLevel,
30-
"changefeed.frontier.per_table_tracking.enabled",
31-
"track progress on a per-table basis in-memory",
32-
metamorphic.ConstantWithTestBool("changefeed.frontier.per_table_tracking.enabled", true),
33-
)
34-
3524
// AggregatorFrontier wraps a resolvedSpanFrontier with additional
3625
// checks specific to how change aggregators process boundaries.
3726
type AggregatorFrontier struct {
@@ -43,10 +32,10 @@ func NewAggregatorFrontier(
4332
statementTime hlc.Timestamp,
4433
initialHighWater hlc.Timestamp,
4534
decoder TablePrefixDecoder,
46-
sv *settings.Values,
35+
perTableTracking bool,
4736
spans ...roachpb.Span,
4837
) (*AggregatorFrontier, error) {
49-
rsf, err := newResolvedSpanFrontier(statementTime, initialHighWater, decoder, sv, spans...)
38+
rsf, err := newResolvedSpanFrontier(statementTime, initialHighWater, decoder, perTableTracking, spans...)
5039
if err != nil {
5140
return nil, err
5241
}
@@ -92,10 +81,10 @@ func NewCoordinatorFrontier(
9281
statementTime hlc.Timestamp,
9382
initialHighWater hlc.Timestamp,
9483
decoder TablePrefixDecoder,
95-
sv *settings.Values,
84+
perTableTracking bool,
9685
spans ...roachpb.Span,
9786
) (*CoordinatorFrontier, error) {
98-
rsf, err := newResolvedSpanFrontier(statementTime, initialHighWater, decoder, sv, spans...)
87+
rsf, err := newResolvedSpanFrontier(statementTime, initialHighWater, decoder, perTableTracking, spans...)
9988
if err != nil {
10089
return nil, err
10190
}
@@ -230,11 +219,11 @@ func newResolvedSpanFrontier(
230219
statementTime hlc.Timestamp,
231220
initialHighWater hlc.Timestamp,
232221
decoder TablePrefixDecoder,
233-
sv *settings.Values,
222+
perTableTracking bool,
234223
spans ...roachpb.Span,
235224
) (*resolvedSpanFrontier, error) {
236225
sf, err := func() (maybeTablePartitionedFrontier, error) {
237-
if FrontierPerTableTracking.Get(sv) {
226+
if perTableTracking {
238227
return span.NewMultiFrontierAt(newTableIDPartitioner(decoder), initialHighWater, spans...)
239228
}
240229
f, err := span.MakeFrontierAt(initialHighWater, spans...)

pkg/ccl/changefeedccl/resolvedspan/frontier_test.go

Lines changed: 7 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,13 @@
66
package resolvedspan_test
77

88
import (
9-
"context"
109
"iter"
1110
"testing"
1211

1312
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/resolvedspan"
1413
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
1514
"github.com/cockroachdb/cockroach/pkg/keys"
1615
"github.com/cockroachdb/cockroach/pkg/roachpb"
17-
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
1816
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
1917
"github.com/cockroachdb/cockroach/pkg/util/hlc"
2018
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
@@ -29,16 +27,14 @@ func TestAggregatorFrontier(t *testing.T) {
2927
defer leaktest.AfterTest(t)()
3028
defer log.Scope(t).Close(t)
3129

32-
st := cluster.MakeTestingClusterSettings()
33-
3430
// Create a fresh frontier with no progress.
3531
statementTime := makeTS(10)
3632
var initialHighwater hlc.Timestamp
3733
f, err := resolvedspan.NewAggregatorFrontier(
3834
statementTime,
3935
initialHighwater,
4036
mockDecoder{},
41-
&st.SV,
37+
false, /* perTableTracking */
4238
makeSpan("a", "f"),
4339
)
4440
require.NoError(t, err)
@@ -84,7 +80,7 @@ func TestAggregatorFrontier(t *testing.T) {
8480
statementTime,
8581
initialHighwater,
8682
mockDecoder{},
87-
&st.SV,
83+
false, /* perTableTracking */
8884
makeSpan("a", "f"),
8985
)
9086
require.NoError(t, err)
@@ -103,16 +99,14 @@ func TestCoordinatorFrontier(t *testing.T) {
10399
defer leaktest.AfterTest(t)()
104100
defer log.Scope(t).Close(t)
105101

106-
st := cluster.MakeTestingClusterSettings()
107-
108102
// Create a fresh frontier with no progress.
109103
statementTime := makeTS(10)
110104
var initialHighwater hlc.Timestamp
111105
f, err := resolvedspan.NewCoordinatorFrontier(
112106
statementTime,
113107
initialHighwater,
114108
mockDecoder{},
115-
&st.SV,
109+
false, /* perTableTracking */
116110
makeSpan("a", "f"),
117111
)
118112
require.NoError(t, err)
@@ -161,7 +155,7 @@ func TestCoordinatorFrontier(t *testing.T) {
161155
statementTime,
162156
initialHighwater,
163157
mockDecoder{},
164-
&st.SV,
158+
false, /* perTableTracking */
165159
makeSpan("a", "f"),
166160
)
167161
require.NoError(t, err)
@@ -266,14 +260,12 @@ func TestAggregatorFrontier_ForwardResolvedSpan(t *testing.T) {
266260
defer leaktest.AfterTest(t)()
267261
defer log.Scope(t).Close(t)
268262

269-
st := cluster.MakeTestingClusterSettings()
270-
271263
// Create a fresh frontier with no progress.
272264
f, err := resolvedspan.NewAggregatorFrontier(
273265
hlc.Timestamp{},
274266
hlc.Timestamp{},
275267
mockDecoder{},
276-
&st.SV,
268+
false, /* perTableTracking */
277269
makeSpan("a", "f"),
278270
)
279271
require.NoError(t, err)
@@ -328,12 +320,6 @@ func TestFrontierPerTableResolvedTimestamps(t *testing.T) {
328320
defer leaktest.AfterTest(t)()
329321
defer log.Scope(t).Close(t)
330322

331-
ctx := context.Background()
332-
st := cluster.MakeTestingClusterSettings()
333-
334-
// Explicitly enable per-table tracking for this test.
335-
resolvedspan.FrontierPerTableTracking.Override(ctx, &st.SV, true)
336-
337323
for _, frontierType := range []string{"aggregator", "coordinator"} {
338324
t.Run(frontierType, func(t *testing.T) {
339325
rnd, _ := randutil.NewPseudoRand()
@@ -373,15 +359,15 @@ func TestFrontierPerTableResolvedTimestamps(t *testing.T) {
373359
statementTime,
374360
initialHighWater,
375361
codec,
376-
&st.SV,
362+
true, /* perTableTracking */
377363
tableSpans...,
378364
)
379365
case "coordinator":
380366
return resolvedspan.NewCoordinatorFrontier(
381367
statementTime,
382368
initialHighWater,
383369
codec,
384-
&st.SV,
370+
true, /* perTableTracking */
385371
tableSpans...,
386372
)
387373
default:

pkg/sql/execinfrapb/processors_changefeeds.proto

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,9 @@ message ChangeAggregatorSpec {
6161
// have been resolved to the given timestamp, so it is safe to forward these
6262
// spans to its corresponding timestamps upon resuming.
6363
optional cockroach.sql.jobs.jobspb.TimestampSpansMap span_level_checkpoint = 9;
64+
65+
// ProgressConfig is the configuration for the changefeed's progress tracking.
66+
optional ChangefeedProgressConfig progress_config = 10;
6467
}
6568

6669
// ChangeFrontierSpec is the specification for a processor that receives
@@ -92,4 +95,15 @@ message ChangeFrontierSpec {
9295
// SpanLevelCheckpoint is a map from timestamps to lists of spans that captures
9396
// the changefeed progress and is used to initialize the frontier on resume.
9497
optional cockroach.sql.jobs.jobspb.TimestampSpansMap span_level_checkpoint = 6;
98+
99+
// ProgressConfig is the configuration for the changefeed's progress tracking.
100+
optional ChangefeedProgressConfig progress_config = 7;
101+
}
102+
103+
// ChangefeedProgressConfig is the configuration for the changefeed's progress tracking.
104+
message ChangefeedProgressConfig {
105+
// PerTableTracking configures whether the changefeed should track
106+
// span progress on a per-table basis. If configured, any span frontiers
107+
// created will track spans belonging to different tables separately.
108+
optional bool per_table_tracking = 1 [(gogoproto.nullable) = false];
95109
}

0 commit comments

Comments
 (0)