Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 56 additions & 3 deletions cmd/epp/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ import (
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer"
dlmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer/metrics"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol"
fccontroller "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/controller"
fcregistry "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/registry"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics/collectors"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
Expand All @@ -68,11 +71,25 @@ import (
)

const (
// enableExperimentalDatalayerV2 defines the environment variable
// used as feature flag for the pluggable data layer.
// enableExperimentalDatalayerV2 defines the environment variable used as feature flag for the pluggable data layer.
enableExperimentalDatalayerV2 = "ENABLE_EXPERIMENTAL_DATALAYER_V2"
// enableExperimentalFlowControlLayer defines the environment variable used as a feature flag for the pluggable flow
// control layer.
enableExperimentalFlowControlLayer = "ENABLE_EXPERIMENTAL_FLOW_CONTROL_LAYER"
)

// TODO: this is hardcoded for POC only. This needs to be hooked up to our text-based config story.
var flowControlConfig = flowcontrol.Config{
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is admittedly sloppy. I will send out follow PRs to align this with how the rest of our pluggable layers are configured (e.g., the text-based config). For now, I am prioritizing getting the POC checked in with a functional e2e story.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pls create issues to track the followup items

Controller: fccontroller.Config{}, // Use all defaults.
Registry: fcregistry.Config{
// Define domain of accepted priority levels as this field is required. Use defaults for all optional fields.
// TODO: this should not be hardcoded.
PriorityBands: []fcregistry.PriorityBandConfig{
{Priority: 0, PriorityName: "Default"},
},
},
}

var (
grpcPort = flag.Int("grpc-port", runserver.DefaultGrpcPort, "The gRPC port used for communicating with Envoy proxy")
grpcHealthPort = flag.Int("grpc-health-port", runserver.DefaultGrpcHealthPort, "The port used for gRPC liveness and readiness probes")
Expand Down Expand Up @@ -271,7 +288,43 @@ func (r *Runner) Run(ctx context.Context) error {

saturationDetector := saturationdetector.NewDetector(sdConfig, setupLog)

director := requestcontrol.NewDirectorWithConfig(datastore, scheduler, saturationDetector, r.requestControlConfig)
// --- Admission Control Initialization ---
enableFlowControl := env.GetEnvBool(enableExperimentalFlowControlLayer, false, setupLog)
var admissionController requestcontrol.AdmissionController
if enableFlowControl {
setupLog.Info("Initializing experimental Flow Control layer")
fcCfg, err := flowControlConfig.ValidateAndApplyDefaults()
if err != nil {
setupLog.Error(err, "failed to initialize Flow Control layer")
return fmt.Errorf("invalid Flow Control config: %w", err)
}

registry, err := fcregistry.NewFlowRegistry(fcCfg.Registry, setupLog)
if err != nil {
return fmt.Errorf("failed to initialize Flow Registry: %w", err)
}
fc, err := fccontroller.NewFlowController(
ctx,
fcCfg.Controller,
registry,
saturationDetector,
setupLog,
)
if err != nil {
return fmt.Errorf("failed to initialize Flow Controller: %w", err)
}
go registry.Run(ctx)
admissionController = requestcontrol.NewFlowControlAdmissionController(saturationDetector, fc)
} else {
setupLog.Info("Experimental Flow Control layer is disabled, using legacy admission control")
admissionController = requestcontrol.NewLegacyAdmissionController(saturationDetector)
}

director := requestcontrol.NewDirectorWithConfig(
datastore,
scheduler,
admissionController,
r.requestControlConfig)

// --- Setup ExtProc Server Runner ---
serverRunner := &runserver.ExtProcServerRunner{
Expand Down
231 changes: 231 additions & 0 deletions pkg/epp/requestcontrol/admission.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package requestcontrol

import (
"context"
"time"

"sigs.k8s.io/controller-runtime/pkg/log"

backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/handlers"
errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/error"
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
)

// AdmissionController defines the interface for making admission control decisions.
// Implementations of this interface determine whether an incoming inference request should be accepted or rejected
// based on various criteria such as system load, fairness, priority, and available capacity.
type AdmissionController interface {
// Admit determines if a request should be admitted.
// It is called by the Director for each incoming request.
//
// Args:
// ctx: The request context, carrying deadlines, cancellation signals, and logger.
// reqCtx: The handlers.RequestContext containing details about the incoming request.
// candidatePods: A list of potential backend pods that can serve the request.
// priority: The priority level of the request, as determined by the InferenceObjective.
//
// Returns:
// - nil: If the request is admitted and should proceed to scheduling.
// - errutil.Error: If the request is rejected.
Admit(
ctx context.Context,
reqCtx *handlers.RequestContext,
candidatePods []backendmetrics.PodMetrics,
priority int,
) error
}

// saturationDetector defines the minimal interface required for checking if the backend pool is saturated.
type saturationDetector interface {
IsSaturated(ctx context.Context, candidatePods []backendmetrics.PodMetrics) bool
}

// flowController defines the minimal interface required by FlowControlAdmissionController for enqueuing requests and
// waiting for an admission outcome.
type flowController interface {
EnqueueAndWait(req types.FlowControlRequest) (types.QueueOutcome, error)
}

// rejectIfSheddableAndSaturated checks if a request should be immediately rejected because it's sheddable
// (priority < 0) and the system is saturated.
func rejectIfSheddableAndSaturated(
ctx context.Context,
sd saturationDetector,
reqCtx *handlers.RequestContext,
candidatePods []backendmetrics.PodMetrics,
priority int,
) error {
if priority < 0 {
logger := log.FromContext(ctx)
if sd.IsSaturated(ctx, candidatePods) {
logger.V(logutil.TRACE).Info("Request rejected: system saturated and request is sheddable",
"requestID", reqCtx.SchedulingRequest.RequestId)
return errutil.Error{
Code: errutil.InferencePoolResourceExhausted,
Msg: "system saturated, sheddable request dropped",
}
}
}
return nil
}

// --- LegacyAdmissionController ---

// LegacyAdmissionController implements saturation-based admission control.
// It rejects sheddable requests (priority < 0) if the saturationDetector indicates that the system is currently
// saturated. Non-sheddable requests always bypass the saturation check.
type LegacyAdmissionController struct {
saturationDetector saturationDetector
}

// NewLegacyAdmissionController creates a new LegacyAdmissionController.
func NewLegacyAdmissionController(sd saturationDetector) *LegacyAdmissionController {
return &LegacyAdmissionController{saturationDetector: sd}
}

// Admit implements the AdmissionController interface for the legacy strategy.
// It checks for saturation only for requests with priority < 0.
func (lac *LegacyAdmissionController) Admit(
ctx context.Context,
reqCtx *handlers.RequestContext,
candidatePods []backendmetrics.PodMetrics,
priority int,
) error {
logger := log.FromContext(ctx)
logger.V(logutil.TRACE).Info("Executing LegacyAdmissionController",
"priority", priority, "fairnessID", reqCtx.FairnessID)
if err := rejectIfSheddableAndSaturated(ctx, lac.saturationDetector, reqCtx, candidatePods, priority); err != nil {
return err
}
logger.V(logutil.TRACE).Info("Request admitted", "requestID", reqCtx.SchedulingRequest.RequestId)
return nil
}

// --- FlowControlAdmissionController ---

const (
// defaultFairnessID is the default fairness ID used when no ID is provided in the request.
// This ensures that requests without explicit fairness identifiers are still grouped and managed by the Flow Control
// system.
defaultFairnessID = "default-flow"
)

// FlowControlAdmissionController delegates admission decisions to the Flow Control layer.
// It first checks if the request is sheddable and the system is saturated, rejecting immediately if both conditions are
// true. Otherwise, it uses the provided flowController to enqueue the request and await an outcome.
type FlowControlAdmissionController struct {
saturationDetector saturationDetector
flowController flowController
}

// NewFlowControlAdmissionController creates a new FlowControlAdmissionController.
// It requires a SaturationDetector and a flowController instance.
func NewFlowControlAdmissionController(sd saturationDetector, fc flowController) *FlowControlAdmissionController {
return &FlowControlAdmissionController{
saturationDetector: sd,
flowController: fc,
}
}

// Admit implements the AdmissionController interface by checking for saturation on sheddable requests first, then
// deferring to the Flow Control system.
func (fcac *FlowControlAdmissionController) Admit(
ctx context.Context,
reqCtx *handlers.RequestContext,
candidatePods []backendmetrics.PodMetrics,
priority int,
) error {
logger := log.FromContext(ctx)
logger.V(logutil.TRACE).Info("Executing FlowControlAdmissionController",
"requestID", reqCtx.SchedulingRequest.RequestId, "priority", priority, "fairnessID", reqCtx.FairnessID)
if err := rejectIfSheddableAndSaturated(ctx, fcac.saturationDetector, reqCtx, candidatePods, priority); err != nil {
return err
}

logger.V(logutil.TRACE).Info("Request proceeding to flow control", "requestID", reqCtx.SchedulingRequest.RequestId)
fairnessID := reqCtx.FairnessID
if fairnessID == "" {
logger.V(logutil.TRACE).Info("No fairnessID provided, using default",
"requestID", reqCtx.SchedulingRequest.RequestId, "defaultFairnessID", defaultFairnessID)
fairnessID = defaultFairnessID
}

fcReq := &flowControlRequest{
ctx: ctx,
requestID: reqCtx.SchedulingRequest.RequestId,
fairnessID: fairnessID,
priority: priority,
requestByteSize: uint64(reqCtx.RequestSize),
candidatePods: candidatePods,
}

outcome, err := fcac.flowController.EnqueueAndWait(fcReq)
logger.V(logutil.DEBUG).Info("Flow control outcome",
"requestID", reqCtx.SchedulingRequest.RequestId, "outcome", outcome, "error", err)
return translateFlowControlOutcome(outcome, err)
}

// flowControlRequest is an adapter that implements the types.FlowControlRequest interface.
type flowControlRequest struct {
ctx context.Context
requestID string
fairnessID string
priority int
requestByteSize uint64
candidatePods []backendmetrics.PodMetrics
}

var _ types.FlowControlRequest = &flowControlRequest{}

func (r *flowControlRequest) Context() context.Context { return r.ctx }
func (r *flowControlRequest) ID() string { return r.requestID }
func (r *flowControlRequest) InitialEffectiveTTL() time.Duration { return 0 } // Use controller default.
func (r *flowControlRequest) ByteSize() uint64 { return r.requestByteSize }
func (r *flowControlRequest) CandidatePodsForScheduling() []backendmetrics.PodMetrics {
return r.candidatePods
}
func (r *flowControlRequest) FlowKey() types.FlowKey {
return types.FlowKey{ID: r.fairnessID, Priority: r.priority}
}

// translateFlowControlOutcome maps the context-rich outcome of the Flow Control layer to the public errutil.Error
// contract used by the Director.
func translateFlowControlOutcome(outcome types.QueueOutcome, err error) error {
msg := "request rejected by flow control"
if err != nil {
msg = err.Error()
}

switch outcome {
case types.QueueOutcomeDispatched:
return nil
case types.QueueOutcomeRejectedCapacity:
return errutil.Error{Code: errutil.InferencePoolResourceExhausted, Msg: msg}
case types.QueueOutcomeEvictedTTL:
return errutil.Error{Code: errutil.ServiceUnavailable, Msg: "request timed out in queue: " + msg}
case types.QueueOutcomeEvictedContextCancelled:
return errutil.Error{Code: errutil.ServiceUnavailable, Msg: "client disconnected: " + msg}
case types.QueueOutcomeRejectedOther, types.QueueOutcomeEvictedOther:
return errutil.Error{Code: errutil.Internal, Msg: "internal flow control error: " + msg}
default:
return errutil.Error{Code: errutil.Internal, Msg: "unhandled flow control outcome: " + msg}
}
}
Loading