Skip to content

Commit 034a4cb

Browse files
craig[bot]andyyang890rafissyuzefovich
committed
151615: execinfrapb: add ChangefeedProgressConfig protobuf r=aerfrei,asg0451 a=andyyang890 This patch adds a new `ChangefeedProgressConfig` protobuf message, which is being added to changefeed processor specs so that all of a changefeed's processors use the same consistent configuration for progress tracking and recording. Fixes #151613 Release note: None 151675: tree: fix formatting of ON COMMIT PRESERVE ROWS r=rafiss a=rafiss Formatting for this clause was added in d80409c, but that had a mistake since the locality clause should always be formatted after the ON COMMIT clause. informs #149949 Release note: None 151679: keyside: temporarily don't return assertion error for LTREE r=yuzefovich a=yuzefovich We merged new LTREE datum type only with value encoding, yet this type can be key-encodable (but that is not implemented). In such case, in test builds we return an assertion error for missing key encoding, and many of our tests fail if they see an assertion error. To silence this expected failure mode we add an exception for LTREE to result in a regular error for now, which I think should cover most test failures we've seen. Fixes: #151614. Fixes: #151659. Release note: None Co-authored-by: Andy Yang <[email protected]> Co-authored-by: Rafi Shamim <[email protected]> Co-authored-by: Yahor Yuzefovich <[email protected]>
4 parents 072f468 + c7be0d5 + d015119 + 2bee861 commit 034a4cb

File tree

12 files changed

+90
-50
lines changed

12 files changed

+90
-50
lines changed

pkg/ccl/changefeedccl/changefeed_dist.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdceval"
1515
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
1616
"github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvfollowerreadsccl"
17+
"github.com/cockroachdb/cockroach/pkg/clusterversion"
1718
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
1819
"github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler"
1920
"github.com/cockroachdb/cockroach/pkg/keys"
@@ -453,6 +454,14 @@ func makePlan(
453454
maybeCfKnobs.SpanPartitionsCallback(spanPartitions)
454455
}
455456

