diff --git a/pkg/epp/flowcontrol/controller/controller.go b/pkg/epp/flowcontrol/controller/controller.go index 93d0330c7..a11ef26a5 100644 --- a/pkg/epp/flowcontrol/controller/controller.go +++ b/pkg/epp/flowcontrol/controller/controller.go @@ -14,12 +14,11 @@ See the License for the specific language governing permissions and limitations under the License. */ -// Package controller contains the implementation of the `FlowController` engine. +// Package controller contains the implementation of the FlowController engine. // -// The FlowController is the central processing engine of the Flow Control system. It is a sharded, high-throughput +// The FlowController is the central processing engine of the Flow Control layer. It is a sharded, high-throughput // component responsible for managing the lifecycle of all incoming requests. It achieves this by acting as a stateless -// supervisor that orchestrates a pool of stateful workers (`internal.ShardProcessor`), distributing incoming requests -// among them using a sophisticated load-balancing algorithm. +// supervisor that orchestrates a pool of stateful workers (ShardProcessors), distributing incoming requests among them. package controller import ( @@ -43,22 +42,20 @@ import ( logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" ) -// registryClient defines the minimal interface that the `FlowController` needs to interact with the `FlowRegistry`. +// registryClient defines the minimal interface that the FlowController needs to interact with the FlowRegistry. type registryClient interface { contracts.FlowRegistryObserver contracts.FlowRegistryDataPlane } -// shardProcessor is the minimal internal interface that the `FlowController` requires from its workers. -// This abstraction allows for the injection of mock processors during testing. +// shardProcessor is the minimal internal interface that the FlowController requires from its workers. type shardProcessor interface { Run(ctx context.Context) Submit(item *internal.FlowItem) error SubmitOrBlock(ctx context.Context, item *internal.FlowItem) error } -// shardProcessorFactory defines the signature for a function that creates a `shardProcessor`. -// This enables dependency injection for testing. +// shardProcessorFactory defines the signature for creating a shardProcessor. type shardProcessorFactory func( ctx context.Context, shard contracts.RegistryShard, @@ -74,15 +71,16 @@ var _ shardProcessor = &internal.ShardProcessor{} // managedWorker holds the state for a single supervised worker. type managedWorker struct { processor shardProcessor - cancel context.CancelFunc + // cancel function for the worker-specific context. Used during shutdown and GC. + cancel context.CancelFunc } -// FlowController is the central, high-throughput engine of the Flow Control system. -// It is designed as a stateless distributor that orchestrates a pool of stateful workers (`internal.ShardProcessor`), -// following a "supervisor-worker" pattern. +// FlowController is the central, high-throughput engine of the Flow Control layer. +// It is designed as a stateless distributor that orchestrates a pool of stateful workers (ShardProcessor), following a +// supervisor-worker pattern. // -// The controller's `Run` loop executes periodically, acting as a garbage collector that keeps the pool of running -// workers synchronized with the dynamic shard topology of the `FlowRegistry`. +// The controller's run loop executes periodically, acting as a garbage collector that keeps the pool of running +// workers synchronized with the dynamic shard topology of the FlowRegistry. // // Request Lifecycle Management: // @@ -103,25 +101,26 @@ type FlowController struct { // --- Lifecycle state --- - // parentCtx is the root context for the controller's lifecycle, established when `Run` is called. + // parentCtx is the root context for the controller's lifecycle, established when NewFlowController is called. // It is the parent for all long-lived worker goroutines. parentCtx context.Context // --- Concurrent state --- - // workers is a highly concurrent map storing the `managedWorker` for each shard. + // workers is a highly concurrent map storing the managedWorker for each shard. // It is the controller's source of truth for the worker pool. - // The key is the shard ID (`string`), and the value is a `*managedWorker`. - workers sync.Map + workers sync.Map // key: shard ID (string); value: *managedWorker + // wg waits for all worker goroutines to terminate during shutdown. wg sync.WaitGroup } -// flowControllerOption is a function that applies a configuration change to a `FlowController`. +// flowControllerOption is a function that applies a configuration change. // test-only type flowControllerOption func(*FlowController) -// NewFlowController creates a new `FlowController` instance. +// NewFlowController creates and starts a new FlowController instance. +// The provided context governs the lifecycle of the controller and all its workers. func NewFlowController( ctx context.Context, config Config, @@ -131,7 +130,7 @@ func NewFlowController( opts ...flowControllerOption, ) (*FlowController, error) { fc := &FlowController{ - config: *config.deepCopy(), + config: config, registry: registry, saturationDetector: sd, clock: clock.RealClock{}, @@ -139,7 +138,6 @@ func NewFlowController( parentCtx: ctx, } - // Use the real shard processor implementation by default. fc.shardProcessorFactory = func( ctx context.Context, shard contracts.RegistryShard, @@ -167,9 +165,9 @@ func NewFlowController( return fc, nil } -// run starts the `FlowController`'s main reconciliation loop. +// run starts the FlowController's main reconciliation loop (supervisor loop). // This loop is responsible for garbage collecting workers whose shards no longer exist in the registry. -// This method blocks until the provided context is cancelled and ALL worker goroutines have fully terminated. +// This method blocks until the provided context is cancelled and all worker goroutines have fully terminated. func (fc *FlowController) run(ctx context.Context) { fc.logger.Info("Starting FlowController reconciliation loop.") defer fc.logger.Info("FlowController reconciliation loop stopped.") @@ -194,23 +192,19 @@ func (fc *FlowController) run(ctx context.Context) { // # Design Rationale: The Synchronous Model // // This blocking model is deliberately chosen for its simplicity and robustness, especially in the context of Envoy -// External Processing (`ext_proc`), which operates on a stream-based protocol. +// External Processing (ext_proc), which operates on a stream-based protocol. // -// - `ext_proc` Alignment: A single goroutine typically manages the stream for a given HTTP request. -// `EnqueueAndWait` fits this perfectly: the request-handling goroutine calls it, blocks, and upon return, has a +// - ext_proc Alignment: A single goroutine typically manages the stream for a given HTTP request. +// EnqueueAndWait fits this perfectly: the request-handling goroutine calls it, blocks, and upon return, has a // definitive outcome to act upon. // - Simplified State Management: The state of a "waiting" request is implicitly managed by the blocked goroutine's -// stack and its `context.Context`. The system only needs to signal this specific goroutine to unblock it. -// - Direct Backpressure: If queues are full, `EnqueueAndWait` returns an error immediately, providing direct +// stack and its Context. The system only needs to signal this specific goroutine to unblock it. +// - Direct Backpressure: If queues are full, EnqueueAndWait returns an error immediately, providing direct // backpressure to the caller. func (fc *FlowController) EnqueueAndWait( ctx context.Context, req types.FlowControlRequest, ) (types.QueueOutcome, error) { - if req == nil { - return types.QueueOutcomeRejectedOther, errors.New("request cannot be nil") - } - flowKey := req.FlowKey() fairnessID := flowKey.ID priority := strconv.Itoa(flowKey.Priority) @@ -365,17 +359,22 @@ type candidate struct { // of that flow's queue, from least to most loaded. func (fc *FlowController) selectDistributionCandidates(key types.FlowKey) ([]candidate, error) { var candidates []candidate + + // Acquire a connection to the registry for the flow key. This ensures a consistent view of the ActiveShards for the + // duration of the shard selection process, preventing races with concurrent shard topology changes. err := fc.registry.WithConnection(key, func(conn contracts.ActiveFlowConnection) error { shards := conn.ActiveShards() - candidates = make([]candidate, len(shards)) - for i, shard := range shards { + candidates = make([]candidate, 0, len(shards)) + for _, shard := range shards { worker := fc.getOrStartWorker(shard) mq, err := shard.ManagedQueue(key) if err != nil { - panic(fmt.Sprintf("invariant violation: ManagedQueue for leased flow %s failed on shard %s: %v", - key, shard.ID(), err)) + fc.logger.Error(err, + "Invariant violation. Failed to get ManagedQueue for a leased flow on an Active shard. Skipping shard.", + "flowKey", key, "shardID", shard.ID()) + continue } - candidates[i] = candidate{worker.processor, shard.ID(), mq.ByteSize()} + candidates = append(candidates, candidate{worker.processor, shard.ID(), mq.ByteSize()}) } return nil }) @@ -435,15 +434,15 @@ func (fc *FlowController) distributeRequest( } // getOrStartWorker implements the lazy-loading and startup of shard processors. -// It attempts to retrieve an existing worker for a shard. If one doesn't exist, it constructs a new worker and attempts -// to register it atomically. The worker's processor goroutine is only started *after* it has successfully been -// registered, preventing race conditions where multiple goroutines create and start the same worker. +// It ensures that exactly one worker goroutine is started for each shard, using atomic operations +// (sync.Map.LoadOrStore). The worker's processor goroutine is only started after it has successfully been registered, +// preventing race conditions where multiple goroutines create and start the same worker. func (fc *FlowController) getOrStartWorker(shard contracts.RegistryShard) *managedWorker { if w, ok := fc.workers.Load(shard.ID()); ok { return w.(*managedWorker) } - // Construct a new worker, but do not start its processor goroutine yet. + // Construct a new worker, but do not start its goroutine yet. processorCtx, cancel := context.WithCancel(fc.parentCtx) processor := fc.shardProcessorFactory( processorCtx, @@ -459,18 +458,17 @@ func (fc *FlowController) getOrStartWorker(shard contracts.RegistryShard) *manag cancel: cancel, } - // Atomically load or store. This is the critical step for preventing race conditions. + // Atomically load or store. This is the critical synchronization step. actual, loaded := fc.workers.LoadOrStore(shard.ID(), newWorker) if loaded { // Another goroutine beat us to it. The `newWorker` we created was not stored. - // We must cancel the context we created for it to prevent a leak, but we do not need to do anything else, as its - // processor was never started. + // We must cancel the context we created to prevent a leak. cancel() return actual.(*managedWorker) } - // We won the race. The `newWorker` was successfully stored. - // Now, and only now, do we start the processor's long-running goroutine. + // We won the race. The newWorker was stored. Now, start the processor's long-running goroutine. + fc.logger.V(logutil.DEFAULT).Info("Starting new ShardProcessor worker.", "shardID", shard.ID()) fc.wg.Add(1) go func() { defer fc.wg.Done() @@ -481,11 +479,10 @@ func (fc *FlowController) getOrStartWorker(shard contracts.RegistryShard) *manag } // reconcileProcessors is the supervisor's core garbage collection loop. -// It fetches the current list of Active shards from the registry and removes any workers whose corresponding shards -// have been fully drained and garbage collected by the registry. +// It identifies and stops workers whose corresponding shards have been removed from the registry. func (fc *FlowController) reconcileProcessors() { stats := fc.registry.ShardStats() - shards := make(map[string]struct{}, len(stats)) // `map[shardID] -> isActive` + shards := make(map[string]struct{}, len(stats)) // map[shardID] -> isActive for _, s := range stats { shards[s.ID] = struct{}{} } @@ -493,12 +490,11 @@ func (fc *FlowController) reconcileProcessors() { fc.workers.Range(func(key, value any) bool { shardID := key.(string) worker := value.(*managedWorker) - - // GC check: Is the shard no longer in the registry at all? if _, exists := shards[shardID]; !exists { - fc.logger.Info("Stale worker detected for GC'd shard, shutting down.", "shardID", shardID) - worker.cancel() - fc.workers.Delete(shardID) + fc.logger.V(logutil.DEFAULT).Info("Stale worker detected for GC'd shard, initiating shutdown.", + "shardID", shardID) + worker.cancel() // Cancel the worker's context, initiating the Processor's graceful shutdown sequence. + fc.workers.Delete(shardID) // Delete from the map so no new requests are routed to it. } return true }) @@ -515,7 +511,6 @@ func (fc *FlowController) shutdown() { worker.cancel() return true }) - fc.wg.Wait() fc.logger.Info("All shard processors have shut down.") } diff --git a/pkg/epp/flowcontrol/controller/controller_test.go b/pkg/epp/flowcontrol/controller/controller_test.go index 74802f2df..917a5a1e5 100644 --- a/pkg/epp/flowcontrol/controller/controller_test.go +++ b/pkg/epp/flowcontrol/controller/controller_test.go @@ -317,17 +317,6 @@ func TestFlowController_EnqueueAndWait(t *testing.T) { t.Run("Rejections", func(t *testing.T) { t.Parallel() - t.Run("OnNilRequest", func(t *testing.T) { - t.Parallel() - h := newUnitHarness(t, t.Context(), Config{}, nil) - - outcome, err := h.fc.EnqueueAndWait(context.Background(), nil) - require.Error(t, err, "EnqueueAndWait must reject a nil request") - assert.Equal(t, "request cannot be nil", err.Error(), "error message must be specific") - assert.Equal(t, types.QueueOutcomeRejectedOther, outcome, - "outcome should be QueueOutcomeRejectedOther for invalid inputs") - }) - t.Run("OnReqCtxExpiredBeforeDistribution", func(t *testing.T) { t.Parallel() // Test that if the request context provided to EnqueueAndWait is already expired, it returns immediately. @@ -418,14 +407,13 @@ func TestFlowController_EnqueueAndWait(t *testing.T) { "outcome should be QueueOutcomeRejectedOther for transient registry errors") }) - // This test validates the documented invariant handling in distributeRequest. - t.Run("PanicsOnManagedQueueError", func(t *testing.T) { + t.Run("OnManagedQueueError", func(t *testing.T) { t.Parallel() mockRegistry := &mockRegistryClient{} h := newUnitHarness(t, t.Context(), Config{}, mockRegistry) // Create a faulty shard that successfully leases the flow but fails to return the - // ManagedQueue. + // ManagedQueue. This shard should be considered as unavailable. faultyShard := &mocks.MockRegistryShard{ IDFunc: func() string { return "faulty-shard" }, ManagedQueueFunc: func(_ types.FlowKey) (contracts.ManagedQueue, error) { @@ -440,9 +428,11 @@ func TestFlowController_EnqueueAndWait(t *testing.T) { } req := newTestRequest(defaultFlowKey) - assert.Panics(t, func() { - _, _ = h.fc.EnqueueAndWait(context.Background(), req) - }, "EnqueueAndWait must panic when a registry implementation violates the ManagedQueue contract") + outcome, err := h.fc.EnqueueAndWait(context.Background(), req) + require.Error(t, err, "EnqueueAndWait must reject requests if no shards are available") + assert.ErrorIs(t, err, types.ErrRejected, "error should wrap ErrRejected") + assert.Equal(t, types.QueueOutcomeRejectedCapacity, outcome, + "outcome should be QueueOutcomeRejectedCapacity when no shards exist for the flow") }) }) diff --git a/pkg/epp/flowcontrol/controller/doc.go b/pkg/epp/flowcontrol/controller/doc.go index 4f2620c5a..0d2ea3687 100644 --- a/pkg/epp/flowcontrol/controller/doc.go +++ b/pkg/epp/flowcontrol/controller/doc.go @@ -14,23 +14,48 @@ See the License for the specific language governing permissions and limitations under the License. */ -// Package controller contains the implementation of the `FlowController` engine. +// Package controller contains the implementation of the FlowController engine. // // # Overview // -// The `FlowController` is the central processing engine of the Flow Control system. It acts as a stateless supervisor -// that orchestrates a pool of stateful workers (`internal.ShardProcessor`), managing the lifecycle of all incoming -// requests from initial submission to a terminal outcome (dispatch, rejection, or eviction). +// The FlowController is the central processing engine of the Flow Control layer. It acts as a stateless supervisor that +// orchestrates a pool of stateful workers (internal.ShardProcessor), managing the lifecycle of all incoming requests +// from initial submission to a terminal outcome (dispatch, rejection, or eviction). // // # Architecture: Supervisor-Worker Pattern // -// This package implements a classic "supervisor-worker" pattern to achieve high throughput and dynamic scalability. +// This package implements a supervisor-worker pattern to achieve high throughput and dynamic scalability. // -// - The `FlowController` (Supervisor): The public-facing API of the system. Its primary responsibilities are to -// execute a distribution algorithm to select the optimal worker for a new request and to manage the lifecycle of -// the worker pool, ensuring it stays synchronized with the underlying shard topology defined by the -// `contracts.FlowRegistry`. -// - The `internal.ShardProcessor` (Worker): A stateful, single-goroutine actor responsible for the entire lifecycle -// of requests on a single shard. The supervisor manages a pool of these workers, one for each -// `contracts.RegistryShard`. +// - The FlowController (Supervisor): The public-facing API of the system. Its primary responsibilities are to execute +// a distribution algorithm to select the optimal worker for a new request and to manage the lifecycle of the worker +// pool, ensuring it stays synchronized with the underlying shard topology defined by the contracts.FlowRegistry. +// - The internal.ShardProcessor (Worker): A stateful, single-goroutine actor responsible for the entire lifecycle of +// requests on a single shard. The supervisor manages a pool of these workers, one for each contracts.RegistryShard. +// +// # Concurrency Model +// +// The FlowController is designed to be highly concurrent and thread-safe. It acts primarily as a stateless distributor. +// +// - EnqueueAndWait: Can be called concurrently by many goroutines. +// - Worker Management: Uses a sync.Map (workers) for concurrent access and lazy initialization of workers. +// - Supervision: A single background goroutine (run) manages the worker pool lifecycle (garbage collection). +// +// It achieves high throughput by minimizing shared state and relying on the internal ShardProcessors to handle state +// mutations serially (using an actor model). +// +// # Request Lifecycle and Ownership +// +// A request (represented internally as a FlowItem) has a lifecycle managed cooperatively by the Controller and a +// Processor. Defining ownership is critical for ensuring an item is finalized exactly once. +// +// 1. Submission (Controller): The Controller attempts to hand off the item to a Processor. +// 2. Handoff: +// - Success: Ownership transfers to the Processor, which is now responsible for Finalization. +// - Failure: Ownership remains with the Controller, which must Finalize the item. +// 3. Processing (Processor): The Processor enqueues, manages, and eventually dispatches or rejects the item. +// 4. Finalization: The terminal outcome is set. This can happen: +// - Synchronously: The Processor determines the outcome (e.g., Dispatch, Capacity Rejection). +// - Asynchronously: The Controller observes the request's Context expiry (TTL/Cancellation) and calls Finalize. +// +// The FlowItem uses atomic operations to safely coordinate the Finalization state across goroutines. package controller diff --git a/pkg/epp/flowcontrol/controller/internal/doc.go b/pkg/epp/flowcontrol/controller/internal/doc.go index 083654682..0599d5387 100644 --- a/pkg/epp/flowcontrol/controller/internal/doc.go +++ b/pkg/epp/flowcontrol/controller/internal/doc.go @@ -22,11 +22,15 @@ limitations under the License. // # Design Philosophy: The Single-Writer Actor Model // // The concurrency model for this package is built around a single-writer, channel-based actor pattern, as implemented -// in the `ShardProcessor`. All state-mutating operations for a given shard (primarily enqueuing new requests) are -// funneled through a single "Run" goroutine. +// in the ShardProcessor. All state-mutating operations for a given shard (primarily enqueuing new requests) are +// funneled through a single Run goroutine. // // This design makes complex, multi-step transactions (like a hierarchical capacity check against both a shard's total // limit and a priority band's limit) inherently atomic without locks. This avoids the performance bottleneck of a -// coarse, shard-wide lock and allows the top-level `controller.FlowController` to remain decoupled and highly -// concurrent. +// coarse, shard-wide lock and allows the top-level Controller to remain decoupled and highly concurrent. +// +// # Key Components +// +// - ShardProcessor: The implementation of the worker actor. Manages the lifecycle of requests for a single shard. +// - FlowItem: The internal representation of a request, managing its state and synchronization across goroutines. package internal diff --git a/pkg/epp/flowcontrol/controller/internal/processor.go b/pkg/epp/flowcontrol/controller/internal/processor.go index e3cccc196..2370fd646 100644 --- a/pkg/epp/flowcontrol/controller/internal/processor.go +++ b/pkg/epp/flowcontrol/controller/internal/processor.go @@ -38,9 +38,8 @@ import ( // from overwhelming the Go scheduler with too many goroutines. const maxCleanupWorkers = 4 -// ErrProcessorBusy is a sentinel error returned by a the processor's `Submit` method. -// It indicates that the processor's internal buffer is momentarily full and cannot accept new work. -// This is used as a signal for the `controller.FlowController`'s "fast failover" logic. +// ErrProcessorBusy is a sentinel error returned by the processor's Submit method indicating that the processor's. +// internal buffer is momentarily full and cannot accept new work. var ErrProcessorBusy = errors.New("shard processor is busy") // ShardProcessor is the core worker of the FlowController. @@ -206,9 +205,8 @@ func (sp *ShardProcessor) Run(ctx context.Context) { } } -// enqueue is responsible for adding a new item to its designated queue. It is always run from the single main `Run` -// goroutine, which makes its multi-step "check-then-act" logic for capacity management inherently atomic and safe from -// race conditions. +// enqueue processes an item received from the enqueueChan. +// It handles capacity checks, checks for external finalization, and either admits the item to a queue or rejects it. func (sp *ShardProcessor) enqueue(item *FlowItem) { req := item.OriginalRequest() key := req.FlowKey() @@ -263,8 +261,9 @@ func (sp *ShardProcessor) enqueue(item *FlowItem) { "flowKey", key, "reqID", req.ID(), "priorityName", band.PriorityName()) } -// hasCapacity checks if the shard and the specific priority band have enough capacity to accommodate an item of a given -// size. This check is only safe because it is called from the single-writer `enqueue` method. +// hasCapacity checks if the shard and the specific priority band have enough capacity. +// This check reflects actual resource utilization, including "zombie" items (finalized but unswept), to prevent +// physical resource overcommitment. func (sp *ShardProcessor) hasCapacity(priority int, itemByteSize uint64) bool { if itemByteSize == 0 { return true @@ -275,85 +274,64 @@ func (sp *ShardProcessor) hasCapacity(priority int, itemByteSize uint64) bool { } bandStats, ok := stats.PerPriorityBandStats[priority] if !ok { - // This should not happen if the registry is consistent, but we fail closed just in case. - return false + return false // Fail closed if configuration is inconsistent. } return bandStats.ByteSize+itemByteSize <= bandStats.CapacityBytes } -// dispatchCycle attempts to dispatch a single item by iterating through all priority bands from highest to lowest. +// dispatchCycle attempts to dispatch a single item by iterating through priority bands from highest to lowest. // It applies the configured policies for each band to select an item and then attempts to dispatch it. // It returns true if an item was successfully dispatched, and false otherwise. +// It enforces Head-of-Line (HoL) blocking if the selected item is saturated. // -// # Error Handling Philosophy: Failure Isolation & Work Conservation -// -// A problem in one priority band (e.g., a failing policy) must not halt processing for other, healthy bands. -// Therefore, any error during selection or dispatch for a given band is logged, and the processor immediately continues -// to the next-lower priority band to maximize system throughput. -// -// # Strict Policy Adherence vs. Work Conservation -// -// This function's logic strictly adheres to the scheduling decisions made by the configured policies, even at the cost -// of work conservation. After the inter-flow (fairness) and intra-flow (ordering) policies select a request (e.g., -// `A_1` from flow `A`), a post-selection viability check is performed. +// # Work Conservation and Head-of-Line (HoL) Blocking // -// If request `A_1` targets saturated backends, this function will stop the entire dispatch cycle for the current tick. -// It will NOT attempt to find other work (like request `B_1` or `A_2`). Instead, it respects the policy decision that -// `A_1` is next and enforces Head-of-Line blocking on it. -// -// # Future Extension Point -// -// The iteration over priority bands is currently a simple, strict-priority loop. This could be abstracted into a third -// policy tier (e.g., an `InterBandDispatchPolicy`) if more complex scheduling between bands, such as Weighted Fair -// Queuing (WFQ), is ever required. +// The cycle attempts to be work-conserving by skipping bands where selection fails. +// However, if a selected item is saturated (cannot be scheduled), the cycle stops immediately. This enforces HoL +// blocking to respect the policy's decision and prevent priority inversion, where dispatching lower-priority work might +// exacerbate the saturation affecting the high-priority item. func (sp *ShardProcessor) dispatchCycle(ctx context.Context) bool { - baseLogger := sp.logger.WithName("dispatchCycle") for _, priority := range sp.shard.AllOrderedPriorityLevels() { originalBand, err := sp.shard.PriorityBandAccessor(priority) if err != nil { - baseLogger.Error(err, "Failed to get PriorityBandAccessor, skipping band", "priority", priority) + sp.logger.Error(err, "Failed to get PriorityBandAccessor, skipping band", "priority", priority) continue } - logger := baseLogger.WithValues("priority", priority, "priorityName", originalBand.PriorityName()) - item, err := sp.selectItem(originalBand, logger) + item, err := sp.selectItem(originalBand) if err != nil { - logger.Error(err, "Failed to select item, skipping priority band for this cycle") - continue + sp.logger.Error(err, "Failed to select item, skipping priority band for this cycle", + "priority", priority, "priorityName", originalBand.PriorityName()) + continue // Continue to the next band to maximize work conservation. } if item == nil { - logger.V(logutil.TRACE).Info("No item selected by dispatch policies, skipping band") continue } - logger = logger.WithValues( - "flowKey", item.OriginalRequest().FlowKey(), - "flowID", item.OriginalRequest().FlowKey().ID, - "flowPriority", item.OriginalRequest().FlowKey().Priority, - "reqID", item.OriginalRequest().ID(), - "reqByteSize", item.OriginalRequest().ByteSize()) - - candidatePods := item.OriginalRequest().CandidatePodsForScheduling() + // --- Viability Check (Saturation/HoL Blocking) --- + req := item.OriginalRequest() + candidatePods := req.CandidatePodsForScheduling() if sp.saturationDetector.IsSaturated(ctx, candidatePods) { - logger.V(logutil.VERBOSE).Info("Policy's chosen item is for a saturated flow; pausing dispatch and blocking on HoL") + sp.logger.V(logutil.DEBUG).Info("Policy's chosen item is saturated; enforcing HoL blocking.", + "flowKey", req.FlowKey(), "reqID", req.ID(), "priorityName", originalBand.PriorityName()) + // Stop the dispatch cycle entirely to respect strict policy decision and prevent priority inversion where + // lower-priority work might exacerbate the saturation affecting high-priority work. return false } - if err := sp.dispatchItem(item, logger); err != nil { - logger.Error(err, "Failed to dispatch item, skipping priority band for this cycle") - continue + // --- Dispatch --- + if err := sp.dispatchItem(item); err != nil { + sp.logger.Error(err, "Failed to dispatch item, skipping priority band for this cycle", + "flowKey", req.FlowKey(), "reqID", req.ID(), "priorityName", originalBand.PriorityName()) + continue // Continue to the next band to maximize work conservation. } return true } return false } -// selectItem applies the configured inter- and intra-flow dispatch policies to select a single item from a priority -// band. -func (sp *ShardProcessor) selectItem( - band framework.PriorityBandAccessor, - logger logr.Logger, -) (types.QueueItemAccessor, error) { +// selectItem applies the configured inter- and intra-flow dispatch policies to select a single item. +func (sp *ShardProcessor) selectItem(band framework.PriorityBandAccessor) (types.QueueItemAccessor, error) { interP, err := sp.shard.InterFlowDispatchPolicy(band.Priority()) if err != nil { return nil, fmt.Errorf("could not get InterFlowDispatchPolicy: %w", err) @@ -363,14 +341,9 @@ func (sp *ShardProcessor) selectItem( return nil, fmt.Errorf("InterFlowDispatchPolicy %q failed to select queue: %w", interP.Name(), err) } if queue == nil { - logger.V(logutil.TRACE).Info("No queue selected by InterFlowDispatchPolicy") return nil, nil } key := queue.FlowKey() - logger = logger.WithValues( - "selectedFlowKey", key, - "selectedFlowID", key.ID, - "selectedFlowPriority", key.Priority) intraP, err := sp.shard.IntraFlowDispatchPolicy(key) if err != nil { return nil, fmt.Errorf("could not get IntraFlowDispatchPolicy for flow %s: %w", key, err) @@ -379,32 +352,25 @@ func (sp *ShardProcessor) selectItem( if err != nil { return nil, fmt.Errorf("IntraFlowDispatchPolicy %q failed to select item for flow %s: %w", intraP.Name(), key, err) } - if item == nil { - logger.V(logutil.TRACE).Info("No item selected by IntraFlowDispatchPolicy") - return nil, nil - } return item, nil } -// dispatchItem handles the final steps of dispatching an item after it has been selected by policies. This includes -// removing it from its queue and finalizing its outcome. -func (sp *ShardProcessor) dispatchItem(itemAcc types.QueueItemAccessor, logger logr.Logger) error { - logger = logger.WithName("dispatchItem") - +// dispatchItem handles the final steps of dispatching an item: removing it from the queue and finalizing its outcome. +func (sp *ShardProcessor) dispatchItem(itemAcc types.QueueItemAccessor) error { req := itemAcc.OriginalRequest() - // We must look up the queue by its specific priority, as a flow might have draining queues at other levels. - managedQ, err := sp.shard.ManagedQueue(req.FlowKey()) + key := req.FlowKey() + managedQ, err := sp.shard.ManagedQueue(key) if err != nil { - return fmt.Errorf("failed to get ManagedQueue for flow %s: %w", req.FlowKey(), err) + return fmt.Errorf("failed to get ManagedQueue for flow %s: %w", key, err) } - // The core mutation: remove the item from the queue. removedItemAcc, err := managedQ.Remove(itemAcc.Handle()) if err != nil { - // This can happen benignly if the item was already removed by the expiry cleanup loop between the time it was - // selected by the policy and the time this function is called. - logger.V(logutil.VERBOSE).Info("Item already removed from queue, likely by expiry cleanup", "err", err) - return fmt.Errorf("failed to remove item %q from queue for flow %s: %w", req.ID(), req.FlowKey(), err) + // This happens benignly if the item was already removed by the cleanup sweep loop. + // We log it at a low level for visibility but return nil so the dispatch cycle proceeds. + sp.logger.V(logutil.DEBUG).Info("Failed to remove item during dispatch (likely already finalized and swept).", + "flowKey", key, "reqID", req.ID(), "error", err) + return nil } removedItem := removedItemAcc.(*FlowItem) diff --git a/pkg/epp/flowcontrol/controller/internal/processor_test.go b/pkg/epp/flowcontrol/controller/internal/processor_test.go index 2280b82d7..73fc5b13d 100644 --- a/pkg/epp/flowcontrol/controller/internal/processor_test.go +++ b/pkg/epp/flowcontrol/controller/internal/processor_test.go @@ -913,19 +913,6 @@ func TestShardProcessor(t *testing.T) { }, expectedErr: registryErr, }, - { - name: "on queue remove failure", - setupMocks: func(h *testHarness) { - h.ManagedQueueFunc = func(types.FlowKey) (contracts.ManagedQueue, error) { - return &mocks.MockManagedQueue{ - RemoveFunc: func(types.QueueItemHandle) (types.QueueItemAccessor, error) { - return nil, registryErr - }, - }, nil - } - }, - expectedErr: registryErr, - }, } for _, tc := range testCases { @@ -934,7 +921,7 @@ func TestShardProcessor(t *testing.T) { h := newTestHarness(t, testCleanupTick) tc.setupMocks(h) item := h.newTestItem("req-dispatch-fail", testFlow, testTTL) - err := h.processor.dispatchItem(item, h.logger) + err := h.processor.dispatchItem(item) require.Error(t, err, "dispatchItem should return an error") assert.ErrorIs(t, err, tc.expectedErr, "The underlying registry error should be preserved") }) @@ -957,7 +944,7 @@ func TestShardProcessor(t *testing.T) { } // --- ACT --- - err := h.processor.dispatchItem(item, h.logger) + err := h.processor.dispatchItem(item) // --- ASSERT --- require.NoError(t, err, "dispatchItem should return no error for an already finalized item")