Skip to content

Commit 706d137

Browse files
committed
sql/ttl: improve TTL replan decision logic
Replace calculatePlanGrowth with detectNodeAvailabilityChanges to make TTL job replanning less sensitive to span changes. The new logic focuses specifically on detecting when nodes become unavailable rather than reacting to all plan differences. The previous implementation would trigger replans for span splits/merges that don't actually indicate beneficial restart scenarios. The new approach only considers missing nodes from the original plan, which typically indicates node failures where work redistribution would benefit from restarting the job. It also supports a stability window so that replan decisions need to fire consecutively. This should help eleviate changes in plans due to range cache issues. Fixes #150343 Epic: none Release note (ops change): The 'sql.ttl.replan_flow_threshold' may have been set to 0 to work around the TTL replanner being too sensitive. This fix will alleviate that and any instance that had set replan_flow_threshold to 0 can be reset back to the default.
1 parent b8c2405 commit 706d137

File tree

2 files changed

+288
-3
lines changed

2 files changed

+288
-3
lines changed

pkg/sql/ttl/ttljob/ttljob.go

Lines changed: 98 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package ttljob
88
import (
99
"context"
1010
"math/rand"
11+
"sync/atomic"
1112
"time"
1213

1314
"github.com/cockroachdb/cockroach/pkg/base"
@@ -55,6 +56,14 @@ var replanFrequency = settings.RegisterDurationSetting(
5556
settings.PositiveDuration,
5657
)
5758

59+
var replanStabilityWindow = settings.RegisterIntSetting(
60+
settings.ApplicationLevel,
61+
"sql.ttl.replan_stability_window",
62+
"number of consecutive replan evaluations required before triggering a replan; set to 1 to disable stability window",
63+
2,
64+
settings.PositiveInt,
65+
)
66+
5867
// rowLevelTTLResumer implements the TTL job. The job can run on any node, but
5968
// the job node distributes SELECT/DELETE work via DistSQL to ttlProcessor
6069
// nodes. DistSQL divides work into spans that each ttlProcessor scans in a
@@ -65,6 +74,9 @@ type rowLevelTTLResumer struct {
6574
physicalPlan *sql.PhysicalPlan
6675
planCtx *sql.PlanningCtx
6776

77+
// consecutiveReplanDecisions tracks how many consecutive times replan was deemed necessary.
78+
consecutiveReplanDecisions *atomic.Int64
79+
6880
mu struct {
6981
syncutil.Mutex
7082
// lastUpdateTime is the wall time of the last job progress update.
@@ -296,7 +308,10 @@ func (t *rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) (r
296308
// the TTL job to utilize those nodes for parallel work.
297309
replanChecker, cancelReplanner := sql.PhysicalPlanChangeChecker(
298310
ctx, t.physicalPlan, makePlan, jobExecCtx,
299-
sql.ReplanOnChangedFraction(func() float64 { return replanThreshold.Get(&execCfg.Settings.SV) }),
311+
replanDecider(t.consecutiveReplanDecisions,
312+
func() int64 { return replanStabilityWindow.Get(&execCfg.Settings.SV) },
313+
func() float64 { return replanThreshold.Get(&execCfg.Settings.SV) },
314+
),
300315
func() time.Duration { return replanFrequency.Get(&execCfg.Settings.SV) },
301316
)
302317

@@ -507,11 +522,91 @@ func (t *rowLevelTTLResumer) refreshProgress(
507522
return newProgress, nil
508523
}
509524

525+
// replanDecider returns a function that determines whether a TTL job should be
526+
// replanned based on changes in the physical execution plan. It compares the
527+
// old and new plans to detect node availability changes and decides if the
528+
// benefit of replanning (better parallelization) outweighs the cost of
529+
// restarting the job. It implements a stability window to avoid replanning
530+
// due to transient changes.
531+
func replanDecider(
532+
consecutiveReplanDecisions *atomic.Int64,
533+
stabilityWindowFn func() int64,
534+
thresholdFn func() float64,
535+
) sql.PlanChangeDecision {
536+
return func(ctx context.Context, oldPlan, newPlan *sql.PhysicalPlan) bool {
537+
changed, growth := detectNodeAvailabilityChanges(oldPlan, newPlan)
538+
threshold := thresholdFn()
539+
shouldReplan := threshold != 0.0 && growth > threshold
540+
541+
stabilityWindow := stabilityWindowFn()
542+
543+
var currentDecisions int64
544+
if shouldReplan {
545+
currentDecisions = consecutiveReplanDecisions.Add(1)
546+
} else {
547+
consecutiveReplanDecisions.Store(0)
548+
currentDecisions = 0
549+
}
550+
551+
// If stability window is 1, replan immediately. Otherwise, require
552+
// consecutive decisions to meet the window threshold.
553+
replan := currentDecisions >= stabilityWindow
554+
555+
// Reset the counter when we decide to replan, since the job will restart
556+
if replan {
557+
consecutiveReplanDecisions.Store(0)
558+
}
559+
560+
if shouldReplan || growth > 0.1 || log.V(1) {
561+
log.Infof(ctx, "Re-planning would add or alter flows on %d nodes / %.2f, threshold %.2f, consecutive decisions %d/%d, replan %v",
562+
changed, growth, threshold, currentDecisions, stabilityWindow, replan)
563+
}
564+
565+
return replan
566+
}
567+
}
568+
569+
// detectNodeAvailabilityChanges analyzes differences between two physical plans
570+
// to determine if nodes have become unavailable. It returns the number of nodes
571+
// that are no longer available and the fraction of the original plan affected.
572+
//
573+
// The function focuses on detecting when nodes from the original plan are missing
574+
// from the new plan, which typically indicates node failures. When nodes fail,
575+
// their work gets redistributed to remaining nodes, making a job restart
576+
// beneficial for better parallelization. We ignore newly added nodes since
577+
// continuing the current job on existing nodes is usually more efficient than
578+
// restarting to incorporate new capacity.
579+
func detectNodeAvailabilityChanges(before, after *sql.PhysicalPlan) (int, float64) {
580+
var changed int
581+
beforeSpecs, beforeCleanup := before.GenerateFlowSpecs()
582+
defer beforeCleanup(beforeSpecs)
583+
afterSpecs, afterCleanup := after.GenerateFlowSpecs()
584+
defer afterCleanup(afterSpecs)
585+
586+
// Count nodes from the original plan that are no longer present in the new plan.
587+
// We only check nodes in beforeSpecs because we specifically want to detect
588+
// when nodes that were doing work are no longer available, which typically
589+
// indicates beneficial restart scenarios (node failures where work can be
590+
// redistributed more efficiently).
591+
for n := range beforeSpecs {
592+
if _, ok := afterSpecs[n]; !ok {
593+
changed++
594+
}
595+
}
596+
597+
var frac float64
598+
if changed > 0 {
599+
frac = float64(changed) / float64(len(beforeSpecs))
600+
}
601+
return changed, frac
602+
}
603+
510604
func init() {
511605
jobs.RegisterConstructor(jobspb.TypeRowLevelTTL, func(job *jobs.Job, settings *cluster.Settings) jobs.Resumer {
512606
return &rowLevelTTLResumer{
513-
job: job,
514-
st: settings,
607+
job: job,
608+
st: settings,
609+
consecutiveReplanDecisions: &atomic.Int64{},
515610
}
516611
}, jobs.UsesTenantCostControl)
517612
}

pkg/sql/ttl/ttljob/ttljob_internal_test.go

Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package ttljob
88
import (
99
"context"
1010
"fmt"
11+
"sync/atomic"
1112
"testing"
1213

1314
"github.com/cockroachdb/cockroach/pkg/base"
@@ -155,3 +156,192 @@ func TestTTLProgressLifecycle(t *testing.T) {
155156
require.Equal(t, int64(1000), ttlProgress.JobDeletedRowCount)
156157
require.Len(t, ttlProgress.ProcessorProgresses, 2)
157158
}
159+
160+
func TestReplanDecider(t *testing.T) {
161+
defer leaktest.AfterTest(t)()
162+
defer log.Scope(t).Close(t)
163+
164+
testCases := []struct {
165+
desc string
166+
beforeNodes []base.SQLInstanceID
167+
afterNodes []base.SQLInstanceID
168+
threshold float64
169+
expectReplan bool
170+
}{
171+
{
172+
desc: "nodes don't change",
173+
beforeNodes: []base.SQLInstanceID{1, 2, 3},
174+
afterNodes: []base.SQLInstanceID{1, 2, 3},
175+
threshold: 0.1,
176+
expectReplan: false,
177+
},
178+
{
179+
desc: "one node is shutdown",
180+
beforeNodes: []base.SQLInstanceID{1, 2, 3},
181+
afterNodes: []base.SQLInstanceID{1, 3},
182+
threshold: 0.1,
183+
expectReplan: true,
184+
},
185+
{
186+
desc: "one node is brought online",
187+
beforeNodes: []base.SQLInstanceID{1, 2, 3},
188+
afterNodes: []base.SQLInstanceID{1, 2, 3, 4},
189+
threshold: 0.1,
190+
expectReplan: false,
191+
},
192+
{
193+
desc: "one node is replaced",
194+
beforeNodes: []base.SQLInstanceID{1, 2, 3},
195+
afterNodes: []base.SQLInstanceID{1, 2, 4},
196+
threshold: 0.1,
197+
expectReplan: true,
198+
},
199+
{
200+
desc: "multiple nodes shutdown",
201+
beforeNodes: []base.SQLInstanceID{1, 2, 3, 4, 5},
202+
afterNodes: []base.SQLInstanceID{1, 3},
203+
threshold: 0.1,
204+
expectReplan: true,
205+
},
206+
{
207+
desc: "all nodes replaced",
208+
beforeNodes: []base.SQLInstanceID{1, 2, 3},
209+
afterNodes: []base.SQLInstanceID{4, 5, 6},
210+
threshold: 0.1,
211+
expectReplan: true,
212+
},
213+
{
214+
desc: "threshold boundary: exactly at threshold",
215+
beforeNodes: []base.SQLInstanceID{1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
216+
afterNodes: []base.SQLInstanceID{1, 2, 3, 4, 5, 6, 7, 8, 9},
217+
threshold: 0.1,
218+
expectReplan: false,
219+
},
220+
{
221+
desc: "threshold boundary: just above threshold",
222+
beforeNodes: []base.SQLInstanceID{1, 2, 3, 4, 5, 6, 7, 8, 9},
223+
afterNodes: []base.SQLInstanceID{1, 2, 3, 4, 5, 6, 7, 8},
224+
threshold: 0.1,
225+
expectReplan: true,
226+
},
227+
{
228+
desc: "threshold disabled",
229+
beforeNodes: []base.SQLInstanceID{1, 2, 3},
230+
afterNodes: []base.SQLInstanceID{1, 2},
231+
threshold: 0.0,
232+
expectReplan: false,
233+
},
234+
{
235+
desc: "large scale: many nodes lost",
236+
beforeNodes: []base.SQLInstanceID{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20},
237+
afterNodes: []base.SQLInstanceID{1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
238+
threshold: 0.1,
239+
expectReplan: true,
240+
},
241+
{
242+
desc: "mixed scenario: nodes added and removed",
243+
beforeNodes: []base.SQLInstanceID{1, 2, 3, 4, 5},
244+
afterNodes: []base.SQLInstanceID{1, 3, 5, 6, 7, 8},
245+
threshold: 0.1,
246+
expectReplan: true,
247+
},
248+
}
249+
250+
for _, testCase := range testCases {
251+
t.Run(testCase.desc, func(t *testing.T) {
252+
// Create atomic counter and set stability window to 1 for immediate replan (current behavior)
253+
consecutiveReplanDecisions := &atomic.Int64{}
254+
decider := replanDecider(consecutiveReplanDecisions, func() int64 { return 1 }, func() float64 { return testCase.threshold })
255+
ctx := context.Background()
256+
oldPlan := &sql.PhysicalPlan{}
257+
oldPlan.PhysicalInfrastructure = &physicalplan.PhysicalInfrastructure{Processors: nil}
258+
for _, nodeID := range testCase.beforeNodes {
259+
oldPlan.Processors = append(oldPlan.Processors, physicalplan.Processor{SQLInstanceID: nodeID})
260+
}
261+
newPlan := &sql.PhysicalPlan{}
262+
newPlan.PhysicalInfrastructure = &physicalplan.PhysicalInfrastructure{Processors: nil}
263+
for _, nodeID := range testCase.afterNodes {
264+
newPlan.Processors = append(newPlan.Processors, physicalplan.Processor{SQLInstanceID: nodeID})
265+
}
266+
replan := decider(ctx, oldPlan, newPlan)
267+
require.Equal(t, testCase.expectReplan, replan)
268+
})
269+
}
270+
}
271+
272+
func TestReplanDeciderStabilityWindow(t *testing.T) {
273+
defer leaktest.AfterTest(t)()
274+
defer log.Scope(t).Close(t)
275+
276+
testCases := []struct {
277+
desc string
278+
stabilityWindow int64
279+
threshold float64
280+
planChanges [][]base.SQLInstanceID // sequence of plan changes
281+
expectedReplans []bool // expected replan decision for each change
282+
}{
283+
{
284+
desc: "stability window 1 - immediate replan",
285+
stabilityWindow: 1,
286+
threshold: 0.1,
287+
planChanges: [][]base.SQLInstanceID{{2, 3}, {2, 4}, {3, 4}},
288+
expectedReplans: []bool{true, true, true},
289+
},
290+
{
291+
desc: "stability window 2 - requires consecutive decisions",
292+
stabilityWindow: 2,
293+
threshold: 0.1,
294+
planChanges: [][]base.SQLInstanceID{{2, 3}, {2, 4}, {1, 2, 3}},
295+
expectedReplans: []bool{false, true, false}, // first false, second true (meets window), third false (reset)
296+
},
297+
{
298+
desc: "stability window 2 - interrupted sequence",
299+
stabilityWindow: 2,
300+
threshold: 0.1,
301+
planChanges: [][]base.SQLInstanceID{{2, 3}, {1, 2, 3}, {2, 4}, {3, 4}},
302+
expectedReplans: []bool{false, false, false, true}, // interrupted, then consecutive
303+
},
304+
{
305+
desc: "stability window 3 - three consecutive needed",
306+
stabilityWindow: 3,
307+
threshold: 0.1,
308+
planChanges: [][]base.SQLInstanceID{{2, 3}, {2, 4}, {3, 4}, {1, 2, 3}},
309+
expectedReplans: []bool{false, false, true, false}, // third one triggers replan
310+
},
311+
}
312+
313+
for _, testCase := range testCases {
314+
t.Run(testCase.desc, func(t *testing.T) {
315+
consecutiveReplanDecisions := &atomic.Int64{}
316+
decider := replanDecider(
317+
consecutiveReplanDecisions,
318+
func() int64 { return testCase.stabilityWindow },
319+
func() float64 { return testCase.threshold },
320+
)
321+
ctx := context.Background()
322+
323+
// Use initial plan with nodes 1,2,3
324+
initialPlan := &sql.PhysicalPlan{}
325+
initialPlan.PhysicalInfrastructure = &physicalplan.PhysicalInfrastructure{Processors: nil}
326+
for _, nodeID := range []base.SQLInstanceID{1, 2, 3} {
327+
initialPlan.Processors = append(initialPlan.Processors, physicalplan.Processor{SQLInstanceID: nodeID})
328+
}
329+
330+
for i, nodes := range testCase.planChanges {
331+
newPlan := &sql.PhysicalPlan{}
332+
newPlan.PhysicalInfrastructure = &physicalplan.PhysicalInfrastructure{Processors: nil}
333+
for _, nodeID := range nodes {
334+
newPlan.Processors = append(newPlan.Processors, physicalplan.Processor{SQLInstanceID: nodeID})
335+
}
336+
337+
replan := decider(ctx, initialPlan, newPlan)
338+
if replan != testCase.expectedReplans[i] {
339+
t.Errorf("step %d: expected replan=%v, got %v (consecutive count: %d)", i, testCase.expectedReplans[i], replan, consecutiveReplanDecisions.Load())
340+
}
341+
342+
// Update initial plan for next iteration to maintain state
343+
initialPlan = newPlan
344+
}
345+
})
346+
}
347+
}

0 commit comments

Comments
 (0)