Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
247 changes: 188 additions & 59 deletions pkg/epp/flowcontrol/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"time"

"github.com/go-logr/logr"
k8srand "k8s.io/apimachinery/pkg/util/rand"
"k8s.io/utils/clock"

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/contracts"
Expand All @@ -57,10 +58,11 @@ type shardProcessor interface {
// shardProcessorFactory defines the signature for a function that creates a `shardProcessor`.
// This enables dependency injection for testing.
type shardProcessorFactory func(
ctx context.Context,
shard contracts.RegistryShard,
saturationDetector contracts.SaturationDetector,
clock clock.Clock,
expiryCleanupInterval time.Duration,
clock clock.WithTicker,
cleanupSweepInterval time.Duration,
enqueueChannelBufferSize int,
logger logr.Logger,
) shardProcessor
Expand All @@ -79,6 +81,14 @@ type managedWorker struct {
//
// The controller's `Run` loop executes periodically, acting as a garbage collector that keeps the pool of running
// workers synchronized with the dynamic shard topology of the `FlowRegistry`.
//
// Request Lifecycle Management:
//
// 1. Asynchronous Finalization (Controller-Owned): The Controller actively monitors the request Context
// (TTL/Cancellation) in EnqueueAndWait. If the Context expires, the Controller immediately Finalizes the item and
// unblocks the caller.
// 2. Synchronous Finalization (Processor-Owned): The Processor handles Dispatch, Capacity Rejection, and Shutdown.
// 3. Cleanup (Processor-Owned): The Processor periodically sweeps externally finalized items to reclaim capacity.
type FlowController struct {
// --- Immutable dependencies (set at construction) ---

Expand Down Expand Up @@ -129,18 +139,20 @@ func NewFlowController(

// Use the real shard processor implementation by default.
fc.shardProcessorFactory = func(
ctx context.Context,
shard contracts.RegistryShard,
saturationDetector contracts.SaturationDetector,
clock clock.Clock,
expiryCleanupInterval time.Duration,
clock clock.WithTicker,
cleanupSweepInterval time.Duration,
enqueueChannelBufferSize int,
logger logr.Logger,
) shardProcessor {
return internal.NewShardProcessor(
ctx,
shard,
saturationDetector,
clock,
expiryCleanupInterval,
cleanupSweepInterval,
enqueueChannelBufferSize,
logger)
}
Expand Down Expand Up @@ -189,63 +201,162 @@ func (fc *FlowController) run(ctx context.Context) {
// stack and its `context.Context`. The system only needs to signal this specific goroutine to unblock it.
// - Direct Backpressure: If queues are full, `EnqueueAndWait` returns an error immediately, providing direct
// backpressure to the caller.
func (fc *FlowController) EnqueueAndWait(req types.FlowControlRequest) (types.QueueOutcome, error) {
func (fc *FlowController) EnqueueAndWait(
ctx context.Context,
req types.FlowControlRequest,
) (types.QueueOutcome, error) {
if req == nil {
return types.QueueOutcomeRejectedOther, errors.New("request cannot be nil")
}
effectiveTTL := req.InitialEffectiveTTL()
if effectiveTTL <= 0 {
effectiveTTL = fc.config.DefaultRequestTTL
}
enqueueTime := fc.clock.Now()

// 1. Create the derived context that governs this request's lifecycle (Parent Cancellation + TTL).
reqCtx, cancel, enqueueTime := fc.createRequestContext(ctx, req)
defer cancel()

// 2. Enter the distribution loop to find a home for the request.
// This loop is responsible for retrying on ErrShardDraining.
for {
select {

select { // Non-blocking check on controller lifecycle.
case <-fc.parentCtx.Done():
return types.QueueOutcomeRejectedOther, fmt.Errorf("%w: %w", types.ErrRejected, types.ErrFlowControllerNotRunning)
default:
// The controller is running, proceed.
}

// We must create a fresh `FlowItem` on each attempt since finalization is idempotent.
// However, we use the original, preserved `enqueueTime`.
item := internal.NewItem(req, effectiveTTL, enqueueTime)
if outcome, err := fc.distributeRequest(item); err != nil {
return outcome, fmt.Errorf("%w: %w", types.ErrRejected, err)
// Attempt to distribute the request once.
item, err := fc.tryDistribution(reqCtx, req, enqueueTime)
if err != nil {
// Distribution failed terminally (e.g., no shards, context cancelled during blocking submit).
// The item has already been finalized by tryDistribution.
finalState := item.FinalState()
return finalState.Outcome, finalState.Err
}

// Block until the request is finalized (dispatched, rejected, or evicted).
// The finalization logic internally monitors for context cancellation and TTL expiry.
finalState := <-item.Done()
if errors.Is(finalState.Err, contracts.ErrShardDraining) {
fc.logger.V(logutil.DEBUG).Info("Shard is draining, retrying request", "requestID", req.ID())
// Benign race with the chosen `contracts.RegistryShard` becoming Draining post selection but before the item was
// enqueued into its respective `contracts.ManagedQueue`. Simply try again.
// Distribution was successful; ownership of the item has been transferred to a processor.
// Now, we block here in awaitFinalization until the request is finalized by either the processor (e.g., dispatched,
// rejected) or the controller itself (e.g., caller's context cancelled/TTL expired).
outcome, err := fc.awaitFinalization(reqCtx, item)
if errors.Is(err, contracts.ErrShardDraining) {
// This is a benign race condition where the chosen shard started draining after acceptance.
fc.logger.V(logutil.DEBUG).Info("Selected shard is Draining, retrying request distribution",
"flowKey", req.FlowKey(), "requestID", req.ID())
// Introduce a small, randomized delay (1-10ms) to prevent tight spinning loops and thundering herds during retry
// scenarios (e.g., shard draining)
// TODO: Replace this with a more sophisticated backoff strategy when our data parallelism story matures.
// For now, this is more than sufficient.
jitterMs := k8srand.Intn(10) + 1
fc.clock.Sleep(time.Duration(jitterMs) * time.Millisecond)
continue
}

// The outcome is terminal (Dispatched, Evicted, or a non-retriable rejection).
return outcome, err
}
}

var errNoShards = errors.New("no viable active shards available")

// tryDistribution handles a single attempt to select a shard and submit a request.
// If this function returns an error, it guarantees that the provided `item` has been finalized.
func (fc *FlowController) tryDistribution(
reqCtx context.Context,
req types.FlowControlRequest,
enqueueTime time.Time,
) (*internal.FlowItem, error) {
// Calculate effective TTL for item initialization (reqCtx is the enforcement mechanism).
effectiveTTL := fc.config.DefaultRequestTTL
if deadline, ok := reqCtx.Deadline(); ok {
if ttl := deadline.Sub(enqueueTime); ttl > 0 {
effectiveTTL = ttl
}
}

// We must create a fresh FlowItem on each attempt as finalization is per-lifecycle.
item := internal.NewItem(req, effectiveTTL, enqueueTime)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If my understanding is correct, we now have two two mechanisms enforcing TTL; If so, why do we need both?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We just have one (enforced by context deadline now). The periodic sweep only cleans up finalized items now. It is no longer responsible for "finalizing" them based on TTL durations or checking if the parent context was already cancelled.


candidates, err := fc.selectDistributionCandidates(item.OriginalRequest().FlowKey())
if err != nil {
outcome := types.QueueOutcomeRejectedOther
if errors.Is(err, errNoShards) {
outcome = types.QueueOutcomeRejectedCapacity
}
finalErr := fmt.Errorf("%w: request not accepted: %w", types.ErrRejected, err)
item.FinalizeWithOutcome(outcome, finalErr)
return item, finalErr
}

outcome, err := fc.distributeRequest(reqCtx, item, candidates)
if err == nil {
// Success: Ownership of the item has been transferred to the processor.
return item, nil
}

// For any distribution error, the controller retains ownership and must finalize the item.
var finalErr error
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
// We propagate the original context error here, EnqueueAndWait will rely on item.FinalState().Err.
finalErr = err
item.Finalize(context.Cause(reqCtx))
} else { // e.g.,
finalErr = fmt.Errorf("%w: request not accepted: %w", types.ErrRejected, err)
item.FinalizeWithOutcome(outcome, finalErr)
}
return item, finalErr
}

// awaitFinalization blocks until an item is finalized, either by the processor (synchronously) or by the controller
// itself due to context expiry (asynchronously).
func (fc *FlowController) awaitFinalization(
reqCtx context.Context,
item *internal.FlowItem,
) (types.QueueOutcome, error) {
select {
case <-reqCtx.Done():
// Asynchronous Finalization (Controller-initiated):
// The request Context expired (Cancellation/TTL) while the item was being processed.
cause := context.Cause(reqCtx)
item.Finalize(cause)

// The processor will eventually discard this "zombie" item during its cleanup sweep.
finalState := item.FinalState()
return finalState.Outcome, finalState.Err

case finalState := <-item.Done():
// Synchronous Finalization (Processor-initiated):
// The processor finalized the item (Dispatch, Reject, Shutdown).
return finalState.Outcome, finalState.Err
}
}

// distributeRequest implements a flow-aware, two-phase "Join-Shortest-Queue-by-Bytes" (JSQ-Bytes) distribution strategy
// with graceful backpressure. It selects the optimal worker for a given item and attempts to submit it.
//
// The algorithm operates as follows:
// 1. Candidate Selection: It identifies all Active shards for the item's flow and ranks them by the current byte size
// of that flow's queue, from least to most loaded.
// 2. Phase 1 (Non-blocking Fast Failover): It iterates through the ranked candidates and attempts a non-blocking
// submission. The first successful submission wins.
// 3. Phase 2 (Blocking Fallback): If all non-blocking attempts fail, it performs a single blocking submission to the
// least-loaded candidate, providing backpressure.
func (fc *FlowController) distributeRequest(item *internal.FlowItem) (types.QueueOutcome, error) {
key := item.OriginalRequest().FlowKey()
reqID := item.OriginalRequest().ID()
type candidate struct {
processor shardProcessor
shardID string
byteSize uint64
// createRequestContext derives the context that governs a request's lifecycle, enforcing the TTL deadline.
func (fc *FlowController) createRequestContext(
ctx context.Context,
req types.FlowControlRequest,
) (context.Context, context.CancelFunc, time.Time) {
enqueueTime := fc.clock.Now()
effectiveTTL := req.InitialEffectiveTTL()
if effectiveTTL <= 0 {
effectiveTTL = fc.config.DefaultRequestTTL
}

if effectiveTTL > 0 {
reqCtx, cancel := context.WithDeadlineCause(ctx, enqueueTime.Add(effectiveTTL), types.ErrTTLExpired)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just so I understand, perviously we had a loop that periodically checks and rejects requests with expired TTL; now we added another one based using context with deadline?

Copy link
Contributor Author

@LukeAVanDrie LukeAVanDrie Oct 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perviously we had a loop that periodically checks and rejects requests with expired TTL

Yes

now we added another one based using context with deadline

Not quite. We removed all responsibilities from the processor for identifying items with expired TLLs or contexts. It is now solely responsible for sweeping already finalized items. You can see the impact of this at the bottom of processor.go where we have deleted all the checkExpiry related code.

TTL expiry (or external context expiry--whatever happens first) is now immediately detected in the controller when the context with deadline expires. Now there is 0 delay between context / TTL expiry and the EnqueueAndWait function unblocking the request routine.

The periodic cleanup loop now just sweeps "zombie" items to reclaim queue capacity. We have decoupled the cleanup loop from the request routine management.

return reqCtx, cancel, enqueueTime
}
reqCtx, cancel := context.WithCancel(ctx)
return reqCtx, cancel, enqueueTime
}

// candidate holds the information needed to evaluate a shard as a potential target for a request.
type candidate struct {
processor shardProcessor
shardID string
byteSize uint64
}

// selectDistributionCandidates identifies all Active shards for the item's flow and ranks them by the current byte size
// of that flow's queue, from least to most loaded.
func (fc *FlowController) selectDistributionCandidates(key types.FlowKey) ([]candidate, error) {
var candidates []candidate
err := fc.registry.WithConnection(key, func(conn contracts.ActiveFlowConnection) error {
shards := conn.ActiveShards()
Expand All @@ -262,41 +373,58 @@ func (fc *FlowController) distributeRequest(item *internal.FlowItem) (types.Queu
return nil
})
if err != nil {
return types.QueueOutcomeRejectedOther, fmt.Errorf("failed to acquire lease for request %q (flow %s): %w",
reqID, key, err)
return nil, fmt.Errorf("failed to acquire lease for flow %s: %w", key, err)
}

if len(candidates) == 0 {
return types.QueueOutcomeRejectedCapacity, fmt.Errorf("no viable Active shards available for request %q (flow %s)",
reqID, key)
return nil, fmt.Errorf("%w for flow %s", errNoShards, key)
}

slices.SortFunc(candidates, func(a, b candidate) int {
return cmp.Compare(a.byteSize, b.byteSize)
})

// --- Phase 1: Fast, non-blocking failover attempt ---
return candidates, nil
}

// distributeRequest implements a flow-aware, two-phase "Join-Shortest-Queue-by-Bytes" (JSQ-Bytes) distribution strategy
// with graceful backpressure. It attempts to submit an item to the best-ranked candidate from the provided list.
//
// The algorithm operates as follows:
// 1. Phase 1 (Non-blocking Fast Failover): It iterates through the ranked candidates and attempts a non-blocking
// submission. The first successful submission wins.
// 2. Phase 2 (Blocking Fallback): If all non-blocking attempts fail, it performs a single blocking submission to the
// least-loaded candidate, providing backpressure.
//
// The provided context (ctx) is used for the blocking submission phase (SubmitOrBlock).
//
// Ownership Contract:
// - Returns nil: Success. Ownership transferred to Processor.
// - Returns error: Failure (Context expiry, shutdown,, etc.).
// Ownership retained by Controller. The Controller MUST finalize the item.
func (fc *FlowController) distributeRequest(
ctx context.Context,
item *internal.FlowItem,
candidates []candidate,
) (types.QueueOutcome, error) {
reqID := item.OriginalRequest().ID()
for _, c := range candidates {
if err := c.processor.Submit(item); err == nil {
return types.QueueOutcomeNotYetFinalized, nil // Success
return types.QueueOutcomeNotYetFinalized, nil
}
fc.logger.V(logutil.DEBUG).Info("Processor busy during fast failover, trying next candidate",
fc.logger.V(logutil.TRACE).Info("Processor busy during fast failover, trying next candidate",
"shardID", c.shardID, "requestID", reqID)
}

// --- Phase 2: All processors busy. Attempt a single blocking send to the best candidate. ---
// All processors are busy. Attempt a single blocking submission to the least-loaded candidate.
bestCandidate := candidates[0]
fc.logger.V(logutil.DEBUG).Info("All processors busy, attempting blocking submit to best candidate",
"shardID", bestCandidate.shardID, "requestID", reqID, "queueByteSize", bestCandidate.byteSize)

err = bestCandidate.processor.SubmitOrBlock(item.OriginalRequest().Context(), item)
fc.logger.V(logutil.TRACE).Info("All processors busy, attempting blocking submit to best candidate",
"shardID", bestCandidate.shardID, "requestID", reqID)
err := bestCandidate.processor.SubmitOrBlock(ctx, item)
if err != nil {
// If even the blocking attempt fails (e.g., context cancelled or processor shut down), the request is definitively
// rejected.
return types.QueueOutcomeRejectedCapacity, fmt.Errorf(
"all viable shard processors are at capacity for request %q (flow %s): %w", reqID, key, err)
return types.QueueOutcomeRejectedOther, fmt.Errorf("%w: request not accepted: %w", types.ErrRejected, err)
}
return types.QueueOutcomeNotYetFinalized, nil
return types.QueueOutcomeNotYetFinalized, nil // Success, ownership transferred.
}

// getOrStartWorker implements the lazy-loading and startup of shard processors.
Expand All @@ -311,6 +439,7 @@ func (fc *FlowController) getOrStartWorker(shard contracts.RegistryShard) *manag
// Construct a new worker, but do not start its processor goroutine yet.
processorCtx, cancel := context.WithCancel(fc.parentCtx)
processor := fc.shardProcessorFactory(
processorCtx,
shard,
fc.saturationDetector,
fc.clock,
Expand Down
Loading