Skip to content

Commit c46a7e7

Browse files
authored
remove NewScheduler function (#1153)
* initial update to load plugins from file Signed-off-by: Nir Rozenbaum <[email protected]> * remove NewScheduler function and all its usage Signed-off-by: Nir Rozenbaum <[email protected]> --------- Signed-off-by: Nir Rozenbaum <[email protected]>
1 parent e696e68 commit c46a7e7

File tree

4 files changed

+89
-63
lines changed

4 files changed

+89
-63
lines changed

cmd/epp/runner/runner.go

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -272,16 +272,16 @@ func (r *Runner) Run(ctx context.Context) error {
272272
}
273273
}
274274

275-
err = r.parseConfiguration(ctx)
275+
err = r.parsePluginsConfiguration(ctx)
276276
if err != nil {
277-
setupLog.Error(err, "Failed to parse the configuration")
277+
setupLog.Error(err, "Failed to parse plugins configuration")
278278
return err
279279
}
280280

281281
// --- Initialize Core EPP Components ---
282-
scheduler := r.initializeScheduler()
282+
scheduler := scheduling.NewSchedulerWithConfig(r.schedulerConfig)
283283

284-
saturationDetector := saturationdetector.NewDetector(sdConfig, datastore, ctrl.Log)
284+
saturationDetector := saturationdetector.NewDetector(sdConfig, datastore, setupLog)
285285

286286
director := requestcontrol.NewDirectorWithConfig(datastore, scheduler, saturationDetector, r.requestControlConfig)
287287

@@ -326,16 +326,7 @@ func (r *Runner) Run(ctx context.Context) error {
326326
return nil
327327
}
328328

