Skip to content

Commit b2b903d

Browse files
committed
feat: wire V2 saturation analyzer into engine, gated by analyzerName
Integrate the V2 token-based saturation analyzer into the optimization engine behind a config gate (analyzerName: "saturation"). When active, it replaces the V1 percentage-based analyzer inside RunSaturationAnalysis while keeping the rest of the pipeline (enforcer, limiter, decision converter) unchanged via an adapter pattern. Also introduces the CostAwareOptimizer — the first ScalingOptimizer implementation for the V2 pipeline — which handles unlimited-mode multi-variant scaling with cost-based replica allocation. Engine integration: - Add saturationV2Analyzer, capacityStore, and optimizer fields to Engine struct, initialized once in NewEngine() - Gate V2 path in optimize() via analyzerName == "saturation" from global config - optimizeV2() three-stage pipeline: collect ModelScalingRequests, call optimizer.Optimize(), apply enforcer per-model via bridge - Enforcer bridge: extractTargetsFromDecisions, buildVariantAnalysesFromDecisions, applyEnforcedTargetsToDecisions CostAwareOptimizer (unlimited mode): - Scale-up: allocate to most cost-efficient variant (lowest cost/perReplicaCapacity). Variants with pending replicas are NOT skipped — the analyzer already accounts for their capacity in the supply calculation, so RequiredCapacity > 0 means demand exceeds total supply including pending. - Scale-down: remove from most expensive variant (highest absolute cost). The cheapest variant is protected at min 1 replica only when it is the last variant with replicas — this prevents scale-down deadlocks where the expensive variant's per-replica capacity exceeds spare but cheaper replicas could be removed. - Skips variants with zero capacity Limiter infrastructure: - ResourcePool, ResourceConstraints, ConstraintProvider interface for future V2 limited-mode path (GreedyBySaturationOptimizer) - DefaultLimiter implements ConstraintProvider via ComputeConstraints() - TypeInventory.GetResourcePools() for per-type resource availability
1 parent 4edeb8f commit b2b903d

File tree

10 files changed

+1476
-60
lines changed

10 files changed

