Skip to content

Commit ab56d4a

Browse files
Add scorer dependencies to ensure SLO scorer has the requisit scores from other plugins
1 parent 620715a commit ab56d4a

File tree

10 files changed

+137
-7
lines changed

10 files changed

+137
-7
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ require (
77
github.com/elastic/crd-ref-docs v0.2.0
88
github.com/envoyproxy/go-control-plane/envoy v1.35.0
99
github.com/go-logr/logr v1.4.3
10+
github.com/go-logr/zapr v1.3.0
1011
github.com/google/go-cmp v0.7.0
1112
github.com/google/uuid v1.6.0
1213
github.com/hashicorp/golang-lru/v2 v2.0.7
@@ -61,7 +62,6 @@ require (
6162
github.com/fsnotify/fsnotify v1.9.0 // indirect
6263
github.com/fxamacker/cbor/v2 v2.9.0 // indirect
6364
github.com/go-logr/stdr v1.2.2 // indirect
64-
github.com/go-logr/zapr v1.3.0 // indirect
6565
github.com/go-openapi/jsonpointer v0.21.2 // indirect
6666
github.com/go-openapi/jsonreference v0.21.0 // indirect
6767
github.com/go-openapi/swag v0.23.1 // indirect

pkg/epp/scheduling/framework/plugins.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ type Filter interface {
6161
type Scorer interface {
6262
plugins.Plugin
6363
Score(ctx context.Context, cycleState *types.CycleState, request *types.LLMRequest, pods []types.Pod) map[types.Pod]float64
64+
Dependencies() []plugins.TypedName
6465
}
6566

6667
// Picker picks the final pod(s) to send the request to.

pkg/epp/scheduling/framework/plugins/multi/prefix/plugin.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,11 @@ type Plugin struct {
9191
wg sync.WaitGroup
9292
}
9393

94+
// Dependencies implements framework.Scorer.
95+
func (p *Plugin) Dependencies() []plugins.TypedName {
96+
return []plugins.TypedName{} // No dependencies
97+
}
98+
9499
// podSet holds an pods servers that may have a specific prefix hash.
95100
type podSet map[ServerID]struct{}
96101

pkg/epp/scheduling/framework/plugins/multi/slo_aware_router/plugin.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,12 @@ type SLOAwareRouter struct {
219219
headroomStrategy HeadroomStrategy
220220
}
221221

222+
func (s *SLOAwareRouter) Dependencies() []plugins.TypedName {
223+
return []plugins.TypedName{
224+
{Type: "prefix-cache-scorer", Name: "prefix-cache-scorer"},
225+
}
226+
}
227+
222228
var _ framework.Scorer = &SLOAwareRouter{}
223229

224230
func NewSLOAwareRouter(latencypredictor latencypredictor.PredictorInterface, strategy HeadroomStrategy) *SLOAwareRouter {

pkg/epp/scheduling/framework/plugins/scorer/kvcache_utilization.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,10 @@ type KVCacheUtilizationScorer struct {
5050
typedName plugins.TypedName
5151
}
5252

53+
func (s *KVCacheUtilizationScorer) Dependencies() []plugins.TypedName {
54+
return []plugins.TypedName{} // No dependencies
55+
}
56+
5357
// TypedName returns the type and name tuple of this plugin instance.
5458
func (s *KVCacheUtilizationScorer) TypedName() plugins.TypedName {
5559
return s.typedName

pkg/epp/scheduling/framework/plugins/scorer/lora_affinity.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,10 @@ type LoraAffinityScorer struct {
5050
tn plugins.TypedName
5151
}
5252

53+
func (s *LoraAffinityScorer) Dependencies() []plugins.TypedName {
54+
return []plugins.TypedName{} // No dependencies
55+
}
56+
5357
// TypedName returns the type and name tuple of this plugin instance.
5458
func (s *LoraAffinityScorer) TypedName() plugins.TypedName {
5559
return s.tn

pkg/epp/scheduling/framework/plugins/scorer/queue.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,10 @@ type QueueScorer struct {
5252
typedName plugins.TypedName
5353
}
5454

55+
func (s *QueueScorer) Dependencies() []plugins.TypedName {
56+
return []plugins.TypedName{} // No dependencies
57+
}
58+
5559
// TypedName returns the type and name tuple of this plugin instance.
5660
func (s *QueueScorer) TypedName() plugins.TypedName {
5761
return s.typedName

pkg/epp/scheduling/framework/scheduler_profile.go

Lines changed: 80 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -151,18 +151,25 @@ func (p *SchedulerProfile) runScorerPlugins(ctx context.Context, request *types.
151151
logger := log.FromContext(ctx)
152152
logger.V(logutil.DEBUG).Info("Before running scorer plugins", "pods", pods)
153153

154+
sortedScorers, err := p.topologicalSortScorers()
155+
if err != nil {
156+
logger.Error(err, "Failed to resolve scorer dependencies")
157+
// Fallback to original order if dependency resolution fails
158+
sortedScorers = p.scorers
159+
}
160+
154161
weightedScorePerPod := make(map[types.Pod]float64, len(pods))
155162
for _, pod := range pods {
156163
weightedScorePerPod[pod] = float64(0) // initialize weighted score per pod with 0 value
157164
}
158165
// Iterate through each scorer in the chain and accumulate the weighted scores.
159-
for _, scorer := range p.scorers {
166+
for _, scorer := range sortedScorers {
160167
logger.V(logutil.DEBUG).Info("Running scorer plugin", "plugin", scorer.TypedName())
161168
before := time.Now()
162169
scores := scorer.Score(ctx, cycleState, request, pods)
163170
metrics.RecordPluginProcessingLatency(ScorerExtensionPoint, scorer.TypedName().Type, scorer.TypedName().Name, time.Since(before))
164171
for pod, score := range scores { // weight is relative to the sum of weights
165-
logger.V(logutil.DEBUG).Info("Calculated score", "plugin", scorer.TypedName(), "endpoint", pod.GetPod().NamespacedName, "score", score)
172+
logger.V(logutil.DEBUG).Info("Calculated score", "plugin", scorer.TypedName(), "endpoint", pod.GetPod().NamespacedName, "score", score, "weight", scorer.Weight())
166173
weightedScorePerPod[pod] += enforceScoreRange(score) * float64(scorer.Weight())
167174
}
168175
logger.V(logutil.DEBUG).Info("Completed running scorer plugin successfully", "plugin", scorer.TypedName())
@@ -190,6 +197,77 @@ func (p *SchedulerProfile) runPickerPlugin(ctx context.Context, cycleState *type
190197
return result
191198
}
192199

200+
func (p *SchedulerProfile) topologicalSortScorers() ([]*WeightedScorer, error) {
201+
if len(p.scorers) == 0 {
202+
return p.scorers, nil
203+
}
204+
205+
// Create maps for efficient lookups
206+
scorerByName := make(map[string]*WeightedScorer)
207+
inDegree := make(map[string]int)
208+
adjList := make(map[string][]string)
209+
210+
// Initialize data structures
211+
for _, scorer := range p.scorers {
212+
name := scorer.TypedName().String()
213+
scorerByName[name] = scorer
214+
inDegree[name] = 0
215+
adjList[name] = []string{}
216+
}
217+
218+
// Build adjacency list and calculate in-degrees
219+
for _, scorer := range p.scorers {
220+
scorerName := scorer.TypedName().String()
221+
for _, dep := range scorer.Dependencies() {
222+
depName := dep.String()
223+
224+
// Check if dependency exists in our scorer list
225+
if _, exists := scorerByName[depName]; !exists {
226+
return nil, fmt.Errorf("scorer '%s' depends on '%s' which is not registered in the profile", scorerName, depName)
227+
}
228+
229+
// Add edge: dependency -> dependent
230+
adjList[depName] = append(adjList[depName], scorerName)
231+
inDegree[scorerName]++
232+
}
233+
}
234+
235+
// Kahn's algorithm for topological sorting
236+
var queue []string
237+
var result []*WeightedScorer
238+
239+
// Find all nodes with no incoming edges
240+
for name, degree := range inDegree {
241+
if degree == 0 {
242+
queue = append(queue, name)
243+
}
244+
}
245+
246+
for len(queue) > 0 {
247+
// Remove a node from queue
248+
current := queue[0]
249+
queue = queue[1:]
250+
251+
// Add to result
252+
result = append(result, scorerByName[current])
253+
254+
// For each neighbor of current node
255+
for _, neighbor := range adjList[current] {
256+
inDegree[neighbor]--
257+
if inDegree[neighbor] == 0 {
258+
queue = append(queue, neighbor)
259+
}
260+
}
261+
}
262+
263+
// Check for cycles
264+
if len(result) != len(p.scorers) {
265+
return nil, fmt.Errorf("circular dependency detected in scorer plugins")
266+
}
267+
268+
return result, nil
269+
}
270+
193271
func enforceScoreRange(score float64) float64 {
194272
if score < 0 {
195273
return 0

pkg/epp/scheduling/framework/scheduler_profile_test.go

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ func TestSchedulePlugins(t *testing.T) {
144144
},
145145
}
146146

147-
if diff := cmp.Diff(wantRes, got); diff != "" {
147+
if diff := cmp.Diff(wantRes, got, podScoresTransformer); diff != "" {
148148
t.Errorf("Unexpected output (-want +got): %v", diff)
149149
}
150150
// Validate plugin execution counts dynamically
@@ -182,6 +182,21 @@ var _ Filter = &testPlugin{}
182182
var _ Scorer = &testPlugin{}
183183
var _ Picker = &testPlugin{}
184184

185+
// podScoresTransformer converts a map keyed by types.Pod into a map keyed by
186+
// the pod's unique name string. This allows cmp.Diff to compare the maps based
187+
// on their semantic content rather than on pointer addresses.
188+
var podScoresTransformer = cmp.Transformer("podScores", func(in map[types.Pod]float64) map[string]float64 {
189+
out := make(map[string]float64, len(in))
190+
if in == nil {
191+
return nil
192+
}
193+
for pod, score := range in {
194+
// Use the pod's unique NamespacedName as the stable key
195+
out[pod.GetPod().NamespacedName.String()] = score
196+
}
197+
return out
198+
})
199+
185200
// testPlugin is an implementation useful in unit tests.
186201
type testPlugin struct {
187202
typedName plugins.TypedName
@@ -197,6 +212,11 @@ type testPlugin struct {
197212
WinnerPodScore float64
198213
}
199214

215+
// Dependencies implements Scorer.
216+
func (tp *testPlugin) Dependencies() []plugins.TypedName {
217+
return []plugins.TypedName{} // No dependencies
218+
}
219+
200220
func (tp *testPlugin) TypedName() plugins.TypedName {
201221
return tp.typedName
202222
}

pkg/epp/scheduling/framework/weighted_scorer.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,21 +16,29 @@ limitations under the License.
1616

1717
package framework
1818

19+
import "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
20+
1921
// NewWeightedScorer initializes a new WeightedScorer and returns its pointer.
2022
func NewWeightedScorer(scorer Scorer, weight int) *WeightedScorer {
2123
return &WeightedScorer{
22-
Scorer: scorer,
23-
weight: weight,
24+
Scorer: scorer,
25+
weight: weight,
26+
dependencies: scorer.Dependencies(),
2427
}
2528
}
2629

2730
// WeightedScorer is a struct that encapsulates a scorer with its weight.
2831
type WeightedScorer struct {
2932
Scorer
30-
weight int
33+
weight int
34+
dependencies []plugins.TypedName
3135
}
3236

3337
// Weight returns the weight of the scorer.
3438
func (s *WeightedScorer) Weight() int {
3539
return s.weight
3640
}
41+
42+
func (ws *WeightedScorer) Dependencies() []plugins.TypedName {
43+
return ws.dependencies
44+
}

0 commit comments

Comments
 (0)