Skip to content

Commit 65ff191

Browse files
committed
changefeedccl/resolvedspan: integrate span.MultiFrontier into frontiers
This patch integrates `span.MultiFrontier`s into the resolved span frontiers to give changefeed processors easy access to per-table resolved timestamps. Release note: None
1 parent 73a66d2 commit 65ff191

File tree

7 files changed

+190
-145
lines changed

7 files changed

+190
-145
lines changed

pkg/ccl/changefeedccl/BUILD.bazel

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,6 @@ go_test(
196196
srcs = [
197197
"alter_changefeed_test.go",
198198
"changefeed_dist_test.go",
199-
"changefeed_processors_test.go",
200199
"changefeed_stmt_test.go",
201200
"changefeed_test.go",
202201
"csv_test.go",

pkg/ccl/changefeedccl/changefeed_processors.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -615,7 +615,8 @@ func (ca *changeAggregator) setupSpansAndFrontier() (spans []roachpb.Span, err e
615615
if err != nil {
616616
return nil, err
617617
}
618-
ca.frontier, err = resolvedspan.NewAggregatorFrontier(ca.spec.Feed.StatementTime, initialHighWater, spans...)
618+
ca.frontier, err = resolvedspan.NewAggregatorFrontier(
619+
ca.spec.Feed.StatementTime, initialHighWater, ca.FlowCtx.Codec(), spans...)
619620
if err != nil {
620621
return nil, err
621622
}
@@ -1419,7 +1420,8 @@ func (cf *changeFrontier) Start(ctx context.Context) {
14191420
}
14201421

14211422
// Set up the resolved span frontier.
1422-
cf.frontier, err = resolvedspan.NewCoordinatorFrontier(cf.spec.Feed.StatementTime, initialHighwater, cf.spec.TrackedSpans...)
1423+
cf.frontier, err = resolvedspan.NewCoordinatorFrontier(
1424+
cf.spec.Feed.StatementTime, initialHighwater, cf.FlowCtx.Codec(), cf.spec.TrackedSpans...)
14231425
if err != nil {
14241426
log.Infof(cf.Ctx(), "change frontier moving to draining due to error setting up frontier: %v", err)
14251427
cf.MoveToDraining(err)

pkg/ccl/changefeedccl/changefeed_processors_test.go

Lines changed: 0 additions & 127 deletions
This file was deleted.

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8936,7 +8936,12 @@ func TestChangefeedBackfillCheckpoint(t *testing.T) {
89368936

89378937
// Verify that the resumed job has restored the progress from the checkpoint
89388938
// to the change frontier.
8939-
expectedFrontier, err := span.MakeFrontier(tableSpan)
8939+
expectedFrontier, err := resolvedspan.NewCoordinatorFrontier(
8940+
hlc.Timestamp{},
8941+
hlc.Timestamp{},
8942+
s.Codec,
8943+
tableSpan,
8944+
)
89408945
if err != nil {
89418946
t.Fatal(err)
89428947
}

pkg/ccl/changefeedccl/resolvedspan/BUILD.bazel

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ go_library(
1414
"//pkg/jobs/jobspb",
1515
"//pkg/roachpb",
1616
"//pkg/settings",
17+
"//pkg/sql/catalog/descpb",
1718
"//pkg/util/hlc",
1819
"//pkg/util/span",
1920
"@com_github_cockroachdb_errors//:errors",
@@ -27,10 +28,14 @@ go_test(
2728
deps = [
2829
":resolvedspan",
2930
"//pkg/jobs/jobspb",
31+
"//pkg/keys",
3032
"//pkg/roachpb",
33+
"//pkg/sql/catalog/descpb",
3134
"//pkg/util/hlc",
3235
"//pkg/util/leaktest",
3336
"//pkg/util/log",
37+
"//pkg/util/randutil",
38+
"//pkg/util/span",
3439
"@com_github_cockroachdb_errors//:errors",
3540
"@com_github_stretchr_testify//require",
3641
],

pkg/ccl/changefeedccl/resolvedspan/frontier.go

Lines changed: 42 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
1515
"github.com/cockroachdb/cockroach/pkg/roachpb"
1616
"github.com/cockroachdb/cockroach/pkg/settings"
17+
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
1718
"github.com/cockroachdb/cockroach/pkg/util/hlc"
1819
"github.com/cockroachdb/cockroach/pkg/util/span"
1920
"github.com/cockroachdb/errors"
@@ -28,9 +29,12 @@ type AggregatorFrontier struct {
2829

2930
// NewAggregatorFrontier returns a new AggregatorFrontier.
3031
func NewAggregatorFrontier(
31-
statementTime hlc.Timestamp, initialHighWater hlc.Timestamp, spans ...roachpb.Span,
32+
statementTime hlc.Timestamp,
33+
initialHighWater hlc.Timestamp,
34+
decoder TablePrefixDecoder,
35+
spans ...roachpb.Span,
3236
) (*AggregatorFrontier, error) {
33-
rsf, err := newResolvedSpanFrontier(statementTime, initialHighWater, spans...)
37+
rsf, err := newResolvedSpanFrontier(statementTime, initialHighWater, decoder, spans...)
3438
if err != nil {
3539
return nil, err
3640
}
@@ -73,9 +77,12 @@ type CoordinatorFrontier struct {
7377

7478
// NewCoordinatorFrontier returns a new CoordinatorFrontier.
7579
func NewCoordinatorFrontier(
76-
statementTime hlc.Timestamp, initialHighWater hlc.Timestamp, spans ...roachpb.Span,
80+
statementTime hlc.Timestamp,
81+
initialHighWater hlc.Timestamp,
82+
decoder TablePrefixDecoder,
83+
spans ...roachpb.Span,
7784
) (*CoordinatorFrontier, error) {
78-
rsf, err := newResolvedSpanFrontier(statementTime, initialHighWater, spans...)
85+
rsf, err := newResolvedSpanFrontier(statementTime, initialHighWater, decoder, spans...)
7986
if err != nil {
8087
return nil, err
8188
}
@@ -184,15 +191,11 @@ func (f *CoordinatorFrontier) MakeCheckpoint(
184191
return checkpoint.Make(f.Frontier(), f.Entries(), maxBytes, metrics)
185192
}
186193

187-
// spanFrontier is a type alias to make it possible to embed and forward calls
188-
// (e.g. Frontier()) to the underlying span.Frontier.
189-
type spanFrontier = span.Frontier
190-
191194
// resolvedSpanFrontier wraps a spanFrontier with additional bookkeeping fields
192195
// used to track resolved spans for a changefeed and methods for computing
193196
// lagging and checkpoint spans.
194197
type resolvedSpanFrontier struct {
195-
spanFrontier
198+
*span.MultiFrontier[descpb.ID]
196199

197200
// statementTime is the statement time of the changefeed.
198201
statementTime hlc.Timestamp
@@ -211,15 +214,17 @@ type resolvedSpanFrontier struct {
211214

212215
// newResolvedSpanFrontier returns a new resolvedSpanFrontier.
213216
func newResolvedSpanFrontier(
214-
statementTime hlc.Timestamp, initialHighWater hlc.Timestamp, spans ...roachpb.Span,
217+
statementTime hlc.Timestamp,
218+
initialHighWater hlc.Timestamp,
219+
decoder TablePrefixDecoder,
220+
spans ...roachpb.Span,
215221
) (*resolvedSpanFrontier, error) {
216-
sf, err := span.MakeFrontierAt(initialHighWater, spans...)
222+
sf, err := span.NewMultiFrontierAt(newTableIDPartitioner(decoder), initialHighWater, spans...)
217223
if err != nil {
218224
return nil, err
219225
}
220-
sf = span.MakeConcurrentFrontier(sf)
221226
return &resolvedSpanFrontier{
222-
spanFrontier: sf,
227+
MultiFrontier: sf,
223228
statementTime: statementTime,
224229
initialHighWater: initialHighWater,
225230
}, nil
@@ -340,7 +345,7 @@ func (f *resolvedSpanFrontier) HasLaggingSpans(sv *settings.Values) bool {
340345
// All returns an iterator over the resolved spans in the frontier.
341346
func (f *resolvedSpanFrontier) All() iter.Seq[jobspb.ResolvedSpan] {
342347
return func(yield func(jobspb.ResolvedSpan) bool) {
343-
for sp, ts := range f.spanFrontier.Entries() {
348+
for sp, ts := range f.Entries() {
344349
var boundaryType jobspb.ResolvedSpan_BoundaryType
345350
if ok, bt := f.boundary.At(ts); ok {
346351
boundaryType = bt
@@ -411,3 +416,26 @@ func (b *resolvedSpanBoundary) Forward(newBoundary resolvedSpanBoundary) bool {
411416
func (b *resolvedSpanBoundary) SafeFormat(s redact.SafePrinter, _ rune) {
412417
s.Printf("%v boundary (%v)", b.typ, b.ts)
413418
}
419+
420+
// A TablePrefixDecoder decodes table prefixes from keys.
421+
// The production implementation is keys.SQLCodec.
422+
type TablePrefixDecoder interface {
423+
DecodeTablePrefix(key roachpb.Key) ([]byte, uint32, error)
424+
}
425+
426+
func newTableIDPartitioner(decoder TablePrefixDecoder) span.PartitionerFunc[descpb.ID] {
427+
return func(sp roachpb.Span) (descpb.ID, error) {
428+
_, startKeyTableID, err := decoder.DecodeTablePrefix(sp.Key)
429+
if err != nil {
430+
return 0, err
431+
}
432+
_, endKeyTableID, err := decoder.DecodeTablePrefix(sp.EndKey)
433+
if err != nil {
434+
return 0, err
435+
}
436+
if startKeyTableID != endKeyTableID {
437+
return 0, errors.AssertionFailedf("span encompassing multiple tables: %s", sp)
438+
}
439+
return descpb.ID(startKeyTableID), nil
440+
}
441+
}

0 commit comments

Comments
 (0)