Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 59 additions & 3 deletions cmd/epp/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ import (
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer"
dlmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer/metrics"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol"
fccontroller "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/controller"
fcregistry "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/registry"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics/collectors"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
Expand All @@ -68,11 +71,25 @@ import (
)

const (
// enableExperimentalDatalayerV2 defines the environment variable
// used as feature flag for the pluggable data layer.
// enableExperimentalDatalayerV2 defines the environment variable used as feature flag for the pluggable data layer.
enableExperimentalDatalayerV2 = "ENABLE_EXPERIMENTAL_DATALAYER_V2"
// enableExperimentalFlowControlLayer defines the environment variable used as a feature flag for the pluggable flow
// control layer.
enableExperimentalFlowControlLayer = "ENABLE_EXPERIMENTAL_FLOW_CONTROL_LAYER"
)

// TODO: this is hardcoded for POC only. This needs to be hooked up to our text-based config story.
var flowControlConfig = flowcontrol.Config{
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is admittedly sloppy. I will send out follow PRs to align this with how the rest of our pluggable layers are configured (e.g., the text-based config). For now, I am prioritizing getting the POC checked in with a functional e2e story.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pls create issues to track the followup items

Controller: fccontroller.Config{}, // Use all defaults.
Registry: fcregistry.Config{
// Define domain of accepted priority levels as this field is required. Use defaults for all optional fields.
// TODO: this should not be hardcoded.
PriorityBands: []fcregistry.PriorityBandConfig{
{Priority: 0, PriorityName: "Default"},
},
},
}

var (
grpcPort = flag.Int("grpc-port", runserver.DefaultGrpcPort, "The gRPC port used for communicating with Envoy proxy")
grpcHealthPort = flag.Int("grpc-health-port", runserver.DefaultGrpcHealthPort, "The port used for gRPC liveness and readiness probes")
Expand Down Expand Up @@ -271,7 +288,46 @@ func (r *Runner) Run(ctx context.Context) error {

saturationDetector := saturationdetector.NewDetector(sdConfig, setupLog)

director := requestcontrol.NewDirectorWithConfig(datastore, scheduler, saturationDetector, r.requestControlConfig)
// --- Flow Control Initialization (Experimental) ---

enableFlowControl := env.GetEnvBool(enableExperimentalFlowControlLayer, false, setupLog)
var flowController *fccontroller.FlowController
if enableFlowControl {
setupLog.Info("Initializing experimental Flow Control layer")
cfg, err := flowControlConfig.ValidateAndApplyDefaults()
if err != nil {
setupLog.Error(err, "failed to initialize Flow Control layer")
return fmt.Errorf("invalid Flow Control config: %w", err)
}

registry, err := fcregistry.NewFlowRegistry(cfg.Registry, setupLog)
if err != nil {
return fmt.Errorf("failed to initialize Flow Registry: %w", err)
}
fc, err := fccontroller.NewFlowController(
ctx,
cfg.Controller,
registry,
saturationDetector,
setupLog,
)
if err != nil {
return fmt.Errorf("failed to initialize Flow Controller: %w", err)
}
flowController = fc

go registry.Run(ctx)
} else {
setupLog.Info("Experimental Flow Control layer is disabled")
}

director := requestcontrol.NewDirectorWithConfig(
datastore,
scheduler,
saturationDetector,
flowController,
r.requestControlConfig,
enableFlowControl)

// --- Setup ExtProc Server Runner ---
serverRunner := &runserver.ExtProcServerRunner{
Expand Down
107 changes: 95 additions & 12 deletions pkg/epp/requestcontrol/director.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend"
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/handlers"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metadata"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
Expand All @@ -59,15 +60,29 @@ type SaturationDetector interface {
IsSaturated(ctx context.Context, candidatePods []backendmetrics.PodMetrics) bool
}

// flowController defines the minimal interface required by the Director for flow control.
type flowController interface {
EnqueueAndWait(req types.FlowControlRequest) (types.QueueOutcome, error)
}

// NewDirectorWithConfig creates a new Director instance with all dependencies.
func NewDirectorWithConfig(datastore Datastore, scheduler Scheduler, saturationDetector SaturationDetector, config *Config) *Director {
func NewDirectorWithConfig(
datastore Datastore,
scheduler Scheduler,
saturationDetector SaturationDetector,
fc flowController,
config *Config,
enableFlowControl bool,
) *Director {
return &Director{
datastore: datastore,
scheduler: scheduler,
saturationDetector: saturationDetector,
flowController: fc,
preRequestPlugins: config.preRequestPlugins,
postResponsePlugins: config.postResponsePlugins,
defaultPriority: 0, // define default priority explicitly
enableFlowControl: enableFlowControl,
}
}

Expand All @@ -76,12 +91,14 @@ type Director struct {
datastore Datastore
scheduler Scheduler
saturationDetector SaturationDetector
flowController flowController
preRequestPlugins []PreRequest
postResponsePlugins []PostResponse
// we just need a pointer to an int variable since priority is a pointer in InferenceObjective
// no need to set this in the constructor, since the value we want is the default int val
// and value types cannot be nil
defaultPriority int
defaultPriority int
enableFlowControl bool
}

// HandleRequest orchestrates the request lifecycle.
Expand Down Expand Up @@ -141,7 +158,7 @@ func (d *Director) HandleRequest(ctx context.Context, reqCtx *handlers.RequestCo
}

// Admission Control check
if err := d.admitRequest(ctx, candidatePods, *infObjective.Spec.Priority, reqCtx.FairnessID); err != nil {
if err := d.admitRequest(ctx, reqCtx, candidatePods, *infObjective.Spec.Priority); err != nil {
return reqCtx, err
}

Expand Down Expand Up @@ -209,27 +226,43 @@ func (d *Director) getCandidatePodsForScheduling(ctx context.Context, requestMet

// admitRequest handles admission control to decide whether or not to accept the request
// based on the request priority and saturation state.
func (d *Director) admitRequest(ctx context.Context, candidatePods []backendmetrics.PodMetrics, requestPriority int, fairnessID string) error {
func (d *Director) admitRequest(ctx context.Context, reqCtx *handlers.RequestContext, candidatePods []backendmetrics.PodMetrics, requestPriority int) error {
loggerTrace := log.FromContext(ctx).V(logutil.TRACE)

loggerTrace.Info("Entering Flow Control", "priority", requestPriority, "fairnessID", fairnessID)
loggerTrace.Info("Entering admission control", "priority", requestPriority, "fairnessID", reqCtx.FairnessID, "flowControlEnabled", d.flowController != nil)

// This will be removed in favor of a more robust implementation (Flow Control) in the very near future.
// TODO: Make this a configurable value.
// Tracking issue https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/1347
if requestPriority >= 0 {
loggerTrace.Info("Non-sheddable request bypassing saturation check.")
return nil
}

if d.saturationDetector.IsSaturated(ctx, candidatePods) {
isSheddable := requestPriority < 0
if isSheddable && d.saturationDetector.IsSaturated(ctx, candidatePods) {
return errutil.Error{
Code: errutil.InferencePoolResourceExhausted,
Msg: "system saturated, sheddable request dropped",
}
}

return nil
if !d.enableFlowControl {
loggerTrace.Info("Non-sheddable request bypassing saturation check.")
return nil
}

fairnessID := reqCtx.FairnessID
if fairnessID == "" {
fairnessID = "default-flow"
}

fcReq := &flowControlRequest{
ctx: ctx,
requestID: reqCtx.SchedulingRequest.RequestId,
fairnessID: fairnessID,
priority: requestPriority,
requestByteSize: uint64(reqCtx.RequestSize),
candidatePods: candidatePods,
}

outcome, err := d.flowController.EnqueueAndWait(fcReq)
return translateFlowControlOutcome(outcome, err)
}

// prepareRequest populates the RequestContext and calls the registered PreRequest plugins
Expand Down Expand Up @@ -323,3 +356,53 @@ func (d *Director) runPostResponsePlugins(ctx context.Context, request *scheduli
loggerDebug.Info("Completed running post-response plugin successfully", "plugin", plugin.TypedName())
}
}

// --- Flow Control Integration ---

// flowControlRequest is an adapter that implements the types.FlowControlRequest interface, wrapping the director's
// internal request context.
type flowControlRequest struct {
ctx context.Context
requestID string
fairnessID string
priority int
requestByteSize uint64
candidatePods []backendmetrics.PodMetrics
}

var _ types.FlowControlRequest = &flowControlRequest{}

func (r *flowControlRequest) Context() context.Context { return r.ctx }
func (r *flowControlRequest) ID() string { return r.requestID }
func (r *flowControlRequest) InitialEffectiveTTL() time.Duration { return 0 } // Use controller default.
func (r *flowControlRequest) ByteSize() uint64 { return r.requestByteSize }
func (r *flowControlRequest) CandidatePodsForScheduling() []backendmetrics.PodMetrics {
return r.candidatePods
}
func (r *flowControlRequest) FlowKey() types.FlowKey {
return types.FlowKey{ID: r.fairnessID, Priority: r.priority}
}

// translateFlowControlOutcome maps the context-rich outcome of the Flow Control layer to the public errutil.Error
// contract.
func translateFlowControlOutcome(outcome types.QueueOutcome, err error) error {
msg := "request rejected by flow control"
if err != nil {
msg = err.Error()
}

switch outcome {
case types.QueueOutcomeDispatched:
return nil
case types.QueueOutcomeRejectedCapacity:
return errutil.Error{Code: errutil.InferencePoolResourceExhausted, Msg: msg}
case types.QueueOutcomeEvictedTTL:
return errutil.Error{Code: errutil.ServiceUnavailable, Msg: "request timed out in queue: " + msg}
case types.QueueOutcomeEvictedContextCancelled:
return errutil.Error{Code: errutil.ServiceUnavailable, Msg: "client disconnected: " + msg}
case types.QueueOutcomeRejectedOther, types.QueueOutcomeEvictedOther:
return errutil.Error{Code: errutil.Internal, Msg: msg}
default:
return errutil.Error{Code: errutil.Internal, Msg: "unhandled flow control outcome: " + msg}
}
}
Loading