457+
// Create progress config based on current settings.
458+
var progressConfig *execinfrapb.ChangefeedProgressConfig
459+
if execCtx.ExecCfg().Settings.Version.IsActive(ctx, clusterversion.V25_4) {
460+
progressConfig = &execinfrapb.ChangefeedProgressConfig{
461+
PerTableTracking: changefeedbase.TrackPerTableProgress.Get(sv),
462+
}
463+
}
464+
456465
aggregatorSpecs := make([]*execinfrapb.ChangeAggregatorSpec, len(spanPartitions))
457466
for i, sp := range spanPartitions {
458467
if log.ExpensiveLogEnabled(ctx, 2) {
@@ -475,6 +484,7 @@ func makePlan(
475484
JobID: jobID,
476485
Select: execinfrapb.Expression{Expr: details.Select},
477486
Description: description,
487+
ProgressConfig: progressConfig,
478488
}
479489
}
480490

@@ -489,6 +499,7 @@ func makePlan(
489499
JobID: jobID,
490500
UserProto: execCtx.User().EncodeProto(),
491501
Description: description,
502+
ProgressConfig: progressConfig,
492503
}
493504

494505
if haveKnobs && maybeCfKnobs.OnDistflowSpec != nil {

pkg/ccl/changefeedccl/changefeed_processors.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -615,9 +615,13 @@ func (ca *changeAggregator) setupSpansAndFrontier() (spans []roachpb.Span, err e
615615
if err != nil {
616616
return nil, err
617617
}
618+
var perTableTracking bool
619+
if ca.spec.ProgressConfig != nil {
620+
perTableTracking = ca.spec.ProgressConfig.PerTableTracking
621+
}
618622
ca.frontier, err = resolvedspan.NewAggregatorFrontier(
619623
ca.spec.Feed.StatementTime, initialHighWater, ca.FlowCtx.Codec(),
620-
&ca.FlowCtx.Cfg.Settings.SV,
624+
perTableTracking,
621625
spans...)
622626
if err != nil {
623627
return nil, err
@@ -1422,9 +1426,13 @@ func (cf *changeFrontier) Start(ctx context.Context) {
14221426
}
14231427

14241428
// Set up the resolved span frontier.
1429+
var perTableTracking bool
1430+
if cf.spec.ProgressConfig != nil {
1431+
perTableTracking = cf.spec.ProgressConfig.PerTableTracking
1432+
}
14251433
cf.frontier, err = resolvedspan.NewCoordinatorFrontier(
14261434
cf.spec.Feed.StatementTime, initialHighwater, cf.FlowCtx.Codec(),
1427-
&cf.FlowCtx.Cfg.Settings.SV,
1435+
perTableTracking,
14281436
cf.spec.TrackedSpans...)
14291437
if err != nil {
14301438
log.Infof(cf.Ctx(), "change frontier moving to draining due to error setting up frontier: %v", err)

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8940,7 +8940,7 @@ func TestChangefeedBackfillCheckpoint(t *testing.T) {
89408940
hlc.Timestamp{},
89418941
hlc.Timestamp{},
89428942
s.Codec,
8943-
&s.Server.ClusterSettings().SV,
8943+
changefeedbase.TrackPerTableProgress.Get(&s.Server.ClusterSettings().SV),
89448944
tableSpan,
89458945
)
89468946
if err != nil {

pkg/ccl/changefeedccl/changefeedbase/settings.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -388,3 +388,16 @@ var UseBareTableNames = settings.RegisterBoolSetting(
388388
"changefeed.bare_table_names.enabled",
389389
"set to true to use bare table names in changefeed topics, false to use quoted table names; default is true",
390390
true)
391+
392+
// TrackPerTableProgress controls whether a changefeed's in-memory frontiers
393+
// should track span progress on a per-table basis (via partitioning into
394+
// one sub-frontier per table). Enabling this is necessary for any other
395+
// per-table progress features (e.g. per-table PTS) to work.
396+
var TrackPerTableProgress = settings.RegisterBoolSetting(
397+
settings.ApplicationLevel,
398+
"changefeed.progress.per_table_tracking.enabled",
399+
"track progress on a per-table basis in-memory; enabling this will enable more "+
400+
"granular saving/restoring of progress, which will reduce duplicates during restarts, "+
401+
"but doing so may incur additional overhead during ordinary changefeed execution",
402+
metamorphic.ConstantWithTestBool("changefeed.progress.per_table_tracking.enabled", true),
403+
)

pkg/ccl/changefeedccl/resolvedspan/BUILD.bazel

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ go_library(
1616
"//pkg/settings",
1717
"//pkg/sql/catalog/descpb",
1818
"//pkg/util/hlc",
19-
"//pkg/util/metamorphic",
2019
"//pkg/util/span",
2120
"@com_github_cockroachdb_errors//:errors",
2221
"@com_github_cockroachdb_redact//:redact",
@@ -31,7 +30,6 @@ go_test(
3130
"//pkg/jobs/jobspb",
3231
"//pkg/keys",
3332
"//pkg/roachpb",
34-
"//pkg/settings/cluster",
3533
"//pkg/sql/catalog/descpb",
3634
"//pkg/util/hlc",
3735
"//pkg/util/leaktest",

pkg/ccl/changefeedccl/resolvedspan/frontier.go

Lines changed: 6 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,22 +16,11 @@ import (
1616
"github.com/cockroachdb/cockroach/pkg/settings"
1717
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
1818
"github.com/cockroachdb/cockroach/pkg/util/hlc"
19-
"github.com/cockroachdb/cockroach/pkg/util/metamorphic"
2019
"github.com/cockroachdb/cockroach/pkg/util/span"
2120
"github.com/cockroachdb/errors"
2221
"github.com/cockroachdb/redact"
2322
)
2423

25-
// FrontierPerTableTracking controls whether the in-memory frontiers changefeeds
26-
// use for tracking span progress should do per-table tracking (via partitioning
27-
// into one sub-frontier per table).
28-
var FrontierPerTableTracking = settings.RegisterBoolSetting(
29-
settings.ApplicationLevel,
30-
"changefeed.frontier.per_table_tracking.enabled",
31-
"track progress on a per-table basis in-memory",
32-
metamorphic.ConstantWithTestBool("changefeed.frontier.per_table_tracking.enabled", true),
33-
)
34-
3524
// AggregatorFrontier wraps a resolvedSpanFrontier with additional
3625
// checks specific to how change aggregators process boundaries.
3726
type AggregatorFrontier struct {
@@ -43,10 +32,10 @@ func NewAggregatorFrontier(
4332
statementTime hlc.Timestamp,
4433
initialHighWater hlc.Timestamp,
4534
decoder TablePrefixDecoder,
46-
sv *settings.Values,
35+
perTableTracking bool,
4736
spans ...roachpb.Span,
4837
) (*AggregatorFrontier, error) {
49-
rsf, err := newResolvedSpanFrontier(statementTime, initialHighWater, decoder, sv, spans...)
38+
rsf, err := newResolvedSpanFrontier(statementTime, initialHighWater, decoder, perTableTracking, spans...)
5039
if err != nil {
5140
return nil, err
5241
}
@@ -92,10 +81,10 @@ func NewCoordinatorFrontier(
9281
statementTime hlc.Timestamp,
9382
initialHighWater hlc.Timestamp,
9483
decoder TablePrefixDecoder,
95-
sv *settings.Values,
84+
perTableTracking bool,
9685
spans ...roachpb.Span,
9786
) (*CoordinatorFrontier, error) {
98-
rsf, err := newResolvedSpanFrontier(statementTime, initialHighWater, decoder, sv, spans...)
87+
rsf, err := newResolvedSpanFrontier(statementTime, initialHighWater, decoder, perTableTracking, spans...)
9988
if err != nil {
10089
return nil, err
10190
}
@@ -230,11 +219,11 @@ func newResolvedSpanFrontier(
230219
statementTime hlc.Timestamp,
231220
initialHighWater hlc.Timestamp,
232221
decoder TablePrefixDecoder,
233-
sv *settings.Values,
222+
perTableTracking bool,
234223
spans ...roachpb.Span,
235224
) (*resolvedSpanFrontier, error) {
236225
sf, err := func() (maybeTablePartitionedFrontier, error) {
237-
if FrontierPerTableTracking.Get(sv) {
226+
if perTableTracking {
238227
return span.NewMultiFrontierAt(newTableIDPartitioner(decoder), initialHighWater, spans...)
239228
}
240229
f, err := span.MakeFrontierAt(initialHighWater, spans...)

pkg/ccl/changefeedccl/resolvedspan/frontier_test.go

Lines changed: 7 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,13 @@
66
package resolvedspan_test
77

88
import (
9-
"context"
109
"iter"
1110
"testing"
1211

1312
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/resolvedspan"
1413
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
1514
"github.com/cockroachdb/cockroach/pkg/keys"
1615
"github.com/cockroachdb/cockroach/pkg/roachpb"
17-
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
1816
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
1917
"github.com/cockroachdb/cockroach/pkg/util/hlc"
2018
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
@@ -29,16 +27,14 @@ func TestAggregatorFrontier(t *testing.T) {
2927
defer leaktest.AfterTest(t)()
3028
defer log.Scope(t).Close(t)
3129

32-
st := cluster.MakeTestingClusterSettings()
33-
3430
// Create a fresh frontier with no progress.
3531
statementTime := makeTS(10)
3632
var initialHighwater hlc.Timestamp
3733
f, err := resolvedspan.NewAggregatorFrontier(
3834
statementTime,
3935
initialHighwater,
4036
mockDecoder{},
41-
&st.SV,
37+
false, /* perTableTracking */
4238
makeSpan("a", "f"),
4339
)
4440
require.NoError(t, err)
@@ -84,7 +80,7 @@ func TestAggregatorFrontier(t *testing.T) {
8480
statementTime,
8581
initialHighwater,
8682
mockDecoder{},
87-
&st.SV,
83+
false, /* perTableTracking */
8884
makeSpan("a", "f"),
8985
)
9086
require.NoError(t, err)
@@ -103,16 +99,14 @@ func TestCoordinatorFrontier(t *testing.T) {
10399
defer leaktest.AfterTest(t)()
104100
defer log.Scope(t).Close(t)
105101

106-
st := cluster.MakeTestingClusterSettings()
107-
108102
// Create a fresh frontier with no progress.
109103
statementTime := makeTS(10)
110104
var initialHighwater hlc.Timestamp
111105
f, err := resolvedspan.NewCoordinatorFrontier(
112106
statementTime,
113107
initialHighwater,
114108
mockDecoder{},
115-
&st.SV,
109+
false, /* perTableTracking */
116110
makeSpan("a", "f"),
117111
)
118112
require.NoError(t, err)
@@ -161,7 +155,7 @@ func TestCoordinatorFrontier(t *testing.T) {
161155
statementTime,
162156
initialHighwater,
163157
mockDecoder{},
164-
&st.SV,
158+
false, /* perTableTracking */
165159
makeSpan("a", "f"),
166160
)
167161
require.NoError(t, err)
@@ -266,14 +260,12 @@ func TestAggregatorFrontier_ForwardResolvedSpan(t *testing.T) {
266260
defer leaktest.AfterTest(t)()
267261
defer log.Scope(t).Close(t)
268262

269-
st := cluster.MakeTestingClusterSettings()
270-
271263
// Create a fresh frontier with no progress.
272264
f, err := resolvedspan.NewAggregatorFrontier(
273265
hlc.Timestamp{},
274266
hlc.Timestamp{},
275267
mockDecoder{},
276-
&st.SV,
268+
false, /* perTableTracking */
277269
makeSpan("a", "f"),
278270
)
279271
require.NoError(t, err)
@@ -328,12 +320,6 @@ func TestFrontierPerTableResolvedTimestamps(t *testing.T) {
328320
defer leaktest.AfterTest(t)()
329321
defer log.Scope(t).Close(t)
330322

331-
ctx := context.Background()
332-
st := cluster.MakeTestingClusterSettings()
333-
334-
// Explicitly enable per-table tracking for this test.
335-
resolvedspan.FrontierPerTableTracking.Override(ctx, &st.SV, true)
336-
337323
for _, frontierType := range []string{"aggregator", "coordinator"} {
338324
t.Run(frontierType, func(t *testing.T) {
339325
rnd, _ := randutil.NewPseudoRand()
@@ -373,15 +359,15 @@ func TestFrontierPerTableResolvedTimestamps(t *testing.T) {
373359
statementTime,
374360
initialHighWater,
375361
codec,
376-
&st.SV,
362+
true, /* perTableTracking */
377363
tableSpans...,
378364
)
379365
case "coordinator":
380366
return resolvedspan.NewCoordinatorFrontier(
381367
statementTime,
382368
initialHighWater,
383369
codec,
384-
&st.SV,
370+
true, /* perTableTracking */
385371
tableSpans...,
386372
)
387373
default:

pkg/sql/execinfrapb/processors_changefeeds.proto

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,9 @@ message ChangeAggregatorSpec {
6161
// have been resolved to the given timestamp, so it is safe to forward these
6262
// spans to its corresponding timestamps upon resuming.
6363
optional cockroach.sql.jobs.jobspb.TimestampSpansMap span_level_checkpoint = 9;
64+
65+
// ProgressConfig is the configuration for the changefeed's progress tracking.
66+
optional ChangefeedProgressConfig progress_config = 10;
6467
}
6568

6669
// ChangeFrontierSpec is the specification for a processor that receives
@@ -92,4 +95,15 @@ message ChangeFrontierSpec {
9295
// SpanLevelCheckpoint is a map from timestamps to lists of spans that captures
9396
// the changefeed progress and is used to initialize the frontier on resume.
9497
optional cockroach.sql.jobs.jobspb.TimestampSpansMap span_level_checkpoint = 6;
98+
99+
// ProgressConfig is the configuration for the changefeed's progress tracking.
100+
optional ChangefeedProgressConfig progress_config = 7;
101+
}
102+
103+
// ChangefeedProgressConfig is the configuration for the changefeed's progress tracking.
104+
message ChangefeedProgressConfig {
105+
// PerTableTracking configures whether the changefeed should track
106+
// span progress on a per-table basis. If configured, any span frontiers
107+
// created will track spans belonging to different tables separately.
108+
optional bool per_table_tracking = 1 [(gogoproto.nullable) = false];
95109
}

pkg/sql/parser/testdata/create_table

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2687,3 +2687,19 @@ CREATE TABLE t (l LTREE) -- normalized!
26872687
CREATE TABLE t (l LTREE) -- fully parenthesized
26882688
CREATE TABLE t (l LTREE) -- literals removed
26892689
CREATE TABLE _ (_ LTREE) -- identifiers removed
2690+
2691+
parse
2692+
CREATE TABLE tbl (a INT PRIMARY KEY) ON COMMIT PRESERVE ROWS LOCALITY REGIONAL BY TABLE IN PRIMARY REGION
2693+
----
2694+
CREATE TABLE tbl (a INT8 PRIMARY KEY) ON COMMIT PRESERVE ROWS LOCALITY REGIONAL BY TABLE IN PRIMARY REGION -- normalized!
2695+
CREATE TABLE tbl (a INT8 PRIMARY KEY) ON COMMIT PRESERVE ROWS LOCALITY REGIONAL BY TABLE IN PRIMARY REGION -- fully parenthesized
2696+
CREATE TABLE tbl (a INT8 PRIMARY KEY) ON COMMIT PRESERVE ROWS LOCALITY REGIONAL BY TABLE IN PRIMARY REGION -- literals removed
2697+
CREATE TABLE _ (_ INT8 PRIMARY KEY) ON COMMIT PRESERVE ROWS LOCALITY REGIONAL BY TABLE IN PRIMARY REGION -- identifiers removed
2698+
2699+
error
2700+
CREATE TABLE tbl AS (SELECT * FROM t) ON COMMIT PRESERVE ROWS LOCALITY REGIONAL BY TABLE IN PRIMARY REGION
2701+
----
2702+
at or near "locality": syntax error
2703+
DETAIL: source SQL:
2704+
CREATE TABLE tbl AS (SELECT * FROM t) ON COMMIT PRESERVE ROWS LOCALITY REGIONAL BY TABLE IN PRIMARY REGION
2705+
^

pkg/sql/rowenc/keyside/encode.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,11 @@ func Encode(b []byte, val tree.Datum, dir encoding.Direction) ([]byte, error) {
183183
return encodeJSONKey(b, t, dir)
184184
}
185185
if buildutil.CrdbTestBuild {
186+
if _, isLTree := val.(*tree.DLTree); isLTree {
187+
// TODO(paulniziolek): remove this exception once key encoding is
188+
// added.
189+
return nil, errors.Newf("LTREE key encoding is not implemented yet")
190+
}
186191
return nil, errors.AssertionFailedf("unable to encode table key: %T", val)
187192
}
188193
return nil, errors.Errorf("unable to encode table key: %T", val)

0 commit comments

Comments
 (0)