Skip to content

Commit dcd914a

Browse files
committed
changefeedccl: fix job progress checkpoint fields invariant violation
Release note (bug fix): A bug where a changefeed that was created before v25.2 could fail after upgrading to v25.2 with the error message `both legacy and current checkpoint set on change aggregator spec` has now been fixed.
1 parent ade0d13 commit dcd914a

File tree

8 files changed

+151
-25
lines changed

8 files changed

+151
-25
lines changed

pkg/ccl/changefeedccl/alter_changefeed_stmt.go

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -170,8 +170,10 @@ func alterChangefeedPlanHook(
170170
if !changefeedProgress.Checkpoint.IsEmpty() {
171171
return errors.AssertionFailedf("both legacy and current checkpoint set on changefeed job progress")
172172
}
173-
changefeedProgress.Checkpoint = checkpoint.ConvertToLegacyCheckpoint(changefeedProgress.SpanLevelCheckpoint)
174-
changefeedProgress.SpanLevelCheckpoint = nil
173+
legacyCheckpoint := checkpoint.ConvertToLegacyCheckpoint(changefeedProgress.SpanLevelCheckpoint)
174+
if err := changefeedProgress.SetCheckpoint(legacyCheckpoint, nil); err != nil {
175+
return err
176+
}
175177
}
176178
}
177179
newChangefeedStmt.Targets = newTargets
@@ -859,21 +861,15 @@ func generateNewProgress(
859861
}
860862

861863
func removeSpansFromProgress(
862-
prevProgress jobspb.Progress, spansToRemove []roachpb.Span, statementTime hlc.Timestamp,
864+
progress jobspb.Progress, spansToRemove []roachpb.Span, statementTime hlc.Timestamp,
863865
) error {
864-
changefeedProgress := prevProgress.GetChangefeed()
865-
if changefeedProgress == nil {
866-
return nil
867-
}
868-
changefeedCheckpoint := changefeedProgress.Checkpoint
869-
if changefeedCheckpoint == nil {
870-
return nil
871-
}
872-
873-
spanLevelCheckpoint, err := getSpanLevelCheckpointFromProgress(prevProgress, statementTime)
866+
spanLevelCheckpoint, err := getSpanLevelCheckpointFromProgress(progress, statementTime)
874867
if err != nil {
875868
return err
876869
}
870+
if spanLevelCheckpoint == nil {
871+
return nil
872+
}
877873
checkpointSpansMap := make(map[hlc.Timestamp]roachpb.Spans)
878874
for ts, sp := range spanLevelCheckpoint.All() {
879875
var spanGroup roachpb.SpanGroup
@@ -883,7 +879,10 @@ func removeSpansFromProgress(
883879
checkpointSpansMap[ts] = spans
884880
}
885881
}
886-
changefeedProgress.SpanLevelCheckpoint = jobspb.NewTimestampSpansMap(checkpointSpansMap)
882+
spanLevelCheckpoint = jobspb.NewTimestampSpansMap(checkpointSpansMap)
883+
if err := progress.GetChangefeed().SetCheckpoint(nil, spanLevelCheckpoint); err != nil {
884+
return err
885+
}
887886

888887
return nil
889888
}

