Skip to content

Commit 233b6ad

Browse files
committed
feat(disaggregatedset): Port planner fix: source-aware surge baseline and maxUnavailable floor
Port fixes from commit 7c66541 to the N-phase planner: 1. Source-aware surge baseline: - Change surge constraint from old + new <= target + surge to old + new <= max(source, target) + surge - Ensures scale-down scenarios don't block scale-up since the system already runs source replicas 2. maxUnavailable floor enforcement: - Add minOld constraint: old + new >= target - maxUnavailable - Only enforce when source >= target (scale-down scenario) - Applied to both proportional drain and fallback drain paths Test updates: - Update expected sequences for scale-down and mixed-scale scenarios - Add asymmetric_5_3_surge2 test case All tests pass with 90.2% coverage. Signed-off-by: Mathis Felardos <mathis@mistral.ai>
1 parent 81cfa97 commit 233b6ad

File tree

3 files changed

+69
-43
lines changed

3 files changed

+69
-43
lines changed

disaggregatedset/internal/controller/planner.go

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -262,14 +262,16 @@ func ComputeNextStep(source, currentOld, currentNew, targetNew PhaseReplicaState
262262
}
263263
}
264264

265-
// Check surge constraint for scale-up (old + new <= target + surge)
265+
// Check surge constraint for scale-up (old + new <= max(source, target) + surge)
266+
// Use max(source, target) so that dimensions scaling down (source > target) don't
267+
// block scale-up — the system already runs source replicas, surge is relative to that.
266268
// Skip removed phases (target=0) - they don't need surge protection
267269
surgeOK := true
268270
for i := 0; i < numPhases; i++ {
269271
if targetNew[i] == 0 {
270272
continue // Removed phases just drain, no surge constraint
271273
}
272-
if currentOld[i]+nextNewState[i] > targetNew[i]+config.MaxSurge[i] {
274+
if currentOld[i]+nextNewState[i] > max(source[i], targetNew[i])+config.MaxSurge[i] {
273275
surgeOK = false
274276
break
275277
}
@@ -286,6 +288,17 @@ func ComputeNextStep(source, currentOld, currentNew, targetNew PhaseReplicaState
286288
// Scale down: first try proportional drain
287289
nextOldState := computeNextOldReplicas(source, currentOld, totalNumSteps)
288290

291+
// Enforce maxUnavailable constraint: old + new >= target - maxUnavailable per phase.
292+
// Only enforce when source >= target for that phase — when scaling up (source < target),
293+
// the system never had enough replicas to maintain the target level.
294+
minOld := make([]int, numPhases)
295+
for i := 0; i < numPhases; i++ {
296+
if source[i] >= targetNew[i] {
297+
minOld[i] = max(0, targetNew[i]-config.MaxUnavailable[i]-currentNew[i])
298+
}
299+
nextOldState[i] = max(nextOldState[i], minOld[i])
300+
}
301+
289302
needsScaleDown := false
290303
for i := 0; i < numPhases; i++ {
291304
if nextOldState[i] < currentOld[i] {
@@ -308,8 +321,10 @@ func ComputeNextStep(source, currentOld, currentNew, targetNew PhaseReplicaState
308321
drainedOld := make([]int, numPhases)
309322
needsDrain := false
310323
for i := 0; i < numPhases; i++ {
311-
maxOld := targetNew[i] + config.MaxSurge[i] - nextNewState[i]
324+
maxOld := max(source[i], targetNew[i]) + config.MaxSurge[i] - nextNewState[i]
312325
drainedOld[i] = max(0, min(currentOld[i], maxOld))
326+
// Enforce maxUnavailable constraint on fallback drain path
327+
drainedOld[i] = max(drainedOld[i], minOld[i])
313328
if drainedOld[i] < currentOld[i] {
314329
needsDrain = true
315330
}

disaggregatedset/internal/controller/planner_test.go

Lines changed: 45 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -370,13 +370,10 @@ func TestComputeAllSteps_ExactSequence(t *testing.T) {
370370
config: DefaultRollingUpdateConfig(2),
371371
expected: []UpdateStep{
372372
step([]int{5, 5}, []int{0, 0}),
373-
step([]int{4, 4}, []int{0, 0}), // scale down (can't scale up: surge=1, 5+1>2+1)
374-
step([]int{3, 3}, []int{0, 0}), // scale down
375-
step([]int{2, 2}, []int{0, 0}), // scale down (now old=2=target)
376-
step([]int{2, 2}, []int{1, 1}), // scale up (surge: 2+1<=2+1)
377-
step([]int{1, 1}, []int{1, 1}), // scale down
378-
step([]int{1, 1}, []int{2, 2}), // scale up (new at target)
379-
step([]int{0, 0}, []int{2, 2}), // scale down
373+
step([]int{5, 5}, []int{1, 1}), // scale up (surge uses max(5,2)+1=6: 5+1<=6)
374+
step([]int{4, 4}, []int{1, 1}), // scale down
375+
step([]int{4, 4}, []int{2, 2}), // scale up (new at target)
376+
step([]int{0, 0}, []int{2, 2}), // drain all old
380377
},
381378
},
382379
{
@@ -385,16 +382,16 @@ func TestComputeAllSteps_ExactSequence(t *testing.T) {
385382
config: DefaultRollingUpdateConfig(2),
386383
expected: []UpdateStep{
387384
step([]int{3, 5}, []int{0, 0}),
388-
step([]int{3, 4}, []int{0, 0}),
389-
step([]int{2, 3}, []int{0, 0}),
390-
step([]int{2, 3}, []int{1, 1}),
391-
step([]int{2, 2}, []int{1, 1}),
392-
step([]int{2, 2}, []int{2, 2}),
393-
step([]int{2, 2}, []int{3, 2}),
394-
step([]int{1, 1}, []int{3, 2}),
395-
step([]int{1, 1}, []int{4, 3}),
396-
step([]int{1, 1}, []int{5, 3}),
397-
step([]int{0, 0}, []int{5, 3}),
385+
step([]int{3, 5}, []int{1, 1}), // scale up (surge uses max(3,5)+1=6: 3+1<=6, 5+1<=6)
386+
step([]int{3, 4}, []int{1, 1}), // scale down decode
387+
step([]int{3, 4}, []int{2, 2}), // scale up
388+
step([]int{3, 4}, []int{3, 2}), // scale up
389+
step([]int{2, 3}, []int{3, 2}), // scale down
390+
step([]int{2, 3}, []int{4, 3}), // scale up
391+
step([]int{2, 2}, []int{4, 3}), // scale down decode
392+
step([]int{1, 1}, []int{4, 3}), // scale down
393+
step([]int{1, 1}, []int{5, 3}), // scale up (new at target)
394+
step([]int{0, 0}, []int{5, 3}), // drain all old
398395
},
399396
},
400397
{
@@ -403,14 +400,13 @@ func TestComputeAllSteps_ExactSequence(t *testing.T) {
403400
config: DefaultRollingUpdateConfig(2),
404401
expected: []UpdateStep{
405402
step([]int{2, 4}, []int{0, 0}),
406-
step([]int{2, 3}, []int{0, 0}),
407-
step([]int{1, 2}, []int{0, 0}),
408-
step([]int{1, 2}, []int{1, 1}),
409-
step([]int{1, 2}, []int{2, 1}),
410-
step([]int{1, 1}, []int{2, 1}),
411-
step([]int{1, 1}, []int{3, 2}),
412-
step([]int{1, 1}, []int{4, 2}),
413-
step([]int{0, 0}, []int{4, 2}),
403+
step([]int{2, 4}, []int{1, 1}), // scale up (surge uses max(2,4)+1=5: 2+1<=5, 4+1<=5)
404+
step([]int{2, 4}, []int{2, 1}), // scale up
405+
step([]int{2, 3}, []int{2, 1}), // scale down decode
406+
step([]int{2, 3}, []int{3, 2}), // scale up
407+
step([]int{1, 2}, []int{3, 2}), // scale down
408+
step([]int{1, 2}, []int{4, 2}), // scale up (new at target)
409+
step([]int{0, 0}, []int{4, 2}), // drain all old
414410
},
415411
},
416412
{
@@ -419,15 +415,15 @@ func TestComputeAllSteps_ExactSequence(t *testing.T) {
419415
config: DefaultRollingUpdateConfig(2),
420416
expected: []UpdateStep{
421417
step([]int{3, 5}, []int{0, 0}),
422-
step([]int{3, 4}, []int{0, 0}),
423-
step([]int{2, 3}, []int{0, 0}),
424-
step([]int{2, 2}, []int{0, 0}),
425-
step([]int{2, 2}, []int{1, 1}),
426-
step([]int{2, 2}, []int{2, 1}),
427-
step([]int{1, 1}, []int{2, 1}),
428-
step([]int{1, 1}, []int{3, 2}),
429-
step([]int{1, 1}, []int{4, 2}),
430-
step([]int{0, 0}, []int{4, 2}),
418+
step([]int{3, 5}, []int{1, 1}), // scale up (surge uses max(3,4)+1=5, max(5,2)+1=6: 3+1<=5, 5+1<=6)
419+
step([]int{3, 5}, []int{2, 1}), // scale up
420+
step([]int{3, 4}, []int{2, 1}), // scale down decode
421+
step([]int{2, 3}, []int{2, 1}), // scale down
422+
step([]int{2, 3}, []int{3, 2}), // scale up
423+
step([]int{2, 2}, []int{3, 2}), // scale down decode
424+
step([]int{1, 1}, []int{3, 2}), // scale down
425+
step([]int{1, 1}, []int{4, 2}), // scale up (new at target)
426+
step([]int{0, 0}, []int{4, 2}), // drain all old
431427
},
432428
},
433429
{
@@ -454,6 +450,21 @@ func TestComputeAllSteps_ExactSequence(t *testing.T) {
454450
step([]int{0, 0}, []int{4, 6}),
455451
},
456452
},
453+
{
454+
name: "asymmetric_5_3_surge2",
455+
sourcePhase0: 5, sourcePhase1: 3, targetPhase0: 5, targetPhase1: 3,
456+
config: config([]int{2, 2}, []int{0, 0}),
457+
expected: []UpdateStep{
458+
step([]int{5, 3}, []int{0, 0}),
459+
step([]int{5, 3}, []int{2, 1}),
460+
step([]int{4, 2}, []int{2, 1}),
461+
step([]int{3, 2}, []int{2, 1}),
462+
step([]int{3, 2}, []int{4, 2}),
463+
step([]int{2, 1}, []int{4, 2}),
464+
step([]int{2, 1}, []int{5, 3}),
465+
step([]int{0, 0}, []int{5, 3}),
466+
},
467+
},
457468
// Edge cases
458469
{
459470
name: "fresh_deploy_0_0_to_3_3",

disaggregatedset/internal/controller/service_manager.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -64,19 +64,19 @@ func (manager *ServiceManager) ReconcileServices(
6464
// Find revisions where all phases are ready (readyReplicas >= 1)
6565
var readyRevisions []string
6666
for _, group := range groupedWorkloads {
67-
allReady := true
67+
phasesReady := true
6868
logArgs := []interface{}{"revision", group.Revision}
6969

7070
for _, phaseName := range phaseNames {
7171
phaseInfo, hasPhase := group.Phases[phaseName]
7272
if !hasPhase || phaseInfo.ReadyReplicas < 1 {
73-
allReady = false
73+
phasesReady = false
7474
break
7575
}
7676
logArgs = append(logArgs, phaseName+"Ready", phaseInfo.ReadyReplicas)
7777
}
7878

79-
if allReady {
79+
if phasesReady {
8080
readyRevisions = append(readyRevisions, group.Revision)
8181
log.V(1).Info("Revision is ready on all phases", logArgs...)
8282
}
@@ -199,15 +199,15 @@ func (manager *ServiceManager) cleanupDrainedServices(
199199
// Build a set of revisions that still have ready replicas on all phases
200200
readyRevisionSet := make(map[string]bool)
201201
for _, group := range groupedWorkloads {
202-
allReady := true
202+
phasesReady := true
203203
for _, phaseName := range phaseNames {
204204
phaseInfo, hasPhase := group.Phases[phaseName]
205205
if !hasPhase || phaseInfo.ReadyReplicas < 1 {
206-
allReady = false
206+
phasesReady = false
207207
break
208208
}
209209
}
210-
if allReady {
210+
if phasesReady {
211211
readyRevisionSet[group.Revision] = true
212212
}
213213
}

0 commit comments

Comments
 (0)