Skip to content

Commit 540a042

Browse files
craig[bot]andyyang890
andcommitted
Merge #142659
142659: changefeedccl: write new span-level checkpoint during ALTER CHANGEFEED r=asg0451 a=andyyang890 This patch starts writing the new span-level checkpoint during ALTER CHANGEFEED on 25.2+ clusters. Fixes #140509 Release note: None Co-authored-by: Andy Yang <[email protected]>
2 parents 37c1208 + 28c583c commit 540a042

File tree

12 files changed

+341
-154
lines changed

12 files changed

+341
-154
lines changed

pkg/ccl/changefeedccl/alter_changefeed_stmt.go

Lines changed: 111 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,16 @@ package changefeedccl
77

88
import (
99
"context"
10+
"maps"
1011
"net/url"
1112

1213
"github.com/cockroachdb/cockroach/pkg/backup/backupresolver"
1314
"github.com/cockroachdb/cockroach/pkg/build"
1415
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdceval"
1516
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
1617
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedvalidators"
18+
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/checkpoint"
19+
"github.com/cockroachdb/cockroach/pkg/clusterversion"
1720
"github.com/cockroachdb/cockroach/pkg/jobs"
1821
"github.com/cockroachdb/cockroach/pkg/jobs/jobsauth"
1922
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
@@ -154,13 +157,23 @@ func alterChangefeedPlanHook(
154157
newTargets, newProgress, newStatementTime, originalSpecs, err := generateAndValidateNewTargets(
155158
ctx, exprEval, p,
156159
alterChangefeedStmt.Cmds,
157-
newOptions.AsMap(), // TODO: Remove .AsMap()
160+
newOptions,
158161
prevDetails, job.Progress(),
159162
newSinkURI,
160163
)
161164
if err != nil {
162165
return err
163166
}
167+
if !p.ExecCfg().Settings.Version.IsActive(ctx, clusterversion.V25_2) {
168+
changefeedProgress := newProgress.GetChangefeed()
169+
if changefeedProgress != nil && !changefeedProgress.SpanLevelCheckpoint.IsEmpty() {
170+
if !changefeedProgress.Checkpoint.IsEmpty() {
171+
return errors.AssertionFailedf("both legacy and current checkpoint set on changefeed job progress")
172+
}
173+
changefeedProgress.Checkpoint = checkpoint.ConvertToLegacyCheckpoint(changefeedProgress.SpanLevelCheckpoint)
174+
changefeedProgress.SpanLevelCheckpoint = nil
175+
}
176+
}
164177
newChangefeedStmt.Targets = newTargets
165178

166179
if prevDetails.Select != "" {
@@ -375,7 +388,7 @@ func generateAndValidateNewTargets(
375388
exprEval exprutil.Evaluator,
376389
p sql.PlanHookState,
377390
alterCmds tree.AlterChangefeedCmds,
378-
opts map[string]string,
391+
opts changefeedbase.StatementOptions,
379392
prevDetails jobspb.ChangefeedDetails,
380393
prevProgress jobspb.Progress,
381394
sinkURI string,
@@ -406,7 +419,11 @@ func generateAndValidateNewTargets(
406419
// initial_scan set to only so that we only do an initial scan on an alter
407420
// changefeed with initial_scan = 'only' if the original one also had
408421
// initial_scan = 'only'.
409-
_, originalInitialScanOnlyOption := opts[changefeedbase.OptInitialScanOnly]
422+
originalInitialScanType, err := opts.GetInitialScanType()
423+
if err != nil {
424+
return nil, nil, hlc.Timestamp{}, nil, err
425+
}
426+
originalInitialScanOnlyOption := originalInitialScanType == changefeedbase.OnlyInitialScan
410427

411428
// When we add new targets with or without initial scans, indicating
412429
// initial_scan or no_initial_scan in the job description would lose its
@@ -415,9 +432,9 @@ func generateAndValidateNewTargets(
415432
// newly added targets, we will introduce the initial_scan opt after the
416433
// job record is created.
417434

418-
delete(opts, changefeedbase.OptInitialScanOnly)
419-
delete(opts, changefeedbase.OptNoInitialScan)
420-
delete(opts, changefeedbase.OptInitialScan)
435+
opts.Unset(changefeedbase.OptInitialScanOnly)
436+
opts.Unset(changefeedbase.OptNoInitialScan)
437+
opts.Unset(changefeedbase.OptInitialScan)
421438

422439
// the new progress and statement time will start from the progress and
423440
// statement time of the job prior to the alteration of the changefeed. Each
@@ -445,7 +462,7 @@ func generateAndValidateNewTargets(
445462

446463
prevTargets := AllTargets(prevDetails)
447464
noLongerExist := make(map[string]descpb.ID)
448-
err = prevTargets.EachTarget(func(targetSpec changefeedbase.Target) error {
465+
if err := prevTargets.EachTarget(func(targetSpec changefeedbase.Target) error {
449466
k := targetKey{TableID: targetSpec.TableID, FamilyName: tree.Name(targetSpec.FamilyName)}
450467
var desc catalog.TableDescriptor
451468
if d, exists := descResolver.DescByID[targetSpec.TableID]; exists {
@@ -482,9 +499,7 @@ func generateAndValidateNewTargets(
482499
StatementTimeName: string(targetSpec.StatementTimeName),
483500
}
484501
return nil
485-
})
486-
487-
if err != nil {
502+
}); err != nil {
488503
return nil, nil, hlc.Timestamp{}, nil, err
489504
}
490505

@@ -515,25 +530,14 @@ func generateAndValidateNewTargets(
515530
return nil, nil, hlc.Timestamp{}, nil, err
516531
}
517532

518-
var withInitialScan bool
519533
initialScanType, initialScanSet := targetOpts[changefeedbase.OptInitialScan]
520534
_, noInitialScanSet := targetOpts[changefeedbase.OptNoInitialScan]
521-
_, initialScanOnlySet := targetOpts[changefeedbase.OptInitialScanOnly]
522-
523-
if initialScanSet {
524-
if initialScanType == `no` || (initialScanType == `only` && !originalInitialScanOnlyOption) {
525-
withInitialScan = false
526-
} else {
527-
withInitialScan = true
528-
}
529-
} else {
530-
withInitialScan = false
531-
}
532535

533536
if initialScanType != `` && initialScanType != `yes` && initialScanType != `no` && initialScanType != `only` {
534537
return nil, nil, hlc.Timestamp{}, nil, pgerror.Newf(
535538
pgcode.InvalidParameterValue,
536-
`cannot set initial_scan to %q. possible values for initial_scan are "yes", "no", "only", or no value`, changefeedbase.OptInitialScan,
539+
`cannot set %q to %q. possible values for initial_scan are "yes", "no", "only", or no value`,
540+
changefeedbase.OptInitialScan, initialScanType,
537541
)
538542
}
539543

@@ -545,22 +549,11 @@ func generateAndValidateNewTargets(
545549
)
546550
}
547551

548-
if initialScanSet && initialScanOnlySet {
549-
return nil, nil, hlc.Timestamp{}, nil, pgerror.Newf(
550-
pgcode.InvalidParameterValue,
551-
`cannot specify both %q and %q`, changefeedbase.OptInitialScan,
552-
changefeedbase.OptInitialScanOnly,
553-
)
554-
}
555-
556-
if noInitialScanSet && initialScanOnlySet {
557-
return nil, nil, hlc.Timestamp{}, nil, pgerror.Newf(
558-
pgcode.InvalidParameterValue,
559-
`cannot specify both %q and %q`, changefeedbase.OptInitialScanOnly,
560-
changefeedbase.OptNoInitialScan,
561-
)
562-
}
552+
withInitialScan := (initialScanType == `` && initialScanSet) ||
553+
initialScanType == `yes` ||
554+
(initialScanType == `only` && originalInitialScanOnlyOption)
563555

556+
// TODO(#142376): Audit whether this list is generated correctly.
564557
var existingTargetIDs []descpb.ID
565558
for _, targetDesc := range newTableDescs {
566559
existingTargetIDs = append(existingTargetIDs, targetDesc.GetID())
@@ -659,7 +652,9 @@ func generateAndValidateNewTargets(
659652
}
660653
}
661654
droppedTargetSpans := fetchSpansForDescs(p, droppedIDs)
662-
removeSpansFromProgress(newJobProgress, droppedTargetSpans)
655+
if err := removeSpansFromProgress(newJobProgress, droppedTargetSpans, newJobStatementTime); err != nil {
656+
return nil, nil, hlc.Timestamp{}, nil, err
657+
}
663658
}
664659

665660
newTargetList := tree.ChangefeedTargets{}
@@ -750,7 +745,6 @@ func validateNewTargets(
750745
// no_initial_scan), and the current status of the job. If the progress does not
751746
// need to be updated, we will simply return the previous progress and statement
752747
// time that is passed into the function.
753-
// TODO(#140509): Update this function to work with the new span-level checkpoint.
754748
func generateNewProgress(
755749
prevProgress jobspb.Progress,
756750
prevStatementTime hlc.Timestamp,
@@ -760,12 +754,10 @@ func generateNewProgress(
760754
) (jobspb.Progress, hlc.Timestamp, error) {
761755
prevHighWater := prevProgress.GetHighWater()
762756
changefeedProgress := prevProgress.GetChangefeed()
763-
ptsRecord := uuid.UUID{}
764-
if changefeedProgress != nil {
765-
ptsRecord = changefeedProgress.ProtectedTimestampRecord
766-
}
767757

768-
haveHighwater := !(prevHighWater == nil || prevHighWater.IsEmpty())
758+
haveHighwater := prevHighWater != nil && prevHighWater.IsSet()
759+
// TODO(#142376): Whether a checkpoint exists seems orthogonal to what
760+
// we do in this function. Consider removing this flag.
769761
haveCheckpoint := changefeedProgress != nil &&
770762
(!changefeedProgress.Checkpoint.IsEmpty() || !changefeedProgress.SpanLevelCheckpoint.IsEmpty())
771763

@@ -774,6 +766,9 @@ func generateNewProgress(
774766
// * the high watermark is empty, and we would like to perform an initial scan.
775767
// * the high watermark is non-empty, the checkpoint is empty, and we do not want to
776768
// perform an initial scan.
769+
// TODO(#142376): Consider in the scenario where we have a highwater whether
770+
// we should starting sending events for the new table starting at the ALTER
771+
// CHANGEFEED statement time instead of the current highwater.
777772
if (!haveHighwater && withInitialScan) || (haveHighwater && !haveCheckpoint && !withInitialScan) {
778773
return prevProgress, prevStatementTime, nil
779774
}
@@ -788,6 +783,13 @@ func generateNewProgress(
788783
)
789784
}
790785

786+
// TODO(#142369): We should create a new PTS record instead of just
787+
// copying the old one.
788+
var ptsRecord uuid.UUID
789+
if changefeedProgress != nil {
790+
ptsRecord = changefeedProgress.ProtectedTimestampRecord
791+
}
792+
791793
// Check if the user is trying to perform an initial scan while the high
792794
// watermark is non-empty but the checkpoint is empty.
793795
if haveHighwater && !haveCheckpoint && withInitialScan {
@@ -797,19 +799,18 @@ func generateNewProgress(
797799
// To avoid this, we update the statement time of the job to the previous high
798800
// watermark, and add all the existing targets to the checkpoint to skip the
799801
// initial scan on these targets.
802+
// TODO(#142376): Consider whether we want to set the new statement time
803+
// to the actual new statement time (ALTER CHANGEFEED statement time).
800804
newStatementTime := *prevHighWater
801805

802806
newProgress := jobspb.Progress{
803807
Progress: &jobspb.Progress_HighWater{},
804808
Details: &jobspb.Progress_Changefeed{
805809
Changefeed: &jobspb.ChangefeedProgress{
806-
//lint:ignore SA1019 deprecated usage
807-
Checkpoint: &jobspb.ChangefeedProgress_Checkpoint{
808-
Spans: existingTargetSpans,
809-
// TODO(#140509): ALTER CHANGEFED should handle fine grained
810-
// progress and checkpointed timestamp properly.
811-
},
812810
ProtectedTimestampRecord: ptsRecord,
811+
SpanLevelCheckpoint: jobspb.NewTimestampSpansMap(map[hlc.Timestamp]roachpb.Spans{
812+
newStatementTime: existingTargetSpans,
813+
}),
813814
},
814815
},
815816
}
@@ -826,49 +827,89 @@ func generateNewProgress(
826827
// of the newly added targets so that the changefeed will skip performing
827828
// a backfill on these targets.
828829

829-
var mergedSpanGroup roachpb.SpanGroup
830-
if haveCheckpoint {
831-
mergedSpanGroup.Add(changefeedProgress.Checkpoint.Spans...)
830+
// TODO(#142376): In the case where we have a highwater, we will resend
831+
// events since the original statement time for all existing tables.
832+
// We might want to change this. We might also want to change whether
833+
// the events for the new table start at the ALTER CHANGEFEED statement
834+
// time instead.
835+
836+
spanLevelCheckpoint, err := getSpanLevelCheckpointFromProgress(prevProgress, prevStatementTime)
837+
if err != nil {
838+
return jobspb.Progress{}, hlc.Timestamp{}, err
832839
}
833-
mergedSpanGroup.Add(newSpans...)
840+
841+
checkpointSpansMap := maps.Collect(spanLevelCheckpoint.All())
842+
var spanGroup roachpb.SpanGroup
843+
spanGroup.Add(checkpointSpansMap[prevStatementTime]...)
844+
spanGroup.Add(newSpans...)
845+
checkpointSpansMap[prevStatementTime] = spanGroup.Slice()
834846

835847
newProgress := jobspb.Progress{
836848
Progress: &jobspb.Progress_HighWater{},
837849
Details: &jobspb.Progress_Changefeed{
838850
Changefeed: &jobspb.ChangefeedProgress{
839-
//lint:ignore SA1019 deprecated usage
840-
Checkpoint: &jobspb.ChangefeedProgress_Checkpoint{
841-
Spans: mergedSpanGroup.Slice(),
842-
},
843851
ProtectedTimestampRecord: ptsRecord,
852+
SpanLevelCheckpoint: jobspb.NewTimestampSpansMap(checkpointSpansMap),
844853
},
845854
},
846855
}
847856
return newProgress, prevStatementTime, nil
848857
}
849858

850-
// TODO(#140509): Update this function to work with the new span-level checkpoint.
851-
func removeSpansFromProgress(prevProgress jobspb.Progress, spansToRemove []roachpb.Span) {
859+
func removeSpansFromProgress(
860+
prevProgress jobspb.Progress, spansToRemove []roachpb.Span, statementTime hlc.Timestamp,
861+
) error {
852862
changefeedProgress := prevProgress.GetChangefeed()
853863
if changefeedProgress == nil {
854-
return
864+
return nil
855865
}
856866
changefeedCheckpoint := changefeedProgress.Checkpoint
857867
if changefeedCheckpoint == nil {
858-
return
868+
return nil
859869
}
860-
prevSpans := changefeedCheckpoint.Spans
861870

862-
var spanGroup roachpb.SpanGroup
863-
spanGroup.Add(prevSpans...)
864-
spanGroup.Sub(spansToRemove...)
865-
changefeedProgress.Checkpoint.Spans = spanGroup.Slice()
871+
spanLevelCheckpoint, err := getSpanLevelCheckpointFromProgress(prevProgress, statementTime)
872+
if err != nil {
873+
return err
874+
}
875+
checkpointSpansMap := make(map[hlc.Timestamp]roachpb.Spans)
876+
for ts, sp := range spanLevelCheckpoint.All() {
877+
var spanGroup roachpb.SpanGroup
878+
spanGroup.Add(sp...)
879+
spanGroup.Sub(spansToRemove...)
880+
if spans := spanGroup.Slice(); len(spans) > 0 {
881+
checkpointSpansMap[ts] = spans
882+
}
883+
}
884+
changefeedProgress.SpanLevelCheckpoint = jobspb.NewTimestampSpansMap(checkpointSpansMap)
885+
886+
return nil
887+
}
888+
889+
func getSpanLevelCheckpointFromProgress(
890+
progress jobspb.Progress, statementTime hlc.Timestamp,
891+
) (*jobspb.TimestampSpansMap, error) {
892+
changefeedProgress := progress.GetChangefeed()
893+
if changefeedProgress == nil {
894+
return nil, nil
895+
}
896+
897+
if !changefeedProgress.Checkpoint.IsEmpty() && !changefeedProgress.SpanLevelCheckpoint.IsEmpty() {
898+
return nil, errors.AssertionFailedf("both legacy and current checkpoint set on changefeed job progress")
899+
}
900+
901+
if legacyCheckpoint := changefeedProgress.Checkpoint; !legacyCheckpoint.IsEmpty() {
902+
hw := progress.GetHighWater()
903+
return checkpoint.ConvertFromLegacyCheckpoint(legacyCheckpoint, statementTime, *hw), nil
904+
}
905+
906+
return changefeedProgress.SpanLevelCheckpoint, nil
866907
}
867908

868-
func fetchSpansForDescs(p sql.PlanHookState, droppedIDs []descpb.ID) (primarySpans []roachpb.Span) {
909+
func fetchSpansForDescs(p sql.PlanHookState, descIDs []descpb.ID) (primarySpans []roachpb.Span) {
869910
seen := make(map[descpb.ID]struct{})
870911
codec := p.ExtendedEvalContext().Codec
871-
for _, id := range droppedIDs {
912+
for _, id := range descIDs {
872913
if _, isDup := seen[id]; isDup {
873914
continue
874915
}

0 commit comments

Comments
 (0)