pkg/ccl/changefeedccl/changefeed_dist.go

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -261,16 +261,33 @@ func startDistChangefeed(
261261
dsp := execCtx.DistSQLPlanner()
262262

263263
//lint:ignore SA1019 deprecated usage
264-
var checkpoint *jobspb.ChangefeedProgress_Checkpoint
264+
var legacyCheckpoint *jobspb.ChangefeedProgress_Checkpoint
265265
if progress := localState.progress.GetChangefeed(); progress != nil && progress.Checkpoint != nil {
266-
checkpoint = progress.Checkpoint
266+
legacyCheckpoint = progress.Checkpoint
267267
}
268268
var spanLevelCheckpoint *jobspb.TimestampSpansMap
269269
if progress := localState.progress.GetChangefeed(); progress != nil && progress.SpanLevelCheckpoint != nil {
270270
spanLevelCheckpoint = progress.SpanLevelCheckpoint
271271
}
272+
if legacyCheckpoint != nil && spanLevelCheckpoint != nil {
273+
if legacyCheckpoint.Timestamp.After(spanLevelCheckpoint.MinTimestamp()) {
274+
// We should never be writing the legacy checkpoint again once we
275+
// start writing the new checkpoint format. If we do, that signals
276+
// a missing or incorrect version gate check somewhere.
277+
return errors.AssertionFailedf("both legacy and current checkpoint set on " +
278+
"changefeed job progress and legacy checkpoint has later timestamp")
279+
}
280+
// This should be an assertion failure but unfortunately due to a bug
281+
// that was included in earlier versions of 25.2 (#148620), we may fail
282+
// to clear the legacy checkpoint when we start writing the new one.
283+
// We instead discard the legacy checkpoint here and it will eventually be
284+
// cleared once the cluster is running a newer patch release with the fix.
285+
log.Warningf(ctx, "both legacy and current checkpoint set on changefeed job progress; "+
286+
"discarding legacy checkpoint")
287+
legacyCheckpoint = nil
288+
}
272289
p, planCtx, err := makePlan(execCtx, jobID, details, description, initialHighWater,
273-
trackedSpans, checkpoint, spanLevelCheckpoint, localState.drainingNodes)(ctx, dsp)
290+
trackedSpans, legacyCheckpoint, spanLevelCheckpoint, localState.drainingNodes)(ctx, dsp)
274291
if err != nil {
275292
return err
276293
}

pkg/ccl/changefeedccl/changefeed_processors.go

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1148,11 +1148,11 @@ func (cs *cachedState) SetHighwater(frontier hlc.Timestamp) {
11481148
}
11491149

