Skip to content

Commit 5355313

Browse files
committed
changefeedccl: delete legacy checkpoint code
Since 25.2 is a required upgrade, we can now safely delete all of the legacy checkpoint code. Release note: None
1 parent 36b970c commit 5355313

15 files changed

+66
-781
lines changed

pkg/ccl/changefeedccl/alter_changefeed_stmt.go

Lines changed: 7 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@ import (
1515
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdceval"
1616
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
1717
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedvalidators"
18-
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/checkpoint"
19-
"github.com/cockroachdb/cockroach/pkg/clusterversion"
2018
"github.com/cockroachdb/cockroach/pkg/jobs"
2119
"github.com/cockroachdb/cockroach/pkg/jobs/jobsauth"
2220
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
@@ -161,18 +159,6 @@ func alterChangefeedPlanHook(
161159
if err != nil {
162160
return err
163161
}
164-
if !p.ExecCfg().Settings.Version.IsActive(ctx, clusterversion.V25_2) {
165-
changefeedProgress := newProgress.GetChangefeed()
166-
if changefeedProgress != nil && !changefeedProgress.SpanLevelCheckpoint.IsEmpty() {
167-
if !changefeedProgress.Checkpoint.IsEmpty() {
168-
return errors.AssertionFailedf("both legacy and current checkpoint set on changefeed job progress")
169-
}
170-
legacyCheckpoint := checkpoint.ConvertToLegacyCheckpoint(changefeedProgress.SpanLevelCheckpoint)
171-
if err := changefeedProgress.SetCheckpoint(legacyCheckpoint, nil); err != nil {
172-
return err
173-
}
174-
}
175-
}
176162
newChangefeedStmt.TableTargets = newTargets
177163

178164
if prevDetails.Select != "" {
@@ -653,7 +639,7 @@ func generateAndValidateNewTargets(
653639
}
654640
}
655641
droppedTargetSpans := fetchSpansForDescs(p, droppedIDs)
656-
if err := removeSpansFromProgress(newJobProgress, droppedTargetSpans, newJobStatementTime); err != nil {
642+
if err := removeSpansFromProgress(newJobProgress, droppedTargetSpans); err != nil {
657643
return nil, nil, hlc.Timestamp{}, nil, err
658644
}
659645
}
@@ -759,8 +745,7 @@ func generateNewProgress(
759745
haveHighwater := prevHighWater != nil && prevHighWater.IsSet()
760746
// TODO(#142376): Whether a checkpoint exists seems orthogonal to what
761747
// we do in this function. Consider removing this flag.
762-
haveCheckpoint := changefeedProgress != nil &&
763-
(!changefeedProgress.Checkpoint.IsEmpty() || !changefeedProgress.SpanLevelCheckpoint.IsEmpty())
748+
haveCheckpoint := changefeedProgress != nil && !changefeedProgress.SpanLevelCheckpoint.IsEmpty()
764749

765750
// Check if the progress does not need to be updated. The progress does not
766751
// need to be updated if:
@@ -834,7 +819,7 @@ func generateNewProgress(
834819
// the events for the new table start at the ALTER CHANGEFEED statement
835820
// time instead.
836821

837-
spanLevelCheckpoint, err := getSpanLevelCheckpointFromProgress(prevProgress, prevStatementTime)
822+
spanLevelCheckpoint, err := getSpanLevelCheckpointFromProgress(prevProgress)
838823
if err != nil {
839824
return jobspb.Progress{}, hlc.Timestamp{}, err
840825
}
@@ -857,10 +842,8 @@ func generateNewProgress(
857842
return newProgress, prevStatementTime, nil
858843
}
859844

860-
func removeSpansFromProgress(
861-
progress jobspb.Progress, spansToRemove []roachpb.Span, statementTime hlc.Timestamp,
862-
) error {
863-
spanLevelCheckpoint, err := getSpanLevelCheckpointFromProgress(progress, statementTime)
845+
func removeSpansFromProgress(progress jobspb.Progress, spansToRemove []roachpb.Span) error {
846+
spanLevelCheckpoint, err := getSpanLevelCheckpointFromProgress(progress)
864847
if err != nil {
865848
return err
866849
}
@@ -876,31 +859,18 @@ func removeSpansFromProgress(
876859
checkpointSpansMap[ts] = spans
877860
}
878861
}
879-
spanLevelCheckpoint = jobspb.NewTimestampSpansMap(checkpointSpansMap)
880-
if err := progress.GetChangefeed().SetCheckpoint(nil, spanLevelCheckpoint); err != nil {
881-
return err
882-
}
862+
progress.GetChangefeed().SpanLevelCheckpoint = jobspb.NewTimestampSpansMap(checkpointSpansMap)
883863

884864
return nil
885865
}
886866

887867
func getSpanLevelCheckpointFromProgress(
888-
progress jobspb.Progress, statementTime hlc.Timestamp,
868+
progress jobspb.Progress,
889869
) (*jobspb.TimestampSpansMap, error) {
890870
changefeedProgress := progress.GetChangefeed()
891871
if changefeedProgress == nil {
892872
return nil, nil
893873
}
894-
895-
if !changefeedProgress.Checkpoint.IsEmpty() && !changefeedProgress.SpanLevelCheckpoint.IsEmpty() {
896-
return nil, errors.AssertionFailedf("both legacy and current checkpoint set on changefeed job progress")
897-
}
898-
899-
if legacyCheckpoint := changefeedProgress.Checkpoint; !legacyCheckpoint.IsEmpty() {
900-
hw := progress.GetHighWater()
901-
return checkpoint.ConvertFromLegacyCheckpoint(legacyCheckpoint, statementTime, *hw), nil
902-
}
903-
904874
return changefeedProgress.SpanLevelCheckpoint, nil
905875
}
906876

pkg/ccl/changefeedccl/changefeed_dist.go

Lines changed: 13 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,7 @@ import (
1313

1414
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdceval"
1515
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
16-
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/checkpoint"
1716
"github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvfollowerreadsccl"
18-
"github.com/cockroachdb/cockroach/pkg/clusterversion"
1917
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
2018
"github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler"
2119
"github.com/cockroachdb/cockroach/pkg/keys"
@@ -260,38 +258,15 @@ func startDistChangefeed(
260258

261259
dsp := execCtx.DistSQLPlanner()
262260

263-
//lint:ignore SA1019 deprecated usage
264-
var legacyCheckpoint *jobspb.ChangefeedProgress_Checkpoint
265-
if progress := localState.progress.GetChangefeed(); progress != nil && progress.Checkpoint != nil {
266-
legacyCheckpoint = progress.Checkpoint
267-
}
268261
var spanLevelCheckpoint *jobspb.TimestampSpansMap
269262
if progress := localState.progress.GetChangefeed(); progress != nil && progress.SpanLevelCheckpoint != nil {
270263
spanLevelCheckpoint = progress.SpanLevelCheckpoint
271-
}
272-
if legacyCheckpoint != nil && spanLevelCheckpoint != nil {
273-
if legacyCheckpoint.Timestamp.After(spanLevelCheckpoint.MinTimestamp()) {
274-
// We should never be writing the legacy checkpoint again once we
275-
// start writing the new checkpoint format. If we do, that signals
276-
// a missing or incorrect version gate check somewhere.
277-
return errors.AssertionFailedf("both legacy and current checkpoint set on " +
278-
"changefeed job progress and legacy checkpoint has later timestamp")
279-
}
280-
// This should always be an assertion failure but unfortunately due to a bug
281-
// that was included in earlier versions of 25.2 (#148620), we may fail
282-
// to clear the legacy checkpoint when we start writing the new one.
283-
// We instead discard the legacy checkpoint here and it will eventually be
284-
// cleared once the cluster is running a newer patch release with the fix.
285-
if buildutil.CrdbTestBuild {
286-
return errors.AssertionFailedf("both legacy and current checkpoint set on " +
287-
"changefeed job progress")
264+
if log.V(2) {
265+
log.Infof(ctx, "span-level checkpoint: %s", spanLevelCheckpoint)
288266
}
289-
log.Warningf(ctx, "both legacy and current checkpoint set on changefeed job progress; "+
290-
"discarding legacy checkpoint")
291-
legacyCheckpoint = nil
292267
}
293268
p, planCtx, err := makePlan(execCtx, jobID, details, description, initialHighWater,
294-
trackedSpans, legacyCheckpoint, spanLevelCheckpoint, localState.drainingNodes)(ctx, dsp)
269+
trackedSpans, spanLevelCheckpoint, localState.drainingNodes)(ctx, dsp)
295270
if err != nil {
296271
return err
297272
}
@@ -410,8 +385,6 @@ func makePlan(
410385
description string,
411386
initialHighWater hlc.Timestamp,
412387
trackedSpans []roachpb.Span,
413-
//lint:ignore SA1019 deprecated usage
414-
legacyCheckpoint *jobspb.ChangefeedProgress_Checkpoint,
415388
spanLevelCheckpoint *jobspb.TimestampSpansMap,
416389
drainingNodes []roachpb.NodeID,
417390
) func(context.Context, *sql.DistSQLPlanner) (*sql.PhysicalPlan, *sql.PlanningCtx, error) {
@@ -480,62 +453,22 @@ func makePlan(
480453
maybeCfKnobs.SpanPartitionsCallback(spanPartitions)
481454
}
482455

483-
// Use the same checkpoint for all aggregators; each aggregator will only look at
484-
// spans that are assigned to it.
485-
// We could compute per-aggregator checkpoint, but that's probably an overkill.
486-
//lint:ignore SA1019 deprecated usage
487-
var aggregatorCheckpoint execinfrapb.ChangeAggregatorSpec_Checkpoint
488-
var checkpointSpanGroup roachpb.SpanGroup
489-
490-
if legacyCheckpoint != nil {
491-
checkpointSpanGroup.Add(legacyCheckpoint.Spans...)
492-
aggregatorCheckpoint.Spans = legacyCheckpoint.Spans
493-
aggregatorCheckpoint.Timestamp = legacyCheckpoint.Timestamp
494-
}
495-
if log.V(2) {
496-
log.Infof(ctx, "aggregator checkpoint: %s", aggregatorCheckpoint)
497-
}
498-
499456
aggregatorSpecs := make([]*execinfrapb.ChangeAggregatorSpec, len(spanPartitions))
500457
for i, sp := range spanPartitions {
501458
if log.ExpensiveLogEnabled(ctx, 2) {
502459
log.Infof(ctx, "watched spans for node %d: %v", sp.SQLInstanceID, sp)
503460
}
504-
watches := make([]execinfrapb.ChangeAggregatorSpec_Watch, len(sp.Spans))
505461

506-
var initialHighWaterPtr *hlc.Timestamp
462+
watches := make([]execinfrapb.ChangeAggregatorSpec_Watch, len(sp.Spans))
507463
for watchIdx, nodeSpan := range sp.Spans {
508-
if evalCtx.Settings.Version.IsActive(ctx, clusterversion.V25_2) {
509-
// If the cluster has been fully upgraded to v25.2, we should populate
510-
// the initial highwater of ChangeAggregatorSpec_Watch and leave the
511-
// initial resolved of each span empty. We rely on the aggregators to
512-
// forward the checkpointed timestamp for every span based on
513-
// aggregatorCheckpoint.
514-
watches[watchIdx] = execinfrapb.ChangeAggregatorSpec_Watch{
515-
Span: nodeSpan,
516-
}
517-
initialHighWaterPtr = &initialHighWater
518-
} else {
519-
// If the cluster has not been fully upgraded to v25.2, we should
520-
// leave the initial highwater of ChangeAggregatorSpec_Watch as nil.
521-
// We rely on this to tell the aggregators to the initial resolved
522-
// timestamp for each span to infer the initial highwater. Read more
523-
// from changeAggregator.getInitialHighWaterAndSpans.
524-
initialResolved := initialHighWater
525-
if checkpointSpanGroup.Encloses(nodeSpan) {
526-
initialResolved = legacyCheckpoint.Timestamp
527-
}
528-
watches[watchIdx] = execinfrapb.ChangeAggregatorSpec_Watch{
529-
Span: nodeSpan,
530-
InitialResolved: initialResolved,
531-
}
464+
watches[watchIdx] = execinfrapb.ChangeAggregatorSpec_Watch{
465+
Span: nodeSpan,
532466
}
533467
}
534468

535469
aggregatorSpecs[i] = &execinfrapb.ChangeAggregatorSpec{
536470
Watches: watches,
537-
Checkpoint: aggregatorCheckpoint,
538-
InitialHighWater: initialHighWaterPtr,
471+
InitialHighWater: &initialHighWater,
539472
SpanLevelCheckpoint: spanLevelCheckpoint,
540473
Feed: details,
541474
UserProto: execCtx.User().EncodeProto(),
@@ -550,17 +483,12 @@ func makePlan(
550483
// is created, even if it is paused and unpaused, but #28982 describes some
551484
// ways that this might happen in the future.
552485
changeFrontierSpec := execinfrapb.ChangeFrontierSpec{
553-
TrackedSpans: trackedSpans,
554-
Feed: details,
555-
JobID: jobID,
556-
UserProto: execCtx.User().EncodeProto(),
557-
Description: description,
558-
}
559-
560-
if spanLevelCheckpoint != nil {
561-
changeFrontierSpec.SpanLevelCheckpoint = spanLevelCheckpoint
562-
} else {
563-
changeFrontierSpec.SpanLevelCheckpoint = checkpoint.ConvertFromLegacyCheckpoint(legacyCheckpoint, details.StatementTime, initialHighWater)
486+
TrackedSpans: trackedSpans,
487+
SpanLevelCheckpoint: spanLevelCheckpoint,
488+
Feed: details,
489+
JobID: jobID,
490+
UserProto: execCtx.User().EncodeProto(),
491+
Description: description,
564492
}
565493

566494
if haveKnobs && maybeCfKnobs.OnDistflowSpec != nil {

0 commit comments

Comments
 (0)