diff --git a/pkg/epp/flowcontrol/contracts/mocks/mocks.go b/pkg/epp/flowcontrol/contracts/mocks/mocks.go index 02f9863fb..10f093b11 100644 --- a/pkg/epp/flowcontrol/contracts/mocks/mocks.go +++ b/pkg/epp/flowcontrol/contracts/mocks/mocks.go @@ -34,6 +34,7 @@ import ( "fmt" "sync" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/contracts" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types" @@ -112,12 +113,12 @@ func (m *MockRegistryShard) Stats() contracts.ShardStats { // MockSaturationDetector is a simple "stub-style" mock for testing. type MockSaturationDetector struct { - IsSaturatedFunc func(ctx context.Context) bool + IsSaturatedFunc func(ctx context.Context, candidatePods []metrics.PodMetrics) bool } -func (m *MockSaturationDetector) IsSaturated(ctx context.Context) bool { +func (m *MockSaturationDetector) IsSaturated(ctx context.Context, candidatePods []metrics.PodMetrics) bool { if m.IsSaturatedFunc != nil { - return m.IsSaturatedFunc(ctx) + return m.IsSaturatedFunc(ctx, candidatePods) } return false } diff --git a/pkg/epp/flowcontrol/contracts/saturationdetector.go b/pkg/epp/flowcontrol/contracts/saturationdetector.go index 91d2406c5..15037d50a 100644 --- a/pkg/epp/flowcontrol/contracts/saturationdetector.go +++ b/pkg/epp/flowcontrol/contracts/saturationdetector.go @@ -16,7 +16,11 @@ limitations under the License. package contracts -import "context" +import ( + "context" + + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" +) // SaturationDetector defines the contract for a component that provides real-time load signals to the // `controller.FlowController`. @@ -32,8 +36,8 @@ import "context" // // Implementations MUST be goroutine-safe. type SaturationDetector interface { - // IsSaturated returns true if the system's backend resources are considered saturated. + // IsSaturated returns true if the system's backend resources are considered saturated for a set of candidate pods. // `controller.FlowController`'s dispatch workers call this method to decide whether to pause or throttle dispatch // operations to prevent overwhelming the backends. - IsSaturated(ctx context.Context) bool + IsSaturated(ctx context.Context, candidatePods []metrics.PodMetrics) bool } diff --git a/pkg/epp/flowcontrol/controller/controller.go b/pkg/epp/flowcontrol/controller/controller.go index 8449159b1..aeb0bdd87 100644 --- a/pkg/epp/flowcontrol/controller/controller.go +++ b/pkg/epp/flowcontrol/controller/controller.go @@ -58,7 +58,7 @@ type shardProcessor interface { // This enables dependency injection for testing. type shardProcessorFactory func( shard contracts.RegistryShard, - dispatchFilter internal.BandFilter, + saturationDetector contracts.SaturationDetector, clock clock.Clock, expiryCleanupInterval time.Duration, enqueueChannelBufferSize int, @@ -130,7 +130,7 @@ func NewFlowController( // Use the real shard processor implementation by default. fc.shardProcessorFactory = func( shard contracts.RegistryShard, - dispatchFilter internal.BandFilter, + saturationDetector contracts.SaturationDetector, clock clock.Clock, expiryCleanupInterval time.Duration, enqueueChannelBufferSize int, @@ -138,7 +138,7 @@ func NewFlowController( ) shardProcessor { return internal.NewShardProcessor( shard, - dispatchFilter, + saturationDetector, clock, expiryCleanupInterval, enqueueChannelBufferSize, @@ -310,10 +310,9 @@ func (fc *FlowController) getOrStartWorker(shard contracts.RegistryShard) *manag // Construct a new worker, but do not start its processor goroutine yet. processorCtx, cancel := context.WithCancel(fc.parentCtx) - dispatchFilter := internal.NewSaturationFilter(fc.saturationDetector) processor := fc.shardProcessorFactory( shard, - dispatchFilter, + fc.saturationDetector, fc.clock, fc.config.ExpiryCleanupInterval, fc.config.EnqueueChannelBufferSize, diff --git a/pkg/epp/flowcontrol/controller/controller_test.go b/pkg/epp/flowcontrol/controller/controller_test.go index 435cdb510..c832e2d6d 100644 --- a/pkg/epp/flowcontrol/controller/controller_test.go +++ b/pkg/epp/flowcontrol/controller/controller_test.go @@ -209,7 +209,7 @@ type mockShardProcessorFactory struct { func (f *mockShardProcessorFactory) new( shard contracts.RegistryShard, - _ internal.BandFilter, + _ contracts.SaturationDetector, _ clock.Clock, _ time.Duration, _ int, @@ -640,7 +640,7 @@ func TestFlowController_Lifecycle(t *testing.T) { h := newUnitHarness(t, t.Context(), Config{}, &mockRegistryClient{}) h.fc.shardProcessorFactory = func( shard contracts.RegistryShard, - _ internal.BandFilter, + _ contracts.SaturationDetector, _ clock.Clock, _ time.Duration, _ int, diff --git a/pkg/epp/flowcontrol/controller/internal/filter.go b/pkg/epp/flowcontrol/controller/internal/filter.go deleted file mode 100644 index 12788d4cb..000000000 --- a/pkg/epp/flowcontrol/controller/internal/filter.go +++ /dev/null @@ -1,144 +0,0 @@ -/* -Copyright 2025 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package internal - -import ( - "context" - - "github.com/go-logr/logr" - - "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/contracts" - "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework" - "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types" - logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" -) - -// BandFilter is a function that acts as a pre-policy gate. It takes a complete view of a priority band and returns a -// potentially filtered `framework.PriorityBandAccessor` containing only the flows that are viable candidates for a -// subsequent policy decision. It can also return a boolean signal to pause the entire operation for the band. -// -// This abstraction decouples the logic of determining *viability* (e.g., is a flow subject to backpressure?) from the -// logic of *selection* (e.g., which of the viable flows is the fairest to pick next?). This separation simplifies the -// mental model for policy authors, who can focus solely on selection logic without needing to account for external -// gating signals. -// -// Because filters are applied before the band is passed to a policy, they are inherently composable. Multiple filters -// can be chained to apply different viability criteria. For example, a future filter could be developed to temporarily -// exclude a "misbehaving" flow that is causing repeated errors, quarantining it from policy consideration. -// -// A nil returned `PriorityBandAccessor` indicates that no filtering was necessary and the original accessor should be -// used. This provides a zero-allocation fast path for the common case where no flows are being filtered. -type BandFilter func( - ctx context.Context, - band framework.PriorityBandAccessor, - logger logr.Logger, -) (filteredBand framework.PriorityBandAccessor, shouldPause bool) - -// NewSaturationFilter creates a `BandFilter` that uses the provided `contracts.SaturationDetector` to determine which -// flows are dispatchable. This is the standard filter used in the production `FlowController` for the dispatch -// operation. -func NewSaturationFilter(sd contracts.SaturationDetector) BandFilter { - return func( - ctx context.Context, - band framework.PriorityBandAccessor, - logger logr.Logger, - ) (framework.PriorityBandAccessor, bool) { - // Phase 1: Implement the current global saturation check. - if sd.IsSaturated(ctx) { - logger.V(logutil.VERBOSE).Info("System saturated, pausing dispatch for this shard.") - return nil, true // Pause dispatching for all bands. - } - - // Phase 2 (Future): This is where per-flow saturation logic would go. - // It would iterate `band`, call `IsSaturated(ctx, flowID)`, and build a filtered map of allowed flows, - // then return `newSubsetPriorityBandAccessor(band, allowedFlows)`. - // For now, no per-flow filtering is done. Return nil to signal the fast path. - return nil, false // Do not pause, and do not filter any flows. - } -} - -// subsetPriorityBandAccessor provides a view of a priority band that is restricted to a specific subset of flows. -// It implements the `framework.PriorityBandAccessor` interface, ensuring that any policy operating on it will only -// see the allowed flows, regardless of which accessor method is used. This provides correctness by construction. -// -// For performance, it pre-computes a slice of the allowed flows at creation time, making subsequent calls to -// `FlowKeys()` an O(1) operation with zero allocations. -type subsetPriorityBandAccessor struct { - originalAccessor framework.PriorityBandAccessor - allowedFlows map[types.FlowKey]struct{} - allowedFlowsSlice []types.FlowKey -} - -var _ framework.PriorityBandAccessor = &subsetPriorityBandAccessor{} - -// newSubsetPriorityBandAccessor creates a new filtered view of a priority band. -func newSubsetPriorityBandAccessor(original framework.PriorityBandAccessor, allowed []types.FlowKey) *subsetPriorityBandAccessor { - // Pre-compute the map for efficient lookups in `Queue()` and `IterateQueues()`. - allowedMap := make(map[types.FlowKey]struct{}, len(allowed)) - for _, k := range allowed { - allowedMap[k] = struct{}{} - } - - return &subsetPriorityBandAccessor{ - originalAccessor: original, - allowedFlows: allowedMap, - allowedFlowsSlice: allowed, - } -} - -// Priority returns the numerical priority level of this band. -func (s *subsetPriorityBandAccessor) Priority() int { - return s.originalAccessor.Priority() -} - -// PriorityName returns the human-readable name of this priority band. -func (s *subsetPriorityBandAccessor) PriorityName() string { - return s.originalAccessor.PriorityName() -} - -// FlowKeys returns a slice of the composite `types.FlowKey`s for every flow instance currently active within this -// priority band that are in the allowed subset. -// This is an O(1) operation because the slice is pre-computed at creation. -func (s *subsetPriorityBandAccessor) FlowKeys() []types.FlowKey { - return s.allowedFlowsSlice -} - -// Queue returns a `framework.FlowQueueAccessor` for the specified `ID` within this priority band, but only if it is -// in the allowed subset. This is an O(1) map lookup. If the flow is not in the allowed subset, it returns nil. -func (s *subsetPriorityBandAccessor) Queue(id string) framework.FlowQueueAccessor { - key := types.FlowKey{ID: id, Priority: s.Priority()} - if _, ok := s.allowedFlows[key]; !ok { - return nil - } - return s.originalAccessor.Queue(id) -} - -// IterateQueues executes the given `callback` for each `framework.FlowQueueAccessor` in the allowed subset of this -// priority band. The iteration stops if the callback returns false. -// This implementation delegates to the original accessor's iterator and applies the filter, which is more robust and -// efficient than iterating over a pre-computed slice of IDs. -func (s *subsetPriorityBandAccessor) IterateQueues(callback func(queue framework.FlowQueueAccessor) bool) { - s.originalAccessor.IterateQueues(func(queue framework.FlowQueueAccessor) bool { - if _, ok := s.allowedFlows[queue.FlowKey()]; ok { - // This queue is in the allowed set, so execute the callback. - if !callback(queue) { - return false // The callback requested to stop, so we stop the outer iteration too. - } - } - return true // Continue iterating over the original set. - }) -} diff --git a/pkg/epp/flowcontrol/controller/internal/filter_test.go b/pkg/epp/flowcontrol/controller/internal/filter_test.go deleted file mode 100644 index 7e51453c4..000000000 --- a/pkg/epp/flowcontrol/controller/internal/filter_test.go +++ /dev/null @@ -1,174 +0,0 @@ -/* -Copyright 2025 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package internal - -import ( - "context" - "sort" - "testing" - - "github.com/go-logr/logr" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/contracts/mocks" - "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework" - frameworkmocks "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/mocks" - "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types" -) - -func TestNewSaturationFilter(t *testing.T) { - t.Parallel() - - testCases := []struct { - name string - isSaturated bool - expectShouldPause bool - expectFilteredBandNil bool - }{ - { - name: "should not pause or filter when system is not saturated", - isSaturated: false, - expectShouldPause: false, - expectFilteredBandNil: true, // nil band signals the fast path - }, - { - name: "should pause when system is saturated", - isSaturated: true, - expectShouldPause: true, - expectFilteredBandNil: true, - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - t.Parallel() - - // --- ARRANGE --- - mockSD := &mocks.MockSaturationDetector{IsSaturatedFunc: func(ctx context.Context) bool { return tc.isSaturated }} - filter := NewSaturationFilter(mockSD) - require.NotNil(t, filter, "NewSaturationFilter should not return nil") - - mockBand := &frameworkmocks.MockPriorityBandAccessor{} - - // --- ACT --- - filteredBand, shouldPause := filter(context.Background(), mockBand, logr.Discard()) - - // --- ASSERT --- - assert.Equal(t, tc.expectShouldPause, shouldPause, "The filter's pause signal should match the expected value") - - if tc.expectFilteredBandNil { - assert.Nil(t, filteredBand, "Expected filtered band to be nil") - } else { - assert.NotNil(t, filteredBand, "Expected a non-nil filtered band") - } - }) - } -} - -func TestSubsetPriorityBandAccessor(t *testing.T) { - t.Parallel() - - // --- ARRANGE --- - // Setup a mock original accessor that knows about three flows. - flowAKey := types.FlowKey{ID: "flow-a", Priority: 10} - flowBKey := types.FlowKey{ID: "flow-b", Priority: 10} - flowCKey := types.FlowKey{ID: "flow-c", Priority: 10} - - mockQueueA := &frameworkmocks.MockFlowQueueAccessor{FlowKeyV: flowAKey} - mockQueueB := &frameworkmocks.MockFlowQueueAccessor{FlowKeyV: flowBKey} - mockQueueC := &frameworkmocks.MockFlowQueueAccessor{FlowKeyV: flowCKey} - - originalAccessor := &frameworkmocks.MockPriorityBandAccessor{ - PriorityV: 10, - PriorityNameV: "High", - FlowKeysFunc: func() []types.FlowKey { - return []types.FlowKey{flowAKey, flowBKey, flowCKey} - }, - QueueFunc: func(id string) framework.FlowQueueAccessor { - switch id { - case "flow-a": - return mockQueueA - case "flow-b": - return mockQueueB - case "flow-c": - return mockQueueC - } - return nil - }, - IterateQueuesFunc: func(callback func(queue framework.FlowQueueAccessor) bool) { - if !callback(mockQueueA) { - return - } - if !callback(mockQueueB) { - return - } - callback(mockQueueC) - }, - } - - // Create a subset view that only allows two of the flows. - allowedFlows := []types.FlowKey{flowAKey, flowCKey} - subsetAccessor := newSubsetPriorityBandAccessor(originalAccessor, allowedFlows) - require.NotNil(t, subsetAccessor, "newSubsetPriorityBandAccessor should not return nil") - - t.Run("should pass through priority and name", func(t *testing.T) { - t.Parallel() - assert.Equal(t, 10, subsetAccessor.Priority(), "Priority() should pass through from the original accessor") - assert.Equal(t, "High", subsetAccessor.PriorityName(), - "PriorityName() should pass through from the original accessor") - }) - - t.Run("should only return allowed flow keys", func(t *testing.T) { - t.Parallel() - flowKeys := subsetAccessor.FlowKeys() - // Sort for consistent comparison, as the pre-computed slice order is not guaranteed. - sort.Slice(flowKeys, func(i, j int) bool { - return flowKeys[i].ID < flowKeys[j].ID - }) - assert.Equal(t, []types.FlowKey{flowAKey, flowCKey}, flowKeys, "FlowKeys() should only return the allowed subset") - }) - - t.Run("should only return queues for allowed flows", func(t *testing.T) { - t.Parallel() - assert.Same(t, mockQueueA, subsetAccessor.Queue("flow-a"), "Should return queue for allowed flow 'a'") - assert.Nil(t, subsetAccessor.Queue("flow-b"), "Should not return queue for disallowed flow 'b'") - assert.Same(t, mockQueueC, subsetAccessor.Queue("flow-c"), "Should return queue for allowed flow 'c'") - }) - - t.Run("should only iterate over allowed queues", func(t *testing.T) { - t.Parallel() - var iterated []string - subsetAccessor.IterateQueues(func(queue framework.FlowQueueAccessor) bool { - iterated = append(iterated, queue.FlowKey().ID) - return true - }) - // Sort for consistent comparison, as iteration order is not guaranteed. - sort.Strings(iterated) - assert.Equal(t, []string{"flow-a", "flow-c"}, iterated, "IterateQueues() should only visit allowed flows") - }) - - t.Run("should stop iteration if callback returns false", func(t *testing.T) { - t.Parallel() - var iterated []string - subsetAccessor.IterateQueues(func(queue framework.FlowQueueAccessor) bool { - iterated = append(iterated, queue.FlowKey().ID) - return false // Exit after the first item. - }) - assert.Len(t, iterated, 1, "Iteration should have stopped after one item") - }) -} diff --git a/pkg/epp/flowcontrol/controller/internal/processor.go b/pkg/epp/flowcontrol/controller/internal/processor.go index fa0118270..9b6bb705f 100644 --- a/pkg/epp/flowcontrol/controller/internal/processor.go +++ b/pkg/epp/flowcontrol/controller/internal/processor.go @@ -65,7 +65,7 @@ var ErrProcessorBusy = errors.New("shard processor is busy") // succeeds. type ShardProcessor struct { shard contracts.RegistryShard - dispatchFilter BandFilter + saturationDetector contracts.SaturationDetector clock clock.Clock expiryCleanupInterval time.Duration logger logr.Logger @@ -81,7 +81,7 @@ type ShardProcessor struct { // NewShardProcessor creates a new `ShardProcessor` instance. func NewShardProcessor( shard contracts.RegistryShard, - dispatchFilter BandFilter, + saturationDetector contracts.SaturationDetector, clock clock.Clock, expiryCleanupInterval time.Duration, enqueueChannelBufferSize int, @@ -89,7 +89,7 @@ func NewShardProcessor( ) *ShardProcessor { return &ShardProcessor{ shard: shard, - dispatchFilter: dispatchFilter, + saturationDetector: saturationDetector, clock: clock, expiryCleanupInterval: expiryCleanupInterval, logger: logger, @@ -290,25 +290,31 @@ func (sp *ShardProcessor) hasCapacity(priority int, itemByteSize uint64) bool { // dispatchCycle attempts to dispatch a single item by iterating through all priority bands from highest to lowest. // It applies the configured policies for each band to select an item and then attempts to dispatch it. -// It returns true if an item was successfully dispatched, and false otherwise, indicating that no work was done in this -// cycle. +// It returns true if an item was successfully dispatched, and false otherwise. // // # Error Handling Philosophy: Failure Isolation & Work Conservation // -// The engine is designed to be resilient to failures in individual policies or transient errors within a specific flow. -// The core principle is failure isolation. A problem in one priority band must not be allowed to halt processing for -// other, healthy bands. +// A problem in one priority band (e.g., a failing policy) must not halt processing for other, healthy bands. +// Therefore, any error during selection or dispatch for a given band is logged, and the processor immediately continues +// to the next-lower priority band to maximize system throughput. // -// To achieve this, any error encountered during the selection or dispatch process for a given priority band is treated -// as a non-critical failure for that band, in this cycle. The processor will log the error for observability and then -// immediately continue its attempt to find work in the next-lower priority band. This promotes work conservation and -// maximizes system throughput even in the presence of partial failures. +// # Strict Policy Adherence vs. Work Conservation +// +// This function's logic strictly adheres to the scheduling decisions made by the configured policies, even at the cost +// of work conservation. After the inter-flow (fairness) and intra-flow (ordering) policies select a request (e.g., +// `A_1` from flow `A`), a post-selection viability check is performed. +// +// If request `A_1` targets saturated backends, this function will stop the entire dispatch cycle for the current tick. +// It will NOT attempt to find other work (like request `B_1` or `A_2`). Instead, it respects the policy decision that +// `A_1` is next and enforces Head-of-Line blocking on it. +// +// # Future Extension Point +// +// The iteration over priority bands is currently a simple, strict-priority loop. This could be abstracted into a third +// policy tier (e.g., an `InterBandDispatchPolicy`) if more complex scheduling between bands, such as Weighted Fair +// Queuing (WFQ), is ever required. func (sp *ShardProcessor) dispatchCycle(ctx context.Context) bool { baseLogger := sp.logger.WithName("dispatchCycle") - - // FUTURE EXTENSION POINT: The iteration over priority bands is currently a simple, strict-priority loop. - // This could be abstracted into a third policy tier (e.g., an `InterBandDispatchPolicy`) if more complex scheduling - // between bands, such as Weighted Fair Queuing (WFQ), is ever required. For now, strict priority is sufficient. for _, priority := range sp.shard.AllOrderedPriorityLevels() { originalBand, err := sp.shard.PriorityBandAccessor(priority) if err != nil { @@ -317,27 +323,16 @@ func (sp *ShardProcessor) dispatchCycle(ctx context.Context) bool { } logger := baseLogger.WithValues("priority", priority, "priorityName", originalBand.PriorityName()) - // Apply the configured filter to get a view of only the dispatchable flows. - dispatchableBand, shouldPause := sp.dispatchFilter(ctx, originalBand, logger) - if shouldPause { - return false // A global gate told us to stop the entire cycle. - } - if dispatchableBand == nil { - // A nil return from the filter indicates the fast path: no filtering was needed. - dispatchableBand = originalBand - } - - // Pass the (potentially filtered) band to the policies. - item, err := sp.selectItem(dispatchableBand, logger) + item, err := sp.selectItem(originalBand, logger) if err != nil { logger.Error(err, "Failed to select item, skipping priority band for this cycle") continue } if item == nil { - // This is the common case where a priority band has no items to dispatch. logger.V(logutil.TRACE).Info("No item selected by dispatch policies, skipping band") continue } + logger = logger.WithValues( "flowKey", item.OriginalRequest().FlowKey(), "flowID", item.OriginalRequest().FlowKey().ID, @@ -345,14 +340,18 @@ func (sp *ShardProcessor) dispatchCycle(ctx context.Context) bool { "reqID", item.OriginalRequest().ID(), "reqByteSize", item.OriginalRequest().ByteSize()) + candidatePods := item.OriginalRequest().CandidatePodsForScheduling() + if sp.saturationDetector.IsSaturated(ctx, candidatePods) { + logger.V(logutil.VERBOSE).Info("Policy's chosen item is for a saturated flow; pausing dispatch and blocking on HoL") + return false + } + if err := sp.dispatchItem(item, logger); err != nil { logger.Error(err, "Failed to dispatch item, skipping priority band for this cycle") continue } - // A successful dispatch occurred, so we return true to signal that work was done. return true } - // No items were dispatched in this cycle across all priority bands. return false } diff --git a/pkg/epp/flowcontrol/controller/internal/processor_test.go b/pkg/epp/flowcontrol/controller/internal/processor_test.go index 461cb3440..4c31d7ae4 100644 --- a/pkg/epp/flowcontrol/controller/internal/processor_test.go +++ b/pkg/epp/flowcontrol/controller/internal/processor_test.go @@ -14,29 +14,6 @@ See the License for the specific language governing permissions and limitations under the License. */ -// -// A Note on the Testing Strategy for `ShardProcessor` -// -// The `ShardProcessor` is a complex concurrent orchestrator. Testing it with concrete implementations would lead to -// flaky, non-deterministic tests. Therefore, we use a high-fidelity `testHarness` with stateful mocks to enable -// reliable and deterministic testing. This is a deliberate and necessary choice for several key reasons: -// -// 1. Deterministic Race Simulation: The harness allows us to pause mock execution at critical moments, making it -// possible to deterministically simulate and verify the processor's behavior during race conditions (e.g., the -// dispatch vs. expiry race). This is impossible with concrete implementations without resorting to unreliable -// sleeps. -// -// 2. Failure Mode Simulation: We can trigger specific, on-demand errors from dependencies to verify the processor's -// resilience and complex error-handling logic (e.g., the `errIntraFlow` circuit breaker). -// -// 3. Interaction and Isolation Testing: Mocks allow us to isolate the `ShardProcessor` from its dependencies. This -// ensures that tests are verifying the processor's orchestration logic (i.e., that it calls its dependencies -// correctly) and are not affected by confounding bugs in those dependencies. -// -// In summary, this is a prerequisite for reliably testing a concurrent engine, not just a simple data -// structure. -// - package internal import ( @@ -57,6 +34,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/log/zap" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/contracts" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/contracts/mocks" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework" @@ -93,9 +71,10 @@ type testHarness struct { startSignal chan struct{} // Core components under test - processor *ShardProcessor - clock *testclock.FakeClock - logger logr.Logger + processor *ShardProcessor + clock *testclock.FakeClock + logger logr.Logger + saturationDetector *mocks.MockSaturationDetector // --- Centralized Mock State --- // The harness's mutex protects the single source of truth for all mock state. @@ -112,13 +91,14 @@ type testHarness struct { func newTestHarness(t *testing.T, expiryCleanupInterval time.Duration) *testHarness { t.Helper() h := &testHarness{ - t: t, - MockRegistryShard: &mocks.MockRegistryShard{}, - clock: testclock.NewFakeClock(time.Now()), - logger: logr.Discard(), - startSignal: make(chan struct{}), - queues: make(map[types.FlowKey]*mocks.MockManagedQueue), - priorityFlows: make(map[int][]types.FlowKey), + t: t, + MockRegistryShard: &mocks.MockRegistryShard{}, + clock: testclock.NewFakeClock(time.Now()), + logger: logr.Discard(), + saturationDetector: &mocks.MockSaturationDetector{}, + startSignal: make(chan struct{}), + queues: make(map[types.FlowKey]*mocks.MockManagedQueue), + priorityFlows: make(map[int][]types.FlowKey), } // Wire up the harness to provide the mock implementations for the shard's dependencies. @@ -138,15 +118,7 @@ func newTestHarness(t *testing.T, expiryCleanupInterval time.Duration) *testHarn } } - // Use a default pass-through filter. - filter := func( - ctx context.Context, - band framework.PriorityBandAccessor, - logger logr.Logger, - ) (framework.PriorityBandAccessor, bool) { - return nil, false - } - h.processor = NewShardProcessor(h, filter, h.clock, expiryCleanupInterval, 100, h.logger) + h.processor = NewShardProcessor(h, h.saturationDetector, h.clock, expiryCleanupInterval, 100, h.logger) require.NotNil(t, h.processor, "NewShardProcessor should not return nil") t.Cleanup(func() { h.Stop() }) @@ -779,17 +751,19 @@ func TestShardProcessor(t *testing.T) { expectDidDispatch: false, }, { - name: "should stop dispatching when filter signals pause", + name: "should block dispatch on HoL saturation", setupHarness: func(h *testHarness) { - // Add an item that *could* be dispatched to prove the pause is effective. - q := h.addQueue(testFlow) - require.NoError(t, q.Add(h.newTestItem("item", testFlow, testTTL))) - h.processor.dispatchFilter = func( - _ context.Context, - _ framework.PriorityBandAccessor, - _ logr.Logger, - ) (framework.PriorityBandAccessor, bool) { - return nil, true // Signal pause. + // Add a high-priority item that will be selected but is saturated. + qHigh := h.addQueue(testFlow) // priority 10 + require.NoError(t, qHigh.Add(h.newTestItem("item-high", testFlow, testTTL))) + + // Add a low-priority, viable item. + keyLow := types.FlowKey{ID: "flow-low", Priority: 5} + qLow := h.addQueue(keyLow) + require.NoError(t, qLow.Add(h.newTestItem("item-low", keyLow, testTTL))) + + h.saturationDetector.IsSaturatedFunc = func(_ context.Context, _ []metrics.PodMetrics) bool { + return true } }, expectDidDispatch: false, @@ -898,50 +872,6 @@ func TestShardProcessor(t *testing.T) { } }) - t.Run("should use filtered view of queues when filter is active", func(t *testing.T) { - t.Parallel() - // --- ARRANGE --- - h := newTestHarness(t, testCleanupTick) - flowB := types.FlowKey{ID: "flow-b", Priority: testFlow.Priority} - h.addQueue(testFlow) - qB := h.addQueue(flowB) - itemB := h.newTestItem("item-b", flowB, testTTL) - require.NoError(t, qB.Add(itemB)) - - // This filter only allows flow-b. - h.processor.dispatchFilter = func( - _ context.Context, - originalBand framework.PriorityBandAccessor, - _ logr.Logger, - ) (framework.PriorityBandAccessor, bool) { - return newSubsetPriorityBandAccessor(originalBand, []types.FlowKey{flowB}), false - } - - // This policy will be given the filtered view, so it should only see flow-b. - h.interFlowPolicySelectQueue = func(band framework.PriorityBandAccessor) (framework.FlowQueueAccessor, error) { - var flowIDs []string - band.IterateQueues(func(fqa framework.FlowQueueAccessor) bool { - flowIDs = append(flowIDs, fqa.FlowKey().ID) - return true - }) - // This is the core assertion of the test. - require.ElementsMatch(t, []string{flowB.ID}, flowIDs, "Policy should only see the filtered flow") - - // Select flow-b to prove the chain works. - q, _ := h.managedQueue(flowB) - return q.FlowQueueAccessor(), nil - } - - // --- ACT --- - dispatched := h.processor.dispatchCycle(context.Background()) - - // --- ASSERT --- - assert.True(t, dispatched, "An item should have been dispatched from the filtered flow") - assert.Equal(t, types.QueueOutcomeDispatched, itemB.finalState.Outcome, - "The dispatched item's outcome should be correct") - assert.NoError(t, itemB.finalState.Err, "The dispatched item should not have an error") - }) - t.Run("should guarantee strict priority by starving lower priority items", func(t *testing.T) { t.Parallel() // --- ARRANGE --- diff --git a/pkg/epp/flowcontrol/types/mocks/mocks.go b/pkg/epp/flowcontrol/types/mocks/mocks.go index dbef031d7..c52c5c2db 100644 --- a/pkg/epp/flowcontrol/types/mocks/mocks.go +++ b/pkg/epp/flowcontrol/types/mocks/mocks.go @@ -22,16 +22,18 @@ import ( "context" "time" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types" ) // MockFlowControlRequest provides a mock implementation of the `types.FlowControlRequest` interface. type MockFlowControlRequest struct { - Ctx context.Context - FlowKeyV types.FlowKey - ByteSizeV uint64 - InitialEffectiveTTLV time.Duration - IDV string + Ctx context.Context + FlowKeyV types.FlowKey + ByteSizeV uint64 + InitialEffectiveTTLV time.Duration + IDV string + CandidatePodsForSchedulingV []*metrics.FakePodMetrics } // NewMockFlowControlRequest creates a new `MockFlowControlRequest` instance. @@ -58,6 +60,14 @@ func (m *MockFlowControlRequest) ByteSize() uint64 { return m. func (m *MockFlowControlRequest) InitialEffectiveTTL() time.Duration { return m.InitialEffectiveTTLV } func (m *MockFlowControlRequest) ID() string { return m.IDV } +func (m *MockFlowControlRequest) CandidatePodsForScheduling() []metrics.PodMetrics { + pods := make([]metrics.PodMetrics, 0, len(m.CandidatePodsForSchedulingV)) + for i, pod := range m.CandidatePodsForSchedulingV { + pods[i] = pod + } + return pods +} + var _ types.FlowControlRequest = &MockFlowControlRequest{} // MockQueueItemHandle provides a mock implementation of the `types.QueueItemHandle` interface. diff --git a/pkg/epp/flowcontrol/types/request.go b/pkg/epp/flowcontrol/types/request.go index 5dfc18eb6..255d3bc45 100644 --- a/pkg/epp/flowcontrol/types/request.go +++ b/pkg/epp/flowcontrol/types/request.go @@ -19,6 +19,8 @@ package types import ( "context" "time" + + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" ) // FlowControlRequest is the contract for an incoming request submitted to the `controller.FlowController`. It @@ -49,6 +51,11 @@ type FlowControlRequest interface { // applied. InitialEffectiveTTL() time.Duration + // CandidatePodsForScheduling passes through a set of candidate pods a request may be admitted to. + // This is necessary for invoking `contracts.SaturationDetector.IsSaturated`, but it is otherwise unused in the Flow + // Control system. + CandidatePodsForScheduling() []metrics.PodMetrics + // ID returns an optional, user-facing unique identifier for this specific request. It is intended for logging, // tracing, and observability. The `controller.FlowController` does not use this ID for dispatching decisions; it uses // the internal, opaque `QueueItemHandle`.