@@ -13,9 +13,7 @@ import (
13
13
14
14
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdceval"
15
15
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
16
- "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/checkpoint"
17
16
"github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvfollowerreadsccl"
18
- "github.com/cockroachdb/cockroach/pkg/clusterversion"
19
17
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
20
18
"github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler"
21
19
"github.com/cockroachdb/cockroach/pkg/keys"
@@ -260,38 +258,15 @@ func startDistChangefeed(
260
258
261
259
dsp := execCtx .DistSQLPlanner ()
262
260
263
- //lint:ignore SA1019 deprecated usage
264
- var legacyCheckpoint * jobspb.ChangefeedProgress_Checkpoint
265
- if progress := localState .progress .GetChangefeed (); progress != nil && progress .Checkpoint != nil {
266
- legacyCheckpoint = progress .Checkpoint
267
- }
268
261
var spanLevelCheckpoint * jobspb.TimestampSpansMap
269
262
if progress := localState .progress .GetChangefeed (); progress != nil && progress .SpanLevelCheckpoint != nil {
270
263
spanLevelCheckpoint = progress .SpanLevelCheckpoint
271
- }
272
- if legacyCheckpoint != nil && spanLevelCheckpoint != nil {
273
- if legacyCheckpoint .Timestamp .After (spanLevelCheckpoint .MinTimestamp ()) {
274
- // We should never be writing the legacy checkpoint again once we
275
- // start writing the new checkpoint format. If we do, that signals
276
- // a missing or incorrect version gate check somewhere.
277
- return errors .AssertionFailedf ("both legacy and current checkpoint set on " +
278
- "changefeed job progress and legacy checkpoint has later timestamp" )
279
- }
280
- // This should always be an assertion failure but unfortunately due to a bug
281
- // that was included in earlier versions of 25.2 (#148620), we may fail
282
- // to clear the legacy checkpoint when we start writing the new one.
283
- // We instead discard the legacy checkpoint here and it will eventually be
284
- // cleared once the cluster is running a newer patch release with the fix.
285
- if buildutil .CrdbTestBuild {
286
- return errors .AssertionFailedf ("both legacy and current checkpoint set on " +
287
- "changefeed job progress" )
264
+ if log .V (2 ) {
265
+ log .Infof (ctx , "span-level checkpoint: %s" , spanLevelCheckpoint )
288
266
}
289
- log .Warningf (ctx , "both legacy and current checkpoint set on changefeed job progress; " +
290
- "discarding legacy checkpoint" )
291
- legacyCheckpoint = nil
292
267
}
293
268
p , planCtx , err := makePlan (execCtx , jobID , details , description , initialHighWater ,
294
- trackedSpans , legacyCheckpoint , spanLevelCheckpoint , localState .drainingNodes )(ctx , dsp )
269
+ trackedSpans , spanLevelCheckpoint , localState .drainingNodes )(ctx , dsp )
295
270
if err != nil {
296
271
return err
297
272
}
@@ -410,8 +385,6 @@ func makePlan(
410
385
description string ,
411
386
initialHighWater hlc.Timestamp ,
412
387
trackedSpans []roachpb.Span ,
413
- //lint:ignore SA1019 deprecated usage
414
- legacyCheckpoint * jobspb.ChangefeedProgress_Checkpoint ,
415
388
spanLevelCheckpoint * jobspb.TimestampSpansMap ,
416
389
drainingNodes []roachpb.NodeID ,
417
390
) func (context.Context , * sql.DistSQLPlanner ) (* sql.PhysicalPlan , * sql.PlanningCtx , error ) {
@@ -480,62 +453,22 @@ func makePlan(
480
453
maybeCfKnobs .SpanPartitionsCallback (spanPartitions )
481
454
}
482
455
483
- // Use the same checkpoint for all aggregators; each aggregator will only look at
484
- // spans that are assigned to it.
485
- // We could compute per-aggregator checkpoint, but that's probably an overkill.
486
- //lint:ignore SA1019 deprecated usage
487
- var aggregatorCheckpoint execinfrapb.ChangeAggregatorSpec_Checkpoint
488
- var checkpointSpanGroup roachpb.SpanGroup
489
-
490
- if legacyCheckpoint != nil {
491
- checkpointSpanGroup .Add (legacyCheckpoint .Spans ... )
492
- aggregatorCheckpoint .Spans = legacyCheckpoint .Spans
493
- aggregatorCheckpoint .Timestamp = legacyCheckpoint .Timestamp
494
- }
495
- if log .V (2 ) {
496
- log .Infof (ctx , "aggregator checkpoint: %s" , aggregatorCheckpoint )
497
- }
498
-
499
456
aggregatorSpecs := make ([]* execinfrapb.ChangeAggregatorSpec , len (spanPartitions ))
500
457
for i , sp := range spanPartitions {
501
458
if log .ExpensiveLogEnabled (ctx , 2 ) {
502
459
log .Infof (ctx , "watched spans for node %d: %v" , sp .SQLInstanceID , sp )
503
460
}
504
- watches := make ([]execinfrapb.ChangeAggregatorSpec_Watch , len (sp .Spans ))
505
461
506
- var initialHighWaterPtr * hlc. Timestamp
462
+ watches := make ([]execinfrapb. ChangeAggregatorSpec_Watch , len ( sp . Spans ))
507
463
for watchIdx , nodeSpan := range sp .Spans {
508
- if evalCtx .Settings .Version .IsActive (ctx , clusterversion .V25_2 ) {
509
- // If the cluster has been fully upgraded to v25.2, we should populate
510
- // the initial highwater of ChangeAggregatorSpec_Watch and leave the
511
- // initial resolved of each span empty. We rely on the aggregators to
512
- // forward the checkpointed timestamp for every span based on
513
- // aggregatorCheckpoint.
514
- watches [watchIdx ] = execinfrapb.ChangeAggregatorSpec_Watch {
515
- Span : nodeSpan ,
516
- }
517
- initialHighWaterPtr = & initialHighWater
518
- } else {
519
- // If the cluster has not been fully upgraded to v25.2, we should
520
- // leave the initial highwater of ChangeAggregatorSpec_Watch as nil.
521
- // We rely on this to tell the aggregators to the initial resolved
522
- // timestamp for each span to infer the initial highwater. Read more
523
- // from changeAggregator.getInitialHighWaterAndSpans.
524
- initialResolved := initialHighWater
525
- if checkpointSpanGroup .Encloses (nodeSpan ) {
526
- initialResolved = legacyCheckpoint .Timestamp
527
- }
528
- watches [watchIdx ] = execinfrapb.ChangeAggregatorSpec_Watch {
529
- Span : nodeSpan ,
530
- InitialResolved : initialResolved ,
531
- }
464
+ watches [watchIdx ] = execinfrapb.ChangeAggregatorSpec_Watch {
465
+ Span : nodeSpan ,
532
466
}
533
467
}
534
468
535
469
aggregatorSpecs [i ] = & execinfrapb.ChangeAggregatorSpec {
536
470
Watches : watches ,
537
- Checkpoint : aggregatorCheckpoint ,
538
- InitialHighWater : initialHighWaterPtr ,
471
+ InitialHighWater : & initialHighWater ,
539
472
SpanLevelCheckpoint : spanLevelCheckpoint ,
540
473
Feed : details ,
541
474
UserProto : execCtx .User ().EncodeProto (),
@@ -550,17 +483,12 @@ func makePlan(
550
483
// is created, even if it is paused and unpaused, but #28982 describes some
551
484
// ways that this might happen in the future.
552
485
changeFrontierSpec := execinfrapb.ChangeFrontierSpec {
553
- TrackedSpans : trackedSpans ,
554
- Feed : details ,
555
- JobID : jobID ,
556
- UserProto : execCtx .User ().EncodeProto (),
557
- Description : description ,
558
- }
559
-
560
- if spanLevelCheckpoint != nil {
561
- changeFrontierSpec .SpanLevelCheckpoint = spanLevelCheckpoint
562
- } else {
563
- changeFrontierSpec .SpanLevelCheckpoint = checkpoint .ConvertFromLegacyCheckpoint (legacyCheckpoint , details .StatementTime , initialHighWater )
486
+ TrackedSpans : trackedSpans ,
487
+ SpanLevelCheckpoint : spanLevelCheckpoint ,
488
+ Feed : details ,
489
+ JobID : jobID ,
490
+ UserProto : execCtx .User ().EncodeProto (),
491
+ Description : description ,
564
492
}
565
493
566
494
if haveKnobs && maybeCfKnobs .OnDistflowSpec != nil {
0 commit comments