11501150
// SetCheckpoint implements the eval.ChangefeedState interface.
1151-
func (cs *cachedState) SetCheckpoint(checkpoint *jobspb.TimestampSpansMap) {
1152-
// NB: It's not necessary to set the legacy checkpoint field because this
1151+
func (cs *cachedState) SetCheckpoint(checkpoint *jobspb.TimestampSpansMap) error {
1152+
// It's not necessary to do a version gate check here because this
11531153
// copy of the checkpoint is only used in-memory on a coordinator node that
11541154
// knows about the new field.
1155-
cs.progress.Details.(*jobspb.Progress_Changefeed).Changefeed.SpanLevelCheckpoint = checkpoint
1155+
return cs.progress.Details.(*jobspb.Progress_Changefeed).Changefeed.SetCheckpoint(nil, checkpoint)
11561156
}
11571157

11581158
// AggregatorFrontierSpans returns an iterator over the spans in the aggregator
@@ -1864,10 +1864,14 @@ func (cf *changeFrontier) checkpointJobProgress(
18641864

18651865
changefeedProgress := progress.Details.(*jobspb.Progress_Changefeed).Changefeed
18661866
if cv.IsActive(cf.Ctx(), clusterversion.V25_2) {
1867-
changefeedProgress.SpanLevelCheckpoint = spanLevelCheckpoint
1867+
if err := changefeedProgress.SetCheckpoint(nil, spanLevelCheckpoint); err != nil {
1868+
return err
1869+
}
18681870
} else {
18691871
legacyCheckpoint = checkpoint.ConvertToLegacyCheckpoint(spanLevelCheckpoint)
1870-
changefeedProgress.Checkpoint = legacyCheckpoint
1872+
if err := changefeedProgress.SetCheckpoint(legacyCheckpoint, nil); err != nil {
1873+
return err
1874+
}
18711875
}
18721876

18731877
if ptsUpdated, err = cf.manageProtectedTimestamps(ctx, txn, changefeedProgress); err != nil {
@@ -1900,7 +1904,9 @@ func (cf *changeFrontier) checkpointJobProgress(
19001904
}
19011905

19021906
cf.localState.SetHighwater(frontier)
1903-
cf.localState.SetCheckpoint(spanLevelCheckpoint)
1907+
if err := cf.localState.SetCheckpoint(spanLevelCheckpoint); err != nil {
1908+
return false, err
1909+
}
19041910

19051911
return true, nil
19061912
}

pkg/ccl/changefeedccl/changefeed_stmt.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1642,7 +1642,9 @@ func reconcileJobStateWithLocalState(
16421642
if updateHW {
16431643
localState.SetHighwater(sf.Frontier())
16441644
}
1645-
localState.SetCheckpoint(checkpoint)
1645+
if err := localState.SetCheckpoint(checkpoint); err != nil {
1646+
return err
1647+
}
16461648
if log.V(1) {
16471649
log.Infof(ctx, "Applying checkpoint to job record: hw=%v, cf=%v",
16481650
localState.progress.GetHighWater(), localState.progress.GetChangefeed())

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11703,3 +11703,83 @@ func TestCloudstorageParallelCompression(t *testing.T) {
1170311703
}
1170411704
})
1170511705
}
11706+
11707+
// TestChangefeedResumeWithBothLegacyAndCurrentCheckpoint is a regression
11708+
// test for #148620, which was a bug where the legacy checkpoint was not
11709+
// being cleared after the cluster was upgraded to 25.2 and subsequently
11710+
// causing an assertion error when resuming.
11711+
func TestChangefeedResumeWithBothLegacyAndCurrentCheckpoint(t *testing.T) {
11712+
defer leaktest.AfterTest(t)()
11713+
defer log.Scope(t).Close(t)
11714+
11715+
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
11716+
ctx := context.Background()
11717+
sqlDB := sqlutils.MakeSQLRunner(s.DB)
11718+
11719+
// Create a table with 11 ranges.
11720+
const numRows = 10
11721+
const numRanges = numRows + 1
11722+
sqlDB.ExecMultiple(t,
11723+
`CREATE TABLE foo (a INT PRIMARY KEY)`,
11724+
fmt.Sprintf(`INSERT INTO foo SELECT * FROM generate_series(1, %d)`, numRows),
11725+
fmt.Sprintf(`ALTER TABLE foo SPLIT AT (SELECT * FROM generate_series(1, %d))`, numRows),
11726+
)
11727+
fooSpan := desctestutils.
11728+
TestingGetPublicTableDescriptor(s.Server.DB(), s.Codec, "d", "foo").
11729+
PrimaryIndexSpan(s.Codec)
11730+
ranges, _, err := s.Server.
11731+
DistSenderI().(*kvcoord.DistSender).
11732+
AllRangeSpans(ctx, []roachpb.Span{fooSpan})
11733+
require.NoError(t, err)
11734+
require.Len(t, ranges, numRanges)
11735+
11736+
cf := feed(t, f, `CREATE CHANGEFEED FOR foo WITH no_initial_scan`)
11737+
defer closeFeed(t, cf)
11738+
11739+
jobFeed, ok := cf.(cdctest.EnterpriseTestFeed)
11740+
require.True(t, ok)
11741+
11742+
sqlDB.Exec(t, `PAUSE JOB $1`, jobFeed.JobID())
11743+
waitForJobState(sqlDB, t, jobFeed.JobID(), jobs.StatePaused)
11744+
hw, err := jobFeed.HighWaterMark()
11745+
require.NoError(t, err)
11746+
11747+
registry := s.Server.JobRegistry().(*jobs.Registry)
11748+
11749+
// Manually insert both a legacy and current checkpoint containing
11750+
// a random range from the table.
11751+
rnd, _ := randutil.NewTestRand()
11752+
randomRange := ranges[rnd.Intn(len(ranges))]
11753+
err = registry.UpdateJobWithTxn(ctx, jobFeed.JobID(), nil,
11754+
func(txn isql.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error {
11755+
checkpointTS := hw.Add(int64(time.Nanosecond), 0)
11756+
progress := md.Progress
11757+
progress.Details = jobspb.WrapProgressDetails(jobspb.ChangefeedProgress{
11758+
//lint:ignore SA1019 deprecated usage
11759+
Checkpoint: &jobspb.ChangefeedProgress_Checkpoint{
11760+
Spans: []roachpb.Span{randomRange},
11761+
Timestamp: checkpointTS,
11762+
},
11763+
SpanLevelCheckpoint: jobspb.NewTimestampSpansMap(
11764+
map[hlc.Timestamp]roachpb.Spans{
11765+
checkpointTS: []roachpb.Span{randomRange},
11766+
},
11767+
),
11768+
})
11769+
ju.UpdateProgress(progress)
11770+
return nil
11771+
})
11772+
require.NoError(t, err)
11773+
11774+
sqlDB.Exec(t, `RESUME JOB $1`, jobFeed.JobID())
11775+
waitForJobState(sqlDB, t, jobFeed.JobID(), jobs.StateRunning)
11776+
11777+
// Wait for highwater to advance past the current time.
11778+
var tsStr string
11779+
sqlDB.QueryRow(t, `SELECT cluster_logical_timestamp()`).Scan(&tsStr)
11780+
ts := parseTimeToHLC(t, tsStr)
11781+
require.NoError(t, jobFeed.WaitForHighWaterMark(ts))
11782+
}
11783+
11784+
cdcTest(t, testFn, feedTestEnterpriseSinks)
11785+
}

pkg/jobs/jobspb/jobs.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/cockroachdb/cockroach/pkg/util/hlc"
1717
"github.com/cockroachdb/cockroach/pkg/util/iterutil"
1818
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
19+
"github.com/cockroachdb/errors"
1920
)
2021

