Skip to content

Commit b721e45

Browse files
committed
changefeedccl: fix ALTER CHANGEFEED max PTS age bug
This patch fixes a bug where modifying a changefeed with ALTER CHANGEFEED that either unset or left the `gc_protect_expires_after` option unset would cause the changefeed's max PTS age to become unbounded instead of being set to the default value configured by the `changefeed.protect_timestamp.max_age` cluster setting. This bug was occurring because the ALTER CHANGEFEED code was manually querying the option instead of relying on the CREATE CHANGEFEED code path that also consulted the cluster setting. This inconsistency has now been corrected. Release note (bug fix): A bug where modifying a changefeed with ALTER CHANGEFEED that either unset or left the `gc_protect_expires_after` option unset would cause the changefeed's max PTS age to become unbounded instead of being set to the default value configured by the `changefeed.protect_timestamp.max_age` cluster setting.
1 parent a2e86c9 commit b721e45

File tree

3 files changed

+110
-23
lines changed

3 files changed

+110
-23
lines changed

pkg/ccl/changefeedccl/alter_changefeed_stmt.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -227,11 +227,11 @@ func alterChangefeedPlanHook(
227227
newPayload.Details = jobspb.WrapPayloadDetails(newDetails)
228228
newPayload.Description = jobRecord.Description
229229
newPayload.DescriptorIDs = jobRecord.DescriptorIDs
230-
newExpiration, err := newOptions.GetPTSExpiration()
231-
if err != nil {
232-
return err
233-
}
234-
newPayload.MaximumPTSAge = newExpiration
230+
231+
// The maximum PTS age on jobRecord will be set correctly (based on either
232+
// the option or cluster setting) by createChangefeedJobRecord.
233+
newPayload.MaximumPTSAge = jobRecord.MaximumPTSAge
234+
235235
j, err := p.ExecCfg().JobRegistry.LoadJobWithTxn(ctx, jobID, p.InternalSQLTxn())
236236
if err != nil {
237237
return err

pkg/ccl/changefeedccl/protected_timestamps_test.go

Lines changed: 104 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -424,27 +424,114 @@ func TestChangefeedCanceledWhenPTSIsOld(t *testing.T) {
424424
// single row with multiple versions.
425425
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b INT)`)
426426

427-
feed, err := f.Feed("CREATE CHANGEFEED FOR TABLE foo WITH protect_data_from_gc_on_pause, gc_protect_expires_after='24h'")
428-
require.NoError(t, err)
429-
defer func() {
430-
closeFeed(t, feed)
431-
}()
427+
t.Run("canceled due to gc_protect_expires_after option", func(t *testing.T) {
428+
testutils.RunValues(t, "initially-protected-with", []string{"none", "option", "setting"},
429+
func(t *testing.T, initialProtect string) {
430+
defer func() {
431+
sqlDB.Exec(t, `RESET CLUSTER SETTING changefeed.protect_timestamp.max_age`)
432+
}()
433+
434+
if initialProtect == "option" {
435+
// We set the cluster setting to something small to make sure that
436+
// the option alone is able to protect the PTS record.
437+
sqlDB.Exec(t, `SET CLUSTER SETTING changefeed.protect_timestamp.max_age = '1us'`)
438+
} else {
439+
sqlDB.Exec(t, `SET CLUSTER SETTING changefeed.protect_timestamp.max_age = '24h'`)
440+
}
432441

433-
jobFeed := feed.(cdctest.EnterpriseTestFeed)
434-
require.NoError(t, jobFeed.Pause())
442+
feedStmt := `CREATE CHANGEFEED FOR TABLE foo`
443+
switch initialProtect {
444+
case "none":
445+
feedStmt += ` WITH gc_protect_expires_after='1us'`
446+
case "option":
447+
feedStmt += ` WITH gc_protect_expires_after='24h'`
448+
}
449+
450+
feed, err := f.Feed(feedStmt)
451+
require.NoError(t, err)
452+
defer func() {
453+
closeFeed(t, feed)
454+
}()
435455

436-
// While the job is paused, take opportunity to test that alter changefeed
437-
// works when setting gc_protect_expires_after option.
456+
jobFeed := feed.(cdctest.EnterpriseTestFeed)
438457

439-
// Verify we can set it to 0 -- i.e. disable.
440-
sqlDB.Exec(t, fmt.Sprintf("ALTER CHANGEFEED %d SET gc_protect_expires_after = '0s'", jobFeed.JobID()))
441-
// Now, set it to something very small.
442-
sqlDB.Exec(t, fmt.Sprintf("ALTER CHANGEFEED %d SET gc_protect_expires_after = '250ms'", jobFeed.JobID()))
458+
if initialProtect != "none" {
459+
require.NoError(t, jobFeed.Pause())
460+
461+
// Wait a little bit and make sure the job ISN'T canceled.
462+
require.ErrorContains(t, jobFeed.WaitDurationForState(10*time.Second, func(s jobs.State) bool {
463+
return s == jobs.StateCanceled
464+
}), `still waiting for job status; current status is "paused"`)
465+
466+
if initialProtect == "option" {
467+
// Set the cluster setting back to something high to make sure the
468+
// option alone can cause the changefeed to be canceled.
469+
sqlDB.Exec(t, `SET CLUSTER SETTING changefeed.protect_timestamp.max_age = '24h'`)
470+
}
471+
472+
// Set option to something small so that job will be canceled.
473+
sqlDB.Exec(t, fmt.Sprintf(`ALTER CHANGEFEED %d SET gc_protect_expires_after = '1us'`, jobFeed.JobID()))
474+
}
443475

444-
// Stale PTS record should trigger job cancellation.
445-
require.NoError(t, jobFeed.WaitForState(func(s jobs.State) bool {
446-
return s == jobs.StateCanceled
447-
}))
476+
// Stale PTS record should trigger job cancellation.
477+
require.NoError(t, jobFeed.WaitForState(func(s jobs.State) bool {
478+
return s == jobs.StateCanceled
479+
}))
480+
})
481+
})
482+
483+
t.Run("canceled due to changefeed.protect_timestamp.max_age setting", func(t *testing.T) {
484+
testutils.RunValues(t, "initially-protected-with", []string{"none", "option", "setting"},
485+
func(t *testing.T, initialProtect string) {
486+
defer func() {
487+
sqlDB.Exec(t, `RESET CLUSTER SETTING changefeed.protect_timestamp.max_age`)
488+
}()
489+
490+
if initialProtect == "setting" {
491+
sqlDB.Exec(t, `SET CLUSTER SETTING changefeed.protect_timestamp.max_age = '24h'`)
492+
} else {
493+
sqlDB.Exec(t, `SET CLUSTER SETTING changefeed.protect_timestamp.max_age = '1us'`)
494+
}
495+
496+
// Set the max age cluster setting to something small.
497+
feedStmt := `CREATE CHANGEFEED FOR TABLE foo`
498+
if initialProtect == "option" {
499+
feedStmt += ` WITH gc_protect_expires_after='24h'`
500+
}
501+
feed, err := f.Feed(feedStmt)
502+
require.NoError(t, err)
503+
defer func() {
504+
closeFeed(t, feed)
505+
}()
506+
507+
jobFeed := feed.(cdctest.EnterpriseTestFeed)
508+
509+
if initialProtect != "none" {
510+
require.NoError(t, jobFeed.Pause())
511+
512+
// Wait a little bit and make sure the job ISN'T canceled.
513+
require.ErrorContains(t, jobFeed.WaitDurationForState(10*time.Second, func(s jobs.State) bool {
514+
return s == jobs.StateCanceled
515+
}), `still waiting for job status; current status is "paused"`)
516+
517+
switch initialProtect {
518+
case "option":
519+
// Reset the option so that it defaults to the cluster setting.
520+
sqlDB.Exec(t, fmt.Sprintf(`ALTER CHANGEFEED %d SET gc_protect_expires_after = '0s'`, jobFeed.JobID()))
521+
case "setting":
522+
// Modify the cluster setting and do an ALTER CHANGEFEED so that
523+
// the new value is picked up.
524+
sqlDB.Exec(t, `SET CLUSTER SETTING changefeed.protect_timestamp.max_age = '1us'`)
525+
sqlDB.Exec(t, fmt.Sprintf(`ALTER CHANGEFEED %d SET diff`, jobFeed.JobID()))
526+
}
527+
}
528+
529+
// Stale PTS record should trigger job cancellation.
530+
require.NoError(t, jobFeed.WaitForState(func(s jobs.State) bool {
531+
return s == jobs.StateCanceled
532+
}))
533+
})
534+
})
448535
}
449536

450537
cdcTestWithSystem(t, testFn, feedTestEnterpriseSinks)

pkg/ccl/changefeedccl/testfeed_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -491,7 +491,7 @@ func (f *jobFeed) WaitDurationForState(
491491
if statusPred(jobs.State(status)) {
492492
return nil
493493
}
494-
return errors.Newf("still waiting for job status; current %s", status)
494+
return errors.Newf("still waiting for job status; current status is %q", status)
495495
}, dur)
496496
}
497497

0 commit comments

Comments
 (0)