Skip to content

Commit 384a28d

Browse files
committed
removed datastore dependency from saturation detector
Signed-off-by: Nir Rozenbaum <[email protected]>
1 parent f483650 commit 384a28d

File tree

6 files changed

+127
-203
lines changed

6 files changed

+127
-203
lines changed

cmd/epp/runner/runner.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -333,7 +333,7 @@ func (r *Runner) Run(ctx context.Context) error {
333333

334334
scheduler := scheduling.NewSchedulerWithConfig(r.schedulerConfig)
335335

336-
saturationDetector := saturationdetector.NewDetector(sdConfig, datastore, setupLog)
336+
saturationDetector := saturationdetector.NewDetector(sdConfig, setupLog)
337337

338338
director := requestcontrol.NewDirectorWithConfig(datastore, scheduler, saturationDetector, r.requestControlConfig)
339339

pkg/epp/requestcontrol/director.go

Lines changed: 30 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,10 @@ import (
3030
"github.com/go-logr/logr"
3131
"sigs.k8s.io/controller-runtime/pkg/log"
3232

33+
v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1"
3334
"sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2"
3435
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend"
3536
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
36-
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
3737
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/handlers"
3838
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
3939
schedulingtypes "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
@@ -47,18 +47,25 @@ const (
4747
subsetHintKey = "x-gateway-destination-endpoint-subset"
4848
)
4949

50+
// Datastore defines the interface required by the Director.
51+
type Datastore interface {
52+
PoolGet() (*v1.InferencePool, error)
53+
ObjectiveGet(modelName string) *v1alpha2.InferenceObjective
54+
PodList(predicate func(backendmetrics.PodMetrics) bool) []backendmetrics.PodMetrics
55+
}
56+
5057
// Scheduler defines the interface required by the Director for scheduling.
5158
type Scheduler interface {
5259
Schedule(ctx context.Context, request *schedulingtypes.LLMRequest, candidatePods []schedulingtypes.Pod) (result *schedulingtypes.SchedulingResult, err error)
5360
}
5461

5562
// SaturationDetector provides a signal indicating whether the backends are considered saturated.
5663
type SaturationDetector interface {
57-
IsSaturated(ctx context.Context) bool
64+
IsSaturated(ctx context.Context, candidatePods []backendmetrics.PodMetrics) bool
5865
}
5966

6067
// NewDirectorWithConfig creates a new Director instance with all dependencies.
61-
func NewDirectorWithConfig(datastore datastore.Datastore, scheduler Scheduler, saturationDetector SaturationDetector, config *Config) *Director {
68+
func NewDirectorWithConfig(datastore Datastore, scheduler Scheduler, saturationDetector SaturationDetector, config *Config) *Director {
6269
return &Director{
6370
datastore: datastore,
6471
scheduler: scheduler,
@@ -70,24 +77,19 @@ func NewDirectorWithConfig(datastore datastore.Datastore, scheduler Scheduler, s
7077

7178
// Director orchestrates the request handling flow, including scheduling.
7279
type Director struct {
73-
datastore datastore.Datastore
80+
datastore Datastore
7481
scheduler Scheduler
7582
saturationDetector SaturationDetector
7683
preRequestPlugins []PreRequest
7784
postResponsePlugins []PostResponse
7885
}
7986

80-
// HandleRequest orchestrates the request lifecycle:
81-
// 1. Parses request details.
82-
// 2. Calls admitRequest for admission control.
83-
// 3. Calls Scheduler.Schedule if request is approved.
84-
// 4. Calls prepareRequest to populate RequestContext with result and call PreRequest plugins.
85-
//
87+
// HandleRequest orchestrates the request lifecycle.
8688
// It always returns the requestContext even in the error case, as the request context is used in error handling.
8789
func (d *Director) HandleRequest(ctx context.Context, reqCtx *handlers.RequestContext) (*handlers.RequestContext, error) {
8890
logger := log.FromContext(ctx)
8991

90-
// --- 1. Parse Request, Resolve Target Models, and Determine Parameters ---
92+
// Parse Request, Resolve Target Models, and Determine Parameters
9193
var ok bool
9294
requestBodyMap := reqCtx.Request.Body
9395
reqCtx.Model, ok = requestBodyMap["model"].(string)
@@ -138,17 +140,18 @@ func (d *Director) HandleRequest(ctx context.Context, reqCtx *handlers.RequestCo
138140
ctx = log.IntoContext(ctx, logger)
139141
logger.V(logutil.DEBUG).Info("LLM request assembled")
140142

141-
// --- 2. Admission Control check --
142-
if err := d.admitRequest(ctx, requestCriticality, reqCtx.FairnessID); err != nil {
143-
return reqCtx, err
144-
}
145-
146-
// --- 3. Call Scheduler (with the relevant candidate pods) ---
143+
// Get candidate pods for scheduling
147144
candidatePods := d.getCandidatePodsForScheduling(ctx, reqCtx.Request.Metadata)
148145
if len(candidatePods) == 0 {
149146
return reqCtx, errutil.Error{Code: errutil.ServiceUnavailable, Msg: "failed to find candidate pods for serving the request"}
150147
}
151-
result, err := d.scheduler.Schedule(ctx, reqCtx.SchedulingRequest, candidatePods)
148+
149+
// Admission Control check
150+
if err := d.admitRequest(ctx, candidatePods, requestCriticality, reqCtx.FairnessID); err != nil {
151+
return reqCtx, err
152+
}
153+
154+
result, err := d.scheduler.Schedule(ctx, reqCtx.SchedulingRequest, d.toSchedulerPodMetrics(candidatePods))
152155
if err != nil {
153156
return reqCtx, errutil.Error{Code: errutil.InferencePoolResourceExhausted, Msg: fmt.Errorf("failed to find target pod: %w", err).Error()}
154157
}
@@ -165,8 +168,9 @@ func (d *Director) HandleRequest(ctx context.Context, reqCtx *handlers.RequestCo
165168
}
166169

167170
// admitRequest handles admission control to decide whether or not to accept the request
168-
// based on the request criticality and system saturation state.
169-
func (d *Director) admitRequest(ctx context.Context, requestCriticality v1alpha2.Criticality, fairnessID string) error {
171+
// based on the request criticality and the saturation state of the candidate pods.
172+
func (d *Director) admitRequest(ctx context.Context, candidatePods []backendmetrics.PodMetrics,
173+
requestCriticality v1alpha2.Criticality, fairnessID string) error {
170174
logger := log.FromContext(ctx)
171175

172176
logger.V(logutil.TRACE).Info("Entering Flow Control", "criticality", requestCriticality, "fairnessID", fairnessID)
@@ -177,7 +181,7 @@ func (d *Director) admitRequest(ctx context.Context, requestCriticality v1alpha2
177181
}
178182

179183
logger.V(logutil.DEBUG).Info("Performing saturation check for non-critical request.")
180-
if d.saturationDetector.IsSaturated(ctx) { // Assuming non-nil Saturation Detector
184+
if d.saturationDetector.IsSaturated(ctx, candidatePods) {
181185
return errutil.Error{
182186
Code: errutil.InferencePoolResourceExhausted,
183187
Msg: "system saturated, non-critical request dropped",
@@ -193,21 +197,21 @@ func (d *Director) admitRequest(ctx context.Context, requestCriticality v1alpha2
193197
// Snapshot pod metrics from the datastore to:
194198
// 1. Reduce concurrent access to the datastore.
195199
// 2. Ensure consistent data during the scheduling operation of a request between all scheduling cycles.
196-
func (d *Director) getCandidatePodsForScheduling(ctx context.Context, requestMetadata map[string]any) []schedulingtypes.Pod {
200+
func (d *Director) getCandidatePodsForScheduling(ctx context.Context, requestMetadata map[string]any) []backendmetrics.PodMetrics {
197201
loggerTrace := log.FromContext(ctx).V(logutil.TRACE)
198202

199203
subsetMap, found := requestMetadata[subsetHintNamespace].(map[string]any)
200204
if !found {
201-
return d.toSchedulerPodMetrics(d.datastore.PodList(backendmetrics.AllPodPredicate))
205+
return d.datastore.PodList(backendmetrics.AllPodPredicate)
202206
}
203207

204208
// Check if endpoint key is present in the subset map and ensure there is at least one value
205209
endpointSubsetList, found := subsetMap[subsetHintKey].([]any)
206210
if !found {
207-
return d.toSchedulerPodMetrics(d.datastore.PodList(backendmetrics.AllPodPredicate))
211+
return d.datastore.PodList(backendmetrics.AllPodPredicate)
208212
} else if len(endpointSubsetList) == 0 {
209213
loggerTrace.Info("found empty subset filter in request metadata, filtering all pods")
210-
return []schedulingtypes.Pod{}
214+
return []backendmetrics.PodMetrics{}
211215
}
212216

213217
// Create a map of endpoint addresses for easy lookup
@@ -230,7 +234,7 @@ func (d *Director) getCandidatePodsForScheduling(ctx context.Context, requestMet
230234

231235
loggerTrace.Info("filtered candidate pods by subset filtering", "podTotalCount", podTotalCount, "filteredCount", len(podFitleredList))
232236

233-
return d.toSchedulerPodMetrics(podFitleredList)
237+
return podFitleredList
234238
}
235239

236240
// prepareRequest populates the RequestContext and calls the registered PreRequest plugins

pkg/epp/requestcontrol/director_test.go

Lines changed: 48 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"k8s.io/apimachinery/pkg/runtime"
3232
"k8s.io/apimachinery/pkg/types"
3333
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
34+
"k8s.io/utils/ptr"
3435
"sigs.k8s.io/controller-runtime/pkg/client/fake"
3536

3637
v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1"
@@ -53,7 +54,7 @@ type mockSaturationDetector struct {
5354
isSaturated bool
5455
}
5556

56-
func (m *mockSaturationDetector) IsSaturated(_ context.Context) bool {
57+
func (m *mockSaturationDetector) IsSaturated(_ context.Context, _ []backendmetrics.PodMetrics) bool {
5758
return m.isSaturated
5859
}
5960

@@ -66,6 +67,23 @@ func (m *mockScheduler) Schedule(_ context.Context, _ *schedulingtypes.LLMReques
6667
return m.scheduleResults, m.scheduleErr
6768
}
6869

70+
type mockDatastore struct {
71+
pods []backendmetrics.PodMetrics
72+
}
73+
74+
func (ds *mockDatastore) PoolGet() (*v1.InferencePool, error) { return nil, nil }
75+
func (ds *mockDatastore) ObjectiveGet(_ string) *v1alpha2.InferenceObjective { return nil }
76+
func (ds *mockDatastore) PodList(predicate func(backendmetrics.PodMetrics) bool) []backendmetrics.PodMetrics {
77+
res := []backendmetrics.PodMetrics{}
78+
for _, pod := range ds.pods {
79+
if predicate(pod) {
80+
res = append(res, pod)
81+
}
82+
}
83+
84+
return res
85+
}
86+
6987
func TestDirector_HandleRequest(t *testing.T) {
7088
ctx := logutil.NewTestLoggerIntoContext(context.Background())
7189

@@ -425,125 +443,78 @@ func TestDirector_HandleRequest(t *testing.T) {
425443
func TestGetCandidatePodsForScheduling(t *testing.T) {
426444
var makeFilterMetadata = func(data []any) map[string]any {
427445
return map[string]any{
428-
"envoy.lb.subset_hint": map[string]any{
429-
"x-gateway-destination-endpoint-subset": data,
446+
subsetHintNamespace: map[string]any{
447+
subsetHintKey: data,
430448
},
431449
}
432450
}
433451

434-
testInput := []*corev1.Pod{
435-
{
436-
ObjectMeta: metav1.ObjectMeta{
437-
Name: "pod1",
438-
},
439-
Status: corev1.PodStatus{
440-
PodIP: "10.0.0.1",
441-
},
442-
},
443-
{
444-
ObjectMeta: metav1.ObjectMeta{
445-
Name: "pod2",
446-
},
447-
Status: corev1.PodStatus{
448-
PodIP: "10.0.0.2",
449-
},
450-
},
451-
}
452-
453-
outputPod1 := &backend.Pod{
452+
pod1 := &backend.Pod{
454453
NamespacedName: types.NamespacedName{Name: "pod1"},
455454
Address: "10.0.0.1",
456455
Labels: map[string]string{},
457456
}
458457

459-
outputPod2 := &backend.Pod{
458+
pod2 := &backend.Pod{
460459
NamespacedName: types.NamespacedName{Name: "pod2"},
461460
Address: "10.0.0.2",
462461
Labels: map[string]string{},
463462
}
464463

464+
testInput := []backendmetrics.PodMetrics{
465+
&backendmetrics.FakePodMetrics{Pod: pod1},
466+
&backendmetrics.FakePodMetrics{Pod: pod2},
467+
}
468+
465469
tests := []struct {
466470
name string
467471
metadata map[string]any
468-
output []schedulingtypes.Pod
472+
output []backendmetrics.PodMetrics
469473
}{
470474
{
471475
name: "SubsetFilter, filter not present — return all pods",
472476
metadata: map[string]any{},
473-
output: []schedulingtypes.Pod{
474-
&schedulingtypes.PodMetrics{
475-
Pod: outputPod1,
476-
MetricsState: backendmetrics.NewMetricsState(),
477-
},
478-
&schedulingtypes.PodMetrics{
479-
Pod: outputPod2,
480-
MetricsState: backendmetrics.NewMetricsState(),
481-
},
482-
},
477+
output: testInput,
483478
},
484479
{
485480
name: "SubsetFilter, namespace present filter not present — return all pods",
486-
metadata: map[string]any{"envoy.lb.subset_hint": map[string]any{}},
487-
output: []schedulingtypes.Pod{
488-
&schedulingtypes.PodMetrics{
489-
Pod: outputPod1,
490-
MetricsState: backendmetrics.NewMetricsState(),
491-
},
492-
&schedulingtypes.PodMetrics{
493-
Pod: outputPod2,
494-
MetricsState: backendmetrics.NewMetricsState(),
495-
},
496-
},
481+
metadata: map[string]any{subsetHintNamespace: map[string]any{}},
482+
output: testInput,
497483
},
498484
{
499485
name: "SubsetFilter, filter present with empty list — return error",
500486
metadata: makeFilterMetadata([]any{}),
501-
output: []schedulingtypes.Pod{},
487+
output: []backendmetrics.PodMetrics{},
502488
},
503489
{
504490
name: "SubsetFilter, subset with one matching pod",
505491
metadata: makeFilterMetadata([]any{"10.0.0.1"}),
506-
output: []schedulingtypes.Pod{
507-
&schedulingtypes.PodMetrics{
508-
Pod: outputPod1,
509-
MetricsState: backendmetrics.NewMetricsState(),
492+
output: []backendmetrics.PodMetrics{
493+
&backendmetrics.FakePodMetrics{
494+
Pod: pod1,
510495
},
511496
},
512497
},
513498
{
514499
name: "SubsetFilter, subset with multiple matching pods",
515500
metadata: makeFilterMetadata([]any{"10.0.0.1", "10.0.0.2", "10.0.0.3"}),
516-
output: []schedulingtypes.Pod{
517-
&schedulingtypes.PodMetrics{
518-
Pod: outputPod1,
519-
MetricsState: backendmetrics.NewMetricsState(),
520-
},
521-
&schedulingtypes.PodMetrics{
522-
Pod: outputPod2,
523-
MetricsState: backendmetrics.NewMetricsState(),
524-
},
525-
},
501+
output: testInput,
526502
},
527503
{
528504
name: "SubsetFilter, subset with no matching pods",
529505
metadata: makeFilterMetadata([]any{"10.0.0.3"}),
530-
output: []schedulingtypes.Pod{},
506+
output: []backendmetrics.PodMetrics{},
531507
},
532508
}
533509

534-
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second, time.Second*2)
535-
ds := datastore.NewDatastore(t.Context(), pmf)
536-
for _, testPod := range testInput {
537-
ds.PodUpdateOrAddIfNotExist(testPod)
538-
}
539-
510+
ds := &mockDatastore{pods: testInput}
540511
for _, test := range tests {
541512
t.Run(test.name, func(t *testing.T) {
542513
director := NewDirectorWithConfig(ds, &mockScheduler{}, &mockSaturationDetector{}, NewConfig())
543514

544515
got := director.getCandidatePodsForScheduling(context.Background(), test.metadata)
545516

546-
diff := cmp.Diff(test.output, got, cmpopts.SortSlices(func(a, b schedulingtypes.Pod) bool {
517+
diff := cmp.Diff(test.output, got, cmpopts.SortSlices(func(a, b backendmetrics.PodMetrics) bool {
547518
return a.GetPod().NamespacedName.String() < b.GetPod().NamespacedName.String()
548519
}))
549520
if diff != "" {
@@ -567,8 +538,8 @@ func TestRandomWeightedDraw(t *testing.T) {
567538
model: &v1alpha2.InferenceObjective{
568539
Spec: v1alpha2.InferenceObjectiveSpec{
569540
TargetModels: []v1alpha2.TargetModel{
570-
{Name: "canary", Weight: pointer(50)},
571-
{Name: "v1", Weight: pointer(50)},
541+
{Name: "canary", Weight: ptr.To(int32(50))},
542+
{Name: "v1", Weight: ptr.To(int32(50))},
572543
},
573544
},
574545
},
@@ -579,9 +550,9 @@ func TestRandomWeightedDraw(t *testing.T) {
579550
model: &v1alpha2.InferenceObjective{
580551
Spec: v1alpha2.InferenceObjectiveSpec{
581552
TargetModels: []v1alpha2.TargetModel{
582-
{Name: "canary", Weight: pointer(25)},
583-
{Name: "v1.1", Weight: pointer(55)},
584-
{Name: "v1", Weight: pointer(50)},
553+
{Name: "canary", Weight: ptr.To(int32(25))},
554+
{Name: "v1.1", Weight: ptr.To(int32(55))},
555+
{Name: "v1", Weight: ptr.To(int32(50))},
585556
},
586557
},
587558
},
@@ -592,9 +563,9 @@ func TestRandomWeightedDraw(t *testing.T) {
592563
model: &v1alpha2.InferenceObjective{
593564
Spec: v1alpha2.InferenceObjectiveSpec{
594565
TargetModels: []v1alpha2.TargetModel{
595-
{Name: "canary", Weight: pointer(20)},
596-
{Name: "v1.1", Weight: pointer(20)},
597-
{Name: "v1", Weight: pointer(10)},
566+
{Name: "canary", Weight: ptr.To(int32(20))},
567+
{Name: "v1.1", Weight: ptr.To(int32(20))},
568+
{Name: "v1", Weight: ptr.To(int32(10))},
598569
},
599570
},
600571
},
@@ -672,10 +643,6 @@ func TestGetRandomPod(t *testing.T) {
672643
}
673644
}
674645

675-
func pointer(v int32) *int32 {
676-
return &v
677-
}
678-
679646
func TestDirector_HandleResponse(t *testing.T) {
680647
pr1 := newTestPostResponse("pr1")
681648

0 commit comments

Comments
 (0)