Skip to content

Commit 0867a03

Browse files
committed
changefeedccl/resolvedspan: add feature flag for per-table tracking
This patch adds a new cluster setting named `changefeed.frontier.per_table_tracking.enabled` to feature flag enabling per-table tracking in the resolved span frontiers. Release note: None
1 parent 65ff191 commit 0867a03

File tree

5 files changed

+93
-9
lines changed

5 files changed

+93
-9
lines changed

pkg/ccl/changefeedccl/changefeed_processors.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -616,7 +616,9 @@ func (ca *changeAggregator) setupSpansAndFrontier() (spans []roachpb.Span, err e
616616
return nil, err
617617
}
618618
ca.frontier, err = resolvedspan.NewAggregatorFrontier(
619-
ca.spec.Feed.StatementTime, initialHighWater, ca.FlowCtx.Codec(), spans...)
619+
ca.spec.Feed.StatementTime, initialHighWater, ca.FlowCtx.Codec(),
620+
&ca.FlowCtx.Cfg.Settings.SV,
621+
spans...)
620622
if err != nil {
621623
return nil, err
622624
}
@@ -1421,7 +1423,9 @@ func (cf *changeFrontier) Start(ctx context.Context) {
14211423

14221424
// Set up the resolved span frontier.
14231425
cf.frontier, err = resolvedspan.NewCoordinatorFrontier(
1424-
cf.spec.Feed.StatementTime, initialHighwater, cf.FlowCtx.Codec(), cf.spec.TrackedSpans...)
1426+
cf.spec.Feed.StatementTime, initialHighwater, cf.FlowCtx.Codec(),
1427+
&cf.FlowCtx.Cfg.Settings.SV,
1428+
cf.spec.TrackedSpans...)
14251429
if err != nil {
14261430
log.Infof(cf.Ctx(), "change frontier moving to draining due to error setting up frontier: %v", err)
14271431
cf.MoveToDraining(err)

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8940,6 +8940,7 @@ func TestChangefeedBackfillCheckpoint(t *testing.T) {
89408940
hlc.Timestamp{},
89418941
hlc.Timestamp{},
89428942
s.Codec,
8943+
&s.Server.ClusterSettings().SV,
89438944
tableSpan,
89448945
)
89458946
if err != nil {

pkg/ccl/changefeedccl/resolvedspan/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ go_library(
1616
"//pkg/settings",
1717
"//pkg/sql/catalog/descpb",
1818
"//pkg/util/hlc",
19+
"//pkg/util/metamorphic",
1920
"//pkg/util/span",
2021
"@com_github_cockroachdb_errors//:errors",
2122
"@com_github_cockroachdb_redact//:redact",
@@ -30,6 +31,7 @@ go_test(
3031
"//pkg/jobs/jobspb",
3132
"//pkg/keys",
3233
"//pkg/roachpb",
34+
"//pkg/settings/cluster",
3335
"//pkg/sql/catalog/descpb",
3436
"//pkg/util/hlc",
3537
"//pkg/util/leaktest",

pkg/ccl/changefeedccl/resolvedspan/frontier.go

Lines changed: 63 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,22 @@ import (
1616
"github.com/cockroachdb/cockroach/pkg/settings"
1717
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
1818
"github.com/cockroachdb/cockroach/pkg/util/hlc"
19+
"github.com/cockroachdb/cockroach/pkg/util/metamorphic"
1920
"github.com/cockroachdb/cockroach/pkg/util/span"
2021
"github.com/cockroachdb/errors"
2122
"github.com/cockroachdb/redact"
2223
)
2324

25+
// FrontierPerTableTracking controls whether the in-memory frontiers changefeeds
26+
// use for tracking span progress should do per-table tracking (via partitioning
27+
// into one sub-frontier per table).
28+
var FrontierPerTableTracking = settings.RegisterBoolSetting(
29+
settings.ApplicationLevel,
30+
"changefeed.frontier.per_table_tracking.enabled",
31+
"track progress on a per-table basis in-memory",
32+
metamorphic.ConstantWithTestBool("changefeed.frontier.per_table_tracking.enabled", true),
33+
)
34+
2435
// AggregatorFrontier wraps a resolvedSpanFrontier with additional
2536
// checks specific to how change aggregators process boundaries.
2637
type AggregatorFrontier struct {
@@ -32,9 +43,10 @@ func NewAggregatorFrontier(
3243
statementTime hlc.Timestamp,
3344
initialHighWater hlc.Timestamp,
3445
decoder TablePrefixDecoder,
46+
sv *settings.Values,
3547
spans ...roachpb.Span,
3648
) (*AggregatorFrontier, error) {
37-
rsf, err := newResolvedSpanFrontier(statementTime, initialHighWater, decoder, spans...)
49+
rsf, err := newResolvedSpanFrontier(statementTime, initialHighWater, decoder, sv, spans...)
3850
if err != nil {
3951
return nil, err
4052
}
@@ -80,9 +92,10 @@ func NewCoordinatorFrontier(
8092
statementTime hlc.Timestamp,
8193
initialHighWater hlc.Timestamp,
8294
decoder TablePrefixDecoder,
95+
sv *settings.Values,
8396
spans ...roachpb.Span,
8497
) (*CoordinatorFrontier, error) {
85-
rsf, err := newResolvedSpanFrontier(statementTime, initialHighWater, decoder, spans...)
98+
rsf, err := newResolvedSpanFrontier(statementTime, initialHighWater, decoder, sv, spans...)
8699
if err != nil {
87100
return nil, err
88101
}
@@ -195,7 +208,7 @@ func (f *CoordinatorFrontier) MakeCheckpoint(
195208
// used to track resolved spans for a changefeed and methods for computing
196209
// lagging and checkpoint spans.
197210
type resolvedSpanFrontier struct {
198-
*span.MultiFrontier[descpb.ID]
211+
maybeTablePartitionedFrontier
199212

200213
// statementTime is the statement time of the changefeed.
201214
statementTime hlc.Timestamp
@@ -217,16 +230,26 @@ func newResolvedSpanFrontier(
217230
statementTime hlc.Timestamp,
218231
initialHighWater hlc.Timestamp,
219232
decoder TablePrefixDecoder,
233+
sv *settings.Values,
220234
spans ...roachpb.Span,
221235
) (*resolvedSpanFrontier, error) {
222-
sf, err := span.NewMultiFrontierAt(newTableIDPartitioner(decoder), initialHighWater, spans...)
236+
sf, err := func() (maybeTablePartitionedFrontier, error) {
237+
if FrontierPerTableTracking.Get(sv) {
238+
return span.NewMultiFrontierAt(newTableIDPartitioner(decoder), initialHighWater, spans...)
239+
}
240+
f, err := span.MakeFrontierAt(initialHighWater, spans...)
241+
if err != nil {
242+
return nil, err
243+
}
244+
return notTablePartitionedFrontier{spanFrontier: span.MakeConcurrentFrontier(f)}, nil
245+
}()
223246
if err != nil {
224247
return nil, err
225248
}
226249
return &resolvedSpanFrontier{
227-
MultiFrontier: sf,
228-
statementTime: statementTime,
229-
initialHighWater: initialHighWater,
250+
maybeTablePartitionedFrontier: sf,
251+
statementTime: statementTime,
252+
initialHighWater: initialHighWater,
230253
}, nil
231254
}
232255

@@ -417,6 +440,39 @@ func (b *resolvedSpanBoundary) SafeFormat(s redact.SafePrinter, _ rune) {
417440
s.Printf("%v boundary (%v)", b.typ, b.ts)
418441
}
419442

443+
// A maybeTablePartitionedFrontier is a frontier that might be tracking
444+
// spans in per-table sub-frontiers.
445+
type maybeTablePartitionedFrontier interface {
446+
span.Frontier
447+
448+
// Frontiers returns an iterator over the table ID and sub-frontiers
449+
// being tracked by the frontier. If the frontier is not tracking
450+
// on a per-table basis, the iterator will return a single frontier
451+
// with an ID of 0.
452+
Frontiers() iter.Seq2[descpb.ID, span.Frontier]
453+
}
454+
455+
var _ maybeTablePartitionedFrontier = (*span.MultiFrontier[descpb.ID])(nil)
456+
457+
// spanFrontier is a type alias to make it possible to embed and forward calls
458+
// (e.g. Frontier()) to the underlying span.Frontier.
459+
type spanFrontier = span.Frontier
460+
461+
// notTablePartitionedFrontier is a frontier that does not track spans on
462+
// a per-table basis.
463+
type notTablePartitionedFrontier struct {
464+
spanFrontier
465+
}
466+
467+
var _ maybeTablePartitionedFrontier = notTablePartitionedFrontier{}
468+
469+
// Frontiers implements maybeTablePartitionedFrontier.
470+
func (f notTablePartitionedFrontier) Frontiers() iter.Seq2[descpb.ID, span.Frontier] {
471+
return func(yield func(descpb.ID, span.Frontier) bool) {
472+
yield(0, f.spanFrontier)
473+
}
474+
}
475+
420476
// A TablePrefixDecoder decodes table prefixes from keys.
421477
// The production implementation is keys.SQLCodec.
422478
type TablePrefixDecoder interface {

pkg/ccl/changefeedccl/resolvedspan/frontier_test.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,15 @@
66
package resolvedspan_test
77

88
import (
9+
"context"
910
"iter"
1011
"testing"
1112

1213
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/resolvedspan"
1314
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
1415
"github.com/cockroachdb/cockroach/pkg/keys"
1516
"github.com/cockroachdb/cockroach/pkg/roachpb"
17+
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
1618
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
1719
"github.com/cockroachdb/cockroach/pkg/util/hlc"
1820
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
@@ -27,13 +29,16 @@ func TestAggregatorFrontier(t *testing.T) {
2729
defer leaktest.AfterTest(t)()
2830
defer log.Scope(t).Close(t)
2931

32+
st := cluster.MakeTestingClusterSettings()
33+
3034
// Create a fresh frontier with no progress.
3135
statementTime := makeTS(10)
3236
var initialHighwater hlc.Timestamp
3337
f, err := resolvedspan.NewAggregatorFrontier(
3438
statementTime,
3539
initialHighwater,
3640
mockDecoder{},
41+
&st.SV,
3742
makeSpan("a", "f"),
3843
)
3944
require.NoError(t, err)
@@ -79,6 +84,7 @@ func TestAggregatorFrontier(t *testing.T) {
7984
statementTime,
8085
initialHighwater,
8186
mockDecoder{},
87+
&st.SV,
8288
makeSpan("a", "f"),
8389
)
8490
require.NoError(t, err)
@@ -97,13 +103,16 @@ func TestCoordinatorFrontier(t *testing.T) {
97103
defer leaktest.AfterTest(t)()
98104
defer log.Scope(t).Close(t)
99105

106+
st := cluster.MakeTestingClusterSettings()
107+
100108
// Create a fresh frontier with no progress.
101109
statementTime := makeTS(10)
102110
var initialHighwater hlc.Timestamp
103111
f, err := resolvedspan.NewCoordinatorFrontier(
104112
statementTime,
105113
initialHighwater,
106114
mockDecoder{},
115+
&st.SV,
107116
makeSpan("a", "f"),
108117
)
109118
require.NoError(t, err)
@@ -152,6 +161,7 @@ func TestCoordinatorFrontier(t *testing.T) {
152161
statementTime,
153162
initialHighwater,
154163
mockDecoder{},
164+
&st.SV,
155165
makeSpan("a", "f"),
156166
)
157167
require.NoError(t, err)
@@ -256,11 +266,14 @@ func TestAggregatorFrontier_ForwardResolvedSpan(t *testing.T) {
256266
defer leaktest.AfterTest(t)()
257267
defer log.Scope(t).Close(t)
258268

269+
st := cluster.MakeTestingClusterSettings()
270+
259271
// Create a fresh frontier with no progress.
260272
f, err := resolvedspan.NewAggregatorFrontier(
261273
hlc.Timestamp{},
262274
hlc.Timestamp{},
263275
mockDecoder{},
276+
&st.SV,
264277
makeSpan("a", "f"),
265278
)
266279
require.NoError(t, err)
@@ -315,6 +328,12 @@ func TestFrontierPerTableResolvedTimestamps(t *testing.T) {
315328
defer leaktest.AfterTest(t)()
316329
defer log.Scope(t).Close(t)
317330

331+
ctx := context.Background()
332+
st := cluster.MakeTestingClusterSettings()
333+
334+
// Explicitly enable per-table tracking for this test.
335+
resolvedspan.FrontierPerTableTracking.Override(ctx, &st.SV, true)
336+
318337
for _, frontierType := range []string{"aggregator", "coordinator"} {
319338
t.Run(frontierType, func(t *testing.T) {
320339
rnd, _ := randutil.NewPseudoRand()
@@ -354,13 +373,15 @@ func TestFrontierPerTableResolvedTimestamps(t *testing.T) {
354373
statementTime,
355374
initialHighWater,
356375
codec,
376+
&st.SV,
357377
tableSpans...,
358378
)
359379
case "coordinator":
360380
return resolvedspan.NewCoordinatorFrontier(
361381
statementTime,
362382
initialHighWater,
363383
codec,
384+
&st.SV,
364385
tableSpans...,
365386
)
366387
default:

0 commit comments

Comments
 (0)