Skip to content

Commit e958cbd

Browse files
authored
scheduler redesign continuation (#937)
* implement agreed points on scheduler redesign Signed-off-by: Nir Rozenbaum <[email protected]> * addressed code review comments Signed-off-by: Nir Rozenbaum <[email protected]> --------- Signed-off-by: Nir Rozenbaum <[email protected]>
1 parent 1698146 commit e958cbd

File tree

17 files changed

+150
-108
lines changed

17 files changed

+150
-108
lines changed

cmd/epp/main.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ import (
4747
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework"
4848
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/multi/prefix"
4949
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/picker"
50-
profilepicker "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/profile-picker"
50+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/profile"
5151
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/scorer"
5252
runserver "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/server"
5353
envutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/env"
@@ -222,7 +222,7 @@ func run() error {
222222
}
223223
}
224224

225-
schedulerConfig := scheduling.NewSchedulerConfig(profilepicker.NewAllProfilesPicker(), map[string]*framework.SchedulerProfile{"schedulerv2": schedulerProfile})
225+
schedulerConfig := scheduling.NewSchedulerConfig(profile.NewSingleProfileHandler(), map[string]*framework.SchedulerProfile{"schedulerv2": schedulerProfile})
226226
scheduler = scheduling.NewSchedulerWithConfig(datastore, schedulerConfig)
227227
}
228228

conformance/testing-epp/scheduler.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,17 @@ import (
2121
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling"
2222
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework"
2323
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/picker"
24-
profilepicker "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/profile-picker"
24+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/profile"
2525
)
2626

2727
// NewReqHeaderBasedScheduler creates a scheduler for conformance tests that selects
2828
// an endpoint based on the "test-epp-endpoint-selection" request header. If the
2929
// header is missing or the specified endpoint doesn't exist, no endpoint is returned.
3030
func NewReqHeaderBasedScheduler(datastore scheduling.Datastore) *scheduling.Scheduler {
31-
predicatableSchedulerProfile := framework.NewSchedulerProfile().WithFilters(filter.NewHeaderBasedTestingFilter()).WithPicker(picker.NewMaxScorePicker())
31+
predicatableSchedulerProfile := framework.NewSchedulerProfile().
32+
WithFilters(filter.NewHeaderBasedTestingFilter()).
33+
WithPicker(picker.NewMaxScorePicker())
34+
3235
return scheduling.NewSchedulerWithConfig(datastore, scheduling.NewSchedulerConfig(
33-
profilepicker.NewAllProfilesPicker(), map[string]*framework.SchedulerProfile{"req-header-based-profile": predicatableSchedulerProfile}))
36+
profile.NewSingleProfileHandler(), map[string]*framework.SchedulerProfile{"req-header-based-profile": predicatableSchedulerProfile}))
3437
}

