Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/ccl/changefeedccl/changefeed_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,8 @@ func makePlan(
var progressConfig *execinfrapb.ChangefeedProgressConfig
if execCtx.ExecCfg().Settings.Version.IsActive(ctx, clusterversion.V25_4) {
perTableTrackingEnabled := changefeedbase.TrackPerTableProgress.Get(sv)
perTableProtectedTimestampsEnabled := changefeedbase.PerTableProtectedTimestamps.Get(sv)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should remove this cluster setting altogether since there are no longer any production usages?

Copy link
Contributor Author

@aerfrei aerfrei Oct 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pros:
It means that if this builds it's very clear that we didn't miss any cases where the flag could potentially do something. (I think this makes it worthwhile for me, so I'll make the change.)
We could theoretically rename this flag and not have to worry about backwards compatibility to 25.4.
It prevents people from setting this flag which could have implications if they upgrade.

Potential Cons:
I think this would mean we have to touch existing tests that specify this flag (to set it to false) and remove those references as well, that would increase the footprint of this pr by another two tests and mean if we wanted to make any changes to those tests and backport them, there could be more merge conflicts. But we very likely won't backport things that interact with this PTS code.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could theoretically rename this flag and not have to worry about backwards compatibility to 25.4.
It prevents people from setting this flag which could have implications if they upgrade.

Yeah these are the two main benefits I was thinking of. Also, similar to the first benefit, if we decide to not add this flag, we won't have to add it to the list of retired settings.

// In 25.4 we are hard disabling per table protected timestamps.
perTableProtectedTimestampsEnabled := false
progressConfig = &execinfrapb.ChangefeedProgressConfig{
PerTableTracking: perTableTrackingEnabled,
// If the per table pts flag was turned on between changefeed creation and now,
Expand Down
3 changes: 2 additions & 1 deletion pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,8 @@ func changefeedPlanHook(

// We do not yet have the progress config here, so we need to check the settings directly.
perTableTrackingEnabled := changefeedbase.TrackPerTableProgress.Get(&p.ExecCfg().Settings.SV)
perTableProtectedTimestampsEnabled := changefeedbase.PerTableProtectedTimestamps.Get(&p.ExecCfg().Settings.SV)
// In 25.4 we are hard disabling per table protected timestamps.
perTableProtectedTimestampsEnabled := false
usingPerTablePTS := perTableTrackingEnabled && perTableProtectedTimestampsEnabled
if usingPerTablePTS {
protectedTimestampRecords := make(map[descpb.ID]uuid.UUID)
Expand Down
4 changes: 3 additions & 1 deletion pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12065,7 +12065,9 @@ WITH resolved='10ms', min_checkpoint_frequency='10ms', no_initial_scan`
require.NoError(t, err)
return ts
}
perTablePTSEnabled := changefeedbase.PerTableProtectedTimestamps.Get(&s.Server.ClusterSettings().SV)

// In 25.4 we are hard disabling per table protected timestamps.
perTablePTSEnabled := false
perTableProgressEnabled := changefeedbase.TrackPerTableProgress.Get(&s.Server.ClusterSettings().SV)
getPTS := func() hlc.Timestamp {
if perTablePTSEnabled && perTableProgressEnabled {
Expand Down
9 changes: 0 additions & 9 deletions pkg/ccl/changefeedccl/changefeedbase/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,15 +215,6 @@ var ProtectTimestampLag = settings.RegisterDurationSetting(
10*time.Minute,
settings.PositiveDuration)

// PerTableProtectedTimestamps enables per-table protected timestamp records
// instead of a single record for all tables in a changefeed.
var PerTableProtectedTimestamps = settings.RegisterBoolSetting(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we'll have to update/skip the per-table PTS benchmark roachtests too

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call out. Since they those roachtests are generally for multi-table feeds, I'll keep them in but only with per-table pts set to false and remove the actual query that sets the setting.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That run failed with VM preemption for the 50000 table test. It failed while initializing the workload.

Started a new run here. I hope it will also be helped by the fact that the backport removing the latency assertion has now been merged.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we missed the frontier persistence benchmarks, opened #154832

settings.ApplicationLevel,
"changefeed.protect_timestamp.per_table.enabled",
"if true, creates separate protected timestamp records for each table in a changefeed; "+
"if false, uses a single protected timestamp record for all tables",
metamorphic.ConstantWithTestBool("changefeed.protect_timestamp.per_table.enabled", false))

// MaxProtectedTimestampAge controls the frequency of protected timestamp record updates
var MaxProtectedTimestampAge = settings.RegisterDurationSetting(
settings.ApplicationLevel,
Expand Down
26 changes: 8 additions & 18 deletions pkg/ccl/changefeedccl/protected_timestamps_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand Down Expand Up @@ -404,9 +405,8 @@ func TestChangefeedAlterPTS(t *testing.T) {

_, _ = expectResolvedTimestamp(t, f2)

perTablePTSEnabled :=
changefeedbase.PerTableProtectedTimestamps.Get(&s.Server.ClusterSettings().SV) &&
changefeedbase.TrackPerTableProgress.Get(&s.Server.ClusterSettings().SV)
// In 25.4 we are hard disabling per table protected timestamps.
perTablePTSEnabled := false

if perTablePTSEnabled {
eFeed, ok := f2.(cdctest.EnterpriseTestFeed)
Expand Down Expand Up @@ -751,8 +751,6 @@ func TestChangefeedMigratesProtectedTimestampTargets(t *testing.T) {
context.Background(), &s.Server.ClusterSettings().SV, ptsInterval)
changefeedbase.ProtectTimestampLag.Override(
context.Background(), &s.Server.ClusterSettings().SV, ptsInterval)
changefeedbase.PerTableProtectedTimestamps.Override(
context.Background(), &s.Server.ClusterSettings().SV, false)

sqlDB := sqlutils.MakeSQLRunner(s.DB)
sysDB := sqlutils.MakeSQLRunner(s.SystemServer.SQLConn(t))
Expand Down Expand Up @@ -870,12 +868,6 @@ func TestChangefeedMigratesProtectedTimestamps(t *testing.T) {
changefeedbase.ProtectTimestampLag.Override(
context.Background(), &s.Server.ClusterSettings().SV, ptsInterval)

// Since old style PTS records should not be created when per-table PTS records are enabled,
// we disable them for this test. Per-table PTS breaks assumptions about where we can find
// the PTS record uuids.
changefeedbase.PerTableProtectedTimestamps.Override(
context.Background(), &s.Server.ClusterSettings().SV, false)

sqlDB := sqlutils.MakeSQLRunner(s.DB)
sysDB := sqlutils.MakeSQLRunner(s.SystemServer.SQLConn(t))

Expand Down Expand Up @@ -977,10 +969,6 @@ func TestChangefeedProtectedTimestampUpdateForMultipleTables(t *testing.T) {
changefeedbase.ProtectTimestampLag.Override(
context.Background(), &s.Server.ClusterSettings().SV, 10*time.Hour)

// Ensure we use legacy single protected timestamp behavior for this test
changefeedbase.PerTableProtectedTimestamps.Override(
context.Background(), &s.Server.ClusterSettings().SV, false)

sqlDB.Exec(t, `CREATE TABLE foo (id INT)`)
sqlDB.Exec(t, `CREATE TABLE bar (id INT)`)
registry := s.Server.JobRegistry().(*jobs.Registry)
Expand Down Expand Up @@ -1087,12 +1075,14 @@ func TestChangefeedPerTableProtectedTimestampProgression(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.WithIssue(t, 93793, "unreleased feature as of 25.4")

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)

// Enable per-table protected timestamps and progress tracking
changefeedbase.PerTableProtectedTimestamps.Override(
context.Background(), &s.Server.ClusterSettings().SV, true)
// It's not possible to enable per-table protected timestamps
// since it's disabled in 25.4. This is where we will enable the setting
// for 26.1.
changefeedbase.TrackPerTableProgress.Override(
context.Background(), &s.Server.ClusterSettings().SV, true)

Expand Down
6 changes: 1 addition & 5 deletions pkg/cmd/roachtest/tests/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1837,10 +1837,6 @@ func runCDCMultiTablePTSBenchmark(
numRanges = params.numRanges
}

if _, err := db.Exec("SET CLUSTER SETTING changefeed.protect_timestamp.per_table.enabled = $1", params.perTablePTS); err != nil {
t.Fatalf("failed to set per-table protected timestamps: %v", err)
}

initCmd := fmt.Sprintf("./cockroach workload init bank --rows=%d --ranges=%d --tables=%d {pgurl%s}",
params.numRows, numRanges, params.numTables, ct.crdbNodes.RandNode())
if err := c.RunE(ctx, option.WithNodes(ct.workloadNode), initCmd); err != nil {
Expand Down Expand Up @@ -3106,7 +3102,7 @@ func registerCDC(r registry.Registry) {
CompatibleClouds: registry.AllExceptIBM,
Run: runMessageTooLarge,
})
for _, perTablePTS := range []bool{false, true} {
for _, perTablePTS := range []bool{false} {
for _, config := range []struct {
numTables int
numRanges int
Expand Down