@@ -15,7 +15,6 @@ import (
15
15
"time"
16
16
17
17
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
18
- "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/checkpoint"
19
18
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent"
20
19
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/schemafeed"
21
20
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/timers"
@@ -55,21 +54,20 @@ type MonitoringConfig struct {
55
54
56
55
// Config configures a kvfeed.
57
56
type Config struct {
58
- Settings * cluster.Settings
59
- DB * kv.DB
60
- Codec keys.SQLCodec
61
- Clock * hlc.Clock
62
- Spans []roachpb.Span
63
- SpanLevelCheckpoint * jobspb.TimestampSpansMap
64
- Targets changefeedbase.Targets
65
- Writer kvevent.Writer
66
- Metrics * kvevent.Metrics
67
- MonitoringCfg MonitoringConfig
68
- MM * mon.BytesMonitor
69
- WithDiff bool
70
- SchemaChangeEvents changefeedbase.SchemaChangeEventClass
71
- SchemaChangePolicy changefeedbase.SchemaChangePolicy
72
- SchemaFeed schemafeed.SchemaFeed
57
+ Settings * cluster.Settings
58
+ DB * kv.DB
59
+ Codec keys.SQLCodec
60
+ Clock * hlc.Clock
61
+ Spans []roachpb.Span
62
+ Targets changefeedbase.Targets
63
+ Writer kvevent.Writer
64
+ Metrics * kvevent.Metrics
65
+ MonitoringCfg MonitoringConfig
66
+ MM * mon.BytesMonitor
67
+ WithDiff bool
68
+ SchemaChangeEvents changefeedbase.SchemaChangeEventClass
69
+ SchemaChangePolicy changefeedbase.SchemaChangePolicy
70
+ SchemaFeed schemafeed.SchemaFeed
73
71
74
72
// If true, the feed will begin with a dump of data at exactly the
75
73
// InitialHighWater. This is a peculiar behavior. In general the
@@ -78,9 +76,14 @@ type Config struct {
78
76
NeedsInitialScan bool
79
77
80
78
// InitialHighWater is the timestamp after which new events are guaranteed to
81
- // be produced.
79
+ // be produced. For initial scans, this is the scan time.
82
80
InitialHighWater hlc.Timestamp
83
81
82
+ // InitialSpanTimePairs contains pairs of spans and their initial resolved
83
+ // timestamps. The timestamps could be lower than InitialHighWater if the
84
+ // initial scan has not yet been completed.
85
+ InitialSpanTimePairs []kvcoord.SpanTimePair
86
+
84
87
// If the end time is set, the changefeed will run until the frontier
85
88
// progresses past the end time. Once the frontier has progressed past the end
86
89
// time, the changefeed job will end with a successful status.
@@ -130,12 +133,12 @@ func Run(ctx context.Context, cfg Config) error {
130
133
131
134
g := ctxgroup .WithContext (ctx )
132
135
f := newKVFeed (
133
- cfg .Writer , cfg .Spans , cfg . SpanLevelCheckpoint ,
136
+ cfg .Writer , cfg .Spans ,
134
137
cfg .SchemaChangeEvents , cfg .SchemaChangePolicy ,
135
138
cfg .NeedsInitialScan , cfg .WithDiff , cfg .WithFiltering ,
136
139
cfg .WithFrontierQuantize ,
137
140
cfg .ConsumerID ,
138
- cfg .InitialHighWater , cfg .EndTime ,
141
+ cfg .InitialHighWater , cfg .InitialSpanTimePairs , cfg . EndTime ,
139
142
cfg .Codec ,
140
143
cfg .SchemaFeed ,
141
144
sc , pff , bf , cfg .Targets , cfg .ScopedTimers , cfg .Knobs )
@@ -253,13 +256,13 @@ func (e schemaChangeDetectedError) Error() string {
253
256
254
257
type kvFeed struct {
255
258
spans []roachpb.Span
256
- spanLevelCheckpoint * jobspb.TimestampSpansMap
257
259
withFrontierQuantize time.Duration
258
260
withDiff bool
259
261
withFiltering bool
260
262
withInitialBackfill bool
261
263
consumerID int64
262
264
initialHighWater hlc.Timestamp
265
+ initialSpanTimePairs []kvcoord.SpanTimePair
263
266
endTime hlc.Timestamp
264
267
writer kvevent.Writer
265
268
codec keys.SQLCodec
@@ -284,13 +287,13 @@ type kvFeed struct {
284
287
func newKVFeed (
285
288
writer kvevent.Writer ,
286
289
spans []roachpb.Span ,
287
- spanLevelCheckpoint * jobspb.TimestampSpansMap ,
288
290
schemaChangeEvents changefeedbase.SchemaChangeEventClass ,
289
291
schemaChangePolicy changefeedbase.SchemaChangePolicy ,
290
292
withInitialBackfill , withDiff , withFiltering bool ,
291
293
withFrontierQuantize time.Duration ,
292
294
consumerID int64 ,
293
295
initialHighWater hlc.Timestamp ,
296
+ initialSpanTimePairs []kvcoord.SpanTimePair ,
294
297
endTime hlc.Timestamp ,
295
298
codec keys.SQLCodec ,
296
299
tf schemafeed.SchemaFeed ,
@@ -304,7 +307,7 @@ func newKVFeed(
304
307
return & kvFeed {
305
308
writer : writer ,
306
309
spans : spans ,
307
- spanLevelCheckpoint : spanLevelCheckpoint ,
310
+ initialSpanTimePairs : initialSpanTimePairs ,
308
311
withInitialBackfill : withInitialBackfill ,
309
312
withDiff : withDiff ,
310
313
withFiltering : withFiltering ,
@@ -344,21 +347,28 @@ func (f *kvFeed) run(ctx context.Context) (err error) {
344
347
return nil
345
348
}
346
349
347
- // Frontier initialized to initialHighwater timestamp which
348
- // represents the point in time at or before which we know
349
- // we've seen all events or is the initial starting time of the feed.
350
- rangeFeedResumeFrontier , err := span .MakeFrontierAt (f .initialHighWater , f .spans ... )
350
+ // Use the initial span-time pairs to restore progress.
351
+ rangeFeedResumeFrontier , err := span .MakeFrontier (f .spans ... )
351
352
if err != nil {
352
353
return err
353
354
}
355
+ for _ , stp := range f .initialSpanTimePairs {
356
+ if stp .StartAfter .IsSet () && stp .StartAfter .Compare (f .initialHighWater ) < 0 {
357
+ return errors .AssertionFailedf ("initial span time pair with non-empty timestamp " +
358
+ "earlier than initial highwater %v: %v" , f .initialHighWater , stp )
359
+ }
360
+ if _ , err := rangeFeedResumeFrontier .Forward (stp .Span , stp .StartAfter ); err != nil {
361
+ return err
362
+ }
363
+ }
354
364
rangeFeedResumeFrontier = span .MakeConcurrentFrontier (rangeFeedResumeFrontier )
355
365
defer rangeFeedResumeFrontier .Release ()
356
366
357
367
for i := 0 ; ; i ++ {
358
368
initialScan := i == 0
359
369
initialScanOnly := f .endTime == f .initialHighWater
360
370
361
- scannedSpans , scannedTS , err := f .scanIfShould (ctx , initialScan , initialScanOnly , rangeFeedResumeFrontier . Frontier () )
371
+ scannedSpans , scannedTS , err := f .scanIfShould (ctx , initialScan , initialScanOnly , rangeFeedResumeFrontier )
362
372
if err != nil {
363
373
return err
364
374
}
@@ -388,11 +398,6 @@ func (f *kvFeed) run(ctx context.Context) (err error) {
388
398
return err
389
399
}
390
400
391
- // Clear out checkpoint after the initial scan or rangefeed.
392
- if initialScan {
393
- f .spanLevelCheckpoint = nil
394
- }
395
-
396
401
boundaryTS := rangeFeedResumeFrontier .Frontier ()
397
402
schemaChangeTS := boundaryTS .Next ()
398
403
boundaryType := jobspb .ResolvedSpan_BACKFILL
@@ -461,19 +466,6 @@ func isPrimaryKeyChange(
461
466
return isPrimaryIndexChange , isPrimaryIndexChange && hasNoColumnChanges
462
467
}
463
468
464
- // filterCheckpointSpans filters spans which have already been completed,
465
- // and returns the list of spans that still need to be done.
466
- func filterCheckpointSpans (
467
- spans []roachpb.Span , checkpoint * jobspb.TimestampSpansMap ,
468
- ) []roachpb.Span {
469
- var sg roachpb.SpanGroup
470
- sg .Add (spans ... )
471
- for _ , sp := range checkpoint .All () {
472
- sg .Sub (sp ... )
473
- }
474
- return sg .Slice ()
475
- }
476
-
477
469
// scanIfShould performs a scan of KV pairs in watched span if
478
470
// - this is the initial scan, or
479
471
// - table schema is changed (a column is added/dropped) and a re-scan is needed.
@@ -486,12 +478,18 @@ func filterCheckpointSpans(
486
478
// `highWater` is the largest timestamp at or below which we know all events in
487
479
// watched span have been seen (i.e. frontier.smallestTS).
488
480
func (f * kvFeed ) scanIfShould (
489
- ctx context.Context , initialScan bool , initialScanOnly bool , highWater hlc. Timestamp ,
481
+ ctx context.Context , initialScan bool , initialScanOnly bool , resumeFrontier span. Frontier ,
490
482
) ([]roachpb.Span , hlc.Timestamp , error ) {
491
483
ctx , sp := tracing .ChildSpan (ctx , "changefeed.kvfeed.scan_if_should" )
492
484
defer sp .Finish ()
493
485
494
- scanTime := highWater .Next ()
486
+ highWater := resumeFrontier .Frontier ()
487
+ scanTime := func () hlc.Timestamp {
488
+ if highWater .IsEmpty () {
489
+ return f .initialHighWater
490
+ }
491
+ return highWater .Next ()
492
+ }()
495
493
496
494
events , err := f .tableFeed .Peek (ctx , scanTime )
497
495
if err != nil {
@@ -504,7 +502,6 @@ func (f *kvFeed) scanIfShould(
504
502
isInitialScan := initialScan && f .withInitialBackfill
505
503
var spansToScan []roachpb.Span
506
504
if isInitialScan {
507
- scanTime = highWater
508
505
spansToScan = f .spans
509
506
} else if len (events ) > 0 {
510
507
// Only backfill for the tables which have events which may not be all
@@ -547,10 +544,17 @@ func (f *kvFeed) scanIfShould(
547
544
return spansToScan , scanTime , nil
548
545
}
549
546
550
- // If we have initial checkpoint information specified, filter out
551
- // spans which we no longer need to scan.
552
- spansToBackfill := filterCheckpointSpans (spansToScan , f .spanLevelCheckpoint )
553
- if len (spansToBackfill ) == 0 {
547
+ // Filter out any spans that have already been backfilled.
548
+ var spansToBackfill roachpb.SpanGroup
549
+ spansToBackfill .Add (spansToScan ... )
550
+ for sp , ts := range resumeFrontier .Entries () {
551
+ if scanTime .LessEq (ts ) {
552
+ spansToBackfill .Sub (sp )
553
+ }
554
+ }
555
+
556
+ // Skip scan if there are no spans to scan.
557
+ if spansToBackfill .Len () == 0 {
554
558
return spansToScan , scanTime , nil
555
559
}
556
560
@@ -563,7 +567,7 @@ func (f *kvFeed) scanIfShould(
563
567
boundaryType = jobspb .ResolvedSpan_EXIT
564
568
}
565
569
if err := f .scanner .Scan (ctx , f .writer , scanConfig {
566
- Spans : spansToBackfill ,
570
+ Spans : spansToBackfill . Slice () ,
567
571
Timestamp : scanTime ,
568
572
WithDiff : ! isInitialScan && f .withDiff ,
569
573
Knobs : f .knobs ,
@@ -599,13 +603,6 @@ func (f *kvFeed) runUntilTableEvent(ctx context.Context, resumeFrontier span.Fro
599
603
err = errors .CombineErrors (err , memBuf .CloseWithReason (ctx , err ))
600
604
}()
601
605
602
- // We have catchup scan checkpoint. Advance frontier.
603
- if f .spanLevelCheckpoint != nil {
604
- if err := checkpoint .Restore (resumeFrontier , f .spanLevelCheckpoint ); err != nil {
605
- return err
606
- }
607
- }
608
-
609
606
var stps []kvcoord.SpanTimePair
610
607
for s , ts := range resumeFrontier .Entries () {
611
608
stps = append (stps , kvcoord.SpanTimePair {Span : s , StartAfter : ts })
0 commit comments