Skip to content

Commit c75f1e2

Browse files
craig[bot]andyyang890
andcommitted
Merge #152643
152643: changefeedccl/resolvedspan: update table ID partitioner r=log-head,asg0451 a=andyyang890 This patch updates the table ID partitioner so that it's able to handle spans where the end key is the next table's start key (e.g. full table spans like `/Table/1{0-1}`). Part of #148119 Release note: None Co-authored-by: Andy Yang <[email protected]>
2 parents 7b65b17 + dae1d2a commit c75f1e2

File tree

3 files changed

+122
-24
lines changed

3 files changed

+122
-24
lines changed

pkg/ccl/changefeedccl/resolvedspan/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ go_test(
3131
"//pkg/keys",
3232
"//pkg/roachpb",
3333
"//pkg/sql/catalog/descpb",
34+
"//pkg/testutils",
3435
"//pkg/util/hlc",
3536
"//pkg/util/leaktest",
3637
"//pkg/util/log",

pkg/ccl/changefeedccl/resolvedspan/frontier.go

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,11 @@ type AggregatorFrontier struct {
3131
func NewAggregatorFrontier(
3232
statementTime hlc.Timestamp,
3333
initialHighWater hlc.Timestamp,
34-
decoder TablePrefixDecoder,
34+
codec TableCodec,
3535
perTableTracking bool,
3636
spans ...roachpb.Span,
3737
) (*AggregatorFrontier, error) {
38-
rsf, err := newResolvedSpanFrontier(statementTime, initialHighWater, decoder, perTableTracking, spans...)
38+
rsf, err := newResolvedSpanFrontier(statementTime, initialHighWater, codec, perTableTracking, spans...)
3939
if err != nil {
4040
return nil, err
4141
}
@@ -80,11 +80,11 @@ type CoordinatorFrontier struct {
8080
func NewCoordinatorFrontier(
8181
statementTime hlc.Timestamp,
8282
initialHighWater hlc.Timestamp,
83-
decoder TablePrefixDecoder,
83+
codec TableCodec,
8484
perTableTracking bool,
8585
spans ...roachpb.Span,
8686
) (*CoordinatorFrontier, error) {
87-
rsf, err := newResolvedSpanFrontier(statementTime, initialHighWater, decoder, perTableTracking, spans...)
87+
rsf, err := newResolvedSpanFrontier(statementTime, initialHighWater, codec, perTableTracking, spans...)
8888
if err != nil {
8989
return nil, err
9090
}
@@ -218,13 +218,13 @@ type resolvedSpanFrontier struct {
218218
func newResolvedSpanFrontier(
219219
statementTime hlc.Timestamp,
220220
initialHighWater hlc.Timestamp,
221-
decoder TablePrefixDecoder,
221+
codec TableCodec,
222222
perTableTracking bool,
223223
spans ...roachpb.Span,
224224
) (*resolvedSpanFrontier, error) {
225225
sf, err := func() (maybeTablePartitionedFrontier, error) {
226226
if perTableTracking {
227-
return span.NewMultiFrontierAt(newTableIDPartitioner(decoder), initialHighWater, spans...)
227+
return span.NewMultiFrontierAt(newTableIDPartitioner(codec), initialHighWater, spans...)
228228
}
229229
f, err := span.MakeFrontierAt(initialHighWater, spans...)
230230
if err != nil {
@@ -462,23 +462,22 @@ func (f notTablePartitionedFrontier) Frontiers() iter.Seq2[descpb.ID, span.ReadO
462462
}
463463
}
464464

465-
// A TablePrefixDecoder decodes table prefixes from keys.
465+
// A TableCodec does table-related decoding/encoding.
466466
// The production implementation is keys.SQLCodec.
467-
type TablePrefixDecoder interface {
467+
type TableCodec interface {
468468
DecodeTablePrefix(key roachpb.Key) ([]byte, uint32, error)
469+
TableSpan(tableID uint32) roachpb.Span
469470
}
470471

471-
func newTableIDPartitioner(decoder TablePrefixDecoder) span.PartitionerFunc[descpb.ID] {
472+
func newTableIDPartitioner(codec TableCodec) span.PartitionerFunc[descpb.ID] {
472473
return func(sp roachpb.Span) (descpb.ID, error) {
473-
_, startKeyTableID, err := decoder.DecodeTablePrefix(sp.Key)
474+
_, startKeyTableID, err := codec.DecodeTablePrefix(sp.Key)
474475
if err != nil {
475-
return 0, err
476+
return 0, errors.Wrapf(err, "error decoding start key in %v", sp)
476477
}
477-
_, endKeyTableID, err := decoder.DecodeTablePrefix(sp.EndKey)
478-
if err != nil {
479-
return 0, err
480-
}
481-
if startKeyTableID != endKeyTableID {
478+
// Reject any spans that cross table boundaries.
479+
tableSpan := codec.TableSpan(startKeyTableID)
480+
if !tableSpan.Contains(sp) {
482481
return 0, errors.AssertionFailedf("span encompassing multiple tables: %s", sp)
483482
}
484483
return descpb.ID(startKeyTableID), nil

pkg/ccl/changefeedccl/resolvedspan/frontier_test.go

Lines changed: 106 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/cockroachdb/cockroach/pkg/keys"
1515
"github.com/cockroachdb/cockroach/pkg/roachpb"
1616
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
17+
"github.com/cockroachdb/cockroach/pkg/testutils"
1718
"github.com/cockroachdb/cockroach/pkg/util/hlc"
1819
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
1920
"github.com/cockroachdb/cockroach/pkg/util/log"
@@ -33,7 +34,7 @@ func TestAggregatorFrontier(t *testing.T) {
3334
f, err := resolvedspan.NewAggregatorFrontier(
3435
statementTime,
3536
initialHighwater,
36-
mockDecoder{},
37+
mockCodec{},
3738
false, /* perTableTracking */
3839
makeSpan("a", "f"),
3940
)
@@ -79,7 +80,7 @@ func TestAggregatorFrontier(t *testing.T) {
7980
f, err = resolvedspan.NewAggregatorFrontier(
8081
statementTime,
8182
initialHighwater,
82-
mockDecoder{},
83+
mockCodec{},
8384
false, /* perTableTracking */
8485
makeSpan("a", "f"),
8586
)
@@ -105,7 +106,7 @@ func TestCoordinatorFrontier(t *testing.T) {
105106
f, err := resolvedspan.NewCoordinatorFrontier(
106107
statementTime,
107108
initialHighwater,
108-
mockDecoder{},
109+
mockCodec{},
109110
false, /* perTableTracking */
110111
makeSpan("a", "f"),
111112
)
@@ -154,7 +155,7 @@ func TestCoordinatorFrontier(t *testing.T) {
154155
f, err = resolvedspan.NewCoordinatorFrontier(
155156
statementTime,
156157
initialHighwater,
157-
mockDecoder{},
158+
mockCodec{},
158159
false, /* perTableTracking */
159160
makeSpan("a", "f"),
160161
)
@@ -264,7 +265,7 @@ func TestAggregatorFrontier_ForwardResolvedSpan(t *testing.T) {
264265
f, err := resolvedspan.NewAggregatorFrontier(
265266
hlc.Timestamp{},
266267
hlc.Timestamp{},
267-
mockDecoder{},
268+
mockCodec{},
268269
false, /* perTableTracking */
269270
makeSpan("a", "f"),
270271
)
@@ -308,14 +309,28 @@ func TestAggregatorFrontier_ForwardResolvedSpan(t *testing.T) {
308309
})
309310
}
310311

311-
// mockDecoder is a simple TablePrefixDecoder for testing
312+
// mockCodec is a simple TableCodec for testing
312313
// that treats all keys as table ID 1.
313-
type mockDecoder struct{}
314+
type mockCodec struct{}
314315

315-
func (mockDecoder) DecodeTablePrefix(key roachpb.Key) ([]byte, uint32, error) {
316+
var _ resolvedspan.TableCodec = mockCodec{}
317+
318+
// DecodeTablePrefix implements TableCodec.
319+
func (mockCodec) DecodeTablePrefix(key roachpb.Key) ([]byte, uint32, error) {
316320
return key, 1, nil
317321
}
318322

323+
// TableSpan implements TableCodec.
324+
func (mockCodec) TableSpan(tableID uint32) roachpb.Span {
325+
if tableID == 1 {
326+
// Since the mock codec treats all keys as belonging to table ID 1,
327+
// we return the everything span so that all keys will be considered
328+
// a part of the table.
329+
return keys.EverythingSpan
330+
}
331+
panic("mock codec only handles table ID 1")
332+
}
333+
319334
func TestFrontierPerTableResolvedTimestamps(t *testing.T) {
320335
defer leaktest.AfterTest(t)()
321336
defer log.Scope(t).Close(t)
@@ -335,6 +350,10 @@ func TestFrontierPerTableResolvedTimestamps(t *testing.T) {
335350

336351
// Helper to create spans for tables.
337352
tableSpan := func(tableID uint32) roachpb.Span {
353+
// Randomly choose either the full table span or an index span.
354+
if rnd.Float64() < 0.5 {
355+
return codec.TableSpan(tableID)
356+
}
338357
prefix := codec.IndexPrefix(tableID, 1 /* indexID */)
339358
return roachpb.Span{
340359
Key: prefix,
@@ -432,3 +451,82 @@ func TestFrontierPerTableResolvedTimestamps(t *testing.T) {
432451
})
433452
}
434453
}
454+
455+
func TestFrontierForwardFullTableSpan(t *testing.T) {
456+
defer leaktest.AfterTest(t)()
457+
defer log.Scope(t).Close(t)
458+
459+
testutils.RunValues(t, "frontier type", []string{"aggregator", "coordinator"},
460+
func(t *testing.T, frontierType string) {
461+
rnd, _ := randutil.NewPseudoRand()
462+
463+
// Randomly use either the system codec or a tenant codec.
464+
codec := func() keys.SQLCodec {
465+
if rnd.Float64() < 0.5 {
466+
tenantID := roachpb.MustMakeTenantID(uint64(1 + rnd.Intn(10)))
467+
return keys.MakeSQLCodec(tenantID)
468+
}
469+
return keys.SystemSQLCodec
470+
}()
471+
472+
key := func(base roachpb.Key, suffix string) roachpb.Key {
473+
result := make([]byte, len(base)+len(suffix))
474+
copy(result, base)
475+
copy(result[len(base):], suffix)
476+
return result
477+
}
478+
479+
// Create spans for tables 109 and 110.
480+
// These table IDs were specifically chosen because 109->110
481+
// is the boundary for when table IDs go from single-byte to
482+
// multi-byte encodings.
483+
table109Span := codec.TableSpan(109)
484+
table110Span := codec.TableSpan(110)
485+
require.True(t, len(table110Span.Key) == len(table109Span.Key)+1)
486+
487+
// Create several subspans within each table.
488+
tableSpans := []roachpb.Span{
489+
{Key: key(table109Span.Key, "a"), EndKey: key(table109Span.Key, "c")},
490+
{Key: key(table109Span.Key, "e"), EndKey: key(table109Span.Key, "g")},
491+
{Key: key(table110Span.Key, "b"), EndKey: key(table110Span.Key, "d")},
492+
{Key: key(table110Span.Key, "f"), EndKey: key(table110Span.Key, "k")},
493+
}
494+
495+
statementTime := makeTS(5)
496+
var initialHighWater hlc.Timestamp
497+
498+
f, err := func() (span.Frontier, error) {
499+
switch frontierType {
500+
case "aggregator":
501+
return resolvedspan.NewAggregatorFrontier(
502+
statementTime,
503+
initialHighWater,
504+
codec,
505+
true, /* perTableTracking */
506+
tableSpans...,
507+
)
508+
case "coordinator":
509+
return resolvedspan.NewCoordinatorFrontier(
510+
statementTime,
511+
initialHighWater,
512+
codec,
513+
true, /* perTableTracking */
514+
tableSpans...,
515+
)
516+
default:
517+
t.Fatalf("unknown frontier type: %s", frontierType)
518+
}
519+
panic("unreachable")
520+
}()
521+
require.NoError(t, err)
522+
require.Equal(t, initialHighWater, f.Frontier())
523+
524+
// Forward both tables to timestamp 20.
525+
targetTimestamp := makeTS(20)
526+
for _, tableSpan := range []roachpb.Span{table109Span, table110Span} {
527+
_, err := f.Forward(tableSpan, targetTimestamp)
528+
require.NoError(t, err)
529+
}
530+
require.Equal(t, targetTimestamp, f.Frontier())
531+
})
532+
}

0 commit comments

Comments
 (0)