conformance/testing-epp/sheduler_test.go

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ func TestSchedule(t *testing.T) {
3333
name string
3434
input []*backendmetrics.FakePodMetrics
3535
req *types.LLMRequest
36-
wantRes map[string]*types.Result
36+
wantRes *types.SchedulingResult
3737
err bool
3838
}{
3939
{
@@ -79,17 +79,20 @@ func TestSchedule(t *testing.T) {
7979
Headers: map[string]string{"test-epp-endpoint-selection": "matched-endpoint"},
8080
RequestId: uuid.NewString(),
8181
},
82-
wantRes: map[string]*types.Result{
83-
"req-header-based-profile": {
84-
TargetPod: &types.ScoredPod{
85-
Pod: &types.PodMetrics{
86-
Pod: &backend.Pod{
87-
Address: "matched-endpoint",
88-
Labels: map[string]string{},
82+
wantRes: &types.SchedulingResult{
83+
ProfileResults: map[string]*types.ProfileRunResult{
84+
"req-header-based-profile": {
85+
TargetPod: &types.ScoredPod{
86+
Pod: &types.PodMetrics{
87+
Pod: &backend.Pod{
88+
Address: "matched-endpoint",
89+
Labels: map[string]string{},
90+
},
8991
},
9092
},
9193
},
9294
},
95+
PrimaryProfileName: "req-header-based-profile",
9396
},
9497
},
9598
}

pkg/epp/requestcontrol/director.go

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ import (
4040

4141
// Scheduler defines the interface required by the Director for scheduling.
4242
type Scheduler interface {
43-
Schedule(ctx context.Context, b *schedulingtypes.LLMRequest) (result map[string]*schedulingtypes.Result, err error)
43+
Schedule(ctx context.Context, b *schedulingtypes.LLMRequest) (result *schedulingtypes.SchedulingResult, err error)
4444
}
4545

4646
// SaturationDetector provides a signal indicating whether the backends are considered saturated.
@@ -171,7 +171,7 @@ func (d *Director) PreDispatch(ctx context.Context, reqCtx *handlers.RequestCont
171171
}
172172

173173
// Dispatch runs one or many scheduling cycles.
174-
func (d *Director) Dispatch(ctx context.Context, llmReq *schedulingtypes.LLMRequest) (map[string]*schedulingtypes.Result, error) {
174+
func (d *Director) Dispatch(ctx context.Context, llmReq *schedulingtypes.LLMRequest) (*schedulingtypes.SchedulingResult, error) {
175175
var err error
176176
res, err := d.scheduler.Schedule(ctx, llmReq)
177177
if err != nil {
@@ -182,17 +182,14 @@ func (d *Director) Dispatch(ctx context.Context, llmReq *schedulingtypes.LLMRequ
182182
}
183183

184184
// PostDispatch populates the RequestContext based on scheduling results.
185-
func (d *Director) PostDispatch(ctx context.Context, reqCtx *handlers.RequestContext, results map[string]*schedulingtypes.Result) (*handlers.RequestContext, error) {
185+
func (d *Director) PostDispatch(ctx context.Context, reqCtx *handlers.RequestContext, result *schedulingtypes.SchedulingResult) (*handlers.RequestContext, error) {
186186
logger := log.FromContext(ctx)
187187
// currently only get a single result. Will refactor to pluggably implement the PostSchedule
188-
if len(results) == 0 {
188+
if result == nil || len(result.ProfileResults) == 0 {
189189
return reqCtx, errutil.Error{Code: errutil.Internal, Msg: "results must be greater than zero"}
190190
}
191-
var targetPod *backend.Pod
192-
// TODO should handle multi cycle results, this should be pluggable logic
193-
for _, result := range results {
194-
targetPod = result.TargetPod.GetPod()
195-
}
191+
// primary profile is used to set destination
192+
targetPod := result.ProfileResults[result.PrimaryProfileName].TargetPod.GetPod()
196193

197194
pool, err := d.datastore.PoolGet()
198195
if err != nil {

pkg/epp/requestcontrol/director_test.go

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -54,11 +54,11 @@ func (m *mockSaturationDetector) IsSaturated(_ context.Context) bool {
5454
}
5555

5656
type mockScheduler struct {
57-
scheduleResults map[string]*schedulingtypes.Result
57+
scheduleResults *schedulingtypes.SchedulingResult
5858
scheduleErr error
5959
}
6060

61-
func (m *mockScheduler) Schedule(ctx context.Context, req *schedulingtypes.LLMRequest) (map[string]*schedulingtypes.Result, error) {
61+
func (m *mockScheduler) Schedule(ctx context.Context, req *schedulingtypes.LLMRequest) (*schedulingtypes.SchedulingResult, error) {
6262
return m.scheduleResults, m.scheduleErr
6363
}
6464

@@ -126,17 +126,20 @@ func TestDirector_HandleRequest(t *testing.T) {
126126
}
127127
ds.PodUpdateOrAddIfNotExist(testPod)
128128

129-
defaultSuccessfulScheduleResults := map[string]*schedulingtypes.Result{
130-
"testProfile": {
131-
TargetPod: &schedulingtypes.ScoredPod{
132-
Pod: &schedulingtypes.PodMetrics{
133-
Pod: &backend.Pod{
134-
Address: "192.168.1.100",
135-
NamespacedName: k8stypes.NamespacedName{Name: "pod1", Namespace: "default"},
129+
defaultSuccessfulScheduleResults := &schedulingtypes.SchedulingResult{
130+
ProfileResults: map[string]*schedulingtypes.ProfileRunResult{
131+
"testProfile": {
132+
TargetPod: &schedulingtypes.ScoredPod{
133+
Pod: &schedulingtypes.PodMetrics{
134+
Pod: &backend.Pod{
135+
Address: "192.168.1.100",
136+
NamespacedName: k8stypes.NamespacedName{Name: "pod1", Namespace: "default"},
137+
},
136138
},
137139
},
138140
},
139141
},
142+
PrimaryProfileName: "testProfile",
140143
}
141144

142145
tests := []struct {

pkg/epp/scheduling/framework/plugins.go

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,18 +24,26 @@ import (
2424
)
2525

2626
const (
27-
ProfilePickerType = "ProfilePicker"
28-
FilterPluginType = "Filter"
29-
ScorerPluginType = "Scorer"
30-
PickerPluginType = "Picker"
31-
PostCyclePluginType = "PostCycle"
27+
ProfilePickerType = "ProfilePicker"
28+
FilterPluginType = "Filter"
29+
ScorerPluginType = "Scorer"
30+
PickerPluginType = "Picker"
31+
PostCyclePluginType = "PostCycle"
32+
ProcessProfilesResultsType = "ProcessProfilesResults"
3233
)
3334

34-
// ProfilePicker selects the SchedulingProfiles to run from a list of candidate profiles, while taking into consideration the request properties
35-
// and the previously executed SchedluderProfile cycles along with their results.
36-
type ProfilePicker interface {
35+
// ProfileHandler defines the extension points for handling multi SchedulerProfile instances.
36+
// More specifically, this interface defines the 'Pick' and 'ProcessResults' extension points.
37+
type ProfileHandler interface {
3738
plugins.Plugin
38-
Pick(ctx context.Context, request *types.LLMRequest, profiles map[string]*SchedulerProfile, executionResults map[string]*types.Result) map[string]*SchedulerProfile
39+
// Pick selects the SchedulingProfiles to run from a list of candidate profiles, while taking into consideration the request properties
40+
// and the previously executed SchedluderProfile cycles along with their results.
41+
Pick(ctx context.Context, request *types.LLMRequest, profiles map[string]*SchedulerProfile, profileResults map[string]*types.ProfileRunResult) map[string]*SchedulerProfile
42+
43+
// ProcessResults handles the outcome of the profile runs after all profiles ran succuessfully.
44+
// It may aggregate results, log test profile outputs, or apply custom logic. It specifies in the SchedulingResult the
45+
// key of the primary profile that should be used to get the request selected destination.
46+
ProcessResults(ctx context.Context, request *types.LLMRequest, profileResults map[string]*types.ProfileRunResult) *types.SchedulingResult
3947
}
4048

4149
// Filter defines the interface for filtering a list of pods based on context.
@@ -54,11 +62,12 @@ type Scorer interface {
5462
// Picker picks the final pod(s) to send the request to.
5563
type Picker interface {
5664
plugins.Plugin
57-
Pick(ctx context.Context, cycleState *types.CycleState, scoredPods []*types.ScoredPod) *types.Result
65+
Pick(ctx context.Context, cycleState *types.CycleState, scoredPods []*types.ScoredPod) *types.ProfileRunResult
5866
}
5967

6068
// PostCycle is called by the scheduler after it selects a targetPod for the request in the SchedulerProfile cycle.
69+
// DEPRECATED - do not use PostCycle. this is in the process of deprecation.
6170
type PostCycle interface {
6271
plugins.Plugin
63-
PostCycle(ctx context.Context, cycleState *types.CycleState, res *types.Result)
72+
PostCycle(ctx context.Context, cycleState *types.CycleState, res *types.ProfileRunResult)
6473
}

pkg/epp/scheduling/framework/plugins/multi/prefix/plugin.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ func (m *Plugin) Score(ctx context.Context, request *types.LLMRequest, cycleStat
167167
}
168168

169169
// PostCycle records in the plugin cache the result of the scheduling selection.
170-
func (m *Plugin) PostCycle(ctx context.Context, cycleState *types.CycleState, res *types.Result) {
170+
func (m *Plugin) PostCycle(ctx context.Context, cycleState *types.CycleState, res *types.ProfileRunResult) {
171171
targetPod := res.TargetPod.GetPod()
172172
state, err := m.getPrefixState(cycleState)
173173
if err != nil {

pkg/epp/scheduling/framework/plugins/multi/prefix/plugin_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ func TestPrefixPlugin(t *testing.T) {
5656
assert.Equal(t, float64(0), scores[pod2], "score for pod2")
5757

5858
// Simulate pod1 was picked.
59-
plugin.PostCycle(context.Background(), cycleState1, &types.Result{TargetPod: pod1})
59+
plugin.PostCycle(context.Background(), cycleState1, &types.ProfileRunResult{TargetPod: pod1})
6060

6161
// Second request doesn't share any prefix with first one. It should be added to the cache but
6262
// the pod score should be 0.
@@ -77,7 +77,7 @@ func TestPrefixPlugin(t *testing.T) {
7777
assert.Equal(t, float64(0), scores[pod2], "score for pod2")
7878

7979
// Simulate pod2 was picked.
80-
plugin.PostCycle(context.Background(), cycleState2, &types.Result{TargetPod: pod2})
80+
plugin.PostCycle(context.Background(), cycleState2, &types.ProfileRunResult{TargetPod: pod2})
8181

8282
// Third request shares partial prefix with first one.
8383
req3 := &types.LLMRequest{
@@ -96,7 +96,7 @@ func TestPrefixPlugin(t *testing.T) {
9696
assert.Equal(t, float64(2)/float64(3), scores[pod1], "score should be 2/3 - the model and the first prefix block match")
9797
assert.Equal(t, float64(0), scores[pod2], "score for pod2")
9898

99-
plugin.PostCycle(context.Background(), cycleState3, &types.Result{TargetPod: pod1})
99+
plugin.PostCycle(context.Background(), cycleState3, &types.ProfileRunResult{TargetPod: pod1})
100100

101101
// 4th request is same as req3 except the model is different, still no match.
102102
req4 := &types.LLMRequest{
@@ -115,7 +115,7 @@ func TestPrefixPlugin(t *testing.T) {
115115
assert.Equal(t, float64(0), scores[pod1], "score for pod1")
116116
assert.Equal(t, float64(0), scores[pod2], "score for pod2")
117117

118-
plugin.PostCycle(context.Background(), cycleState4, &types.Result{TargetPod: pod1})
118+
plugin.PostCycle(context.Background(), cycleState4, &types.ProfileRunResult{TargetPod: pod1})
119119

120120
// 5th request shares partial prefix with 3rd one.
121121
req5 := &types.LLMRequest{
@@ -134,5 +134,5 @@ func TestPrefixPlugin(t *testing.T) {
134134
assert.Equal(t, 0.75, scores[pod1], "score should be 0.75 - the model and the first 2 prefix blocks match")
135135
assert.Equal(t, float64(0), scores[pod2], "score for pod2")
136136

137-
plugin.PostCycle(context.Background(), cycleState5, &types.Result{TargetPod: pod1})
137+
plugin.PostCycle(context.Background(), cycleState5, &types.ProfileRunResult{TargetPod: pod1})
138138
}

pkg/epp/scheduling/framework/plugins/picker/max_score_picker.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ func (p *MaxScorePicker) Name() string {
4747
}
4848

4949
// Pick selects the pod with the maximum score from the list of candidates.
50-
func (p *MaxScorePicker) Pick(ctx context.Context, cycleState *types.CycleState, scoredPods []*types.ScoredPod) *types.Result {
50+
func (p *MaxScorePicker) Pick(ctx context.Context, cycleState *types.CycleState, scoredPods []*types.ScoredPod) *types.ProfileRunResult {
5151
log.FromContext(ctx).V(logutil.DEBUG).Info(fmt.Sprintf("Selecting a pod with the max score from %d candidates: %+v", len(scoredPods), scoredPods))
5252

5353
highestScorePods := []*types.ScoredPod{}
@@ -65,5 +65,5 @@ func (p *MaxScorePicker) Pick(ctx context.Context, cycleState *types.CycleState,
6565
return p.random.Pick(ctx, cycleState, highestScorePods) // pick randomly from the highest score pods
6666
}
6767

68-
return &types.Result{TargetPod: highestScorePods[0]}
68+
return &types.ProfileRunResult{TargetPod: highestScorePods[0]}
6969
}

pkg/epp/scheduling/framework/plugins/picker/random_picker.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,8 @@ func (p *RandomPicker) Name() string {
4444
}
4545

4646
// Pick selects a random pod from the list of candidates.
47-
func (p *RandomPicker) Pick(ctx context.Context, _ *types.CycleState, scoredPods []*types.ScoredPod) *types.Result {
47+
func (p *RandomPicker) Pick(ctx context.Context, _ *types.CycleState, scoredPods []*types.ScoredPod) *types.ProfileRunResult {
4848
log.FromContext(ctx).V(logutil.DEBUG).Info(fmt.Sprintf("Selecting a random pod from %d candidates: %+v", len(scoredPods), scoredPods))
4949
i := rand.Intn(len(scoredPods))
50-
return &types.Result{TargetPod: scoredPods[i]}
50+
return &types.ProfileRunResult{TargetPod: scoredPods[i]}
5151
}

0 commit comments

Comments
 (0)