+1476
-60
lines changed
Lines changed: 297 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,297 @@
1+
package pipeline
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"math"
7+
"sort"
8+
9+
ctrl "sigs.k8s.io/controller-runtime"
10+
11+
"github.com/llm-d-incubation/workload-variant-autoscaler/internal/interfaces"
12+
)
13+
14+
// CostAwareOptimizer is a per-model optimizer that minimizes total cost while
15+
// meeting capacity requirements. It processes each model independently:
16+
//
17+
// - Scale-up: adds replicas to the most cost-efficient variant (lowest cost / perReplicaCapacity)
18+
// - Scale-down: removes replicas from the most expensive variant (highest absolute cost)
19+
// - Only the cheapest variant is protected at >=1 replica; others can scale to 0
20+
// - Variants with pending replicas are skipped for scale-up
21+
//
22+
// This optimizer ignores ResourceConstraints (unlimited mode). For GPU-limited
23+
// environments, use GreedyBySaturationOptimizer instead.
24+
type CostAwareOptimizer struct{}
25+
26+
// NewCostAwareOptimizer creates a new CostAwareOptimizer.
27+
func NewCostAwareOptimizer() *CostAwareOptimizer {
28+
return &CostAwareOptimizer{}
29+
}
30+
31+
// Name returns the optimizer identifier.
32+
func (o *CostAwareOptimizer) Name() string {
33+
return "cost-aware"
34+
}
35+
36+
// Optimize produces VariantDecisions for all models.
37+
// Constraints are ignored in unlimited mode (CostAwareOptimizer).
38+
func (o *CostAwareOptimizer) Optimize(
39+
ctx context.Context,
40+
requests []ModelScalingRequest,
41+
constraints []*ResourceConstraints,
42+
) []interfaces.VariantDecision {
43+
logger := ctrl.LoggerFrom(ctx)
44+
var allDecisions []interfaces.VariantDecision
45+
46+
for _, req := range requests {
47+
if req.Result == nil {
48+
continue
49+
}
50+
51+
stateMap := buildStateMap(req.VariantStates)
52+
vcMap := buildCapacityMap(req.Result.VariantCapacities)
53+
targets := initTargets(req.VariantStates)
54+
55+
if req.Result.RequiredCapacity > 0 {
56+
costAwareScaleUp(ctx, req.Result, vcMap, targets)
57+
} else if req.Result.SpareCapacity > 0 {
58+
costAwareScaleDown(ctx, req.Result, vcMap, targets)
59+
}
60+
61+
decisions := buildDecisions(req, stateMap, vcMap, targets)
62+
logger.V(1).Info("Cost-aware optimizer decisions",
63+
"modelID", req.ModelID,
64+
"decisions", len(decisions))
65+
allDecisions = append(allDecisions, decisions...)
66+
}
67+
68+
return allDecisions
69+
}
70+
71+
// costAwareScaleUp adds replicas to the most cost-efficient variant.
72+
// Sorts by cost-efficiency (cost/perReplicaCapacity) ascending, picks first eligible.
73+
// Pending replicas are not skipped because the analyzer already accounts for their
74+
// capacity in the supply calculation — if RequiredCapacity > 0, demand exceeds total
75+
// supply including pending.
76+
func costAwareScaleUp(
77+
ctx context.Context,
78+
result *interfaces.AnalyzerResult,
79+
vcMap map[string]interfaces.VariantCapacity,
80+
targets map[string]int,
81+
) {
82+
logger := ctrl.LoggerFrom(ctx)
83+
84+
sorted := sortByCostEfficiencyAsc(result.VariantCapacities)
85+
remaining := result.RequiredCapacity
86+
87+
for _, vc := range sorted {
88+
if remaining <= 0 {
89+
break
90+
}
91+
if vc.PerReplicaCapacity <= 0 {
92+
continue
93+
}
94+
95+
replicasNeeded := int(math.Ceil(remaining / vc.PerReplicaCapacity))
96+
targets[vc.VariantName] = targets[vc.VariantName] + replicasNeeded
97+
remaining -= float64(replicasNeeded) * vc.PerReplicaCapacity
98+
99+
logger.V(1).Info("Scale-up allocation",
100+
"variant", vc.VariantName,
101+
"added", replicasNeeded,
102+
"costEfficiency", costEfficiency(vc))
103+
}
104+
}
105+
106+
// costAwareScaleDown removes replicas from the most expensive variant.
107+
// Sorts by absolute cost descending, removes from most expensive first.
108+
// The cheapest variant is protected at min 1 replica only when no other variant
109+
// has replicas — this prevents scale-down deadlocks where the expensive variant's
110+
// per-replica capacity exceeds spare but cheaper replicas could be removed.
111+
func costAwareScaleDown(
112+
ctx context.Context,
113+
result *interfaces.AnalyzerResult,
114+
vcMap map[string]interfaces.VariantCapacity,
115+
targets map[string]int,
116+
) {
117+
logger := ctrl.LoggerFrom(ctx)
118+
119+
sorted := sortByCostDesc(result.VariantCapacities)
120+
cheapest := findCheapestVariant(result.VariantCapacities)
121+
remaining := result.SpareCapacity
122+
123+
for _, vc := range sorted {
124+
if remaining <= 0 {
125+
break
126+
}
127+
if vc.PerReplicaCapacity <= 0 {
128+
continue
129+
}
130+
131+
current := targets[vc.VariantName]
132+
minReplicas := 0
133+
if vc.VariantName == cheapest {
134+
// Protect cheapest at 1 only if it's the last variant with replicas
135+
otherHasReplicas := false
136+
for name, t := range targets {
137+
if name != cheapest && t > 0 {
138+
otherHasReplicas = true
139+
break
140+
}
141+
}
142+
if !otherHasReplicas {
143+
minReplicas = 1
144+
}
145+
}
146+
147+
removable := current - minReplicas
148+
if removable <= 0 {
149+
continue
150+
}
151+
152+
replicasToRemove := int(math.Floor(remaining / vc.PerReplicaCapacity))
153+
if replicasToRemove > removable {
154+
replicasToRemove = removable
155+
}
156+
if replicasToRemove <= 0 {
157+
continue
158+
}
159+
160+
targets[vc.VariantName] = current - replicasToRemove
161+
remaining -= float64(replicasToRemove) * vc.PerReplicaCapacity
162+
163+
logger.V(1).Info("Scale-down allocation",
164+
"variant", vc.VariantName,
165+
"removed", replicasToRemove,
166+
"cost", vc.Cost)
167+
}
168+
}
169+
170+
// buildStateMap creates a lookup map from variant name to VariantReplicaState.
171+
func buildStateMap(states []interfaces.VariantReplicaState) map[string]interfaces.VariantReplicaState {
172+
m := make(map[string]interfaces.VariantReplicaState, len(states))
173+
for _, s := range states {
174+
m[s.VariantName] = s
175+
}
176+
return m
177+
}
178+
179+
// buildCapacityMap creates a lookup map from variant name to VariantCapacity.
180+
func buildCapacityMap(capacities []interfaces.VariantCapacity) map[string]interfaces.VariantCapacity {
181+
m := make(map[string]interfaces.VariantCapacity, len(capacities))
182+
for _, vc := range capacities {
183+
m[vc.VariantName] = vc
184+
}
185+
return m
186+
}
187+
188+
// initTargets creates initial targets from current replica counts.
189+
func initTargets(states []interfaces.VariantReplicaState) map[string]int {
190+
targets := make(map[string]int, len(states))
191+
for _, s := range states {
192+
targets[s.VariantName] = s.CurrentReplicas
193+
}
194+
return targets
195+
}
196+
197+
// findCheapestVariant returns the variant name with the lowest cost.
198+
func findCheapestVariant(capacities []interfaces.VariantCapacity) string {
199+
cheapest := ""
200+
minCost := math.MaxFloat64
201+
for _, vc := range capacities {
202+
if vc.Cost < minCost {
203+
minCost = vc.Cost
204+
cheapest = vc.VariantName
205+
}
206+
}
207+
return cheapest
208+
}
209+
210+
// sortByCostEfficiencyAsc returns variants sorted by cost/perReplicaCapacity ascending.
211+
func sortByCostEfficiencyAsc(capacities []interfaces.VariantCapacity) []interfaces.VariantCapacity {
212+
sorted := make([]interfaces.VariantCapacity, len(capacities))
213+
copy(sorted, capacities)
214+
sort.Slice(sorted, func(i, j int) bool {
215+
return costEfficiency(sorted[i]) < costEfficiency(sorted[j])
216+
})
217+
return sorted
218+
}
219+
220+
// sortByCostDesc returns variants sorted by absolute cost descending.
221+
func sortByCostDesc(capacities []interfaces.VariantCapacity) []interfaces.VariantCapacity {
222+
sorted := make([]interfaces.VariantCapacity, len(capacities))
223+
copy(sorted, capacities)
224+
sort.Slice(sorted, func(i, j int) bool {
225+
return sorted[i].Cost > sorted[j].Cost
226+
})
227+
return sorted
228+
}
229+
230+
// costEfficiency returns the cost per unit of capacity.
231+
func costEfficiency(vc interfaces.VariantCapacity) float64 {
232+
if vc.PerReplicaCapacity <= 0 {
233+
return math.MaxFloat64
234+
}
235+
return vc.Cost / vc.PerReplicaCapacity
236+
}
237+
238+
// buildDecisions converts targets map into VariantDecision slice.
239+
func buildDecisions(
240+
req ModelScalingRequest,
241+
stateMap map[string]interfaces.VariantReplicaState,
242+
vcMap map[string]interfaces.VariantCapacity,
243+
targets map[string]int,
244+
) []interfaces.VariantDecision {
245+
decisions := make([]interfaces.VariantDecision, 0, len(targets))
246+
for name, target := range targets {
247+
state := stateMap[name]
248+
vc := vcMap[name]
249+
250+
var action interfaces.SaturationAction
251+
var reason string
252+
switch {
253+
case target > state.CurrentReplicas:
254+
action = interfaces.ActionScaleUp
255+
reason = fmt.Sprintf("V2 scale-up (optimizer: cost-aware, required: %.0f)", req.Result.RequiredCapacity)
256+
case target < state.CurrentReplicas:
257+
action = interfaces.ActionScaleDown
258+
reason = fmt.Sprintf("V2 scale-down (optimizer: cost-aware, spare: %.0f)", req.Result.SpareCapacity)
259+
default:
260+
action = interfaces.ActionNoChange
261+
reason = "V2 steady state"
262+
}
263+
264+
decisions = append(decisions, interfaces.VariantDecision{
265+
VariantName: name,
266+
ModelID: req.ModelID,
267+
Namespace: req.Namespace,
268+
AcceleratorName: vc.AcceleratorName,
269+
Cost: vc.Cost,
270+
CurrentReplicas: state.CurrentReplicas,
271+
TargetReplicas: target,
272+
Action: action,
273+
Reason: reason,
274+
})
275+
}
276+
return decisions
277+
}
278+
279+
// mergeConstraints combines constraints from multiple providers.
280+
// Currently unused in CostAwareOptimizer but available for limited mode.
281+
func mergeConstraints(constraints []*ResourceConstraints) map[string]int {
282+
merged := make(map[string]int)
283+
for _, c := range constraints {
284+
if c == nil {
285+
continue
286+
}
287+
for accType, pool := range c.Pools {
288+
if existing, ok := merged[accType]; !ok || pool.Available < existing {
289+
merged[accType] = pool.Available
290+
}
291+
}
292+
}
293+
return merged
294+
}
295+
296+
// Ensure CostAwareOptimizer implements ScalingOptimizer
297+
var _ ScalingOptimizer = (*CostAwareOptimizer)(nil)

0 commit comments

Comments
 (0)