diff --git a/cmd/epp/runner/runner.go b/cmd/epp/runner/runner.go index 5ee785c3c..06293c70c 100644 --- a/cmd/epp/runner/runner.go +++ b/cmd/epp/runner/runner.go @@ -50,6 +50,9 @@ import ( "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer" dlmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer/metrics" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol" + fccontroller "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/controller" + fcregistry "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/registry" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics/collectors" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins" @@ -68,11 +71,25 @@ import ( ) const ( - // enableExperimentalDatalayerV2 defines the environment variable - // used as feature flag for the pluggable data layer. + // enableExperimentalDatalayerV2 defines the environment variable used as feature flag for the pluggable data layer. enableExperimentalDatalayerV2 = "ENABLE_EXPERIMENTAL_DATALAYER_V2" + // enableExperimentalFlowControlLayer defines the environment variable used as a feature flag for the pluggable flow + // control layer. + enableExperimentalFlowControlLayer = "ENABLE_EXPERIMENTAL_FLOW_CONTROL_LAYER" ) +// TODO: this is hardcoded for POC only. This needs to be hooked up to our text-based config story. +var flowControlConfig = flowcontrol.Config{ + Controller: fccontroller.Config{}, // Use all defaults. + Registry: fcregistry.Config{ + // Define domain of accepted priority levels as this field is required. Use defaults for all optional fields. + // TODO: this should not be hardcoded. + PriorityBands: []fcregistry.PriorityBandConfig{ + {Priority: 0, PriorityName: "Default"}, + }, + }, +} + var ( grpcPort = flag.Int("grpc-port", runserver.DefaultGrpcPort, "The gRPC port used for communicating with Envoy proxy") grpcHealthPort = flag.Int("grpc-health-port", runserver.DefaultGrpcHealthPort, "The port used for gRPC liveness and readiness probes") @@ -271,7 +288,43 @@ func (r *Runner) Run(ctx context.Context) error { saturationDetector := saturationdetector.NewDetector(sdConfig, setupLog) - director := requestcontrol.NewDirectorWithConfig(datastore, scheduler, saturationDetector, r.requestControlConfig) + // --- Admission Control Initialization --- + enableFlowControl := env.GetEnvBool(enableExperimentalFlowControlLayer, false, setupLog) + var admissionController requestcontrol.AdmissionController + if enableFlowControl { + setupLog.Info("Initializing experimental Flow Control layer") + fcCfg, err := flowControlConfig.ValidateAndApplyDefaults() + if err != nil { + setupLog.Error(err, "failed to initialize Flow Control layer") + return fmt.Errorf("invalid Flow Control config: %w", err) + } + + registry, err := fcregistry.NewFlowRegistry(fcCfg.Registry, setupLog) + if err != nil { + return fmt.Errorf("failed to initialize Flow Registry: %w", err) + } + fc, err := fccontroller.NewFlowController( + ctx, + fcCfg.Controller, + registry, + saturationDetector, + setupLog, + ) + if err != nil { + return fmt.Errorf("failed to initialize Flow Controller: %w", err) + } + go registry.Run(ctx) + admissionController = requestcontrol.NewFlowControlAdmissionController(saturationDetector, fc) + } else { + setupLog.Info("Experimental Flow Control layer is disabled, using legacy admission control") + admissionController = requestcontrol.NewLegacyAdmissionController(saturationDetector) + } + + director := requestcontrol.NewDirectorWithConfig( + datastore, + scheduler, + admissionController, + r.requestControlConfig) // --- Setup ExtProc Server Runner --- serverRunner := &runserver.ExtProcServerRunner{ diff --git a/pkg/epp/handlers/request.go b/pkg/epp/handlers/request.go index 7f8122195..0e04289a7 100644 --- a/pkg/epp/handlers/request.go +++ b/pkg/epp/handlers/request.go @@ -29,6 +29,13 @@ import ( errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/error" ) +const ( + // defaultFairnessID is the default fairness ID used when no ID is provided in the request. + // This ensures that requests without explicit fairness identifiers are still grouped and managed by the Flow Control + // system. + defaultFairnessID = "default-flow" +) + func (s *StreamingServer) HandleRequestHeaders(reqCtx *RequestContext, req *extProcPb.ProcessingRequest_RequestHeaders) error { reqCtx.RequestReceivedTimestamp = time.Now() @@ -80,6 +87,11 @@ func (s *StreamingServer) HandleRequestHeaders(reqCtx *RequestContext, req *extP delete(reqCtx.Request.Headers, header.Key) } } + + if reqCtx.FairnessID == "" { + reqCtx.FairnessID = defaultFairnessID + } + return nil } diff --git a/pkg/epp/handlers/request_test.go b/pkg/epp/handlers/request_test.go index 4ae207803..a3ef90cb4 100644 --- a/pkg/epp/handlers/request_test.go +++ b/pkg/epp/handlers/request_test.go @@ -21,6 +21,7 @@ import ( configPb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" + "github.com/stretchr/testify/assert" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metadata" ) @@ -66,3 +67,32 @@ func TestHandleRequestHeaders(t *testing.T) { t.Errorf("expected fairness ID header to be removed from request headers, but it was not") } } + +func TestHandleRequestHeaders_DefaultFairnessID(t *testing.T) { + t.Parallel() + + server := &StreamingServer{} + reqCtx := &RequestContext{ + Request: &Request{ + Headers: make(map[string]string), + }, + } + + req := &extProcPb.ProcessingRequest_RequestHeaders{ + RequestHeaders: &extProcPb.HttpHeaders{ + Headers: &configPb.HeaderMap{ + Headers: []*configPb.HeaderValue{ + { + Key: "x-test-header", + Value: "test-value", + }, + }, + }, + EndOfStream: false, + }, + } + + err := server.HandleRequestHeaders(reqCtx, req) + assert.NoError(t, err, "expected no error") + assert.Equal(t, defaultFairnessID, reqCtx.FairnessID, "expected fairness ID to be defaulted") +} diff --git a/pkg/epp/requestcontrol/admission.go b/pkg/epp/requestcontrol/admission.go new file mode 100644 index 000000000..383d2844a --- /dev/null +++ b/pkg/epp/requestcontrol/admission.go @@ -0,0 +1,219 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package requestcontrol + +import ( + "context" + "time" + + "sigs.k8s.io/controller-runtime/pkg/log" + + backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/handlers" + errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/error" + logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" + requtil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/request" +) + +// AdmissionController defines the interface for making admission control decisions. +// Implementations of this interface determine whether an incoming inference request should be accepted or rejected +// based on various criteria such as system load, fairness, priority, and available capacity. +type AdmissionController interface { + // Admit determines if a request should be admitted. + // It is called by the Director for each incoming request. + // + // Args: + // ctx: The request context, carrying deadlines, cancellation signals, and logger. + // reqCtx: The handlers.RequestContext containing details about the incoming request. + // candidatePods: A list of potential backend pods that can serve the request. + // priority: The priority level of the request, as determined by the InferenceObjective. + // + // Returns: + // - nil: If the request is admitted and should proceed to scheduling. + // - errutil.Error: If the request is rejected. + Admit( + ctx context.Context, + reqCtx *handlers.RequestContext, + candidatePods []backendmetrics.PodMetrics, + priority int, + ) error +} + +// saturationDetector defines the minimal interface required for checking if the backend pool is saturated. +type saturationDetector interface { + IsSaturated(ctx context.Context, candidatePods []backendmetrics.PodMetrics) bool +} + +// flowController defines the minimal interface required by FlowControlAdmissionController for enqueuing requests and +// waiting for an admission outcome. +type flowController interface { + EnqueueAndWait(req types.FlowControlRequest) (types.QueueOutcome, error) +} + +// rejectIfSheddableAndSaturated checks if a request should be immediately rejected because it's sheddable +// (priority < 0) and the system is saturated. +func rejectIfSheddableAndSaturated( + ctx context.Context, + sd saturationDetector, + reqCtx *handlers.RequestContext, + candidatePods []backendmetrics.PodMetrics, + priority int, +) error { + if requtil.IsSheddable(priority) { + logger := log.FromContext(ctx) + if sd.IsSaturated(ctx, candidatePods) { + logger.V(logutil.TRACE).Info("Request rejected: system saturated and request is sheddable", + "requestID", reqCtx.SchedulingRequest.RequestId) + return errutil.Error{ + Code: errutil.InferencePoolResourceExhausted, + Msg: "system saturated, sheddable request dropped", + } + } + } + return nil +} + +// --- LegacyAdmissionController --- + +// LegacyAdmissionController implements saturation-based admission control. +// It rejects sheddable requests (priority < 0) if the saturationDetector indicates that the system is currently +// saturated. Non-sheddable requests always bypass the saturation check. +type LegacyAdmissionController struct { + saturationDetector saturationDetector +} + +// NewLegacyAdmissionController creates a new LegacyAdmissionController. +func NewLegacyAdmissionController(sd saturationDetector) *LegacyAdmissionController { + return &LegacyAdmissionController{saturationDetector: sd} +} + +// Admit implements the AdmissionController interface for the legacy strategy. +// It checks for saturation only for requests with priority < 0. +func (lac *LegacyAdmissionController) Admit( + ctx context.Context, + reqCtx *handlers.RequestContext, + candidatePods []backendmetrics.PodMetrics, + priority int, +) error { + logger := log.FromContext(ctx) + logger.V(logutil.TRACE).Info("Executing LegacyAdmissionController", + "priority", priority, "fairnessID", reqCtx.FairnessID) + if err := rejectIfSheddableAndSaturated(ctx, lac.saturationDetector, reqCtx, candidatePods, priority); err != nil { + return err + } + logger.V(logutil.TRACE).Info("Request admitted", "requestID", reqCtx.SchedulingRequest.RequestId) + return nil +} + +// --- FlowControlAdmissionController --- + +// FlowControlAdmissionController delegates admission decisions to the Flow Control layer. +// It first checks if the request is sheddable and the system is saturated, rejecting immediately if both conditions are +// true. Otherwise, it uses the provided flowController to enqueue the request and await an outcome. +type FlowControlAdmissionController struct { + saturationDetector saturationDetector + flowController flowController +} + +// NewFlowControlAdmissionController creates a new FlowControlAdmissionController. +// It requires a SaturationDetector and a flowController instance. +func NewFlowControlAdmissionController(sd saturationDetector, fc flowController) *FlowControlAdmissionController { + return &FlowControlAdmissionController{ + saturationDetector: sd, + flowController: fc, + } +} + +// Admit implements the AdmissionController interface by checking for saturation on sheddable requests first, then +// deferring to the Flow Control system. +func (fcac *FlowControlAdmissionController) Admit( + ctx context.Context, + reqCtx *handlers.RequestContext, + candidatePods []backendmetrics.PodMetrics, + priority int, +) error { + logger := log.FromContext(ctx) + logger.V(logutil.TRACE).Info("Executing FlowControlAdmissionController", + "requestID", reqCtx.SchedulingRequest.RequestId, "priority", priority, "fairnessID", reqCtx.FairnessID) + if err := rejectIfSheddableAndSaturated(ctx, fcac.saturationDetector, reqCtx, candidatePods, priority); err != nil { + return err + } + + logger.V(logutil.TRACE).Info("Request proceeding to flow control", "requestID", reqCtx.SchedulingRequest.RequestId) + + fcReq := &flowControlRequest{ + ctx: ctx, + requestID: reqCtx.SchedulingRequest.RequestId, + fairnessID: reqCtx.FairnessID, + priority: priority, + requestByteSize: uint64(reqCtx.RequestSize), + candidatePods: candidatePods, + } + + outcome, err := fcac.flowController.EnqueueAndWait(fcReq) + logger.V(logutil.DEBUG).Info("Flow control outcome", + "requestID", reqCtx.SchedulingRequest.RequestId, "outcome", outcome, "error", err) + return translateFlowControlOutcome(outcome, err) +} + +// flowControlRequest is an adapter that implements the types.FlowControlRequest interface. +type flowControlRequest struct { + ctx context.Context + requestID string + fairnessID string + priority int + requestByteSize uint64 + candidatePods []backendmetrics.PodMetrics +} + +var _ types.FlowControlRequest = &flowControlRequest{} + +func (r *flowControlRequest) Context() context.Context { return r.ctx } +func (r *flowControlRequest) ID() string { return r.requestID } +func (r *flowControlRequest) InitialEffectiveTTL() time.Duration { return 0 } // Use controller default. +func (r *flowControlRequest) ByteSize() uint64 { return r.requestByteSize } +func (r *flowControlRequest) CandidatePodsForScheduling() []backendmetrics.PodMetrics { + return r.candidatePods +} +func (r *flowControlRequest) FlowKey() types.FlowKey { + return types.FlowKey{ID: r.fairnessID, Priority: r.priority} +} + +// translateFlowControlOutcome maps the context-rich outcome of the Flow Control layer to the public errutil.Error +// contract used by the Director. +func translateFlowControlOutcome(outcome types.QueueOutcome, err error) error { + msg := "request rejected by flow control" + if err != nil { + msg = err.Error() + } + + switch outcome { + case types.QueueOutcomeDispatched: + return nil + case types.QueueOutcomeRejectedCapacity: + return errutil.Error{Code: errutil.InferencePoolResourceExhausted, Msg: msg} + case types.QueueOutcomeEvictedTTL: + return errutil.Error{Code: errutil.ServiceUnavailable, Msg: "request timed out in queue: " + msg} + case types.QueueOutcomeEvictedContextCancelled: + return errutil.Error{Code: errutil.ServiceUnavailable, Msg: "client disconnected: " + msg} + case types.QueueOutcomeRejectedOther, types.QueueOutcomeEvictedOther: + return errutil.Error{Code: errutil.Internal, Msg: "internal flow control error: " + msg} + default: + return errutil.Error{Code: errutil.Internal, Msg: "unhandled flow control outcome: " + msg} + } +} diff --git a/pkg/epp/requestcontrol/admission_test.go b/pkg/epp/requestcontrol/admission_test.go new file mode 100644 index 000000000..002c50f06 --- /dev/null +++ b/pkg/epp/requestcontrol/admission_test.go @@ -0,0 +1,282 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package requestcontrol + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" + fctypes "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/handlers" + schedulingtypes "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types" + errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/error" + logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" +) + +// --- Mocks --- + +type mockSaturationDetector struct { + isSaturated bool +} + +func (m *mockSaturationDetector) IsSaturated(_ context.Context, _ []backendmetrics.PodMetrics) bool { + return m.isSaturated +} + +type mockFlowController struct { + outcome fctypes.QueueOutcome + err error + called bool +} + +func (m *mockFlowController) EnqueueAndWait(_ fctypes.FlowControlRequest) (fctypes.QueueOutcome, error) { + m.called = true + return m.outcome, m.err +} + +func TestLegacyAdmissionController_Admit(t *testing.T) { + t.Parallel() + ctx := logutil.NewTestLoggerIntoContext(context.Background()) + candidatePods := []backendmetrics.PodMetrics{} + reqCtx := &handlers.RequestContext{ + SchedulingRequest: &schedulingtypes.LLMRequest{RequestId: "test-req"}, + } + + testCases := []struct { + name string + priority int + isSaturated bool + expectErr bool + expectErrCode string + expectErrSubstr string + }{ + { + name: "non_sheddable_saturated_admit", + priority: 0, + isSaturated: true, + expectErr: false, + }, + { + name: "sheddable_not_saturated_admit", + priority: -1, + isSaturated: false, + expectErr: false, + }, + { + name: "sheddable_saturated_reject", + priority: -1, + isSaturated: true, + expectErr: true, + expectErrCode: errutil.InferencePoolResourceExhausted, + expectErrSubstr: "system saturated, sheddable request dropped", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + saturationDetector := &mockSaturationDetector{isSaturated: tc.isSaturated} + ac := NewLegacyAdmissionController(saturationDetector) + + err := ac.Admit(ctx, reqCtx, candidatePods, tc.priority) + + if !tc.expectErr { + assert.NoError(t, err, "Admit() should not have returned an error for scenario: %s", tc.name) + } else { + require.Error(t, err, "Admit() should have returned an error for scenario: %s", tc.name) + var e errutil.Error + if assert.ErrorAs(t, err, &e, "error should be of type errutil.Error") { + assert.Equal(t, tc.expectErrCode, e.Code, "incorrect error code for scenario: %s", tc.name) + assert.Contains(t, e.Msg, tc.expectErrSubstr, "incorrect error message substring for scenario: %s", tc.name) + } + } + }) + } +} + +func TestFlowControlRequestAdapter(t *testing.T) { + t.Parallel() + ctx := context.Background() + candidatePods := []backendmetrics.PodMetrics{&backendmetrics.FakePodMetrics{}} + + testCases := []struct { + name string + requestID string + fairnessID string + priority int + requestByteSize uint64 + expectFlowKey fctypes.FlowKey + }{ + { + name: "simple", + requestID: "req-1", + fairnessID: "flow-1", + priority: 10, + requestByteSize: 1024, + expectFlowKey: fctypes.FlowKey{ID: "flow-1", Priority: 10}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + fcReq := &flowControlRequest{ + ctx: ctx, + requestID: tc.requestID, + fairnessID: tc.fairnessID, + priority: tc.priority, + requestByteSize: tc.requestByteSize, + candidatePods: candidatePods, + } + + assert.Equal(t, ctx, fcReq.Context(), "Context() mismatch") + assert.Equal(t, tc.requestID, fcReq.ID(), "ID() mismatch") + assert.Equal(t, tc.requestByteSize, fcReq.ByteSize(), "ByteSize() mismatch") + assert.Equal(t, candidatePods, fcReq.CandidatePodsForScheduling(), "CandidatePodsForScheduling() mismatch") + assert.Equal(t, tc.expectFlowKey, fcReq.FlowKey(), "FlowKey() mismatch") + assert.Zero(t, fcReq.InitialEffectiveTTL(), "InitialEffectiveTTL() should be zero") + }) + } +} +func TestFlowControlAdmissionController_Admit(t *testing.T) { + t.Parallel() + ctx := logutil.NewTestLoggerIntoContext(context.Background()) + candidatePods := []backendmetrics.PodMetrics{} + + reqCtx := &handlers.RequestContext{ + SchedulingRequest: &schedulingtypes.LLMRequest{RequestId: "test-req"}, + } + + testCases := []struct { + name string + priority int + isSaturated bool + fcOutcome fctypes.QueueOutcome + fcErr error + expectErr bool + expectErrCode string + expectErrSubstr string + expectFCSkipped bool + }{ + { + name: "sheddable_saturated_reject", + priority: -1, + isSaturated: true, + expectErr: true, + expectErrCode: errutil.InferencePoolResourceExhausted, + expectErrSubstr: "system saturated, sheddable request dropped", + expectFCSkipped: true, + }, + { + name: "sheddable_not_saturated_dispatch", + priority: -1, + isSaturated: false, + fcOutcome: fctypes.QueueOutcomeDispatched, + expectErr: false, + }, + { + name: "non_sheddable_saturated_dispatch", + priority: 0, + isSaturated: true, + fcOutcome: fctypes.QueueOutcomeDispatched, + expectErr: false, + }, + { + name: "fc_reject_capacity", + priority: 0, + fcOutcome: fctypes.QueueOutcomeRejectedCapacity, + expectErr: true, + expectErrCode: errutil.InferencePoolResourceExhausted, + expectErrSubstr: "request rejected by flow control", + }, + { + name: "fc_evict_ttl", + priority: 0, + fcOutcome: fctypes.QueueOutcomeEvictedTTL, + fcErr: errors.New("timeout"), + expectErr: true, + expectErrCode: errutil.ServiceUnavailable, + expectErrSubstr: "request timed out in queue: timeout", + }, + { + name: "fc_evict_context_cancelled", + priority: 0, + fcOutcome: fctypes.QueueOutcomeEvictedContextCancelled, + expectErr: true, + expectErrCode: errutil.ServiceUnavailable, + expectErrSubstr: "client disconnected", + }, + { + name: "fc_reject_other", + priority: 0, + fcOutcome: fctypes.QueueOutcomeRejectedOther, + expectErr: true, + expectErrCode: errutil.Internal, + expectErrSubstr: "internal flow control error", + }, + { + name: "fc_evict_other", + priority: 0, + fcOutcome: fctypes.QueueOutcomeEvictedOther, + fcErr: errors.New("internal error"), + expectErr: true, + expectErrCode: errutil.Internal, + expectErrSubstr: "internal flow control error: internal error", + }, + { + name: "fc_unhandled_outcome", + priority: 0, + fcOutcome: fctypes.QueueOutcomeNotYetFinalized, + expectErr: true, + expectErrCode: errutil.Internal, + expectErrSubstr: "unhandled flow control outcome", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + sd := &mockSaturationDetector{isSaturated: tc.isSaturated} + fc := &mockFlowController{outcome: tc.fcOutcome, err: tc.fcErr} + ac := NewFlowControlAdmissionController(sd, fc) + + err := ac.Admit(ctx, reqCtx, candidatePods, tc.priority) + + if tc.expectFCSkipped { + assert.False(t, fc.called, "FlowController should not have been called for scenario: %s", tc.name) + } else { + assert.True(t, fc.called, "FlowController should have been called for scenario: %s", tc.name) + } + + if !tc.expectErr { + assert.NoError(t, err, "Admit() returned an unexpected error for scenario: %s", tc.name) + } else { + require.Error(t, err, "Admit() should have returned an error for scenario: %s", tc.name) + var e errutil.Error + if assert.ErrorAs(t, err, &e, "error should be of type errutil.Error") { + assert.Equal(t, tc.expectErrCode, e.Code, "incorrect error code for scenario: %s", tc.name) + assert.Contains(t, e.Msg, tc.expectErrSubstr, "incorrect error message substring for scenario: %s", tc.name) + } + } + }) + } +} diff --git a/pkg/epp/requestcontrol/director.go b/pkg/epp/requestcontrol/director.go index a3e2d6d13..2f8c7389c 100644 --- a/pkg/epp/requestcontrol/director.go +++ b/pkg/epp/requestcontrol/director.go @@ -54,28 +54,36 @@ type Scheduler interface { Schedule(ctx context.Context, request *schedulingtypes.LLMRequest, candidatePods []schedulingtypes.Pod) (result *schedulingtypes.SchedulingResult, err error) } -// SaturationDetector provides a signal indicating whether the backends are considered saturated. -type SaturationDetector interface { - IsSaturated(ctx context.Context, candidatePods []backendmetrics.PodMetrics) bool -} - // NewDirectorWithConfig creates a new Director instance with all dependencies. -func NewDirectorWithConfig(datastore Datastore, scheduler Scheduler, saturationDetector SaturationDetector, config *Config) *Director { +func NewDirectorWithConfig( + datastore Datastore, + scheduler Scheduler, + admissionController AdmissionController, + config *Config, +) *Director { return &Director{ datastore: datastore, scheduler: scheduler, - saturationDetector: saturationDetector, + admissionController: admissionController, preRequestPlugins: config.preRequestPlugins, postResponsePlugins: config.postResponsePlugins, defaultPriority: 0, // define default priority explicitly } } -// Director orchestrates the request handling flow, including scheduling. +// Director orchestrates the request handling flow after initial parsing by the handler. +// Its responsibilities include: +// - Retrieving request metadata and relevant objectives. +// - Determining candidate pods. +// - Performing admission control via the AdmissionController. +// - Scheduling the request to target pod(s) via the Scheduler. +// - Running PreRequest plugins. +// - Preparing the request context for the Envoy ext_proc filter to route the request. +// - Running PostResponse plugins. type Director struct { datastore Datastore scheduler Scheduler - saturationDetector SaturationDetector + admissionController AdmissionController preRequestPlugins []PreRequest postResponsePlugins []PostResponse // we just need a pointer to an int variable since priority is a pointer in InferenceObjective @@ -140,8 +148,8 @@ func (d *Director) HandleRequest(ctx context.Context, reqCtx *handlers.RequestCo return reqCtx, errutil.Error{Code: errutil.ServiceUnavailable, Msg: "failed to find candidate pods for serving the request"} } - // Admission Control check - if err := d.admitRequest(ctx, candidatePods, *infObjective.Spec.Priority, reqCtx.FairnessID); err != nil { + if err := d.admissionController.Admit(ctx, reqCtx, candidatePods, *infObjective.Spec.Priority); err != nil { + logger.V(logutil.DEFAULT).Info("Request rejected by admission control", "error", err) return reqCtx, err } @@ -207,31 +215,6 @@ func (d *Director) getCandidatePodsForScheduling(ctx context.Context, requestMet return podFilteredList } -// admitRequest handles admission control to decide whether or not to accept the request -// based on the request priority and saturation state. -func (d *Director) admitRequest(ctx context.Context, candidatePods []backendmetrics.PodMetrics, requestPriority int, fairnessID string) error { - loggerTrace := log.FromContext(ctx).V(logutil.TRACE) - - loggerTrace.Info("Entering Flow Control", "priority", requestPriority, "fairnessID", fairnessID) - - // This will be removed in favor of a more robust implementation (Flow Control) in the very near future. - // TODO: Make this a configurable value. - // Tracking issue https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/1347 - if requestPriority >= 0 { - loggerTrace.Info("Non-sheddable request bypassing saturation check.") - return nil - } - - if d.saturationDetector.IsSaturated(ctx, candidatePods) { - return errutil.Error{ - Code: errutil.InferencePoolResourceExhausted, - Msg: "system saturated, sheddable request dropped", - } - } - - return nil -} - // prepareRequest populates the RequestContext and calls the registered PreRequest plugins // for allowing plugging customized logic based on the scheduling result. func (d *Director) prepareRequest(ctx context.Context, reqCtx *handlers.RequestContext, result *schedulingtypes.SchedulingResult) (*handlers.RequestContext, error) { diff --git a/pkg/epp/requestcontrol/director_test.go b/pkg/epp/requestcontrol/director_test.go index a0cb7c325..777e28704 100644 --- a/pkg/epp/requestcontrol/director_test.go +++ b/pkg/epp/requestcontrol/director_test.go @@ -50,12 +50,17 @@ import ( // --- Mocks --- -type mockSaturationDetector struct { - isSaturated bool +type mockAdmissionController struct { + admitErr error } -func (m *mockSaturationDetector) IsSaturated(_ context.Context, _ []backendmetrics.PodMetrics) bool { - return m.isSaturated +func (m *mockAdmissionController) Admit( + _ context.Context, + _ *handlers.RequestContext, + _ []backendmetrics.PodMetrics, + _ int, +) error { + return m.admitErr } type mockScheduler struct { @@ -71,8 +76,12 @@ type mockDatastore struct { pods []backendmetrics.PodMetrics } -func (ds *mockDatastore) PoolGet() (*v1.InferencePool, error) { return nil, nil } -func (ds *mockDatastore) ObjectiveGet(_ string) *v1alpha2.InferenceObjective { return nil } +func (ds *mockDatastore) PoolGet() (*v1.InferencePool, error) { + return nil, nil +} +func (ds *mockDatastore) ObjectiveGet(_ string) *v1alpha2.InferenceObjective { + return nil +} func (ds *mockDatastore) PodList(predicate func(backendmetrics.PodMetrics) bool) []backendmetrics.PodMetrics { res := []backendmetrics.PodMetrics{} for _, pod := range ds.pods { @@ -187,23 +196,23 @@ func TestDirector_HandleRequest(t *testing.T) { } tests := []struct { - name string - reqBodyMap map[string]any - mockSaturationDetector *mockSaturationDetector - inferenceObjectiveName string - schedulerMockSetup func(m *mockScheduler) - wantErrCode string // Expected errutil code string - wantReqCtx *handlers.RequestContext // Fields to check in the returned RequestContext - wantMutatedBodyModel string // Expected model in reqCtx.Request.Body after PostDispatch - targetModelName string // Expected model name after target model resolution + name string + reqBodyMap map[string]any + mockAdmissionController *mockAdmissionController + inferenceObjectiveName string + schedulerMockSetup func(m *mockScheduler) + wantErrCode string // Expected errutil code string + wantReqCtx *handlers.RequestContext // Fields to check in the returned RequestContext + wantMutatedBodyModel string // Expected model in reqCtx.Request.Body after PostDispatch + targetModelName string // Expected model name after target model resolution }{ { - name: "successful completions request (critical, saturation ignored)", + name: "successful completions request", reqBodyMap: map[string]any{ "model": model, "prompt": "critical prompt", }, - mockSaturationDetector: &mockSaturationDetector{isSaturated: true}, + mockAdmissionController: &mockAdmissionController{admitErr: nil}, schedulerMockSetup: func(m *mockScheduler) { m.scheduleResults = defaultSuccessfulScheduleResults }, @@ -221,7 +230,7 @@ func TestDirector_HandleRequest(t *testing.T) { targetModelName: model, }, { - name: "successful chat completions request (default critical, saturation ignored)", + name: "successful chat completions request", reqBodyMap: map[string]any{ "model": model, "messages": []any{ @@ -231,7 +240,7 @@ func TestDirector_HandleRequest(t *testing.T) { }, }, }, - mockSaturationDetector: &mockSaturationDetector{isSaturated: true}, + mockAdmissionController: &mockAdmissionController{admitErr: nil}, schedulerMockSetup: func(m *mockScheduler) { m.scheduleResults = defaultSuccessfulScheduleResults }, @@ -247,7 +256,7 @@ func TestDirector_HandleRequest(t *testing.T) { targetModelName: model, }, { - name: "successful chat completions request with multiple messages (critical, saturation ignored)", + name: "successful chat completions request with multiple messages", reqBodyMap: map[string]any{ "model": model, "messages": []any{ @@ -261,6 +270,7 @@ func TestDirector_HandleRequest(t *testing.T) { }, }, }, + mockAdmissionController: &mockAdmissionController{admitErr: nil}, schedulerMockSetup: func(m *mockScheduler) { m.scheduleResults = defaultSuccessfulScheduleResults }, @@ -277,36 +287,13 @@ func TestDirector_HandleRequest(t *testing.T) { inferenceObjectiveName: objectiveName, targetModelName: model, }, - { - name: "successful completions request (sheddable, not saturated)", - reqBodyMap: map[string]any{ - "model": modelSheddable, - "prompt": "sheddable prompt", - }, - mockSaturationDetector: &mockSaturationDetector{isSaturated: false}, - schedulerMockSetup: func(m *mockScheduler) { - m.scheduleResults = defaultSuccessfulScheduleResults - }, - wantReqCtx: &handlers.RequestContext{ - ObjectiveKey: objectiveNameSheddable, - TargetModelName: modelSheddable, - TargetPod: &backend.Pod{ - NamespacedName: types.NamespacedName{Namespace: "default", Name: "pod1"}, - Address: "192.168.1.100", - }, - TargetEndpoint: "192.168.1.100:8000,192.168.2.100:8000,192.168.4.100:8000", - }, - wantMutatedBodyModel: modelSheddable, - inferenceObjectiveName: objectiveNameSheddable, - targetModelName: modelSheddable, - }, { name: "successful request with target model resolution", reqBodyMap: map[string]any{ "model": modelWithResolvedTarget, "prompt": "prompt for target resolution", }, - mockSaturationDetector: &mockSaturationDetector{isSaturated: false}, + mockAdmissionController: &mockAdmissionController{admitErr: nil}, schedulerMockSetup: func(m *mockScheduler) { m.scheduleResults = defaultSuccessfulScheduleResults }, @@ -342,28 +329,27 @@ func TestDirector_HandleRequest(t *testing.T) { "model": "food-review-1", "prompt": "test prompt", }, - mockSaturationDetector: &mockSaturationDetector{isSaturated: false}, - inferenceObjectiveName: "food-review-1", - targetModelName: "food-review-1", + mockAdmissionController: &mockAdmissionController{admitErr: nil}, + inferenceObjectiveName: "food-review-1", + targetModelName: "food-review-1", }, { - name: "request dropped (sheddable, saturated)", + name: "request rejected by admission controller", reqBodyMap: map[string]any{ "model": modelSheddable, "prompt": "sheddable prompt", }, - inferenceObjectiveName: objectiveNameSheddable, - mockSaturationDetector: &mockSaturationDetector{isSaturated: true}, - wantErrCode: errutil.InferencePoolResourceExhausted, + inferenceObjectiveName: objectiveNameSheddable, + mockAdmissionController: &mockAdmissionController{admitErr: errutil.Error{Code: errutil.InferencePoolResourceExhausted, Msg: "simulated admission rejection"}}, + wantErrCode: errutil.InferencePoolResourceExhausted, }, { - name: "model not found, expect err", - reqBodyMap: map[string]any{"prompt": "p"}, - mockSaturationDetector: &mockSaturationDetector{isSaturated: false}, - wantErrCode: errutil.BadRequest, + name: "model not found, expect err", + reqBodyMap: map[string]any{"prompt": "p"}, + mockAdmissionController: &mockAdmissionController{admitErr: nil}, + wantErrCode: errutil.BadRequest, }, - { name: "prompt or messages not found, expect err", reqBodyMap: map[string]any{"model": model}, @@ -383,6 +369,7 @@ func TestDirector_HandleRequest(t *testing.T) { "model": model, "prompt": "prompt that causes scheduler error", }, + mockAdmissionController: &mockAdmissionController{admitErr: nil}, schedulerMockSetup: func(m *mockScheduler) { m.scheduleErr = errors.New("simulated scheduler failure") }, @@ -395,6 +382,7 @@ func TestDirector_HandleRequest(t *testing.T) { "model": model, "prompt": "prompt for nil,nil scheduler return", }, + mockAdmissionController: &mockAdmissionController{admitErr: nil}, schedulerMockSetup: func(m *mockScheduler) { m.scheduleResults = nil m.scheduleErr = nil @@ -410,7 +398,7 @@ func TestDirector_HandleRequest(t *testing.T) { if test.schedulerMockSetup != nil { test.schedulerMockSetup(mockSched) } - director := NewDirectorWithConfig(ds, mockSched, test.mockSaturationDetector, NewConfig()) + director := NewDirectorWithConfig(ds, mockSched, test.mockAdmissionController, NewConfig()) reqCtx := &handlers.RequestContext{ Request: &handlers.Request{ @@ -529,7 +517,7 @@ func TestGetCandidatePodsForScheduling(t *testing.T) { ds := &mockDatastore{pods: testInput} for _, test := range tests { t.Run(test.name, func(t *testing.T) { - director := NewDirectorWithConfig(ds, &mockScheduler{}, &mockSaturationDetector{}, NewConfig()) + director := NewDirectorWithConfig(ds, &mockScheduler{}, &mockAdmissionController{}, NewConfig()) got := director.getCandidatePodsForScheduling(context.Background(), test.metadata) @@ -598,7 +586,7 @@ func TestDirector_HandleResponse(t *testing.T) { ctx := logutil.NewTestLoggerIntoContext(context.Background()) ds := datastore.NewDatastore(t.Context(), nil) mockSched := &mockScheduler{} - director := NewDirectorWithConfig(ds, mockSched, nil, NewConfig().WithPostResponsePlugins(pr1)) + director := NewDirectorWithConfig(ds, mockSched, &mockAdmissionController{}, NewConfig().WithPostResponsePlugins(pr1)) reqCtx := &handlers.RequestContext{ Request: &handlers.Request{ diff --git a/pkg/epp/server/runserver.go b/pkg/epp/server/runserver.go index 870acc70b..1d9d1d114 100644 --- a/pkg/epp/server/runserver.go +++ b/pkg/epp/server/runserver.go @@ -42,6 +42,7 @@ import ( "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/handlers" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/requestcontrol" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/saturationdetector" ) // ExtProcServerRunner provides methods to manage an external process server. @@ -56,7 +57,7 @@ type ExtProcServerRunner struct { RefreshPrometheusMetricsInterval time.Duration MetricsStalenessThreshold time.Duration Director *requestcontrol.Director - SaturationDetector requestcontrol.SaturationDetector + SaturationDetector *saturationdetector.Detector UseExperimentalDatalayerV2 bool // Pluggable data layer feature flag // This should only be used in tests. We won't need this once we do not inject metrics in the tests. diff --git a/pkg/epp/util/request/sheddable.go b/pkg/epp/util/request/sheddable.go new file mode 100644 index 000000000..c2f32c1f2 --- /dev/null +++ b/pkg/epp/util/request/sheddable.go @@ -0,0 +1,22 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package request + +// IsSheddable determines if a request is considered sheddable based on its priority. +func IsSheddable(priority int) bool { + return priority < 0 +} diff --git a/test/integration/epp/hermetic_test.go b/test/integration/epp/hermetic_test.go index 3dc42f8ba..c2e100f79 100644 --- a/test/integration/epp/hermetic_test.go +++ b/test/integration/epp/hermetic_test.go @@ -1180,7 +1180,8 @@ func BeforeSuite() func() { } detector := saturationdetector.NewDetector(sdConfig, logger.WithName("saturation-detector")) serverRunner.SaturationDetector = detector - serverRunner.Director = requestcontrol.NewDirectorWithConfig(serverRunner.Datastore, scheduler, detector, requestcontrol.NewConfig()) + admissionController := requestcontrol.NewLegacyAdmissionController(detector) + serverRunner.Director = requestcontrol.NewDirectorWithConfig(serverRunner.Datastore, scheduler, admissionController, requestcontrol.NewConfig()) serverRunner.SecureServing = false if err := serverRunner.SetupWithManager(context.Background(), mgr); err != nil {