Skip to content

Commit c631612

Browse files
committed
removed datastore dependency from saturation detector
Signed-off-by: Nir Rozenbaum <[email protected]>
1 parent 02f8613 commit c631612

File tree

6 files changed

+125
-201
lines changed

6 files changed

+125
-201
lines changed

cmd/epp/runner/runner.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,7 @@ func (r *Runner) Run(ctx context.Context) error {
331331

332332
scheduler := scheduling.NewSchedulerWithConfig(r.schedulerConfig)
333333

334-
saturationDetector := saturationdetector.NewDetector(sdConfig, datastore, setupLog)
334+
saturationDetector := saturationdetector.NewDetector(sdConfig, setupLog)
335335

336336
director := requestcontrol.NewDirectorWithConfig(datastore, scheduler, saturationDetector, r.requestControlConfig)
337337

pkg/epp/requestcontrol/director.go

Lines changed: 31 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,10 @@ import (
3030
"github.com/go-logr/logr"
3131
"sigs.k8s.io/controller-runtime/pkg/log"
3232

33+
v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1"
3334
"sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2"
3435
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend"
3536
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
36-
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
3737
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/handlers"
3838
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metadata"
3939
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
@@ -43,18 +43,25 @@ import (
4343
requtil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/request"
4444
)
4545

46+
// Datastore defines the interface required by the Director.
47+
type Datastore interface {
48+
PoolGet() (*v1.InferencePool, error)
49+
ObjectiveGet(modelName string) *v1alpha2.InferenceObjective
50+
PodList(predicate func(backendmetrics.PodMetrics) bool) []backendmetrics.PodMetrics
51+
}
52+
4653
// Scheduler defines the interface required by the Director for scheduling.
4754
type Scheduler interface {
4855
Schedule(ctx context.Context, request *schedulingtypes.LLMRequest, candidatePods []schedulingtypes.Pod) (result *schedulingtypes.SchedulingResult, err error)
4956
}
5057

5158
// SaturationDetector provides a signal indicating whether the backends are considered saturated.
5259
type SaturationDetector interface {
53-
IsSaturated(ctx context.Context) bool
60+
IsSaturated(ctx context.Context, candidatePods []backendmetrics.PodMetrics) bool
5461
}
5562

5663
// NewDirectorWithConfig creates a new Director instance with all dependencies.
57-
func NewDirectorWithConfig(datastore datastore.Datastore, scheduler Scheduler, saturationDetector SaturationDetector, config *Config) *Director {
64+
func NewDirectorWithConfig(datastore Datastore, scheduler Scheduler, saturationDetector SaturationDetector, config *Config) *Director {
5865
return &Director{
5966
datastore: datastore,
6067
scheduler: scheduler,
@@ -66,24 +73,19 @@ func NewDirectorWithConfig(datastore datastore.Datastore, scheduler Scheduler, s
6673

6774
// Director orchestrates the request handling flow, including scheduling.
6875
type Director struct {
69-
datastore datastore.Datastore
76+
datastore Datastore
7077
scheduler Scheduler
7178
saturationDetector SaturationDetector
7279
preRequestPlugins []PreRequest
7380
postResponsePlugins []PostResponse
7481
}
7582

76-
// HandleRequest orchestrates the request lifecycle:
77-
// 1. Parses request details.
78-
// 2. Calls admitRequest for admission control.
79-
// 3. Calls Scheduler.Schedule if request is approved.
80-
// 4. Calls prepareRequest to populate RequestContext with result and call PreRequest plugins.
81-
//
83+
// HandleRequest orchestrates the request lifecycle.
8284
// It always returns the requestContext even in the error case, as the request context is used in error handling.
8385
func (d *Director) HandleRequest(ctx context.Context, reqCtx *handlers.RequestContext) (*handlers.RequestContext, error) {
8486
logger := log.FromContext(ctx)
8587

86-
// --- 1. Parse Request, Resolve Target Models, and Determine Parameters ---
88+
// Parse Request, Resolve Target Models, and Determine Parameters
8789
requestBodyMap := reqCtx.Request.Body
8890
var ok bool
8991
reqCtx.IncomingModelName, ok = requestBodyMap["model"].(string)
@@ -134,22 +136,23 @@ func (d *Director) HandleRequest(ctx context.Context, reqCtx *handlers.RequestCo
134136
ctx = log.IntoContext(ctx, logger)
135137
logger.V(logutil.DEBUG).Info("LLM request assembled")
136138

137-
// --- 2. Admission Control check --
138-
if err := d.admitRequest(ctx, *infObjective.Spec.Criticality, reqCtx.FairnessID); err != nil {
139-
return reqCtx, err
140-
}
141-
142-
// --- 3. Call Scheduler (with the relevant candidate pods) ---
139+
// Get candidate pods for scheduling
143140
candidatePods := d.getCandidatePodsForScheduling(ctx, reqCtx.Request.Metadata)
144141
if len(candidatePods) == 0 {
145142
return reqCtx, errutil.Error{Code: errutil.ServiceUnavailable, Msg: "failed to find candidate pods for serving the request"}
146143
}
147-
result, err := d.scheduler.Schedule(ctx, reqCtx.SchedulingRequest, candidatePods)
144+
145+
// Admission Control check
146+
if err := d.admitRequest(ctx, candidatePods, *infObjective.Spec.Criticality, reqCtx.FairnessID); err != nil {
147+
return reqCtx, err
148+
}
149+
150+
result, err := d.scheduler.Schedule(ctx, reqCtx.SchedulingRequest, d.toSchedulerPodMetrics(candidatePods))
148151
if err != nil {
149152
return reqCtx, errutil.Error{Code: errutil.InferencePoolResourceExhausted, Msg: fmt.Errorf("failed to find target pod: %w", err).Error()}
150153
}
151154

152-
// --- 4. Prepare Request (Populates RequestContext and call PreRequest plugins) ---
155+
// Prepare Request (Populates RequestContext and call PreRequest plugins)
153156
// Insert target endpoint to instruct Envoy to route requests to the specified target pod and attach the port number.
154157
// Invoke PreRequest registered plugins.
155158
reqCtx, err = d.prepareRequest(ctx, reqCtx, result)
@@ -161,8 +164,9 @@ func (d *Director) HandleRequest(ctx context.Context, reqCtx *handlers.RequestCo
161164
}
162165

163166
// admitRequest handles admission control to decide whether or not to accept the request
164-
// based on the request criticality and system saturation state.
165-
func (d *Director) admitRequest(ctx context.Context, requestCriticality v1alpha2.Criticality, fairnessID string) error {
167+
// based on the request criticality and the saturation state of the candidate pods.
168+
func (d *Director) admitRequest(ctx context.Context, candidatePods []backendmetrics.PodMetrics,
169+
requestCriticality v1alpha2.Criticality, fairnessID string) error {
166170
logger := log.FromContext(ctx)
167171

168172
logger.V(logutil.TRACE).Info("Entering Flow Control", "criticality", requestCriticality, "fairnessID", fairnessID)
@@ -173,7 +177,7 @@ func (d *Director) admitRequest(ctx context.Context, requestCriticality v1alpha2
173177
}
174178

175179
logger.V(logutil.DEBUG).Info("Performing saturation check for non-critical request.")
176-
if d.saturationDetector.IsSaturated(ctx) { // Assuming non-nil Saturation Detector
180+
if d.saturationDetector.IsSaturated(ctx, candidatePods) {
177181
return errutil.Error{
178182
Code: errutil.InferencePoolResourceExhausted,
179183
Msg: "system saturated, non-critical request dropped",
@@ -189,21 +193,21 @@ func (d *Director) admitRequest(ctx context.Context, requestCriticality v1alpha2
189193
// Snapshot pod metrics from the datastore to:
190194
// 1. Reduce concurrent access to the datastore.
191195
// 2. Ensure consistent data during the scheduling operation of a request between all scheduling cycles.
192-
func (d *Director) getCandidatePodsForScheduling(ctx context.Context, requestMetadata map[string]any) []schedulingtypes.Pod {
196+
func (d *Director) getCandidatePodsForScheduling(ctx context.Context, requestMetadata map[string]any) []backendmetrics.PodMetrics {
193197
loggerTrace := log.FromContext(ctx).V(logutil.TRACE)
194198

195199
subsetMap, found := requestMetadata[metadata.SubsetFilterNamespace].(map[string]any)
196200
if !found {
197-
return d.toSchedulerPodMetrics(d.datastore.PodList(backendmetrics.AllPodPredicate))
201+
return d.datastore.PodList(backendmetrics.AllPodPredicate)
198202
}
199203

200204
// Check if endpoint key is present in the subset map and ensure there is at least one value
201205
endpointSubsetList, found := subsetMap[metadata.SubsetFilterKey].([]any)
202206
if !found {
203-
return d.toSchedulerPodMetrics(d.datastore.PodList(backendmetrics.AllPodPredicate))
207+
return d.datastore.PodList(backendmetrics.AllPodPredicate)
204208
} else if len(endpointSubsetList) == 0 {
205209
loggerTrace.Info("found empty subset filter in request metadata, filtering all pods")
206-
return []schedulingtypes.Pod{}
210+
return []backendmetrics.PodMetrics{}
207211
}
208212

209213
// Create a map of endpoint addresses for easy lookup
@@ -226,7 +230,7 @@ func (d *Director) getCandidatePodsForScheduling(ctx context.Context, requestMet
226230

227231
loggerTrace.Info("filtered candidate pods by subset filtering", "podTotalCount", podTotalCount, "filteredCount", len(podFitleredList))
228232

229-
return d.toSchedulerPodMetrics(podFitleredList)
233+
return podFitleredList
230234
}
231235

232236
// prepareRequest populates the RequestContext and calls the registered PreRequest plugins

pkg/epp/requestcontrol/director_test.go

Lines changed: 45 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"k8s.io/apimachinery/pkg/runtime"
3232
"k8s.io/apimachinery/pkg/types"
3333
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
34+
"k8s.io/utils/ptr"
3435
"sigs.k8s.io/controller-runtime/pkg/client/fake"
3536

3637
v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1"
@@ -54,7 +55,7 @@ type mockSaturationDetector struct {
5455
isSaturated bool
5556
}
5657

57-
func (m *mockSaturationDetector) IsSaturated(_ context.Context) bool {
58+
func (m *mockSaturationDetector) IsSaturated(_ context.Context, _ []backendmetrics.PodMetrics) bool {
5859
return m.isSaturated
5960
}
6061

@@ -67,6 +68,23 @@ func (m *mockScheduler) Schedule(_ context.Context, _ *schedulingtypes.LLMReques
6768
return m.scheduleResults, m.scheduleErr
6869
}
6970

71+
type mockDatastore struct {
72+
pods []backendmetrics.PodMetrics
73+
}
74+
75+
func (ds *mockDatastore) PoolGet() (*v1.InferencePool, error) { return nil, nil }
76+
func (ds *mockDatastore) ObjectiveGet(_ string) *v1alpha2.InferenceObjective { return nil }
77+
func (ds *mockDatastore) PodList(predicate func(backendmetrics.PodMetrics) bool) []backendmetrics.PodMetrics {
78+
res := []backendmetrics.PodMetrics{}
79+
for _, pod := range ds.pods {
80+
if predicate(pod) {
81+
res = append(res, pod)
82+
}
83+
}
84+
85+
return res
86+
}
87+
7088
func TestDirector_HandleRequest(t *testing.T) {
7189
ctx := logutil.NewTestLoggerIntoContext(context.Background())
7290

@@ -442,119 +460,72 @@ func TestGetCandidatePodsForScheduling(t *testing.T) {
442460
}
443461
}
444462

445-
testInput := []*corev1.Pod{
446-
{
447-
ObjectMeta: metav1.ObjectMeta{
448-
Name: "pod1",
449-
},
450-
Status: corev1.PodStatus{
451-
PodIP: "10.0.0.1",
452-
},
453-
},
454-
{
455-
ObjectMeta: metav1.ObjectMeta{
456-
Name: "pod2",
457-
},
458-
Status: corev1.PodStatus{
459-
PodIP: "10.0.0.2",
460-
},
461-
},
462-
}
463-
464-
outputPod1 := &backend.Pod{
463+
pod1 := &backend.Pod{
465464
NamespacedName: types.NamespacedName{Name: "pod1"},
466465
Address: "10.0.0.1",
467466
Labels: map[string]string{},
468467
}
469468

470-
outputPod2 := &backend.Pod{
469+
pod2 := &backend.Pod{
471470
NamespacedName: types.NamespacedName{Name: "pod2"},
472471
Address: "10.0.0.2",
473472
Labels: map[string]string{},
474473
}
475474

475+
testInput := []backendmetrics.PodMetrics{
476+
&backendmetrics.FakePodMetrics{Pod: pod1},
477+
&backendmetrics.FakePodMetrics{Pod: pod2},
478+
}
479+
476480
tests := []struct {
477481
name string
478482
metadata map[string]any
479-
output []schedulingtypes.Pod
483+
output []backendmetrics.PodMetrics
480484
}{
481485
{
482486
name: "SubsetFilter, filter not present — return all pods",
483487
metadata: map[string]any{},
484-
output: []schedulingtypes.Pod{
485-
&schedulingtypes.PodMetrics{
486-
Pod: outputPod1,
487-
MetricsState: backendmetrics.NewMetricsState(),
488-
},
489-
&schedulingtypes.PodMetrics{
490-
Pod: outputPod2,
491-
MetricsState: backendmetrics.NewMetricsState(),
492-
},
493-
},
488+
output: testInput,
494489
},
495490
{
496491
name: "SubsetFilter, namespace present filter not present — return all pods",
497492
metadata: map[string]any{metadata.SubsetFilterNamespace: map[string]any{}},
498-
output: []schedulingtypes.Pod{
499-
&schedulingtypes.PodMetrics{
500-
Pod: outputPod1,
501-
MetricsState: backendmetrics.NewMetricsState(),
502-
},
503-
&schedulingtypes.PodMetrics{
504-
Pod: outputPod2,
505-
MetricsState: backendmetrics.NewMetricsState(),
506-
},
507-
},
493+
output: testInput,
508494
},
509495
{
510496
name: "SubsetFilter, filter present with empty list — return error",
511497
metadata: makeFilterMetadata([]any{}),
512-
output: []schedulingtypes.Pod{},
498+
output: []backendmetrics.PodMetrics{},
513499
},
514500
{
515501
name: "SubsetFilter, subset with one matching pod",
516502
metadata: makeFilterMetadata([]any{"10.0.0.1"}),
517-
output: []schedulingtypes.Pod{
518-
&schedulingtypes.PodMetrics{
519-
Pod: outputPod1,
520-
MetricsState: backendmetrics.NewMetricsState(),
503+
output: []backendmetrics.PodMetrics{
504+
&backendmetrics.FakePodMetrics{
505+
Pod: pod1,
521506
},
522507
},
523508
},
524509
{
525510
name: "SubsetFilter, subset with multiple matching pods",
526511
metadata: makeFilterMetadata([]any{"10.0.0.1", "10.0.0.2", "10.0.0.3"}),
527-
output: []schedulingtypes.Pod{
528-
&schedulingtypes.PodMetrics{
529-
Pod: outputPod1,
530-
MetricsState: backendmetrics.NewMetricsState(),
531-
},
532-
&schedulingtypes.PodMetrics{
533-
Pod: outputPod2,
534-
MetricsState: backendmetrics.NewMetricsState(),
535-
},
536-
},
512+
output: testInput,
537513
},
538514
{
539515
name: "SubsetFilter, subset with no matching pods",
540516
metadata: makeFilterMetadata([]any{"10.0.0.3"}),
541-
output: []schedulingtypes.Pod{},
517+
output: []backendmetrics.PodMetrics{},
542518
},
543519
}
544520

545-
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second, time.Second*2)
546-
ds := datastore.NewDatastore(t.Context(), pmf)
547-
for _, testPod := range testInput {
548-
ds.PodUpdateOrAddIfNotExist(testPod)
549-
}
550-
521+
ds := &mockDatastore{pods: testInput}
551522
for _, test := range tests {
552523
t.Run(test.name, func(t *testing.T) {
553524
director := NewDirectorWithConfig(ds, &mockScheduler{}, &mockSaturationDetector{}, NewConfig())
554525

555526
got := director.getCandidatePodsForScheduling(context.Background(), test.metadata)
556527

557-
diff := cmp.Diff(test.output, got, cmpopts.SortSlices(func(a, b schedulingtypes.Pod) bool {
528+
diff := cmp.Diff(test.output, got, cmpopts.SortSlices(func(a, b backendmetrics.PodMetrics) bool {
558529
return a.GetPod().NamespacedName.String() < b.GetPod().NamespacedName.String()
559530
}))
560531
if diff != "" {
@@ -578,8 +549,8 @@ func TestRandomWeightedDraw(t *testing.T) {
578549
model: &v1alpha2.InferenceObjective{
579550
Spec: v1alpha2.InferenceObjectiveSpec{
580551
TargetModels: []v1alpha2.TargetModel{
581-
{Name: "canary", Weight: pointer(50)},
582-
{Name: "v1", Weight: pointer(50)},
552+
{Name: "canary", Weight: ptr.To(int32(50))},
553+
{Name: "v1", Weight: ptr.To(int32(50))},
583554
},
584555
},
585556
},
@@ -590,9 +561,9 @@ func TestRandomWeightedDraw(t *testing.T) {
590561
model: &v1alpha2.InferenceObjective{
591562
Spec: v1alpha2.InferenceObjectiveSpec{
592563
TargetModels: []v1alpha2.TargetModel{
593-
{Name: "canary", Weight: pointer(25)},
594-
{Name: "v1.1", Weight: pointer(55)},
595-
{Name: "v1", Weight: pointer(50)},
564+
{Name: "canary", Weight: ptr.To(int32(25))},
565+
{Name: "v1.1", Weight: ptr.To(int32(55))},
566+
{Name: "v1", Weight: ptr.To(int32(50))},
596567
},
597568
},
598569
},
@@ -603,9 +574,9 @@ func TestRandomWeightedDraw(t *testing.T) {
603574
model: &v1alpha2.InferenceObjective{
604575
Spec: v1alpha2.InferenceObjectiveSpec{
605576
TargetModels: []v1alpha2.TargetModel{
606-
{Name: "canary", Weight: pointer(20)},
607-
{Name: "v1.1", Weight: pointer(20)},
608-
{Name: "v1", Weight: pointer(10)},
577+
{Name: "canary", Weight: ptr.To(int32(20))},
578+
{Name: "v1.1", Weight: ptr.To(int32(20))},
579+
{Name: "v1", Weight: ptr.To(int32(10))},
609580
},
610581
},
611582
},
@@ -683,10 +654,6 @@ func TestGetRandomPod(t *testing.T) {
683654
}
684655
}
685656

686-
func pointer(v int32) *int32 {
687-
return &v
688-
}
689-
690657
func TestDirector_HandleResponse(t *testing.T) {
691658
pr1 := newTestPostResponse("pr1")
692659

0 commit comments

Comments
 (0)