Skip to content

Commit 708882c

Browse files
authored
remove datastore dependency from the scheduler (#1049)
* remove datastore dependency from the scheduler Signed-off-by: Nir Rozenbaum <[email protected]> * added back comments on snapshotting pods from datastore before calling schedule Signed-off-by: Nir Rozenbaum <[email protected]> * removed fake datastore from conformance scheduler test Signed-off-by: Nir Rozenbaum <[email protected]> --------- Signed-off-by: Nir Rozenbaum <[email protected]>
1 parent 3f98291 commit 708882c

File tree

9 files changed

+45
-73
lines changed

9 files changed

+45
-73
lines changed

cmd/epp/runner/runner.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@ func (r *Runner) Run(ctx context.Context) error {
241241
}
242242

243243
// --- Initialize Core EPP Components ---
244-
scheduler, err := r.initializeScheduler(datastore)
244+
scheduler, err := r.initializeScheduler()
245245
if err != nil {
246246
setupLog.Error(err, "Failed to create scheduler")
247247
return err
@@ -292,13 +292,13 @@ func (r *Runner) Run(ctx context.Context) error {
292292
return nil
293293
}
294294

295-
func (r *Runner) initializeScheduler(datastore datastore.Datastore) (*scheduling.Scheduler, error) {
295+
func (r *Runner) initializeScheduler() (*scheduling.Scheduler, error) {
296296
if r.schedulerConfig != nil {
297-
return scheduling.NewSchedulerWithConfig(datastore, r.schedulerConfig), nil
297+
return scheduling.NewSchedulerWithConfig(r.schedulerConfig), nil
298298
}
299299

300300
// otherwise, no one configured from outside scheduler config. use existing configuration
301-
scheduler := scheduling.NewScheduler(datastore)
301+
scheduler := scheduling.NewScheduler()
302302
if schedulerV2 {
303303
queueScorerWeight := envutil.GetEnvInt("QUEUE_SCORE_WEIGHT", scorer.DefaultQueueScorerWeight, setupLog)
304304
kvCacheScorerWeight := envutil.GetEnvInt("KV_CACHE_SCORE_WEIGHT", scorer.DefaultKVCacheScorerWeight, setupLog)
@@ -316,11 +316,11 @@ func (r *Runner) initializeScheduler(datastore datastore.Datastore) (*scheduling
316316
}
317317

318318
schedulerConfig := scheduling.NewSchedulerConfig(profile.NewSingleProfileHandler(), map[string]*framework.SchedulerProfile{"schedulerv2": schedulerProfile})
319-
scheduler = scheduling.NewSchedulerWithConfig(datastore, schedulerConfig)
319+
scheduler = scheduling.NewSchedulerWithConfig(schedulerConfig)
320320
}
321321

322322
if reqHeaderBasedSchedulerForTesting {
323-
scheduler = conformance_epp.NewReqHeaderBasedScheduler(datastore)
323+
scheduler = conformance_epp.NewReqHeaderBasedScheduler()
324324
}
325325

326326
return scheduler, nil

conformance/testing-epp/scheduler.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,11 @@ import (
2727
// NewReqHeaderBasedScheduler creates a scheduler for conformance tests that selects
2828
// an endpoint based on the "test-epp-endpoint-selection" request header. If the
2929
// header is missing or the specified endpoint doesn't exist, no endpoint is returned.
30-
func NewReqHeaderBasedScheduler(datastore scheduling.Datastore) *scheduling.Scheduler {
30+
func NewReqHeaderBasedScheduler() *scheduling.Scheduler {
3131
predicatableSchedulerProfile := framework.NewSchedulerProfile().
3232
WithFilters(filter.NewHeaderBasedTestingFilter()).
3333
WithPicker(picker.NewMaxScorePicker())
3434

35-
return scheduling.NewSchedulerWithConfig(datastore, scheduling.NewSchedulerConfig(
35+
return scheduling.NewSchedulerWithConfig(scheduling.NewSchedulerConfig(
3636
profile.NewSingleProfileHandler(), map[string]*framework.SchedulerProfile{"req-header-based-profile": predicatableSchedulerProfile}))
3737
}

conformance/testing-epp/sheduler_test.go

Lines changed: 13 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,13 @@ import (
3131
func TestSchedule(t *testing.T) {
3232
tests := []struct {
3333
name string
34-
input []*backendmetrics.FakePodMetrics
34+
input []backendmetrics.PodMetrics
3535
req *types.LLMRequest
3636
wantRes *types.SchedulingResult
3737
err bool
3838
}{
3939
{
40-
name: "no pods in datastore and req header is set",
40+
name: "no candidate pods and req header is set",
4141
req: &types.LLMRequest{
4242
Headers: map[string]string{"test-epp-endpoint-selection": "random-endpoint"},
4343
RequestId: uuid.NewString(),
@@ -47,8 +47,8 @@ func TestSchedule(t *testing.T) {
4747
},
4848
{
4949
name: "req header not set",
50-
input: []*backendmetrics.FakePodMetrics{
51-
{Pod: &backend.Pod{Address: "random-endpoint"}},
50+
input: []backendmetrics.PodMetrics{
51+
&backendmetrics.FakePodMetrics{Pod: &backend.Pod{Address: "random-endpoint"}},
5252
},
5353
req: &types.LLMRequest{
5454
Headers: map[string]string{}, // Deliberately set an empty header.
@@ -58,9 +58,9 @@ func TestSchedule(t *testing.T) {
5858
err: true,
5959
},
6060
{
61-
name: "no pods address in datastore matches req header address",
62-
input: []*backendmetrics.FakePodMetrics{
63-
{Pod: &backend.Pod{Address: "nonmatched-endpoint"}},
61+
name: "no pods address from the candidate pods matches req header address",
62+
input: []backendmetrics.PodMetrics{
63+
&backendmetrics.FakePodMetrics{Pod: &backend.Pod{Address: "nonmatched-endpoint"}},
6464
},
6565
req: &types.LLMRequest{
6666
Headers: map[string]string{"test-epp-endpoint-selection": "matched-endpoint"},
@@ -70,10 +70,10 @@ func TestSchedule(t *testing.T) {
7070
err: true,
7171
},
7272
{
73-
name: "one pod address in datastore matches req header address",
74-
input: []*backendmetrics.FakePodMetrics{
75-
{Pod: &backend.Pod{Address: "nonmatched-endpoint"}},
76-
{Pod: &backend.Pod{Address: "matched-endpoint"}},
73+
name: "one pod address from the candidate pods matches req header address",
74+
input: []backendmetrics.PodMetrics{
75+
&backendmetrics.FakePodMetrics{Pod: &backend.Pod{Address: "nonmatched-endpoint"}},
76+
&backendmetrics.FakePodMetrics{Pod: &backend.Pod{Address: "matched-endpoint"}},
7777
},
7878
req: &types.LLMRequest{
7979
Headers: map[string]string{"test-epp-endpoint-selection": "matched-endpoint"},
@@ -99,8 +99,8 @@ func TestSchedule(t *testing.T) {
9999

100100
for _, test := range tests {
101101
t.Run(test.name, func(t *testing.T) {
102-
scheduler := NewReqHeaderBasedScheduler(&fakeDataStore{pods: test.input})
103-
got, err := scheduler.Schedule(context.Background(), test.req)
102+
scheduler := NewReqHeaderBasedScheduler()
103+
got, err := scheduler.Schedule(context.Background(), test.req, types.ToSchedulerPodMetrics(test.input))
104104
if test.err != (err != nil) {
105105
t.Errorf("Unexpected error, got %v, want %v", err, test.err)
106106
}
@@ -111,15 +111,3 @@ func TestSchedule(t *testing.T) {
111111
})
112112
}
113113
}
114-
115-
type fakeDataStore struct {
116-
pods []*backendmetrics.FakePodMetrics
117-
}
118-
119-
func (fds *fakeDataStore) PodGetAll() []backendmetrics.PodMetrics {
120-
pm := make([]backendmetrics.PodMetrics, 0, len(fds.pods))
121-
for _, pod := range fds.pods {
122-
pm = append(pm, pod)
123-
}
124-
return pm
125-
}

pkg/epp/requestcontrol/director.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ import (
4141

4242
// Scheduler defines the interface required by the Director for scheduling.
4343
type Scheduler interface {
44-
Schedule(ctx context.Context, b *schedulingtypes.LLMRequest) (result *schedulingtypes.SchedulingResult, err error)
44+
Schedule(ctx context.Context, request *schedulingtypes.LLMRequest, candidatePods []schedulingtypes.Pod) (result *schedulingtypes.SchedulingResult, err error)
4545
}
4646

4747
// SaturationDetector provides a signal indicating whether the backends are considered saturated.
@@ -135,7 +135,11 @@ func (d *Director) HandleRequest(ctx context.Context, reqCtx *handlers.RequestCo
135135
}
136136

137137
// --- 3. Call Scheduler ---
138-
results, err := d.scheduler.Schedule(ctx, reqCtx.SchedulingRequest)
138+
// Snapshot pod metrics from the datastore to:
139+
// 1. Reduce concurrent access to the datastore.
140+
// 2. Ensure consistent data during the scheduling operation of a request between all scheduling cycles.
141+
candidatePods := schedulingtypes.ToSchedulerPodMetrics(d.datastore.PodGetAll())
142+
results, err := d.scheduler.Schedule(ctx, reqCtx.SchedulingRequest, candidatePods)
139143
if err != nil {
140144
return reqCtx, errutil.Error{Code: errutil.InferencePoolResourceExhausted, Msg: fmt.Errorf("failed to find target pod: %w", err).Error()}
141145
}

pkg/epp/requestcontrol/director_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ type mockScheduler struct {
5858
scheduleErr error
5959
}
6060

61-
func (m *mockScheduler) Schedule(ctx context.Context, req *schedulingtypes.LLMRequest) (*schedulingtypes.SchedulingResult, error) {
61+
func (m *mockScheduler) Schedule(_ context.Context, _ *schedulingtypes.LLMRequest, _ []schedulingtypes.Pod) (*schedulingtypes.SchedulingResult, error) {
6262
return m.scheduleResults, m.scheduleErr
6363
}
6464

pkg/epp/scheduling/framework/scheduler_profile.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,8 @@ func (p *SchedulerProfile) AddPlugins(pluginObjects ...plugins.Plugin) error {
106106

107107
// RunCycle runs a SchedulerProfile cycle. In other words, it invokes all the SchedulerProfile plugins in this
108108
// order - Filters, Scorers, Picker, PostCyclePlugins. After completing all, it returns the result.
109-
func (p *SchedulerProfile) Run(ctx context.Context, request *types.LLMRequest, cycleState *types.CycleState, podsSnapshot []types.Pod) (*types.ProfileRunResult, error) {
110-
pods := p.runFilterPlugins(ctx, request, cycleState, podsSnapshot)
109+
func (p *SchedulerProfile) Run(ctx context.Context, request *types.LLMRequest, cycleState *types.CycleState, candidatePods []types.Pod) (*types.ProfileRunResult, error) {
110+
pods := p.runFilterPlugins(ctx, request, cycleState, candidatePods)
111111
if len(pods) == 0 {
112112
return nil, errutil.Error{Code: errutil.Internal, Msg: "no pods available for the given request"}
113113
}

pkg/epp/scheduling/scheduler.go

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ type Datastore interface {
3939
}
4040

4141
// NewScheduler returns a new scheduler with default scheduler plugins configuration.
42-
func NewScheduler(datastore Datastore) *Scheduler {
42+
func NewScheduler() *Scheduler {
4343
// When the scheduler is initialized with NewScheduler function, thw below config will be used as default.
4444
// it's possible to call NewSchedulerWithConfig to pass a different scheduler config.
4545
// For build time plugins changes, it's recommended to call in main.go to NewSchedulerWithConfig.
@@ -75,26 +75,24 @@ func NewScheduler(datastore Datastore) *Scheduler {
7575

7676
profileHandler := profile.NewSingleProfileHandler()
7777

78-
return NewSchedulerWithConfig(datastore, NewSchedulerConfig(profileHandler, map[string]*framework.SchedulerProfile{"default": defaultProfile}))
78+
return NewSchedulerWithConfig(NewSchedulerConfig(profileHandler, map[string]*framework.SchedulerProfile{"default": defaultProfile}))
7979
}
8080

8181
// NewSchedulerWithConfig returns a new scheduler with the given scheduler plugins configuration.
82-
func NewSchedulerWithConfig(datastore Datastore, config *SchedulerConfig) *Scheduler {
82+
func NewSchedulerWithConfig(config *SchedulerConfig) *Scheduler {
8383
return &Scheduler{
84-
datastore: datastore,
8584
profileHandler: config.profileHandler,
8685
profiles: config.profiles,
8786
}
8887
}
8988

9089
type Scheduler struct {
91-
datastore Datastore
9290
profileHandler framework.ProfileHandler
9391
profiles map[string]*framework.SchedulerProfile
9492
}
9593

9694
// Schedule finds the target pod based on metrics and the requested lora adapter.
97-
func (s *Scheduler) Schedule(ctx context.Context, request *types.LLMRequest) (*types.SchedulingResult, error) {
95+
func (s *Scheduler) Schedule(ctx context.Context, request *types.LLMRequest, candidatePods []types.Pod) (*types.SchedulingResult, error) {
9896
logger := log.FromContext(ctx).WithValues("request", request)
9997
loggerDebug := logger.V(logutil.DEBUG)
10098

@@ -103,12 +101,6 @@ func (s *Scheduler) Schedule(ctx context.Context, request *types.LLMRequest) (*t
103101
metrics.RecordSchedulerE2ELatency(time.Since(scheduleStart))
104102
}()
105103

106-
// Snapshot pod metrics from the datastore to:
107-
// 1. Reduce concurrent access to the datastore.
108-
// 2. Ensure consistent data during the scheduling operation of a request between all scheduling cycles.
109-
podsSnapshot := types.ToSchedulerPodMetrics(s.datastore.PodGetAll())
110-
loggerDebug.Info(fmt.Sprintf("Scheduling a request, Metrics: %+v", podsSnapshot))
111-
112104
profileRunResults := map[string]*types.ProfileRunResult{}
113105
cycleState := types.NewCycleState()
114106

@@ -122,7 +114,7 @@ func (s *Scheduler) Schedule(ctx context.Context, request *types.LLMRequest) (*t
122114

123115
for name, profile := range profiles {
124116
// run the selected profiles and collect results (current code runs all profiles)
125-
profileRunResult, err := profile.Run(ctx, request, cycleState, podsSnapshot)
117+
profileRunResult, err := profile.Run(ctx, request, cycleState, candidatePods)
126118
if err != nil {
127119
loggerDebug.Info("failed to run scheduler profile", "profile", name, "error", err.Error())
128120
}

pkg/epp/scheduling/scheduler_test.go

Lines changed: 9 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -33,17 +33,17 @@ func TestSchedule(t *testing.T) {
3333
tests := []struct {
3434
name string
3535
req *types.LLMRequest
36-
input []*backendmetrics.FakePodMetrics
36+
input []backendmetrics.PodMetrics
3737
wantRes *types.SchedulingResult
3838
err bool
3939
}{
4040
{
41-
name: "no pods in datastore",
41+
name: "no candidate pods",
4242
req: &types.LLMRequest{
4343
TargetModel: "any-model",
4444
RequestId: uuid.NewString(),
4545
},
46-
input: []*backendmetrics.FakePodMetrics{},
46+
input: []backendmetrics.PodMetrics{},
4747
wantRes: nil,
4848
err: true,
4949
},
@@ -55,8 +55,8 @@ func TestSchedule(t *testing.T) {
5555
},
5656
// pod2 will be picked because it has relatively low queue size, with the requested
5757
// model being active, and has low KV cache.
58-
input: []*backendmetrics.FakePodMetrics{
59-
{
58+
input: []backendmetrics.PodMetrics{
59+
&backendmetrics.FakePodMetrics{
6060
Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod1"}},
6161
Metrics: &backendmetrics.MetricsState{
6262
WaitingQueueSize: 0,
@@ -68,7 +68,7 @@ func TestSchedule(t *testing.T) {
6868
},
6969
},
7070
},
71-
{
71+
&backendmetrics.FakePodMetrics{
7272
Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod2"}},
7373
Metrics: &backendmetrics.MetricsState{
7474
WaitingQueueSize: 3,
@@ -80,7 +80,7 @@ func TestSchedule(t *testing.T) {
8080
},
8181
},
8282
},
83-
{
83+
&backendmetrics.FakePodMetrics{
8484
Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod3"}},
8585
Metrics: &backendmetrics.MetricsState{
8686
WaitingQueueSize: 10,
@@ -119,8 +119,8 @@ func TestSchedule(t *testing.T) {
119119

120120
for _, test := range tests {
121121
t.Run(test.name, func(t *testing.T) {
122-
scheduler := NewScheduler(&fakeDataStore{pods: test.input})
123-
got, err := scheduler.Schedule(context.Background(), test.req)
122+
scheduler := NewScheduler()
123+
got, err := scheduler.Schedule(context.Background(), test.req, types.ToSchedulerPodMetrics(test.input))
124124
if test.err != (err != nil) {
125125
t.Errorf("Unexpected error, got %v, want %v", err, test.err)
126126
}
@@ -131,15 +131,3 @@ func TestSchedule(t *testing.T) {
131131
})
132132
}
133133
}
134-
135-
type fakeDataStore struct {
136-
pods []*backendmetrics.FakePodMetrics
137-
}
138-
139-
func (fds *fakeDataStore) PodGetAll() []backendmetrics.PodMetrics {
140-
pm := make([]backendmetrics.PodMetrics, 0, len(fds.pods))
141-
for _, pod := range fds.pods {
142-
pm = append(pm, pod)
143-
}
144-
return pm
145-
}

test/integration/epp/hermetic_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -925,7 +925,7 @@ func BeforeSuite() func() {
925925
// Adjust from defaults
926926
serverRunner.PoolNamespacedName = types.NamespacedName{Name: testPoolName, Namespace: testNamespace}
927927
serverRunner.Datastore = datastore.NewDatastore(context.Background(), pmf)
928-
scheduler := scheduling.NewScheduler(serverRunner.Datastore)
928+
scheduler := scheduling.NewScheduler()
929929

930930
sdConfig := &saturationdetector.Config{
931931
QueueDepthThreshold: saturationdetector.DefaultQueueDepthThreshold,

0 commit comments

Comments
 (0)