From ee0bbbaa368431e272f1b89acfc87d2636ae44e8 Mon Sep 17 00:00:00 2001 From: Luke Van Drie Date: Fri, 10 Oct 2025 20:32:41 +0000 Subject: [PATCH 1/2] feat(flowcontrol): Introduce explicit context handling in Controller This commit completes the refactoring of the request lifecycle by making context management explicit and adapting the FlowController to the new internal components. This is the final commit in a series: 1. Harden FlowItem for concurrent finalization. 2. Adapt ShardProcessor to new FlowItem lifecycle. 3. This commit: Introduce explicit context handling. Key changes: - The `FlowControlRequest` interface no longer includes `Context()`. - `FlowController.EnqueueAndWait` now accepts a `context.Context` as its first argument, making the request lifecycle explicit and caller-controlled. - The controller now actively monitors this context and will initiate an "asynchronous finalization" if the context expires after an item has been handed off to a processor. - Adds extensive integration and concurrency tests to validate the new end-to-end lifecycle management under contention. --- pkg/epp/flowcontrol/controller/controller.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/epp/flowcontrol/controller/controller.go b/pkg/epp/flowcontrol/controller/controller.go index 93d0330c7..1e13e95d0 100644 --- a/pkg/epp/flowcontrol/controller/controller.go +++ b/pkg/epp/flowcontrol/controller/controller.go @@ -227,6 +227,10 @@ func (fc *FlowController) EnqueueAndWait( select { // Non-blocking check on controller lifecycle. case <-fc.parentCtx.Done(): return types.QueueOutcomeRejectedOther, fmt.Errorf("%w: %w", types.ErrRejected, types.ErrFlowControllerNotRunning) + case <-reqCtx.Done(): + return types.QueueOutcomeRejectedOther, fmt.Errorf( + "%w: request context cancelled or TTL expired before distribution: %w", + types.ErrRejected, context.Cause(reqCtx)) default: } From 62a5ec4cbcab33e56724518caddcf2600f3f631e Mon Sep 17 00:00:00 2001 From: Luke Van Drie Date: Tue, 14 Oct 2025 21:18:44 +0000 Subject: [PATCH 2/2] chore(fc): Improve controller docs and logging - Overhaul package-level documentation in doc.go files for controller and internal packages to clarify architecture, concurrency, and ownership. - Refine GoDoc and inline comments across all files for clarity, consistency, and to better explain intent. - Remove low signal-to-noise inline comments. - Adjust log levels for better signal-to-noise ratio (e.g., TRACE for high-frequency events). - Remove unnecessary logger.WithValues calls in hot paths within the ShardProcessor to reduce allocation overhead. - Remove overly defensive nil check in EnqueueAndWait. - Remove config deep copy in NewFlowController (we already pass by value) - Replace a panic with logger.Error in distributeRequest for a ManagedQueue invariant violation. --- pkg/epp/flowcontrol/controller/controller.go | 113 ++++++++-------- .../flowcontrol/controller/controller_test.go | 24 +--- pkg/epp/flowcontrol/controller/doc.go | 49 +++++-- .../flowcontrol/controller/internal/doc.go | 12 +- .../controller/internal/processor.go | 122 +++++++----------- .../controller/internal/processor_test.go | 17 +-- 6 files changed, 150 insertions(+), 187 deletions(-) diff --git a/pkg/epp/flowcontrol/controller/controller.go b/pkg/epp/flowcontrol/controller/controller.go index 1e13e95d0..a11ef26a5 100644 --- a/pkg/epp/flowcontrol/controller/controller.go +++ b/pkg/epp/flowcontrol/controller/controller.go @@ -14,12 +14,11 @@ See the License for the specific language governing permissions and limitations under the License. */ -// Package controller contains the implementation of the `FlowController` engine. +// Package controller contains the implementation of the FlowController engine. // -// The FlowController is the central processing engine of the Flow Control system. It is a sharded, high-throughput +// The FlowController is the central processing engine of the Flow Control layer. It is a sharded, high-throughput // component responsible for managing the lifecycle of all incoming requests. It achieves this by acting as a stateless -// supervisor that orchestrates a pool of stateful workers (`internal.ShardProcessor`), distributing incoming requests -// among them using a sophisticated load-balancing algorithm. +// supervisor that orchestrates a pool of stateful workers (ShardProcessors), distributing incoming requests among them. package controller import ( @@ -43,22 +42,20 @@ import ( logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" ) -// registryClient defines the minimal interface that the `FlowController` needs to interact with the `FlowRegistry`. +// registryClient defines the minimal interface that the FlowController needs to interact with the FlowRegistry. type registryClient interface { contracts.FlowRegistryObserver contracts.FlowRegistryDataPlane } -// shardProcessor is the minimal internal interface that the `FlowController` requires from its workers. -// This abstraction allows for the injection of mock processors during testing. +// shardProcessor is the minimal internal interface that the FlowController requires from its workers. type shardProcessor interface { Run(ctx context.Context) Submit(item *internal.FlowItem) error SubmitOrBlock(ctx context.Context, item *internal.FlowItem) error } -// shardProcessorFactory defines the signature for a function that creates a `shardProcessor`. -// This enables dependency injection for testing. +// shardProcessorFactory defines the signature for creating a shardProcessor. type shardProcessorFactory func( ctx context.Context, shard contracts.RegistryShard, @@ -74,15 +71,16 @@ var _ shardProcessor = &internal.ShardProcessor{} // managedWorker holds the state for a single supervised worker. type managedWorker struct { processor shardProcessor - cancel context.CancelFunc + // cancel function for the worker-specific context. Used during shutdown and GC. + cancel context.CancelFunc } -// FlowController is the central, high-throughput engine of the Flow Control system. -// It is designed as a stateless distributor that orchestrates a pool of stateful workers (`internal.ShardProcessor`), -// following a "supervisor-worker" pattern. +// FlowController is the central, high-throughput engine of the Flow Control layer. +// It is designed as a stateless distributor that orchestrates a pool of stateful workers (ShardProcessor), following a +// supervisor-worker pattern. // -// The controller's `Run` loop executes periodically, acting as a garbage collector that keeps the pool of running -// workers synchronized with the dynamic shard topology of the `FlowRegistry`. +// The controller's run loop executes periodically, acting as a garbage collector that keeps the pool of running +// workers synchronized with the dynamic shard topology of the FlowRegistry. // // Request Lifecycle Management: // @@ -103,25 +101,26 @@ type FlowController struct { // --- Lifecycle state --- - // parentCtx is the root context for the controller's lifecycle, established when `Run` is called. + // parentCtx is the root context for the controller's lifecycle, established when NewFlowController is called. // It is the parent for all long-lived worker goroutines. parentCtx context.Context // --- Concurrent state --- - // workers is a highly concurrent map storing the `managedWorker` for each shard. + // workers is a highly concurrent map storing the managedWorker for each shard. // It is the controller's source of truth for the worker pool. - // The key is the shard ID (`string`), and the value is a `*managedWorker`. - workers sync.Map + workers sync.Map // key: shard ID (string); value: *managedWorker + // wg waits for all worker goroutines to terminate during shutdown. wg sync.WaitGroup } -// flowControllerOption is a function that applies a configuration change to a `FlowController`. +// flowControllerOption is a function that applies a configuration change. // test-only type flowControllerOption func(*FlowController) -// NewFlowController creates a new `FlowController` instance. +// NewFlowController creates and starts a new FlowController instance. +// The provided context governs the lifecycle of the controller and all its workers. func NewFlowController( ctx context.Context, config Config, @@ -131,7 +130,7 @@ func NewFlowController( opts ...flowControllerOption, ) (*FlowController, error) { fc := &FlowController{ - config: *config.deepCopy(), + config: config, registry: registry, saturationDetector: sd, clock: clock.RealClock{}, @@ -139,7 +138,6 @@ func NewFlowController( parentCtx: ctx, } - // Use the real shard processor implementation by default. fc.shardProcessorFactory = func( ctx context.Context, shard contracts.RegistryShard, @@ -167,9 +165,9 @@ func NewFlowController( return fc, nil } -// run starts the `FlowController`'s main reconciliation loop. +// run starts the FlowController's main reconciliation loop (supervisor loop). // This loop is responsible for garbage collecting workers whose shards no longer exist in the registry. -// This method blocks until the provided context is cancelled and ALL worker goroutines have fully terminated. +// This method blocks until the provided context is cancelled and all worker goroutines have fully terminated. func (fc *FlowController) run(ctx context.Context) { fc.logger.Info("Starting FlowController reconciliation loop.") defer fc.logger.Info("FlowController reconciliation loop stopped.") @@ -194,23 +192,19 @@ func (fc *FlowController) run(ctx context.Context) { // # Design Rationale: The Synchronous Model // // This blocking model is deliberately chosen for its simplicity and robustness, especially in the context of Envoy -// External Processing (`ext_proc`), which operates on a stream-based protocol. +// External Processing (ext_proc), which operates on a stream-based protocol. // -// - `ext_proc` Alignment: A single goroutine typically manages the stream for a given HTTP request. -// `EnqueueAndWait` fits this perfectly: the request-handling goroutine calls it, blocks, and upon return, has a +// - ext_proc Alignment: A single goroutine typically manages the stream for a given HTTP request. +// EnqueueAndWait fits this perfectly: the request-handling goroutine calls it, blocks, and upon return, has a // definitive outcome to act upon. // - Simplified State Management: The state of a "waiting" request is implicitly managed by the blocked goroutine's -// stack and its `context.Context`. The system only needs to signal this specific goroutine to unblock it. -// - Direct Backpressure: If queues are full, `EnqueueAndWait` returns an error immediately, providing direct +// stack and its Context. The system only needs to signal this specific goroutine to unblock it. +// - Direct Backpressure: If queues are full, EnqueueAndWait returns an error immediately, providing direct // backpressure to the caller. func (fc *FlowController) EnqueueAndWait( ctx context.Context, req types.FlowControlRequest, ) (types.QueueOutcome, error) { - if req == nil { - return types.QueueOutcomeRejectedOther, errors.New("request cannot be nil") - } - flowKey := req.FlowKey() fairnessID := flowKey.ID priority := strconv.Itoa(flowKey.Priority) @@ -227,10 +221,6 @@ func (fc *FlowController) EnqueueAndWait( select { // Non-blocking check on controller lifecycle. case <-fc.parentCtx.Done(): return types.QueueOutcomeRejectedOther, fmt.Errorf("%w: %w", types.ErrRejected, types.ErrFlowControllerNotRunning) - case <-reqCtx.Done(): - return types.QueueOutcomeRejectedOther, fmt.Errorf( - "%w: request context cancelled or TTL expired before distribution: %w", - types.ErrRejected, context.Cause(reqCtx)) default: } @@ -369,17 +359,22 @@ type candidate struct { // of that flow's queue, from least to most loaded. func (fc *FlowController) selectDistributionCandidates(key types.FlowKey) ([]candidate, error) { var candidates []candidate + + // Acquire a connection to the registry for the flow key. This ensures a consistent view of the ActiveShards for the + // duration of the shard selection process, preventing races with concurrent shard topology changes. err := fc.registry.WithConnection(key, func(conn contracts.ActiveFlowConnection) error { shards := conn.ActiveShards() - candidates = make([]candidate, len(shards)) - for i, shard := range shards { + candidates = make([]candidate, 0, len(shards)) + for _, shard := range shards { worker := fc.getOrStartWorker(shard) mq, err := shard.ManagedQueue(key) if err != nil { - panic(fmt.Sprintf("invariant violation: ManagedQueue for leased flow %s failed on shard %s: %v", - key, shard.ID(), err)) + fc.logger.Error(err, + "Invariant violation. Failed to get ManagedQueue for a leased flow on an Active shard. Skipping shard.", + "flowKey", key, "shardID", shard.ID()) + continue } - candidates[i] = candidate{worker.processor, shard.ID(), mq.ByteSize()} + candidates = append(candidates, candidate{worker.processor, shard.ID(), mq.ByteSize()}) } return nil }) @@ -439,15 +434,15 @@ func (fc *FlowController) distributeRequest( } // getOrStartWorker implements the lazy-loading and startup of shard processors. -// It attempts to retrieve an existing worker for a shard. If one doesn't exist, it constructs a new worker and attempts -// to register it atomically. The worker's processor goroutine is only started *after* it has successfully been -// registered, preventing race conditions where multiple goroutines create and start the same worker. +// It ensures that exactly one worker goroutine is started for each shard, using atomic operations +// (sync.Map.LoadOrStore). The worker's processor goroutine is only started after it has successfully been registered, +// preventing race conditions where multiple goroutines create and start the same worker. func (fc *FlowController) getOrStartWorker(shard contracts.RegistryShard) *managedWorker { if w, ok := fc.workers.Load(shard.ID()); ok { return w.(*managedWorker) } - // Construct a new worker, but do not start its processor goroutine yet. + // Construct a new worker, but do not start its goroutine yet. processorCtx, cancel := context.WithCancel(fc.parentCtx) processor := fc.shardProcessorFactory( processorCtx, @@ -463,18 +458,17 @@ func (fc *FlowController) getOrStartWorker(shard contracts.RegistryShard) *manag cancel: cancel, } - // Atomically load or store. This is the critical step for preventing race conditions. + // Atomically load or store. This is the critical synchronization step. actual, loaded := fc.workers.LoadOrStore(shard.ID(), newWorker) if loaded { // Another goroutine beat us to it. The `newWorker` we created was not stored. - // We must cancel the context we created for it to prevent a leak, but we do not need to do anything else, as its - // processor was never started. + // We must cancel the context we created to prevent a leak. cancel() return actual.(*managedWorker) } - // We won the race. The `newWorker` was successfully stored. - // Now, and only now, do we start the processor's long-running goroutine. + // We won the race. The newWorker was stored. Now, start the processor's long-running goroutine. + fc.logger.V(logutil.DEFAULT).Info("Starting new ShardProcessor worker.", "shardID", shard.ID()) fc.wg.Add(1) go func() { defer fc.wg.Done() @@ -485,11 +479,10 @@ func (fc *FlowController) getOrStartWorker(shard contracts.RegistryShard) *manag } // reconcileProcessors is the supervisor's core garbage collection loop. -// It fetches the current list of Active shards from the registry and removes any workers whose corresponding shards -// have been fully drained and garbage collected by the registry. +// It identifies and stops workers whose corresponding shards have been removed from the registry. func (fc *FlowController) reconcileProcessors() { stats := fc.registry.ShardStats() - shards := make(map[string]struct{}, len(stats)) // `map[shardID] -> isActive` + shards := make(map[string]struct{}, len(stats)) // map[shardID] -> isActive for _, s := range stats { shards[s.ID] = struct{}{} } @@ -497,12 +490,11 @@ func (fc *FlowController) reconcileProcessors() { fc.workers.Range(func(key, value any) bool { shardID := key.(string) worker := value.(*managedWorker) - - // GC check: Is the shard no longer in the registry at all? if _, exists := shards[shardID]; !exists { - fc.logger.Info("Stale worker detected for GC'd shard, shutting down.", "shardID", shardID) - worker.cancel() - fc.workers.Delete(shardID) + fc.logger.V(logutil.DEFAULT).Info("Stale worker detected for GC'd shard, initiating shutdown.", + "shardID", shardID) + worker.cancel() // Cancel the worker's context, initiating the Processor's graceful shutdown sequence. + fc.workers.Delete(shardID) // Delete from the map so no new requests are routed to it. } return true }) @@ -519,7 +511,6 @@ func (fc *FlowController) shutdown() { worker.cancel() return true }) - fc.wg.Wait() fc.logger.Info("All shard processors have shut down.") } diff --git a/pkg/epp/flowcontrol/controller/controller_test.go b/pkg/epp/flowcontrol/controller/controller_test.go index 74802f2df..917a5a1e5 100644 --- a/pkg/epp/flowcontrol/controller/controller_test.go +++ b/pkg/epp/flowcontrol/controller/controller_test.go @@ -317,17 +317,6 @@ func TestFlowController_EnqueueAndWait(t *testing.T) { t.Run("Rejections", func(t *testing.T) { t.Parallel() - t.Run("OnNilRequest", func(t *testing.T) { - t.Parallel() - h := newUnitHarness(t, t.Context(), Config{}, nil) - - outcome, err := h.fc.EnqueueAndWait(context.Background(), nil) - require.Error(t, err, "EnqueueAndWait must reject a nil request") - assert.Equal(t, "request cannot be nil", err.Error(), "error message must be specific") - assert.Equal(t, types.QueueOutcomeRejectedOther, outcome, - "outcome should be QueueOutcomeRejectedOther for invalid inputs") - }) - t.Run("OnReqCtxExpiredBeforeDistribution", func(t *testing.T) { t.Parallel() // Test that if the request context provided to EnqueueAndWait is already expired, it returns immediately. @@ -418,14 +407,13 @@ func TestFlowController_EnqueueAndWait(t *testing.T) { "outcome should be QueueOutcomeRejectedOther for transient registry errors") }) - // This test validates the documented invariant handling in distributeRequest. - t.Run("PanicsOnManagedQueueError", func(t *testing.T) { + t.Run("OnManagedQueueError", func(t *testing.T) { t.Parallel() mockRegistry := &mockRegistryClient{} h := newUnitHarness(t, t.Context(), Config{}, mockRegistry) // Create a faulty shard that successfully leases the flow but fails to return the - // ManagedQueue. + // ManagedQueue. This shard should be considered as unavailable. faultyShard := &mocks.MockRegistryShard{ IDFunc: func() string { return "faulty-shard" }, ManagedQueueFunc: func(_ types.FlowKey) (contracts.ManagedQueue, error) { @@ -440,9 +428,11 @@ func TestFlowController_EnqueueAndWait(t *testing.T) { } req := newTestRequest(defaultFlowKey) - assert.Panics(t, func() { - _, _ = h.fc.EnqueueAndWait(context.Background(), req) - }, "EnqueueAndWait must panic when a registry implementation violates the ManagedQueue contract") + outcome, err := h.fc.EnqueueAndWait(context.Background(), req) + require.Error(t, err, "EnqueueAndWait must reject requests if no shards are available") + assert.ErrorIs(t, err, types.ErrRejected, "error should wrap ErrRejected") + assert.Equal(t, types.QueueOutcomeRejectedCapacity, outcome, + "outcome should be QueueOutcomeRejectedCapacity when no shards exist for the flow") }) }) diff --git a/pkg/epp/flowcontrol/controller/doc.go b/pkg/epp/flowcontrol/controller/doc.go index 4f2620c5a..0d2ea3687 100644 --- a/pkg/epp/flowcontrol/controller/doc.go +++ b/pkg/epp/flowcontrol/controller/doc.go @@ -14,23 +14,48 @@ See the License for the specific language governing permissions and limitations under the License. */ -// Package controller contains the implementation of the `FlowController` engine. +// Package controller contains the implementation of the FlowController engine. // // # Overview // -// The `FlowController` is the central processing engine of the Flow Control system. It acts as a stateless supervisor -// that orchestrates a pool of stateful workers (`internal.ShardProcessor`), managing the lifecycle of all incoming -// requests from initial submission to a terminal outcome (dispatch, rejection, or eviction). +// The FlowController is the central processing engine of the Flow Control layer. It acts as a stateless supervisor that +// orchestrates a pool of stateful workers (internal.ShardProcessor), managing the lifecycle of all incoming requests +// from initial submission to a terminal outcome (dispatch, rejection, or eviction). // // # Architecture: Supervisor-Worker Pattern // -// This package implements a classic "supervisor-worker" pattern to achieve high throughput and dynamic scalability. +// This package implements a supervisor-worker pattern to achieve high throughput and dynamic scalability. // -// - The `FlowController` (Supervisor): The public-facing API of the system. Its primary responsibilities are to -// execute a distribution algorithm to select the optimal worker for a new request and to manage the lifecycle of -// the worker pool, ensuring it stays synchronized with the underlying shard topology defined by the -// `contracts.FlowRegistry`. -// - The `internal.ShardProcessor` (Worker): A stateful, single-goroutine actor responsible for the entire lifecycle -// of requests on a single shard. The supervisor manages a pool of these workers, one for each -// `contracts.RegistryShard`. +// - The FlowController (Supervisor): The public-facing API of the system. Its primary responsibilities are to execute +// a distribution algorithm to select the optimal worker for a new request and to manage the lifecycle of the worker +// pool, ensuring it stays synchronized with the underlying shard topology defined by the contracts.FlowRegistry. +// - The internal.ShardProcessor (Worker): A stateful, single-goroutine actor responsible for the entire lifecycle of +// requests on a single shard. The supervisor manages a pool of these workers, one for each contracts.RegistryShard. +// +// # Concurrency Model +// +// The FlowController is designed to be highly concurrent and thread-safe. It acts primarily as a stateless distributor. +// +// - EnqueueAndWait: Can be called concurrently by many goroutines. +// - Worker Management: Uses a sync.Map (workers) for concurrent access and lazy initialization of workers. +// - Supervision: A single background goroutine (run) manages the worker pool lifecycle (garbage collection). +// +// It achieves high throughput by minimizing shared state and relying on the internal ShardProcessors to handle state +// mutations serially (using an actor model). +// +// # Request Lifecycle and Ownership +// +// A request (represented internally as a FlowItem) has a lifecycle managed cooperatively by the Controller and a +// Processor. Defining ownership is critical for ensuring an item is finalized exactly once. +// +// 1. Submission (Controller): The Controller attempts to hand off the item to a Processor. +// 2. Handoff: +// - Success: Ownership transfers to the Processor, which is now responsible for Finalization. +// - Failure: Ownership remains with the Controller, which must Finalize the item. +// 3. Processing (Processor): The Processor enqueues, manages, and eventually dispatches or rejects the item. +// 4. Finalization: The terminal outcome is set. This can happen: +// - Synchronously: The Processor determines the outcome (e.g., Dispatch, Capacity Rejection). +// - Asynchronously: The Controller observes the request's Context expiry (TTL/Cancellation) and calls Finalize. +// +// The FlowItem uses atomic operations to safely coordinate the Finalization state across goroutines. package controller diff --git a/pkg/epp/flowcontrol/controller/internal/doc.go b/pkg/epp/flowcontrol/controller/internal/doc.go index 083654682..0599d5387 100644 --- a/pkg/epp/flowcontrol/controller/internal/doc.go +++ b/pkg/epp/flowcontrol/controller/internal/doc.go @@ -22,11 +22,15 @@ limitations under the License. // # Design Philosophy: The Single-Writer Actor Model // // The concurrency model for this package is built around a single-writer, channel-based actor pattern, as implemented -// in the `ShardProcessor`. All state-mutating operations for a given shard (primarily enqueuing new requests) are -// funneled through a single "Run" goroutine. +// in the ShardProcessor. All state-mutating operations for a given shard (primarily enqueuing new requests) are +// funneled through a single Run goroutine. // // This design makes complex, multi-step transactions (like a hierarchical capacity check against both a shard's total // limit and a priority band's limit) inherently atomic without locks. This avoids the performance bottleneck of a -// coarse, shard-wide lock and allows the top-level `controller.FlowController` to remain decoupled and highly -// concurrent. +// coarse, shard-wide lock and allows the top-level Controller to remain decoupled and highly concurrent. +// +// # Key Components +// +// - ShardProcessor: The implementation of the worker actor. Manages the lifecycle of requests for a single shard. +// - FlowItem: The internal representation of a request, managing its state and synchronization across goroutines. package internal diff --git a/pkg/epp/flowcontrol/controller/internal/processor.go b/pkg/epp/flowcontrol/controller/internal/processor.go index e3cccc196..2370fd646 100644 --- a/pkg/epp/flowcontrol/controller/internal/processor.go +++ b/pkg/epp/flowcontrol/controller/internal/processor.go @@ -38,9 +38,8 @@ import ( // from overwhelming the Go scheduler with too many goroutines. const maxCleanupWorkers = 4 -// ErrProcessorBusy is a sentinel error returned by a the processor's `Submit` method. -// It indicates that the processor's internal buffer is momentarily full and cannot accept new work. -// This is used as a signal for the `controller.FlowController`'s "fast failover" logic. +// ErrProcessorBusy is a sentinel error returned by the processor's Submit method indicating that the processor's. +// internal buffer is momentarily full and cannot accept new work. var ErrProcessorBusy = errors.New("shard processor is busy") // ShardProcessor is the core worker of the FlowController. @@ -206,9 +205,8 @@ func (sp *ShardProcessor) Run(ctx context.Context) { } } -// enqueue is responsible for adding a new item to its designated queue. It is always run from the single main `Run` -// goroutine, which makes its multi-step "check-then-act" logic for capacity management inherently atomic and safe from -// race conditions. +// enqueue processes an item received from the enqueueChan. +// It handles capacity checks, checks for external finalization, and either admits the item to a queue or rejects it. func (sp *ShardProcessor) enqueue(item *FlowItem) { req := item.OriginalRequest() key := req.FlowKey() @@ -263,8 +261,9 @@ func (sp *ShardProcessor) enqueue(item *FlowItem) { "flowKey", key, "reqID", req.ID(), "priorityName", band.PriorityName()) } -// hasCapacity checks if the shard and the specific priority band have enough capacity to accommodate an item of a given -// size. This check is only safe because it is called from the single-writer `enqueue` method. +// hasCapacity checks if the shard and the specific priority band have enough capacity. +// This check reflects actual resource utilization, including "zombie" items (finalized but unswept), to prevent +// physical resource overcommitment. func (sp *ShardProcessor) hasCapacity(priority int, itemByteSize uint64) bool { if itemByteSize == 0 { return true @@ -275,85 +274,64 @@ func (sp *ShardProcessor) hasCapacity(priority int, itemByteSize uint64) bool { } bandStats, ok := stats.PerPriorityBandStats[priority] if !ok { - // This should not happen if the registry is consistent, but we fail closed just in case. - return false + return false // Fail closed if configuration is inconsistent. } return bandStats.ByteSize+itemByteSize <= bandStats.CapacityBytes } -// dispatchCycle attempts to dispatch a single item by iterating through all priority bands from highest to lowest. +// dispatchCycle attempts to dispatch a single item by iterating through priority bands from highest to lowest. // It applies the configured policies for each band to select an item and then attempts to dispatch it. // It returns true if an item was successfully dispatched, and false otherwise. +// It enforces Head-of-Line (HoL) blocking if the selected item is saturated. // -// # Error Handling Philosophy: Failure Isolation & Work Conservation -// -// A problem in one priority band (e.g., a failing policy) must not halt processing for other, healthy bands. -// Therefore, any error during selection or dispatch for a given band is logged, and the processor immediately continues -// to the next-lower priority band to maximize system throughput. -// -// # Strict Policy Adherence vs. Work Conservation -// -// This function's logic strictly adheres to the scheduling decisions made by the configured policies, even at the cost -// of work conservation. After the inter-flow (fairness) and intra-flow (ordering) policies select a request (e.g., -// `A_1` from flow `A`), a post-selection viability check is performed. +// # Work Conservation and Head-of-Line (HoL) Blocking // -// If request `A_1` targets saturated backends, this function will stop the entire dispatch cycle for the current tick. -// It will NOT attempt to find other work (like request `B_1` or `A_2`). Instead, it respects the policy decision that -// `A_1` is next and enforces Head-of-Line blocking on it. -// -// # Future Extension Point -// -// The iteration over priority bands is currently a simple, strict-priority loop. This could be abstracted into a third -// policy tier (e.g., an `InterBandDispatchPolicy`) if more complex scheduling between bands, such as Weighted Fair -// Queuing (WFQ), is ever required. +// The cycle attempts to be work-conserving by skipping bands where selection fails. +// However, if a selected item is saturated (cannot be scheduled), the cycle stops immediately. This enforces HoL +// blocking to respect the policy's decision and prevent priority inversion, where dispatching lower-priority work might +// exacerbate the saturation affecting the high-priority item. func (sp *ShardProcessor) dispatchCycle(ctx context.Context) bool { - baseLogger := sp.logger.WithName("dispatchCycle") for _, priority := range sp.shard.AllOrderedPriorityLevels() { originalBand, err := sp.shard.PriorityBandAccessor(priority) if err != nil { - baseLogger.Error(err, "Failed to get PriorityBandAccessor, skipping band", "priority", priority) + sp.logger.Error(err, "Failed to get PriorityBandAccessor, skipping band", "priority", priority) continue } - logger := baseLogger.WithValues("priority", priority, "priorityName", originalBand.PriorityName()) - item, err := sp.selectItem(originalBand, logger) + item, err := sp.selectItem(originalBand) if err != nil { - logger.Error(err, "Failed to select item, skipping priority band for this cycle") - continue + sp.logger.Error(err, "Failed to select item, skipping priority band for this cycle", + "priority", priority, "priorityName", originalBand.PriorityName()) + continue // Continue to the next band to maximize work conservation. } if item == nil { - logger.V(logutil.TRACE).Info("No item selected by dispatch policies, skipping band") continue } - logger = logger.WithValues( - "flowKey", item.OriginalRequest().FlowKey(), - "flowID", item.OriginalRequest().FlowKey().ID, - "flowPriority", item.OriginalRequest().FlowKey().Priority, - "reqID", item.OriginalRequest().ID(), - "reqByteSize", item.OriginalRequest().ByteSize()) - - candidatePods := item.OriginalRequest().CandidatePodsForScheduling() + // --- Viability Check (Saturation/HoL Blocking) --- + req := item.OriginalRequest() + candidatePods := req.CandidatePodsForScheduling() if sp.saturationDetector.IsSaturated(ctx, candidatePods) { - logger.V(logutil.VERBOSE).Info("Policy's chosen item is for a saturated flow; pausing dispatch and blocking on HoL") + sp.logger.V(logutil.DEBUG).Info("Policy's chosen item is saturated; enforcing HoL blocking.", + "flowKey", req.FlowKey(), "reqID", req.ID(), "priorityName", originalBand.PriorityName()) + // Stop the dispatch cycle entirely to respect strict policy decision and prevent priority inversion where + // lower-priority work might exacerbate the saturation affecting high-priority work. return false } - if err := sp.dispatchItem(item, logger); err != nil { - logger.Error(err, "Failed to dispatch item, skipping priority band for this cycle") - continue + // --- Dispatch --- + if err := sp.dispatchItem(item); err != nil { + sp.logger.Error(err, "Failed to dispatch item, skipping priority band for this cycle", + "flowKey", req.FlowKey(), "reqID", req.ID(), "priorityName", originalBand.PriorityName()) + continue // Continue to the next band to maximize work conservation. } return true } return false } -// selectItem applies the configured inter- and intra-flow dispatch policies to select a single item from a priority -// band. -func (sp *ShardProcessor) selectItem( - band framework.PriorityBandAccessor, - logger logr.Logger, -) (types.QueueItemAccessor, error) { +// selectItem applies the configured inter- and intra-flow dispatch policies to select a single item. +func (sp *ShardProcessor) selectItem(band framework.PriorityBandAccessor) (types.QueueItemAccessor, error) { interP, err := sp.shard.InterFlowDispatchPolicy(band.Priority()) if err != nil { return nil, fmt.Errorf("could not get InterFlowDispatchPolicy: %w", err) @@ -363,14 +341,9 @@ func (sp *ShardProcessor) selectItem( return nil, fmt.Errorf("InterFlowDispatchPolicy %q failed to select queue: %w", interP.Name(), err) } if queue == nil { - logger.V(logutil.TRACE).Info("No queue selected by InterFlowDispatchPolicy") return nil, nil } key := queue.FlowKey() - logger = logger.WithValues( - "selectedFlowKey", key, - "selectedFlowID", key.ID, - "selectedFlowPriority", key.Priority) intraP, err := sp.shard.IntraFlowDispatchPolicy(key) if err != nil { return nil, fmt.Errorf("could not get IntraFlowDispatchPolicy for flow %s: %w", key, err) @@ -379,32 +352,25 @@ func (sp *ShardProcessor) selectItem( if err != nil { return nil, fmt.Errorf("IntraFlowDispatchPolicy %q failed to select item for flow %s: %w", intraP.Name(), key, err) } - if item == nil { - logger.V(logutil.TRACE).Info("No item selected by IntraFlowDispatchPolicy") - return nil, nil - } return item, nil } -// dispatchItem handles the final steps of dispatching an item after it has been selected by policies. This includes -// removing it from its queue and finalizing its outcome. -func (sp *ShardProcessor) dispatchItem(itemAcc types.QueueItemAccessor, logger logr.Logger) error { - logger = logger.WithName("dispatchItem") - +// dispatchItem handles the final steps of dispatching an item: removing it from the queue and finalizing its outcome. +func (sp *ShardProcessor) dispatchItem(itemAcc types.QueueItemAccessor) error { req := itemAcc.OriginalRequest() - // We must look up the queue by its specific priority, as a flow might have draining queues at other levels. - managedQ, err := sp.shard.ManagedQueue(req.FlowKey()) + key := req.FlowKey() + managedQ, err := sp.shard.ManagedQueue(key) if err != nil { - return fmt.Errorf("failed to get ManagedQueue for flow %s: %w", req.FlowKey(), err) + return fmt.Errorf("failed to get ManagedQueue for flow %s: %w", key, err) } - // The core mutation: remove the item from the queue. removedItemAcc, err := managedQ.Remove(itemAcc.Handle()) if err != nil { - // This can happen benignly if the item was already removed by the expiry cleanup loop between the time it was - // selected by the policy and the time this function is called. - logger.V(logutil.VERBOSE).Info("Item already removed from queue, likely by expiry cleanup", "err", err) - return fmt.Errorf("failed to remove item %q from queue for flow %s: %w", req.ID(), req.FlowKey(), err) + // This happens benignly if the item was already removed by the cleanup sweep loop. + // We log it at a low level for visibility but return nil so the dispatch cycle proceeds. + sp.logger.V(logutil.DEBUG).Info("Failed to remove item during dispatch (likely already finalized and swept).", + "flowKey", key, "reqID", req.ID(), "error", err) + return nil } removedItem := removedItemAcc.(*FlowItem) diff --git a/pkg/epp/flowcontrol/controller/internal/processor_test.go b/pkg/epp/flowcontrol/controller/internal/processor_test.go index 2280b82d7..73fc5b13d 100644 --- a/pkg/epp/flowcontrol/controller/internal/processor_test.go +++ b/pkg/epp/flowcontrol/controller/internal/processor_test.go @@ -913,19 +913,6 @@ func TestShardProcessor(t *testing.T) { }, expectedErr: registryErr, }, - { - name: "on queue remove failure", - setupMocks: func(h *testHarness) { - h.ManagedQueueFunc = func(types.FlowKey) (contracts.ManagedQueue, error) { - return &mocks.MockManagedQueue{ - RemoveFunc: func(types.QueueItemHandle) (types.QueueItemAccessor, error) { - return nil, registryErr - }, - }, nil - } - }, - expectedErr: registryErr, - }, } for _, tc := range testCases { @@ -934,7 +921,7 @@ func TestShardProcessor(t *testing.T) { h := newTestHarness(t, testCleanupTick) tc.setupMocks(h) item := h.newTestItem("req-dispatch-fail", testFlow, testTTL) - err := h.processor.dispatchItem(item, h.logger) + err := h.processor.dispatchItem(item) require.Error(t, err, "dispatchItem should return an error") assert.ErrorIs(t, err, tc.expectedErr, "The underlying registry error should be preserved") }) @@ -957,7 +944,7 @@ func TestShardProcessor(t *testing.T) { } // --- ACT --- - err := h.processor.dispatchItem(item, h.logger) + err := h.processor.dispatchItem(item) // --- ASSERT --- require.NoError(t, err, "dispatchItem should return no error for an already finalized item")