generated from kubernetes/kubernetes-template-project
-
Notifications
You must be signed in to change notification settings - Fork 182
feat(fc): Initial wiring of the flow control layer #1701
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
k8s-ci-robot
merged 4 commits into
kubernetes-sigs:main
from
LukeAVanDrie:feat/flow-control-integration
Oct 13, 2025
+691
−100
Merged
Changes from all commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
0b28f2d
feat(fc): Initial wiring of the flow control layer
LukeAVanDrie 6114ca5
refactor: Abstract away admission control logic
LukeAVanDrie 2f14af2
feat: Address feedback on admission control
LukeAVanDrie 0919d3f
fix: admission.go boilerplate header
LukeAVanDrie File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,219 @@ | ||
/* | ||
Copyright 2025 The Kubernetes Authors. | ||
|
||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
|
||
http://www.apache.org/licenses/LICENSE-2.0 | ||
|
||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
package requestcontrol | ||
|
||
import ( | ||
"context" | ||
"time" | ||
|
||
"sigs.k8s.io/controller-runtime/pkg/log" | ||
|
||
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" | ||
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types" | ||
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/handlers" | ||
errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/error" | ||
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" | ||
requtil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/request" | ||
) | ||
|
||
// AdmissionController defines the interface for making admission control decisions. | ||
// Implementations of this interface determine whether an incoming inference request should be accepted or rejected | ||
// based on various criteria such as system load, fairness, priority, and available capacity. | ||
type AdmissionController interface { | ||
// Admit determines if a request should be admitted. | ||
// It is called by the Director for each incoming request. | ||
// | ||
// Args: | ||
// ctx: The request context, carrying deadlines, cancellation signals, and logger. | ||
// reqCtx: The handlers.RequestContext containing details about the incoming request. | ||
// candidatePods: A list of potential backend pods that can serve the request. | ||
// priority: The priority level of the request, as determined by the InferenceObjective. | ||
// | ||
// Returns: | ||
// - nil: If the request is admitted and should proceed to scheduling. | ||
// - errutil.Error: If the request is rejected. | ||
Admit( | ||
ctx context.Context, | ||
reqCtx *handlers.RequestContext, | ||
candidatePods []backendmetrics.PodMetrics, | ||
priority int, | ||
) error | ||
} | ||
|
||
// saturationDetector defines the minimal interface required for checking if the backend pool is saturated. | ||
type saturationDetector interface { | ||
IsSaturated(ctx context.Context, candidatePods []backendmetrics.PodMetrics) bool | ||
} | ||
|
||
// flowController defines the minimal interface required by FlowControlAdmissionController for enqueuing requests and | ||
// waiting for an admission outcome. | ||
type flowController interface { | ||
EnqueueAndWait(req types.FlowControlRequest) (types.QueueOutcome, error) | ||
} | ||
|
||
// rejectIfSheddableAndSaturated checks if a request should be immediately rejected because it's sheddable | ||
// (priority < 0) and the system is saturated. | ||
func rejectIfSheddableAndSaturated( | ||
ctx context.Context, | ||
sd saturationDetector, | ||
reqCtx *handlers.RequestContext, | ||
candidatePods []backendmetrics.PodMetrics, | ||
priority int, | ||
) error { | ||
if requtil.IsSheddable(priority) { | ||
logger := log.FromContext(ctx) | ||
if sd.IsSaturated(ctx, candidatePods) { | ||
logger.V(logutil.TRACE).Info("Request rejected: system saturated and request is sheddable", | ||
"requestID", reqCtx.SchedulingRequest.RequestId) | ||
return errutil.Error{ | ||
Code: errutil.InferencePoolResourceExhausted, | ||
Msg: "system saturated, sheddable request dropped", | ||
} | ||
} | ||
} | ||
return nil | ||
} | ||
|
||
// --- LegacyAdmissionController --- | ||
|
||
// LegacyAdmissionController implements saturation-based admission control. | ||
// It rejects sheddable requests (priority < 0) if the saturationDetector indicates that the system is currently | ||
// saturated. Non-sheddable requests always bypass the saturation check. | ||
type LegacyAdmissionController struct { | ||
saturationDetector saturationDetector | ||
} | ||
|
||
// NewLegacyAdmissionController creates a new LegacyAdmissionController. | ||
func NewLegacyAdmissionController(sd saturationDetector) *LegacyAdmissionController { | ||
return &LegacyAdmissionController{saturationDetector: sd} | ||
} | ||
|
||
// Admit implements the AdmissionController interface for the legacy strategy. | ||
// It checks for saturation only for requests with priority < 0. | ||
func (lac *LegacyAdmissionController) Admit( | ||
ctx context.Context, | ||
reqCtx *handlers.RequestContext, | ||
candidatePods []backendmetrics.PodMetrics, | ||
priority int, | ||
) error { | ||
logger := log.FromContext(ctx) | ||
logger.V(logutil.TRACE).Info("Executing LegacyAdmissionController", | ||
"priority", priority, "fairnessID", reqCtx.FairnessID) | ||
if err := rejectIfSheddableAndSaturated(ctx, lac.saturationDetector, reqCtx, candidatePods, priority); err != nil { | ||
return err | ||
} | ||
logger.V(logutil.TRACE).Info("Request admitted", "requestID", reqCtx.SchedulingRequest.RequestId) | ||
return nil | ||
} | ||
|
||
// --- FlowControlAdmissionController --- | ||
|
||
// FlowControlAdmissionController delegates admission decisions to the Flow Control layer. | ||
// It first checks if the request is sheddable and the system is saturated, rejecting immediately if both conditions are | ||
// true. Otherwise, it uses the provided flowController to enqueue the request and await an outcome. | ||
type FlowControlAdmissionController struct { | ||
saturationDetector saturationDetector | ||
flowController flowController | ||
} | ||
|
||
// NewFlowControlAdmissionController creates a new FlowControlAdmissionController. | ||
// It requires a SaturationDetector and a flowController instance. | ||
func NewFlowControlAdmissionController(sd saturationDetector, fc flowController) *FlowControlAdmissionController { | ||
return &FlowControlAdmissionController{ | ||
saturationDetector: sd, | ||
flowController: fc, | ||
} | ||
} | ||
|
||
// Admit implements the AdmissionController interface by checking for saturation on sheddable requests first, then | ||
// deferring to the Flow Control system. | ||
func (fcac *FlowControlAdmissionController) Admit( | ||
ctx context.Context, | ||
reqCtx *handlers.RequestContext, | ||
candidatePods []backendmetrics.PodMetrics, | ||
priority int, | ||
) error { | ||
logger := log.FromContext(ctx) | ||
logger.V(logutil.TRACE).Info("Executing FlowControlAdmissionController", | ||
"requestID", reqCtx.SchedulingRequest.RequestId, "priority", priority, "fairnessID", reqCtx.FairnessID) | ||
if err := rejectIfSheddableAndSaturated(ctx, fcac.saturationDetector, reqCtx, candidatePods, priority); err != nil { | ||
return err | ||
} | ||
|
||
logger.V(logutil.TRACE).Info("Request proceeding to flow control", "requestID", reqCtx.SchedulingRequest.RequestId) | ||
|
||
fcReq := &flowControlRequest{ | ||
ctx: ctx, | ||
requestID: reqCtx.SchedulingRequest.RequestId, | ||
fairnessID: reqCtx.FairnessID, | ||
priority: priority, | ||
requestByteSize: uint64(reqCtx.RequestSize), | ||
candidatePods: candidatePods, | ||
} | ||
|
||
outcome, err := fcac.flowController.EnqueueAndWait(fcReq) | ||
logger.V(logutil.DEBUG).Info("Flow control outcome", | ||
"requestID", reqCtx.SchedulingRequest.RequestId, "outcome", outcome, "error", err) | ||
return translateFlowControlOutcome(outcome, err) | ||
} | ||
|
||
// flowControlRequest is an adapter that implements the types.FlowControlRequest interface. | ||
type flowControlRequest struct { | ||
ctx context.Context | ||
requestID string | ||
fairnessID string | ||
priority int | ||
requestByteSize uint64 | ||
candidatePods []backendmetrics.PodMetrics | ||
} | ||
|
||
var _ types.FlowControlRequest = &flowControlRequest{} | ||
|
||
func (r *flowControlRequest) Context() context.Context { return r.ctx } | ||
func (r *flowControlRequest) ID() string { return r.requestID } | ||
func (r *flowControlRequest) InitialEffectiveTTL() time.Duration { return 0 } // Use controller default. | ||
func (r *flowControlRequest) ByteSize() uint64 { return r.requestByteSize } | ||
func (r *flowControlRequest) CandidatePodsForScheduling() []backendmetrics.PodMetrics { | ||
return r.candidatePods | ||
} | ||
func (r *flowControlRequest) FlowKey() types.FlowKey { | ||
return types.FlowKey{ID: r.fairnessID, Priority: r.priority} | ||
} | ||
|
||
// translateFlowControlOutcome maps the context-rich outcome of the Flow Control layer to the public errutil.Error | ||
// contract used by the Director. | ||
func translateFlowControlOutcome(outcome types.QueueOutcome, err error) error { | ||
msg := "request rejected by flow control" | ||
if err != nil { | ||
msg = err.Error() | ||
} | ||
|
||
switch outcome { | ||
case types.QueueOutcomeDispatched: | ||
return nil | ||
case types.QueueOutcomeRejectedCapacity: | ||
return errutil.Error{Code: errutil.InferencePoolResourceExhausted, Msg: msg} | ||
case types.QueueOutcomeEvictedTTL: | ||
return errutil.Error{Code: errutil.ServiceUnavailable, Msg: "request timed out in queue: " + msg} | ||
case types.QueueOutcomeEvictedContextCancelled: | ||
return errutil.Error{Code: errutil.ServiceUnavailable, Msg: "client disconnected: " + msg} | ||
case types.QueueOutcomeRejectedOther, types.QueueOutcomeEvictedOther: | ||
return errutil.Error{Code: errutil.Internal, Msg: "internal flow control error: " + msg} | ||
default: | ||
return errutil.Error{Code: errutil.Internal, Msg: "unhandled flow control outcome: " + msg} | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is admittedly sloppy. I will send out follow PRs to align this with how the rest of our pluggable layers are configured (e.g., the text-based config). For now, I am prioritizing getting the POC checked in with a functional e2e story.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pls create issues to track the followup items