Skip to content

Commit 9321425

Browse files
nirrozenbaumkfswain
authored andcommitted
removed datastore dependency from saturation detector (kubernetes-sigs#1293)
* remove datastore dependency from saturation detector Signed-off-by: Nir Rozenbaum <[email protected]> * addressed code review comments Signed-off-by: Nir Rozenbaum <[email protected]> --------- Signed-off-by: Nir Rozenbaum <[email protected]>
1 parent 0e0bd14 commit 9321425

File tree

6 files changed

+161
-239
lines changed

6 files changed

+161
-239
lines changed

cmd/epp/runner/runner.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,7 @@ func (r *Runner) Run(ctx context.Context) error {
250250

251251
scheduler := scheduling.NewSchedulerWithConfig(r.schedulerConfig)
252252

253-
saturationDetector := saturationdetector.NewDetector(sdConfig, datastore, setupLog)
253+
saturationDetector := saturationdetector.NewDetector(sdConfig, setupLog)
254254

255255
director := requestcontrol.NewDirectorWithConfig(datastore, scheduler, saturationDetector, r.requestControlConfig)
256256

pkg/epp/requestcontrol/director.go

Lines changed: 55 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,10 @@ import (
2929

3030
"sigs.k8s.io/controller-runtime/pkg/log"
3131

32+
v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1"
3233
"sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2"
3334
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend"
3435
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
35-
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
3636
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/handlers"
3737
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metadata"
3838
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
@@ -42,30 +42,38 @@ import (
4242
requtil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/request"
4343
)
4444

45+
// Datastore defines the interface required by the Director.
46+
type Datastore interface {
47+
PoolGet() (*v1.InferencePool, error)
48+
ObjectiveGet(modelName string) *v1alpha2.InferenceObjective
49+
PodList(predicate func(backendmetrics.PodMetrics) bool) []backendmetrics.PodMetrics
50+
}
51+
4552
// Scheduler defines the interface required by the Director for scheduling.
4653
type Scheduler interface {
4754
Schedule(ctx context.Context, request *schedulingtypes.LLMRequest, candidatePods []schedulingtypes.Pod) (result *schedulingtypes.SchedulingResult, err error)
4855
}
4956

5057
// SaturationDetector provides a signal indicating whether the backends are considered saturated.
5158
type SaturationDetector interface {
52-
IsSaturated(ctx context.Context) bool
59+
IsSaturated(ctx context.Context, candidatePods []backendmetrics.PodMetrics) bool
5360
}
5461

5562
// NewDirectorWithConfig creates a new Director instance with all dependencies.
56-
func NewDirectorWithConfig(datastore datastore.Datastore, scheduler Scheduler, saturationDetector SaturationDetector, config *Config) *Director {
63+
func NewDirectorWithConfig(datastore Datastore, scheduler Scheduler, saturationDetector SaturationDetector, config *Config) *Director {
5764
return &Director{
5865
datastore: datastore,
5966
scheduler: scheduler,
6067
saturationDetector: saturationDetector,
6168
preRequestPlugins: config.preRequestPlugins,
6269
postResponsePlugins: config.postResponsePlugins,
70+
defaultPriority: 0, // define default priority explicitly
6371
}
6472
}
6573

6674
// Director orchestrates the request handling flow, including scheduling.
6775
type Director struct {
68-
datastore datastore.Datastore
76+
datastore Datastore
6977
scheduler Scheduler
7078
saturationDetector SaturationDetector
7179
preRequestPlugins []PreRequest
@@ -76,17 +84,12 @@ type Director struct {
7684
defaultPriority int
7785
}
7886

79-
// HandleRequest orchestrates the request lifecycle:
80-
// 1. Parses request details.
81-
// 2. Calls admitRequest for admission control.
82-
// 3. Calls Scheduler.Schedule if request is approved.
83-
// 4. Calls prepareRequest to populate RequestContext with result and call PreRequest plugins.
84-
//
87+
// HandleRequest orchestrates the request lifecycle.
8588
// It always returns the requestContext even in the error case, as the request context is used in error handling.
8689
func (d *Director) HandleRequest(ctx context.Context, reqCtx *handlers.RequestContext) (*handlers.RequestContext, error) {
8790
logger := log.FromContext(ctx)
8891

89-
// --- 1. Parse Request, Resolve Target Models, and Determine Parameters ---
92+
// Parse Request, Resolve Target Models, and Determine Parameters
9093
requestBodyMap := reqCtx.Request.Body
9194
var ok bool
9295
reqCtx.IncomingModelName, ok = requestBodyMap["model"].(string)
@@ -130,22 +133,23 @@ func (d *Director) HandleRequest(ctx context.Context, reqCtx *handlers.RequestCo
130133
ctx = log.IntoContext(ctx, logger)
131134
logger.V(logutil.DEBUG).Info("LLM request assembled")
132135

133-
// --- 2. Admission Control check --
134-
if err := d.admitRequest(ctx, *infObjective.Spec.Priority, reqCtx.FairnessID); err != nil {
135-
return reqCtx, err
136-
}
137-
138-
// --- 3. Call Scheduler (with the relevant candidate pods) ---
136+
// Get candidate pods for scheduling
139137
candidatePods := d.getCandidatePodsForScheduling(ctx, reqCtx.Request.Metadata)
140138
if len(candidatePods) == 0 {
141139
return reqCtx, errutil.Error{Code: errutil.ServiceUnavailable, Msg: "failed to find candidate pods for serving the request"}
142140
}
143-
result, err := d.scheduler.Schedule(ctx, reqCtx.SchedulingRequest, candidatePods)
141+
142+
// Admission Control check
143+
if err := d.admitRequest(ctx, candidatePods, *infObjective.Spec.Priority, reqCtx.FairnessID); err != nil {
144+
return reqCtx, err
145+
}
146+
147+
result, err := d.scheduler.Schedule(ctx, reqCtx.SchedulingRequest, d.toSchedulerPodMetrics(candidatePods))
144148
if err != nil {
145149
return reqCtx, errutil.Error{Code: errutil.InferencePoolResourceExhausted, Msg: fmt.Errorf("failed to find target pod: %w", err).Error()}
146150
}
147151

148-
// --- 4. Prepare Request (Populates RequestContext and call PreRequest plugins) ---
152+
// Prepare Request (Populates RequestContext and call PreRequest plugins)
149153
// Insert target endpoint to instruct Envoy to route requests to the specified target pod and attach the port number.
150154
// Invoke PreRequest registered plugins.
151155
reqCtx, err = d.prepareRequest(ctx, reqCtx, result)
@@ -156,52 +160,27 @@ func (d *Director) HandleRequest(ctx context.Context, reqCtx *handlers.RequestCo
156160
return reqCtx, nil
157161
}
158162

159-
// admitRequest handles admission control to decide whether or not to accept the request
160-
// based on the request priority and system saturation state.
161-
func (d *Director) admitRequest(ctx context.Context, requestPriority int, fairnessID string) error {
162-
logger := log.FromContext(ctx)
163-
164-
logger.V(logutil.TRACE).Info("Entering Flow Control", "priority", requestPriority, "fairnessID", fairnessID)
165-
166-
// This will be removed in favor of a more robust implementation (Flow Control) in the very near future.
167-
// TODO: Make this a configurable value.
168-
// Tracking issue https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/1347
169-
if requestPriority >= 0 {
170-
logger.V(logutil.TRACE).Info("Non-sheddable request bypassing saturation check.")
171-
return nil
172-
}
173-
174-
if d.saturationDetector.IsSaturated(ctx) { // Assuming non-nil Saturation Detector
175-
return errutil.Error{
176-
Code: errutil.InferencePoolResourceExhausted,
177-
Msg: "system saturated, sheddable request dropped",
178-
}
179-
}
180-
181-
return nil
182-
}
183-
184163
// getCandidatePodsForScheduling gets the list of relevant endpoints for the scheduling cycle from the datastore.
185164
// according to EPP protocol, if "x-gateway-destination-endpoint-subset" is set on the request metadata and specifies
186165
// a subset of endpoints, only these endpoints will be considered as candidates for the scheduler.
187166
// Snapshot pod metrics from the datastore to:
188167
// 1. Reduce concurrent access to the datastore.
189168
// 2. Ensure consistent data during the scheduling operation of a request between all scheduling cycles.
190-
func (d *Director) getCandidatePodsForScheduling(ctx context.Context, requestMetadata map[string]any) []schedulingtypes.Pod {
169+
func (d *Director) getCandidatePodsForScheduling(ctx context.Context, requestMetadata map[string]any) []backendmetrics.PodMetrics {
191170
loggerTrace := log.FromContext(ctx).V(logutil.TRACE)
192171

193172
subsetMap, found := requestMetadata[metadata.SubsetFilterNamespace].(map[string]any)
194173
if !found {
195-
return d.toSchedulerPodMetrics(d.datastore.PodList(backendmetrics.AllPodsPredicate))
174+
return d.datastore.PodList(backendmetrics.AllPodsPredicate)
196175
}
197176

198177
// Check if endpoint key is present in the subset map and ensure there is at least one value
199178
endpointSubsetList, found := subsetMap[metadata.SubsetFilterKey].([]any)
200179
if !found {
201-
return d.toSchedulerPodMetrics(d.datastore.PodList(backendmetrics.AllPodsPredicate))
180+
return d.datastore.PodList(backendmetrics.AllPodsPredicate)
202181
} else if len(endpointSubsetList) == 0 {
203182
loggerTrace.Info("found empty subset filter in request metadata, filtering all pods")
204-
return []schedulingtypes.Pod{}
183+
return []backendmetrics.PodMetrics{}
205184
}
206185

207186
// Create a map of endpoint addresses for easy lookup
@@ -214,17 +193,42 @@ func (d *Director) getCandidatePodsForScheduling(ctx context.Context, requestMet
214193
}
215194

216195
podTotalCount := 0
217-
podFitleredList := d.datastore.PodList(func(pm backendmetrics.PodMetrics) bool {
196+
podFilteredList := d.datastore.PodList(func(pm backendmetrics.PodMetrics) bool {
218197
podTotalCount++
219198
if _, found := endpoints[pm.GetPod().Address]; found {
220199
return true
221200
}
222201
return false
223202
})
224203

225-
loggerTrace.Info("filtered candidate pods by subset filtering", "podTotalCount", podTotalCount, "filteredCount", len(podFitleredList))
204+
loggerTrace.Info("filtered candidate pods by subset filtering", "podTotalCount", podTotalCount, "filteredCount", len(podFilteredList))
205+
206+
return podFilteredList
207+
}
208+
209+
// admitRequest handles admission control to decide whether or not to accept the request
210+
// based on the request priority and saturation state.
211+
func (d *Director) admitRequest(ctx context.Context, candidatePods []backendmetrics.PodMetrics, requestPriority int, fairnessID string) error {
212+
loggerTrace := log.FromContext(ctx).V(logutil.TRACE)
213+
214+
loggerTrace.Info("Entering Flow Control", "priority", requestPriority, "fairnessID", fairnessID)
215+
216+
// This will be removed in favor of a more robust implementation (Flow Control) in the very near future.
217+
// TODO: Make this a configurable value.
218+
// Tracking issue https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/1347
219+
if requestPriority >= 0 {
220+
loggerTrace.Info("Non-sheddable request bypassing saturation check.")
221+
return nil
222+
}
223+
224+
if d.saturationDetector.IsSaturated(ctx, candidatePods) {
225+
return errutil.Error{
226+
Code: errutil.InferencePoolResourceExhausted,
227+
Msg: "system saturated, sheddable request dropped",
228+
}
229+
}
226230

227-
return d.toSchedulerPodMetrics(podFitleredList)
231+
return nil
228232
}
229233

230234
// prepareRequest populates the RequestContext and calls the registered PreRequest plugins

pkg/epp/requestcontrol/director_test.go

Lines changed: 37 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
"sigs.k8s.io/controller-runtime/pkg/client/fake"
3535

3636
v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1"
37+
"sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2"
3738
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend"
3839
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
3940
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
@@ -53,7 +54,7 @@ type mockSaturationDetector struct {
5354
isSaturated bool
5455
}
5556

56-
func (m *mockSaturationDetector) IsSaturated(_ context.Context) bool {
57+
func (m *mockSaturationDetector) IsSaturated(_ context.Context, _ []backendmetrics.PodMetrics) bool {
5758
return m.isSaturated
5859
}
5960

@@ -66,6 +67,23 @@ func (m *mockScheduler) Schedule(_ context.Context, _ *schedulingtypes.LLMReques
6667
return m.scheduleResults, m.scheduleErr
6768
}
6869

70+
type mockDatastore struct {
71+
pods []backendmetrics.PodMetrics
72+
}
73+
74+
func (ds *mockDatastore) PoolGet() (*v1.InferencePool, error) { return nil, nil }
75+
func (ds *mockDatastore) ObjectiveGet(_ string) *v1alpha2.InferenceObjective { return nil }
76+
func (ds *mockDatastore) PodList(predicate func(backendmetrics.PodMetrics) bool) []backendmetrics.PodMetrics {
77+
res := []backendmetrics.PodMetrics{}
78+
for _, pod := range ds.pods {
79+
if predicate(pod) {
80+
res = append(res, pod)
81+
}
82+
}
83+
84+
return res
85+
}
86+
6987
func TestDirector_HandleRequest(t *testing.T) {
7088
ctx := logutil.NewTestLoggerIntoContext(context.Background())
7189

@@ -450,119 +468,72 @@ func TestGetCandidatePodsForScheduling(t *testing.T) {
450468
}
451469
}
452470

453-
testInput := []*corev1.Pod{
454-
{
455-
ObjectMeta: metav1.ObjectMeta{
456-
Name: "pod1",
457-
},
458-
Status: corev1.PodStatus{
459-
PodIP: "10.0.0.1",
460-
},
461-
},
462-
{
463-
ObjectMeta: metav1.ObjectMeta{
464-
Name: "pod2",
465-
},
466-
Status: corev1.PodStatus{
467-
PodIP: "10.0.0.2",
468-
},
469-
},
470-
}
471-
472-
outputPod1 := &backend.Pod{
471+
pod1 := &backend.Pod{
473472
NamespacedName: types.NamespacedName{Name: "pod1"},
474473
Address: "10.0.0.1",
475474
Labels: map[string]string{},
476475
}
477476

478-
outputPod2 := &backend.Pod{
477+
pod2 := &backend.Pod{
479478
NamespacedName: types.NamespacedName{Name: "pod2"},
480479
Address: "10.0.0.2",
481480
Labels: map[string]string{},
482481
}
483482

483+
testInput := []backendmetrics.PodMetrics{
484+
&backendmetrics.FakePodMetrics{Pod: pod1},
485+
&backendmetrics.FakePodMetrics{Pod: pod2},
486+
}
487+
484488
tests := []struct {
485489
name string
486490
metadata map[string]any
487-
output []schedulingtypes.Pod
491+
output []backendmetrics.PodMetrics
488492
}{
489493
{
490494
name: "SubsetFilter, filter not present — return all pods",
491495
metadata: map[string]any{},
492-
output: []schedulingtypes.Pod{
493-
&schedulingtypes.PodMetrics{
494-
Pod: outputPod1,
495-
MetricsState: backendmetrics.NewMetricsState(),
496-
},
497-
&schedulingtypes.PodMetrics{
498-
Pod: outputPod2,
499-
MetricsState: backendmetrics.NewMetricsState(),
500-
},
501-
},
496+
output: testInput,
502497
},
503498
{
504499
name: "SubsetFilter, namespace present filter not present — return all pods",
505500
metadata: map[string]any{metadata.SubsetFilterNamespace: map[string]any{}},
506-
output: []schedulingtypes.Pod{
507-
&schedulingtypes.PodMetrics{
508-
Pod: outputPod1,
509-
MetricsState: backendmetrics.NewMetricsState(),
510-
},
511-
&schedulingtypes.PodMetrics{
512-
Pod: outputPod2,
513-
MetricsState: backendmetrics.NewMetricsState(),
514-
},
515-
},
501+
output: testInput,
516502
},
517503
{
518504
name: "SubsetFilter, filter present with empty list — return error",
519505
metadata: makeFilterMetadata([]any{}),
520-
output: []schedulingtypes.Pod{},
506+
output: []backendmetrics.PodMetrics{},
521507
},
522508
{
523509
name: "SubsetFilter, subset with one matching pod",
524510
metadata: makeFilterMetadata([]any{"10.0.0.1"}),
525-
output: []schedulingtypes.Pod{
526-
&schedulingtypes.PodMetrics{
527-
Pod: outputPod1,
528-
MetricsState: backendmetrics.NewMetricsState(),
511+
output: []backendmetrics.PodMetrics{
512+
&backendmetrics.FakePodMetrics{
513+
Pod: pod1,
529514
},
530515
},
531516
},
532517
{
533518
name: "SubsetFilter, subset with multiple matching pods",
534519
metadata: makeFilterMetadata([]any{"10.0.0.1", "10.0.0.2", "10.0.0.3"}),
535-
output: []schedulingtypes.Pod{
536-
&schedulingtypes.PodMetrics{
537-
Pod: outputPod1,
538-
MetricsState: backendmetrics.NewMetricsState(),
539-
},
540-
&schedulingtypes.PodMetrics{
541-
Pod: outputPod2,
542-
MetricsState: backendmetrics.NewMetricsState(),
543-
},
544-
},
520+
output: testInput,
545521
},
546522
{
547523
name: "SubsetFilter, subset with no matching pods",
548524
metadata: makeFilterMetadata([]any{"10.0.0.3"}),
549-
output: []schedulingtypes.Pod{},
525+
output: []backendmetrics.PodMetrics{},
550526
},
551527
}
552528

553-
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second)
554-
ds := datastore.NewDatastore(t.Context(), pmf)
555-
for _, testPod := range testInput {
556-
ds.PodUpdateOrAddIfNotExist(testPod)
557-
}
558-
529+
ds := &mockDatastore{pods: testInput}
559530
for _, test := range tests {
560531
t.Run(test.name, func(t *testing.T) {
561532
director := NewDirectorWithConfig(ds, &mockScheduler{}, &mockSaturationDetector{}, NewConfig())
562533

563534
got := director.getCandidatePodsForScheduling(context.Background(), test.metadata)
564535

565-
diff := cmp.Diff(test.output, got, cmpopts.SortSlices(func(a, b schedulingtypes.Pod) bool {
536+
diff := cmp.Diff(test.output, got, cmpopts.SortSlices(func(a, b backendmetrics.PodMetrics) bool {
566537
return a.GetPod().NamespacedName.String() < b.GetPod().NamespacedName.String()
567538
}))
568539
if diff != "" {

0 commit comments

Comments
 (0)