Skip to content

Commit 80ad49e

Browse files
craig[bot]andyyang890
andcommitted
Merge #152234
152234: changefeedccl: decouple checkpoint restoration from kv feed r=aerfrei a=andyyang890 This patch decouples the restoration of span-level checkpoints from the kv feed to clarify package boundaries and ensure restoration only happens once at the processor level. Additionally, the upcoming work to restore per-table resolved timestamps will leverage this change. Fixes #152107 Release note: None Co-authored-by: Andy Yang <[email protected]>
2 parents 97dbae0 + 5aa32d6 commit 80ad49e

File tree

5 files changed

+115
-81
lines changed

5 files changed

+115
-81
lines changed

pkg/ccl/changefeedccl/changefeed_processors.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/cockroachdb/cockroach/pkg/clusterversion"
2626
"github.com/cockroachdb/cockroach/pkg/jobs"
2727
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
28+
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
2829
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts"
2930
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
3031
"github.com/cockroachdb/cockroach/pkg/roachpb"
@@ -562,18 +563,28 @@ func (ca *changeAggregator) makeKVFeedCfg(
562563
return kvfeed.Config{}, err
563564
}
564565

566+
// Create the initial span-timestamp pairs from the frontier
567+
// (which already has checkpoint info restored).
568+
var initialSpanTimePairs []kvcoord.SpanTimePair
569+
for sp, ts := range ca.frontier.Entries() {
570+
initialSpanTimePairs = append(initialSpanTimePairs, kvcoord.SpanTimePair{
571+
Span: sp,
572+
StartAfter: ts,
573+
})
574+
}
575+
565576
return kvfeed.Config{
566577
Writer: buf,
567578
Settings: cfg.Settings,
568579
DB: cfg.DB.KV(),
569580
Codec: cfg.Codec,
570581
Clock: cfg.DB.KV().Clock(),
571582
Spans: spans,
572-
SpanLevelCheckpoint: ca.spec.SpanLevelCheckpoint,
573583
Targets: ca.targets,
574584
Metrics: &ca.metrics.KVFeedMetrics,
575585
MM: memMon,
576586
InitialHighWater: initialHighWater,
587+
InitialSpanTimePairs: initialSpanTimePairs,
577588
EndTime: config.EndTime,
578589
WithDiff: filters.WithDiff,
579590
WithFiltering: filters.WithFiltering,

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3103,7 +3103,7 @@ func TestChangefeedSchemaChangeBackfillCheckpoint(t *testing.T) {
31033103
require.NoError(t, jobFeed.Pause())
31043104

31053105
// Verify that none of the resolved spans after resume were checkpointed.
3106-
t.Logf("Table Span: %s, Second Checkpoint: %v, Resolved Spans: %v", tableSpan, secondCheckpoint, resolved)
3106+
t.Logf("Table Span: %s, Second Checkpoint: %v, Resolved Spans: %v", tableSpan, secondCheckpoint.Slice(), resolved)
31073107
for _, sp := range resolved {
31083108
require.Falsef(t, !sp.Equal(tableSpan) && secondCheckpoint.Contains(sp.Key), "span should not have been resolved: %s", sp)
31093109
}

pkg/ccl/changefeedccl/kvfeed/BUILD.bazel

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ go_library(
1212
visibility = ["//visibility:public"],
1313
deps = [
1414
"//pkg/ccl/changefeedccl/changefeedbase",
15-
"//pkg/ccl/changefeedccl/checkpoint",
1615
"//pkg/ccl/changefeedccl/kvevent",
1716
"//pkg/ccl/changefeedccl/schemafeed",
1817
"//pkg/ccl/changefeedccl/timers",

pkg/ccl/changefeedccl/kvfeed/kv_feed.go

Lines changed: 57 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import (
1515
"time"
1616

1717
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
18-
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/checkpoint"
1918
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent"
2019
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/schemafeed"
2120
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/timers"
@@ -55,21 +54,20 @@ type MonitoringConfig struct {
5554

5655
// Config configures a kvfeed.
5756
type Config struct {
58-
Settings *cluster.Settings
59-
DB *kv.DB
60-
Codec keys.SQLCodec
61-
Clock *hlc.Clock
62-
Spans []roachpb.Span
63-
SpanLevelCheckpoint *jobspb.TimestampSpansMap
64-
Targets changefeedbase.Targets
65-
Writer kvevent.Writer
66-
Metrics *kvevent.Metrics
67-
MonitoringCfg MonitoringConfig
68-
MM *mon.BytesMonitor
69-
WithDiff bool
70-
SchemaChangeEvents changefeedbase.SchemaChangeEventClass
71-
SchemaChangePolicy changefeedbase.SchemaChangePolicy
72-
SchemaFeed schemafeed.SchemaFeed
57+
Settings *cluster.Settings
58+
DB *kv.DB
59+
Codec keys.SQLCodec
60+
Clock *hlc.Clock
61+
Spans []roachpb.Span
62+
Targets changefeedbase.Targets
63+
Writer kvevent.Writer
64+
Metrics *kvevent.Metrics
65+
MonitoringCfg MonitoringConfig
66+
MM *mon.BytesMonitor
67+
WithDiff bool
68+
SchemaChangeEvents changefeedbase.SchemaChangeEventClass
69+
SchemaChangePolicy changefeedbase.SchemaChangePolicy
70+
SchemaFeed schemafeed.SchemaFeed
7371

7472
// If true, the feed will begin with a dump of data at exactly the
7573
// InitialHighWater. This is a peculiar behavior. In general the
@@ -78,9 +76,14 @@ type Config struct {
7876
NeedsInitialScan bool
7977

8078
// InitialHighWater is the timestamp after which new events are guaranteed to
81-
// be produced.
79+
// be produced. For initial scans, this is the scan time.
8280
InitialHighWater hlc.Timestamp
8381

82+
// InitialSpanTimePairs contains pairs of spans and their initial resolved
83+
// timestamps. The timestamps could be lower than InitialHighWater if the
84+
// initial scan has not yet been completed.
85+
InitialSpanTimePairs []kvcoord.SpanTimePair
86+
8487
// If the end time is set, the changefeed will run until the frontier
8588
// progresses past the end time. Once the frontier has progressed past the end
8689
// time, the changefeed job will end with a successful status.
@@ -130,12 +133,12 @@ func Run(ctx context.Context, cfg Config) error {
130133

131134
g := ctxgroup.WithContext(ctx)
132135
f := newKVFeed(
133-
cfg.Writer, cfg.Spans, cfg.SpanLevelCheckpoint,
136+
cfg.Writer, cfg.Spans,
134137
cfg.SchemaChangeEvents, cfg.SchemaChangePolicy,
135138
cfg.NeedsInitialScan, cfg.WithDiff, cfg.WithFiltering,
136139
cfg.WithFrontierQuantize,
137140
cfg.ConsumerID,
138-
cfg.InitialHighWater, cfg.EndTime,
141+
cfg.InitialHighWater, cfg.InitialSpanTimePairs, cfg.EndTime,
139142
cfg.Codec,
140143
cfg.SchemaFeed,
141144
sc, pff, bf, cfg.Targets, cfg.ScopedTimers, cfg.Knobs)
@@ -253,13 +256,13 @@ func (e schemaChangeDetectedError) Error() string {
253256

254257
type kvFeed struct {
255258
spans []roachpb.Span
256-
spanLevelCheckpoint *jobspb.TimestampSpansMap
257259
withFrontierQuantize time.Duration
258260
withDiff bool
259261
withFiltering bool
260262
withInitialBackfill bool
261263
consumerID int64
262264
initialHighWater hlc.Timestamp
265+
initialSpanTimePairs []kvcoord.SpanTimePair
263266
endTime hlc.Timestamp
264267
writer kvevent.Writer
265268
codec keys.SQLCodec
@@ -284,13 +287,13 @@ type kvFeed struct {
284287
func newKVFeed(
285288
writer kvevent.Writer,
286289
spans []roachpb.Span,
287-
spanLevelCheckpoint *jobspb.TimestampSpansMap,
288290
schemaChangeEvents changefeedbase.SchemaChangeEventClass,
289291
schemaChangePolicy changefeedbase.SchemaChangePolicy,
290292
withInitialBackfill, withDiff, withFiltering bool,
291293
withFrontierQuantize time.Duration,
292294
consumerID int64,
293295
initialHighWater hlc.Timestamp,
296+
initialSpanTimePairs []kvcoord.SpanTimePair,
294297
endTime hlc.Timestamp,
295298
codec keys.SQLCodec,
296299
tf schemafeed.SchemaFeed,
@@ -304,7 +307,7 @@ func newKVFeed(
304307
return &kvFeed{
305308
writer: writer,
306309
spans: spans,
307-
spanLevelCheckpoint: spanLevelCheckpoint,
310+
initialSpanTimePairs: initialSpanTimePairs,
308311
withInitialBackfill: withInitialBackfill,
309312
withDiff: withDiff,
310313
withFiltering: withFiltering,
@@ -344,21 +347,28 @@ func (f *kvFeed) run(ctx context.Context) (err error) {
344347
return nil
345348
}
346349

347-
// Frontier initialized to initialHighwater timestamp which
348-
// represents the point in time at or before which we know
349-
// we've seen all events or is the initial starting time of the feed.
350-
rangeFeedResumeFrontier, err := span.MakeFrontierAt(f.initialHighWater, f.spans...)
350+
// Use the initial span-time pairs to restore progress.
351+
rangeFeedResumeFrontier, err := span.MakeFrontier(f.spans...)
351352
if err != nil {
352353
return err
353354
}
355+
for _, stp := range f.initialSpanTimePairs {
356+
if stp.StartAfter.IsSet() && stp.StartAfter.Compare(f.initialHighWater) < 0 {
357+
return errors.AssertionFailedf("initial span time pair with non-empty timestamp "+
358+
"earlier than initial highwater %v: %v", f.initialHighWater, stp)
359+
}
360+
if _, err := rangeFeedResumeFrontier.Forward(stp.Span, stp.StartAfter); err != nil {
361+
return err
362+
}
363+
}
354364
rangeFeedResumeFrontier = span.MakeConcurrentFrontier(rangeFeedResumeFrontier)
355365
defer rangeFeedResumeFrontier.Release()
356366

357367
for i := 0; ; i++ {
358368
initialScan := i == 0
359369
initialScanOnly := f.endTime == f.initialHighWater
360370

361-
scannedSpans, scannedTS, err := f.scanIfShould(ctx, initialScan, initialScanOnly, rangeFeedResumeFrontier.Frontier())
371+
scannedSpans, scannedTS, err := f.scanIfShould(ctx, initialScan, initialScanOnly, rangeFeedResumeFrontier)
362372
if err != nil {
363373
return err
364374
}
@@ -388,11 +398,6 @@ func (f *kvFeed) run(ctx context.Context) (err error) {
388398
return err
389399
}
390400

391-
// Clear out checkpoint after the initial scan or rangefeed.
392-
if initialScan {
393-
f.spanLevelCheckpoint = nil
394-
}
395-
396401
boundaryTS := rangeFeedResumeFrontier.Frontier()
397402
schemaChangeTS := boundaryTS.Next()
398403
boundaryType := jobspb.ResolvedSpan_BACKFILL
@@ -461,19 +466,6 @@ func isPrimaryKeyChange(
461466
return isPrimaryIndexChange, isPrimaryIndexChange && hasNoColumnChanges
462467
}
463468

464-
// filterCheckpointSpans filters spans which have already been completed,
465-
// and returns the list of spans that still need to be done.
466-
func filterCheckpointSpans(
467-
spans []roachpb.Span, checkpoint *jobspb.TimestampSpansMap,
468-
) []roachpb.Span {
469-
var sg roachpb.SpanGroup
470-
sg.Add(spans...)
471-
for _, sp := range checkpoint.All() {
472-
sg.Sub(sp...)
473-
}
474-
return sg.Slice()
475-
}
476-
477469
// scanIfShould performs a scan of KV pairs in watched span if
478470
// - this is the initial scan, or
479471
// - table schema is changed (a column is added/dropped) and a re-scan is needed.
@@ -486,12 +478,18 @@ func filterCheckpointSpans(
486478
// `highWater` is the largest timestamp at or below which we know all events in
487479
// watched span have been seen (i.e. frontier.smallestTS).
488480
func (f *kvFeed) scanIfShould(
489-
ctx context.Context, initialScan bool, initialScanOnly bool, highWater hlc.Timestamp,
481+
ctx context.Context, initialScan bool, initialScanOnly bool, resumeFrontier span.Frontier,
490482
) ([]roachpb.Span, hlc.Timestamp, error) {
491483
ctx, sp := tracing.ChildSpan(ctx, "changefeed.kvfeed.scan_if_should")
492484
defer sp.Finish()
493485

494-
scanTime := highWater.Next()
486+
highWater := resumeFrontier.Frontier()
487+
scanTime := func() hlc.Timestamp {
488+
if highWater.IsEmpty() {
489+
return f.initialHighWater
490+
}
491+
return highWater.Next()
492+
}()
495493

496494
events, err := f.tableFeed.Peek(ctx, scanTime)
497495
if err != nil {
@@ -504,7 +502,6 @@ func (f *kvFeed) scanIfShould(
504502
isInitialScan := initialScan && f.withInitialBackfill
505503
var spansToScan []roachpb.Span
506504
if isInitialScan {
507-
scanTime = highWater
508505
spansToScan = f.spans
509506
} else if len(events) > 0 {
510507
// Only backfill for the tables which have events which may not be all
@@ -547,10 +544,17 @@ func (f *kvFeed) scanIfShould(
547544
return spansToScan, scanTime, nil
548545
}
549546

550-
// If we have initial checkpoint information specified, filter out
551-
// spans which we no longer need to scan.
552-
spansToBackfill := filterCheckpointSpans(spansToScan, f.spanLevelCheckpoint)
553-
if len(spansToBackfill) == 0 {
547+
// Filter out any spans that have already been backfilled.
548+
var spansToBackfill roachpb.SpanGroup
549+
spansToBackfill.Add(spansToScan...)
550+
for sp, ts := range resumeFrontier.Entries() {
551+
if scanTime.LessEq(ts) {
552+
spansToBackfill.Sub(sp)
553+
}
554+
}
555+
556+
// Skip scan if there are no spans to scan.
557+
if spansToBackfill.Len() == 0 {
554558
return spansToScan, scanTime, nil
555559
}
556560

@@ -563,7 +567,7 @@ func (f *kvFeed) scanIfShould(
563567
boundaryType = jobspb.ResolvedSpan_EXIT
564568
}
565569
if err := f.scanner.Scan(ctx, f.writer, scanConfig{
566-
Spans: spansToBackfill,
570+
Spans: spansToBackfill.Slice(),
567571
Timestamp: scanTime,
568572
WithDiff: !isInitialScan && f.withDiff,
569573
Knobs: f.knobs,
@@ -599,13 +603,6 @@ func (f *kvFeed) runUntilTableEvent(ctx context.Context, resumeFrontier span.Fro
599603
err = errors.CombineErrors(err, memBuf.CloseWithReason(ctx, err))
600604
}()
601605

602-
// We have catchup scan checkpoint. Advance frontier.
603-
if f.spanLevelCheckpoint != nil {
604-
if err := checkpoint.Restore(resumeFrontier, f.spanLevelCheckpoint); err != nil {
605-
return err
606-
}
607-
}
608-
609606
var stps []kvcoord.SpanTimePair
610607
for s, ts := range resumeFrontier.Entries() {
611608
stps = append(stps, kvcoord.SpanTimePair{Span: s, StartAfter: ts})

0 commit comments

Comments
 (0)