329-
func (r *Runner) initializeScheduler() *scheduling.Scheduler {
330-
if r.schedulerConfig != nil {
331-
return scheduling.NewSchedulerWithConfig(r.schedulerConfig)
332-
}
333-
334-
// otherwise, no one configured from outside scheduler config. use existing configuration
335-
return scheduling.NewScheduler()
336-
}
337-
338-
func (r *Runner) parseConfiguration(ctx context.Context) error {
329+
func (r *Runner) parsePluginsConfiguration(ctx context.Context) error {
339330
if *configText == "" && *configFile == "" {
340331
return nil // configuring through code, not through file
341332
}

pkg/epp/scheduling/scheduler.go

Lines changed: 0 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,7 @@ import (
2525
"sigs.k8s.io/controller-runtime/pkg/log"
2626
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
2727
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
28-
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/config"
2928
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework"
30-
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/filter"
31-
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/picker"
32-
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/profile"
3329
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
3430
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
3531
)
@@ -38,46 +34,6 @@ type Datastore interface {
3834
PodGetAll() []backendmetrics.PodMetrics
3935
}
4036

41-
// NewScheduler returns a new scheduler with default scheduler plugins configuration.
42-
func NewScheduler() *Scheduler {
43-
// When the scheduler is initialized with NewScheduler function, thw below config will be used as default.
44-
// it's possible to call NewSchedulerWithConfig to pass a different scheduler config.
45-
// For build time plugins changes, it's recommended to call in main.go to NewSchedulerWithConfig.
46-
loraAffinityFilter := filter.NewLoraAffinityFilter(config.Conf.LoraAffinityThreshold)
47-
leastQueueFilter := filter.NewLeastQueueFilter()
48-
leastKvCacheFilter := filter.NewLeastKVCacheFilter()
49-
50-
lowLatencyFilter := &filter.DecisionTreeFilter{
51-
Current: filter.NewLowQueueFilter(config.Conf.QueueingThresholdLoRA),
52-
NextOnSuccess: &filter.DecisionTreeFilter{
53-
Current: loraAffinityFilter,
54-
NextOnSuccessOrFailure: &filter.DecisionTreeFilter{
55-
Current: leastQueueFilter,
56-
NextOnSuccessOrFailure: &filter.DecisionTreeFilter{
57-
Current: leastKvCacheFilter,
58-
},
59-
},
60-
},
61-
NextOnFailure: &filter.DecisionTreeFilter{
62-
Current: leastQueueFilter,
63-
NextOnSuccessOrFailure: &filter.DecisionTreeFilter{
64-
Current: loraAffinityFilter,
65-
NextOnSuccessOrFailure: &filter.DecisionTreeFilter{
66-
Current: leastKvCacheFilter,
67-
},
68-
},
69-
},
70-
}
71-
72-
defaultProfile := framework.NewSchedulerProfile().
73-
WithFilters(lowLatencyFilter).
74-
WithPicker(picker.NewRandomPicker(picker.DefaultMaxNumOfEndpoints))
75-
76-
profileHandler := profile.NewSingleProfileHandler()
77-
78-
return NewSchedulerWithConfig(NewSchedulerConfig(profileHandler, map[string]*framework.SchedulerProfile{"default": defaultProfile}))
79-
}
80-
8137
// NewSchedulerWithConfig returns a new scheduler with the given scheduler plugins configuration.
8238
func NewSchedulerWithConfig(config *SchedulerConfig) *Scheduler {
8339
return &Scheduler{

pkg/epp/scheduling/scheduler_test.go

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,50 @@ import (
2525
k8stypes "k8s.io/apimachinery/pkg/types"
2626
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend"
2727
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" // Import config for thresholds
28+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/config"
29+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework"
30+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/filter"
31+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/picker"
32+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/profile"
2833
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
2934
)
3035

3136
// Tests the default scheduler configuration and expected behavior.
3237
func TestSchedule(t *testing.T) {
38+
loraAffinityFilter := filter.NewLoraAffinityFilter(config.Conf.LoraAffinityThreshold)
39+
leastQueueFilter := filter.NewLeastQueueFilter()
40+
leastKvCacheFilter := filter.NewLeastKVCacheFilter()
41+
42+
lowLatencyFilter := &filter.DecisionTreeFilter{
43+
Current: filter.NewLowQueueFilter(config.Conf.QueueingThresholdLoRA),
44+
NextOnSuccess: &filter.DecisionTreeFilter{
45+
Current: loraAffinityFilter,
46+
NextOnSuccessOrFailure: &filter.DecisionTreeFilter{
47+
Current: leastQueueFilter,
48+
NextOnSuccessOrFailure: &filter.DecisionTreeFilter{
49+
Current: leastKvCacheFilter,
50+
},
51+
},
52+
},
53+
NextOnFailure: &filter.DecisionTreeFilter{
54+
Current: leastQueueFilter,
55+
NextOnSuccessOrFailure: &filter.DecisionTreeFilter{
56+
Current: loraAffinityFilter,
57+
NextOnSuccessOrFailure: &filter.DecisionTreeFilter{
58+
Current: leastKvCacheFilter,
59+
},
60+
},
61+
},
62+
}
63+
64+
defaultProfile := framework.NewSchedulerProfile().
65+
WithFilters(lowLatencyFilter).
66+
WithPicker(picker.NewRandomPicker(picker.DefaultMaxNumOfEndpoints))
67+
68+
profileHandler := profile.NewSingleProfileHandler()
69+
70+
schedulerConfig := NewSchedulerConfig(profileHandler, map[string]*framework.SchedulerProfile{"default": defaultProfile})
71+
3372
tests := []struct {
3473
name string
3574
req *types.LLMRequest
@@ -120,7 +159,7 @@ func TestSchedule(t *testing.T) {
120159

121160
for _, test := range tests {
122161
t.Run(test.name, func(t *testing.T) {
123-
scheduler := NewScheduler()
162+
scheduler := NewSchedulerWithConfig(schedulerConfig)
124163
got, err := scheduler.Schedule(context.Background(), test.req, test.input)
125164
if test.err != (err != nil) {
126165
t.Errorf("Unexpected error, got %v, want %v", err, test.err)

test/integration/epp/hermetic_test.go

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,11 +50,13 @@ import (
5050
ctrl "sigs.k8s.io/controller-runtime"
5151
"sigs.k8s.io/controller-runtime/pkg/cache"
5252
k8sclient "sigs.k8s.io/controller-runtime/pkg/client"
53-
"sigs.k8s.io/controller-runtime/pkg/config"
53+
crconfig "sigs.k8s.io/controller-runtime/pkg/config"
5454
"sigs.k8s.io/controller-runtime/pkg/envtest"
5555
crmetrics "sigs.k8s.io/controller-runtime/pkg/metrics"
5656
"sigs.k8s.io/controller-runtime/pkg/metrics/filters"
5757
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
58+
"sigs.k8s.io/yaml"
59+
5860
"sigs.k8s.io/gateway-api-inference-extension/api/v1alpha2"
5961
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend"
6062
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
@@ -63,11 +65,15 @@ import (
6365
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/requestcontrol"
6466
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/saturationdetector"
6567
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling"
68+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/config"
69+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework"
70+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/filter"
71+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/picker"
72+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/profile"
6673
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/server"
6774
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
6875
epptestutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/testing"
6976
integrationutils "sigs.k8s.io/gateway-api-inference-extension/test/integration"
70-
"sigs.k8s.io/yaml"
7177
)
7278

7379
const (
@@ -1018,7 +1024,41 @@ func BeforeSuite() func() {
10181024
// Adjust from defaults
10191025
serverRunner.PoolNamespacedName = types.NamespacedName{Name: testPoolName, Namespace: testNamespace}
10201026
serverRunner.Datastore = datastore.NewDatastore(context.Background(), pmf)
1021-
scheduler := scheduling.NewScheduler()
1027+
1028+
loraAffinityFilter := filter.NewLoraAffinityFilter(config.Conf.LoraAffinityThreshold)
1029+
leastQueueFilter := filter.NewLeastQueueFilter()
1030+
leastKvCacheFilter := filter.NewLeastKVCacheFilter()
1031+
1032+
lowLatencyFilter := &filter.DecisionTreeFilter{
1033+
Current: filter.NewLowQueueFilter(config.Conf.QueueingThresholdLoRA),
1034+
NextOnSuccess: &filter.DecisionTreeFilter{
1035+
Current: loraAffinityFilter,
1036+
NextOnSuccessOrFailure: &filter.DecisionTreeFilter{
1037+
Current: leastQueueFilter,
1038+
NextOnSuccessOrFailure: &filter.DecisionTreeFilter{
1039+
Current: leastKvCacheFilter,
1040+
},
1041+
},
1042+
},
1043+
NextOnFailure: &filter.DecisionTreeFilter{
1044+
Current: leastQueueFilter,
1045+
NextOnSuccessOrFailure: &filter.DecisionTreeFilter{
1046+
Current: loraAffinityFilter,
1047+
NextOnSuccessOrFailure: &filter.DecisionTreeFilter{
1048+
Current: leastKvCacheFilter,
1049+
},
1050+
},
1051+
},
1052+
}
1053+
1054+
defaultProfile := framework.NewSchedulerProfile().
1055+
WithFilters(lowLatencyFilter).
1056+
WithPicker(picker.NewRandomPicker(picker.DefaultMaxNumOfEndpoints))
1057+
1058+
profileHandler := profile.NewSingleProfileHandler()
1059+
1060+
schedulerConfig := scheduling.NewSchedulerConfig(profileHandler, map[string]*framework.SchedulerProfile{"default": defaultProfile})
1061+
scheduler := scheduling.NewSchedulerWithConfig(schedulerConfig)
10221062

10231063
sdConfig := &saturationdetector.Config{
10241064
QueueDepthThreshold: saturationdetector.DefaultQueueDepthThreshold,
@@ -1125,7 +1165,7 @@ func managerTestOptions(namespace, name string, metricsServerOptions metricsserv
11251165
},
11261166
},
11271167
},
1128-
Controller: config.Controller{
1168+
Controller: crconfig.Controller{
11291169
SkipNameValidation: boolPointer(true),
11301170
},
11311171
Metrics: metricsServerOptions,

0 commit comments

Comments
 (0)