2122
// JobID is the ID of a job.
@@ -152,6 +153,20 @@ func (m *ChangefeedProgress_Checkpoint) IsEmpty() bool {
152153
return m == nil || (len(m.Spans) == 0 && m.Timestamp.IsEmpty())
153154
}
154155

156+
// SetCheckpoint is a setter for the checkpoint fields in ChangefeedProgress.
157+
// It enforces the invariant that at most one of them can be non-nil.
158+
func (m *ChangefeedProgress) SetCheckpoint(
159+
legacyCheckpoint *ChangefeedProgress_Checkpoint, spanLevelCheckpoint *TimestampSpansMap,
160+
) error {
161+
if legacyCheckpoint != nil && spanLevelCheckpoint != nil {
162+
return errors.AssertionFailedf(
163+
"attempting to set both legacy and current checkpoint on changefeed job progress")
164+
}
165+
m.Checkpoint = legacyCheckpoint
166+
m.SpanLevelCheckpoint = spanLevelCheckpoint
167+
return nil
168+
}
169+
155170
func (r RestoreDetails) OnlineImpl() bool {
156171
return r.ExperimentalCopy || r.ExperimentalOnline
157172
}

pkg/jobs/jobspb/jobs.proto

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1217,6 +1217,10 @@ message ChangefeedProgress {
12171217
// changefeed. It consists of a list of spans and a single timestamp
12181218
// representing the minimum resolved timestamp of the spans.
12191219
// It is now deprecated in favor of SpanLevelCheckpoint.
1220+
//
1221+
// Invariant: At most one of Checkpoint and SpanLevelCheckpoint should be
1222+
// non-nil at any time.
1223+
//
12201224
// TODO(#139734): Delete this field and mark field number as reserved.
12211225
Checkpoint checkpoint = 4 [deprecated=true];
12221226

@@ -1243,6 +1247,9 @@ message ChangefeedProgress {
12431247
// checkpoint to its corresponding resolved timestamp, which will be higher
12441248
// than the overall resolved timestamp and thus allow us to do less work.
12451249
// This is especially useful during backfills or if some spans are lagging.
1250+
//
1251+
// Invariant: At most one of Checkpoint and SpanLevelCheckpoint should be
1252+
// non-nil at any time.
12461253
TimestampSpansMap span_level_checkpoint = 5;
12471254
}
12481255

pkg/sql/sem/eval/deps.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -665,7 +665,7 @@ type ChangefeedState interface {
665665
SetHighwater(frontier hlc.Timestamp)
666666

667667
// SetCheckpoint sets the checkpoint for the changefeed.
668-
SetCheckpoint(checkpoint *jobspb.TimestampSpansMap)
668+
SetCheckpoint(checkpoint *jobspb.TimestampSpansMap) error
669669
}
670670

671671
// TenantOperator is capable of interacting with tenant state, allowing SQL

0 commit comments

Comments
 (0)