Skip to content

Commit 71401e7

Browse files
committed
sql/ttl: improve TTL replan decision logic
Replace calculatePlanGrowth with detectNodeAvailabilityChanges to make TTL job replanning less sensitive to span changes. The new logic focuses specifically on detecting when nodes become unavailable rather than reacting to all plan differences. The previous implementation would trigger replans for span splits/merges that don't actually indicate beneficial restart scenarios. The new approach only considers missing nodes from the original plan, which typically indicates node failures where work redistribution would benefit from restarting the job. It also supports a stability window so that replan decisions need to fire consecutively. This should help eleviate changes in plans due to range cache issues. Fixes #150343 Epic: none Release note (ops change): The 'sql.ttl.replan_flow_threshold' may have been set to 0 to work around the TTL replanner being too sensitive. This fix will alleviate that and any instance that had set replan_flow_threshold to 0 can be reset back to the default.
1 parent c43e560 commit 71401e7

File tree

3 files changed

+314
-3
lines changed

3 files changed

+314
-3
lines changed

pkg/sql/ttl/ttljob/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ go_test(
6363
size = "large",
6464
srcs = [
6565
"main_test.go",
66+
"ttljob_internal_test.go",
6667
"ttljob_plans_test.go",
6768
"ttljob_processor_internal_test.go",
6869
"ttljob_processor_test.go",
@@ -100,6 +101,7 @@ go_test(
100101
"//pkg/sql/isql",
101102
"//pkg/sql/lexbase",
102103
"//pkg/sql/parser",
104+
"//pkg/sql/physicalplan",
103105
"//pkg/sql/randgen",
104106
"//pkg/sql/rowenc",
105107
"//pkg/sql/sem/eval",

pkg/sql/ttl/ttljob/ttljob.go

Lines changed: 98 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package ttljob
77

88
import (
99
"context"
10+
"sync/atomic"
1011
"time"
1112

1213
"github.com/cockroachdb/cockroach/pkg/base"
@@ -50,13 +51,24 @@ var replanFrequency = settings.RegisterDurationSetting(
5051
settings.PositiveDuration,
5152
)
5253

54+
var replanStabilityWindow = settings.RegisterIntSetting(
55+
settings.ApplicationLevel,
56+
"sql.ttl.replan_stability_window",
57+
"number of consecutive replan evaluations required before triggering a replan; set to 1 to disable stability window",
58+
2,
59+
settings.PositiveInt,
60+
)
61+
5362
// rowLevelTTLResumer implements the TTL job. The job can run on any node, but
5463
// the job node distributes SELECT/DELETE work via DistSQL to ttlProcessor
5564
// nodes. DistSQL divides work into spans that each ttlProcessor scans in a
5665
// SELECT/DELETE loop.
5766
type rowLevelTTLResumer struct {
5867
job *jobs.Job
5968
st *cluster.Settings
69+
70+
// consecutiveReplanDecisions tracks how many consecutive times replan was deemed necessary.
71+
consecutiveReplanDecisions *atomic.Int64
6072
}
6173

6274
var _ jobs.Resumer = (*rowLevelTTLResumer)(nil)
@@ -283,7 +295,10 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) (re
283295
// the TTL job to utilize those nodes for parallel work.
284296
replanChecker, cancelReplanner := sql.PhysicalPlanChangeChecker(
285297
ctx, physicalPlan, makePlan, jobExecCtx,
286-
sql.ReplanOnChangedFraction(func() float64 { return replanThreshold.Get(&execCfg.Settings.SV) }),
298+
replanDecider(t.consecutiveReplanDecisions,
299+
func() int64 { return replanStabilityWindow.Get(&execCfg.Settings.SV) },
300+
func() float64 { return replanThreshold.Get(&execCfg.Settings.SV) },
301+
),
287302
func() time.Duration { return replanFrequency.Get(&execCfg.Settings.SV) },
288303
)
289304

@@ -351,11 +366,91 @@ func (t rowLevelTTLResumer) CollectProfile(_ context.Context, _ interface{}) err
351366
return nil
352367
}
353368

369+
// replanDecider returns a function that determines whether a TTL job should be
370+
// replanned based on changes in the physical execution plan. It compares the
371+
// old and new plans to detect node availability changes and decides if the
372+
// benefit of replanning (better parallelization) outweighs the cost of
373+
// restarting the job. It implements a stability window to avoid replanning
374+
// due to transient changes.
375+
func replanDecider(
376+
consecutiveReplanDecisions *atomic.Int64,
377+
stabilityWindowFn func() int64,
378+
thresholdFn func() float64,
379+
) sql.PlanChangeDecision {
380+
return func(ctx context.Context, oldPlan, newPlan *sql.PhysicalPlan) bool {
381+
changed, growth := detectNodeAvailabilityChanges(oldPlan, newPlan)
382+
threshold := thresholdFn()
383+
shouldReplan := threshold != 0.0 && growth > threshold
384+
385+
stabilityWindow := stabilityWindowFn()
386+
387+
var currentDecisions int64
388+
if shouldReplan {
389+
currentDecisions = consecutiveReplanDecisions.Add(1)
390+
} else {
391+
consecutiveReplanDecisions.Store(0)
392+
currentDecisions = 0
393+
}
394+
395+
// If stability window is 1, replan immediately. Otherwise, require
396+
// consecutive decisions to meet the window threshold.
397+
replan := currentDecisions >= stabilityWindow
398+
399+
// Reset the counter when we decide to replan, since the job will restart
400+
if replan {
401+
consecutiveReplanDecisions.Store(0)
402+
}
403+
404+
if shouldReplan || growth > 0.1 || log.V(1) {
405+
log.Infof(ctx, "Re-planning would add or alter flows on %d nodes / %.2f, threshold %.2f, consecutive decisions %d/%d, replan %v",
406+
changed, growth, threshold, currentDecisions, stabilityWindow, replan)
407+
}
408+
409+
return replan
410+
}
411+
}
412+
413+
// detectNodeAvailabilityChanges analyzes differences between two physical plans
414+
// to determine if nodes have become unavailable. It returns the number of nodes
415+
// that are no longer available and the fraction of the original plan affected.
416+
//
417+
// The function focuses on detecting when nodes from the original plan are missing
418+
// from the new plan, which typically indicates node failures. When nodes fail,
419+
// their work gets redistributed to remaining nodes, making a job restart
420+
// beneficial for better parallelization. We ignore newly added nodes since
421+
// continuing the current job on existing nodes is usually more efficient than
422+
// restarting to incorporate new capacity.
423+
func detectNodeAvailabilityChanges(before, after *sql.PhysicalPlan) (int, float64) {
424+
var changed int
425+
beforeSpecs, beforeCleanup := before.GenerateFlowSpecs()
426+
defer beforeCleanup(beforeSpecs)
427+
afterSpecs, afterCleanup := after.GenerateFlowSpecs()
428+
defer afterCleanup(afterSpecs)
429+
430+
// Count nodes from the original plan that are no longer present in the new plan.
431+
// We only check nodes in beforeSpecs because we specifically want to detect
432+
// when nodes that were doing work are no longer available, which typically
433+
// indicates beneficial restart scenarios (node failures where work can be
434+
// redistributed more efficiently).
435+
for n := range beforeSpecs {
436+
if _, ok := afterSpecs[n]; !ok {
437+
changed++
438+
}
439+
}
440+
441+
var frac float64
442+
if changed > 0 {
443+
frac = float64(changed) / float64(len(beforeSpecs))
444+
}
445+
return changed, frac
446+
}
447+
354448
func init() {
355449
jobs.RegisterConstructor(jobspb.TypeRowLevelTTL, func(job *jobs.Job, settings *cluster.Settings) jobs.Resumer {
356450
return &rowLevelTTLResumer{
357-
job: job,
358-
st: settings,
451+
job: job,
452+
st: settings,
453+
consecutiveReplanDecisions: &atomic.Int64{},
359454
}
360455
}, jobs.UsesTenantCostControl)
361456
}
Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package ttljob
7+
8+
import (
9+
"context"
10+
"sync/atomic"
11+
"testing"
12+
13+
"github.com/cockroachdb/cockroach/pkg/base"
14+
"github.com/cockroachdb/cockroach/pkg/sql"
15+
"github.com/cockroachdb/cockroach/pkg/sql/physicalplan"
16+
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
17+
"github.com/cockroachdb/cockroach/pkg/util/log"
18+
"github.com/stretchr/testify/require"
19+
)
20+
21+
// NOTE: This test is for functions in ttljob.go. We already have
22+
// ttljob_test.go, but that is part of the ttljob_test package. This test is
23+
// specifically part of the ttljob package to access non-exported functions and
24+
// structs. Hence, the name '_internal_' in the file to signify that it accesses
25+
// internal functions.
26+
27+
func TestReplanDecider(t *testing.T) {
28+
defer leaktest.AfterTest(t)()
29+
defer log.Scope(t).Close(t)
30+
31+
testCases := []struct {
32+
desc string
33+
beforeNodes []base.SQLInstanceID
34+
afterNodes []base.SQLInstanceID
35+
threshold float64
36+
expectReplan bool
37+
}{
38+
{
39+
desc: "nodes don't change",
40+
beforeNodes: []base.SQLInstanceID{1, 2, 3},
41+
afterNodes: []base.SQLInstanceID{1, 2, 3},
42+
threshold: 0.1,
43+
expectReplan: false,
44+
},
45+
{
46+
desc: "one node is shutdown",
47+
beforeNodes: []base.SQLInstanceID{1, 2, 3},
48+
afterNodes: []base.SQLInstanceID{1, 3},
49+
threshold: 0.1,
50+
expectReplan: true,
51+
},
52+
{
53+
desc: "one node is brought online",
54+
beforeNodes: []base.SQLInstanceID{1, 2, 3},
55+
afterNodes: []base.SQLInstanceID{1, 2, 3, 4},
56+
threshold: 0.1,
57+
expectReplan: false,
58+
},
59+
{
60+
desc: "one node is replaced",
61+
beforeNodes: []base.SQLInstanceID{1, 2, 3},
62+
afterNodes: []base.SQLInstanceID{1, 2, 4},
63+
threshold: 0.1,
64+
expectReplan: true,
65+
},
66+
{
67+
desc: "multiple nodes shutdown",
68+
beforeNodes: []base.SQLInstanceID{1, 2, 3, 4, 5},
69+
afterNodes: []base.SQLInstanceID{1, 3},
70+
threshold: 0.1,
71+
expectReplan: true,
72+
},
73+
{
74+
desc: "all nodes replaced",
75+
beforeNodes: []base.SQLInstanceID{1, 2, 3},
76+
afterNodes: []base.SQLInstanceID{4, 5, 6},
77+
threshold: 0.1,
78+
expectReplan: true,
79+
},
80+
{
81+
desc: "threshold boundary: exactly at threshold",
82+
beforeNodes: []base.SQLInstanceID{1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
83+
afterNodes: []base.SQLInstanceID{1, 2, 3, 4, 5, 6, 7, 8, 9},
84+
threshold: 0.1,
85+
expectReplan: false,
86+
},
87+
{
88+
desc: "threshold boundary: just above threshold",
89+
beforeNodes: []base.SQLInstanceID{1, 2, 3, 4, 5, 6, 7, 8, 9},
90+
afterNodes: []base.SQLInstanceID{1, 2, 3, 4, 5, 6, 7, 8},
91+
threshold: 0.1,
92+
expectReplan: true,
93+
},
94+
{
95+
desc: "threshold disabled",
96+
beforeNodes: []base.SQLInstanceID{1, 2, 3},
97+
afterNodes: []base.SQLInstanceID{1, 2},
98+
threshold: 0.0,
99+
expectReplan: false,
100+
},
101+
{
102+
desc: "large scale: many nodes lost",
103+
beforeNodes: []base.SQLInstanceID{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20},
104+
afterNodes: []base.SQLInstanceID{1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
105+
threshold: 0.1,
106+
expectReplan: true,
107+
},
108+
{
109+
desc: "mixed scenario: nodes added and removed",
110+
beforeNodes: []base.SQLInstanceID{1, 2, 3, 4, 5},
111+
afterNodes: []base.SQLInstanceID{1, 3, 5, 6, 7, 8},
112+
threshold: 0.1,
113+
expectReplan: true,
114+
},
115+
}
116+
117+
for _, testCase := range testCases {
118+
t.Run(testCase.desc, func(t *testing.T) {
119+
// Create atomic counter and set stability window to 1 for immediate replan (current behavior)
120+
consecutiveReplanDecisions := &atomic.Int64{}
121+
decider := replanDecider(consecutiveReplanDecisions, func() int64 { return 1 }, func() float64 { return testCase.threshold })
122+
ctx := context.Background()
123+
oldPlan := &sql.PhysicalPlan{}
124+
oldPlan.PhysicalInfrastructure = &physicalplan.PhysicalInfrastructure{Processors: nil}
125+
for _, nodeID := range testCase.beforeNodes {
126+
oldPlan.Processors = append(oldPlan.Processors, physicalplan.Processor{SQLInstanceID: nodeID})
127+
}
128+
newPlan := &sql.PhysicalPlan{}
129+
newPlan.PhysicalInfrastructure = &physicalplan.PhysicalInfrastructure{Processors: nil}
130+
for _, nodeID := range testCase.afterNodes {
131+
newPlan.Processors = append(newPlan.Processors, physicalplan.Processor{SQLInstanceID: nodeID})
132+
}
133+
replan := decider(ctx, oldPlan, newPlan)
134+
require.Equal(t, testCase.expectReplan, replan)
135+
})
136+
}
137+
}
138+
139+
func TestReplanDeciderStabilityWindow(t *testing.T) {
140+
defer leaktest.AfterTest(t)()
141+
defer log.Scope(t).Close(t)
142+
143+
testCases := []struct {
144+
desc string
145+
stabilityWindow int64
146+
threshold float64
147+
planChanges [][]base.SQLInstanceID // sequence of plan changes
148+
expectedReplans []bool // expected replan decision for each change
149+
}{
150+
{
151+
desc: "stability window 1 - immediate replan",
152+
stabilityWindow: 1,
153+
threshold: 0.1,
154+
planChanges: [][]base.SQLInstanceID{{2, 3}, {2, 4}, {3, 4}},
155+
expectedReplans: []bool{true, true, true},
156+
},
157+
{
158+
desc: "stability window 2 - requires consecutive decisions",
159+
stabilityWindow: 2,
160+
threshold: 0.1,
161+
planChanges: [][]base.SQLInstanceID{{2, 3}, {2, 4}, {1, 2, 3}},
162+
expectedReplans: []bool{false, true, false}, // first false, second true (meets window), third false (reset)
163+
},
164+
{
165+
desc: "stability window 2 - interrupted sequence",
166+
stabilityWindow: 2,
167+
threshold: 0.1,
168+
planChanges: [][]base.SQLInstanceID{{2, 3}, {1, 2, 3}, {2, 4}, {3, 4}},
169+
expectedReplans: []bool{false, false, false, true}, // interrupted, then consecutive
170+
},
171+
{
172+
desc: "stability window 3 - three consecutive needed",
173+
stabilityWindow: 3,
174+
threshold: 0.1,
175+
planChanges: [][]base.SQLInstanceID{{2, 3}, {2, 4}, {3, 4}, {1, 2, 3}},
176+
expectedReplans: []bool{false, false, true, false}, // third one triggers replan
177+
},
178+
}
179+
180+
for _, testCase := range testCases {
181+
t.Run(testCase.desc, func(t *testing.T) {
182+
consecutiveReplanDecisions := &atomic.Int64{}
183+
decider := replanDecider(
184+
consecutiveReplanDecisions,
185+
func() int64 { return testCase.stabilityWindow },
186+
func() float64 { return testCase.threshold },
187+
)
188+
ctx := context.Background()
189+
190+
// Use initial plan with nodes 1,2,3
191+
initialPlan := &sql.PhysicalPlan{}
192+
initialPlan.PhysicalInfrastructure = &physicalplan.PhysicalInfrastructure{Processors: nil}
193+
for _, nodeID := range []base.SQLInstanceID{1, 2, 3} {
194+
initialPlan.Processors = append(initialPlan.Processors, physicalplan.Processor{SQLInstanceID: nodeID})
195+
}
196+
197+
for i, nodes := range testCase.planChanges {
198+
newPlan := &sql.PhysicalPlan{}
199+
newPlan.PhysicalInfrastructure = &physicalplan.PhysicalInfrastructure{Processors: nil}
200+
for _, nodeID := range nodes {
201+
newPlan.Processors = append(newPlan.Processors, physicalplan.Processor{SQLInstanceID: nodeID})
202+
}
203+
204+
replan := decider(ctx, initialPlan, newPlan)
205+
if replan != testCase.expectedReplans[i] {
206+
t.Errorf("step %d: expected replan=%v, got %v (consecutive count: %d)", i, testCase.expectedReplans[i], replan, consecutiveReplanDecisions.Load())
207+
}
208+
209+
// Update initial plan for next iteration to maintain state
210+
initialPlan = newPlan
211+
}
212+
})
213+
}
214+
}

0 commit comments

Comments
 (0)