diff --git a/pkg/epp/flowcontrol/contracts/mocks/mocks.go b/pkg/epp/flowcontrol/contracts/mocks/mocks.go index 5431e2c83..c5c8d2e3b 100644 --- a/pkg/epp/flowcontrol/contracts/mocks/mocks.go +++ b/pkg/epp/flowcontrol/contracts/mocks/mocks.go @@ -46,9 +46,8 @@ import ( type MockRegistryShard struct { IDFunc func() string IsActiveFunc func() bool - ActiveManagedQueueFunc func(flowID string) (contracts.ManagedQueue, error) - ManagedQueueFunc func(flowID string, priority uint) (contracts.ManagedQueue, error) - IntraFlowDispatchPolicyFunc func(flowID string, priority uint) (framework.IntraFlowDispatchPolicy, error) + ManagedQueueFunc func(key types.FlowKey) (contracts.ManagedQueue, error) + IntraFlowDispatchPolicyFunc func(key types.FlowKey) (framework.IntraFlowDispatchPolicy, error) InterFlowDispatchPolicyFunc func(priority uint) (framework.InterFlowDispatchPolicy, error) PriorityBandAccessorFunc func(priority uint) (framework.PriorityBandAccessor, error) AllOrderedPriorityLevelsFunc func() []uint @@ -69,23 +68,16 @@ func (m *MockRegistryShard) IsActive() bool { return false } -func (m *MockRegistryShard) ActiveManagedQueue(flowID string) (contracts.ManagedQueue, error) { - if m.ActiveManagedQueueFunc != nil { - return m.ActiveManagedQueueFunc(flowID) - } - return nil, nil -} - -func (m *MockRegistryShard) ManagedQueue(flowID string, priority uint) (contracts.ManagedQueue, error) { +func (m *MockRegistryShard) ManagedQueue(key types.FlowKey) (contracts.ManagedQueue, error) { if m.ManagedQueueFunc != nil { - return m.ManagedQueueFunc(flowID, priority) + return m.ManagedQueueFunc(key) } return nil, nil } -func (m *MockRegistryShard) IntraFlowDispatchPolicy(flowID string, priority uint) (framework.IntraFlowDispatchPolicy, error) { +func (m *MockRegistryShard) IntraFlowDispatchPolicy(key types.FlowKey) (framework.IntraFlowDispatchPolicy, error) { if m.IntraFlowDispatchPolicyFunc != nil { - return m.IntraFlowDispatchPolicyFunc(flowID, priority) + return m.IntraFlowDispatchPolicyFunc(key) } return nil, nil } @@ -148,8 +140,8 @@ func (m *MockSaturationDetector) IsSaturated(ctx context.Context) bool { // 3. **Self-Wiring**: The `FlowQueueAccessor()` method returns the mock itself, ensuring the accessor is always // correctly connected to the queue's state without manual wiring in tests. type MockManagedQueue struct { - // FlowSpecV defines the flow specification for this mock queue. It should be set by the test. - FlowSpecV types.FlowSpecification + // FlowKeyV defines the flow specification for this mock queue. It should be set by the test. + FlowKeyV types.FlowKey // AddFunc allows a test to completely override the default Add behavior. AddFunc func(item types.QueueItemAccessor) error @@ -249,7 +241,7 @@ func (m *MockManagedQueue) Drain() ([]types.QueueItemAccessor, error) { return drained, nil } -func (m *MockManagedQueue) FlowSpec() types.FlowSpecification { return m.FlowSpecV } +func (m *MockManagedQueue) FlowKey() types.FlowKey { return m.FlowKeyV } func (m *MockManagedQueue) Name() string { return "" } func (m *MockManagedQueue) Capabilities() []framework.QueueCapability { return nil } func (m *MockManagedQueue) Comparator() framework.ItemComparator { return nil } diff --git a/pkg/epp/flowcontrol/contracts/registry.go b/pkg/epp/flowcontrol/contracts/registry.go index 843f501ff..b823ae06a 100644 --- a/pkg/epp/flowcontrol/contracts/registry.go +++ b/pkg/epp/flowcontrol/contracts/registry.go @@ -24,11 +24,13 @@ package contracts import ( "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types" ) // RegistryShard defines the read-oriented interface that a `controller.FlowController` worker uses to access its -// specific slice (shard) of the `FlowRegistry`'s state. It provides the necessary methods for a worker to perform its -// dispatch operations by accessing queues and policies in a concurrent-safe manner. +// specific slice (shard) of the `FlowRegistry's` state. It provides a concurrent-safe view of all flow instances, which +// are uniquely identified by their composite `types.FlowKey`. It is the primary contract for performing dispatch +// operations. // // # Conformance // @@ -41,21 +43,19 @@ type RegistryShard interface { // being gracefully drained and should not be given new work. IsActive() bool - // ActiveManagedQueue returns the currently active `ManagedQueue` for a given flow on this shard. This is the queue to - // which new requests for the flow should be enqueued. - // Returns an error wrapping `ErrFlowInstanceNotFound` if no active instance exists for the given `flowID`. - ActiveManagedQueue(flowID string) (ManagedQueue, error) - - // ManagedQueue retrieves a specific (potentially draining) `ManagedQueue` instance from this shard. This allows a - // worker to continue dispatching items from queues that are draining as part of a flow update. - // Returns an error wrapping `ErrFlowInstanceNotFound` if no instance for the given flowID and priority exists. - ManagedQueue(flowID string, priority uint) (ManagedQueue, error) + // ManagedQueue retrieves the managed queue for the given, unique `types.FlowKey`. This is the primary method for + // accessing a specific flow's queue for either enqueueing or dispatching requests. + // + // Returns an error wrapping `ErrPriorityBandNotFound` if the priority specified in the key is not configured, or + // `ErrFlowInstanceNotFound` if no instance exists for the given `key`. + ManagedQueue(key types.FlowKey) (ManagedQueue, error) - // IntraFlowDispatchPolicy retrieves a flow's configured `framework.IntraFlowDispatchPolicy` for this shard. + // IntraFlowDispatchPolicy retrieves a flow's configured `framework.IntraFlowDispatchPolicy` for this shard, + // identified by its unique `FlowKey`. // The registry guarantees that a non-nil default policy (as configured at the priority-band level) is returned if - // none is specified on the flow itself. + // none is specified for the flow. // Returns an error wrapping `ErrFlowInstanceNotFound` if the flow instance does not exist. - IntraFlowDispatchPolicy(flowID string, priority uint) (framework.IntraFlowDispatchPolicy, error) + IntraFlowDispatchPolicy(key types.FlowKey) (framework.IntraFlowDispatchPolicy, error) // InterFlowDispatchPolicy retrieves a priority band's configured `framework.InterFlowDispatchPolicy` for this shard. // The registry guarantees that a non-nil default policy is returned if none is configured for the band. @@ -86,8 +86,6 @@ type RegistryShard interface { // # Conformance // // - All methods (including those embedded from `framework.SafeQueue`) MUST be goroutine-safe. -// - The `Add()` method MUST reject new items if the queue has been marked as "draining" by the `FlowRegistry`, -// ensuring that lifecycle changes are respected even by consumers holding a stale pointer to the queue. // - All mutating methods (`Add()`, `Remove()`, `Cleanup()`, `Drain()`) MUST atomically update relevant statistics // (e.g., queue length, byte size). type ManagedQueue interface { diff --git a/pkg/epp/flowcontrol/controller/internal/filter.go b/pkg/epp/flowcontrol/controller/internal/filter.go index f7e731379..0a0669224 100644 --- a/pkg/epp/flowcontrol/controller/internal/filter.go +++ b/pkg/epp/flowcontrol/controller/internal/filter.go @@ -23,12 +23,13 @@ import ( "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/contracts" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" ) // BandFilter is a function that acts as a pre-policy gate. It takes a complete view of a priority band and returns a -// subset of flow IDs that are considered viable candidates for a subsequent policy decision. It can also return a -// boolean signal to pause the entire operation for the band. +// potentially filtered `framework.PriorityBandAccessor` containing only the flows that are viable candidates for a +// subsequent policy decision. It can also return a boolean signal to pause the entire operation for the band. // // This abstraction decouples the logic of determining *viability* (e.g., is a flow subject to backpressure?) from the // logic of *selection* (e.g., which of the viable flows is the fairest to pick next?). This separation simplifies the @@ -39,13 +40,13 @@ import ( // can be chained to apply different viability criteria. For example, a future filter could be developed to temporarily // exclude a "misbehaving" flow that is causing repeated errors, quarantining it from policy consideration. // -// A nil `allowedFlows` map indicates that no filtering is necessary and all flows in the band are visible. -// This provides a zero-allocation fast path for the common case where no flows are being filtered. +// A nil returned `PriorityBandAccessor` indicates that no filtering was necessary and the original accessor should be +// used. This provides a zero-allocation fast path for the common case where no flows are being filtered. type BandFilter func( ctx context.Context, band framework.PriorityBandAccessor, logger logr.Logger, -) (allowedFlows map[string]struct{}, shouldPause bool) +) (filteredBand framework.PriorityBandAccessor, shouldPause bool) // NewSaturationFilter creates a `BandFilter` that uses the provided `contracts.SaturationDetector` to determine which // flows are dispatchable. This is the standard filter used in the production `FlowController` for the dispatch @@ -55,7 +56,7 @@ func NewSaturationFilter(sd contracts.SaturationDetector) BandFilter { ctx context.Context, band framework.PriorityBandAccessor, logger logr.Logger, - ) (map[string]struct{}, bool) { + ) (framework.PriorityBandAccessor, bool) { // Phase 1: Implement the current global saturation check. if sd.IsSaturated(ctx) { logger.V(logutil.VERBOSE).Info("System saturated, pausing dispatch for this shard.") @@ -63,7 +64,8 @@ func NewSaturationFilter(sd contracts.SaturationDetector) BandFilter { } // Phase 2 (Future): This is where per-flow saturation logic would go. - // It would iterate `band`, call `IsSaturated(ctx, flowID)`, and build a filtered map of allowed flows. + // It would iterate `band`, call `IsSaturated(ctx, flowID)`, and build a filtered map of allowed flows, + // then return `newSubsetPriorityBandAccessor(band, allowedFlows)`. // For now, no per-flow filtering is done. Return nil to signal the fast path. return nil, false // Do not pause, and do not filter any flows. } @@ -73,31 +75,28 @@ func NewSaturationFilter(sd contracts.SaturationDetector) BandFilter { // It implements the `framework.PriorityBandAccessor` interface, ensuring that any policy operating on it will only // see the allowed flows, regardless of which accessor method is used. This provides correctness by construction. // -// For performance, it pre-computes a slice of the allowed flow IDs at creation time, making subsequent calls to -// `FlowIDs()` an O(1) operation with zero allocations. +// For performance, it pre-computes a slice of the allowed flows at creation time, making subsequent calls to +// `FlowKeys()` an O(1) operation with zero allocations. type subsetPriorityBandAccessor struct { originalAccessor framework.PriorityBandAccessor - allowedFlows map[string]struct{} - allowedFlowsSlice []string + allowedFlows map[types.FlowKey]struct{} + allowedFlowsSlice []types.FlowKey } var _ framework.PriorityBandAccessor = &subsetPriorityBandAccessor{} // newSubsetPriorityBandAccessor creates a new filtered view of a priority band. -func newSubsetPriorityBandAccessor( - original framework.PriorityBandAccessor, - allowed map[string]struct{}, -) *subsetPriorityBandAccessor { - // Pre-compute the slice of flow IDs for performance. - ids := make([]string, 0, len(allowed)) - for id := range allowed { - ids = append(ids, id) +func newSubsetPriorityBandAccessor(original framework.PriorityBandAccessor, allowed []types.FlowKey) *subsetPriorityBandAccessor { + // Pre-compute the map for efficient lookups in `Queue()` and `IterateQueues()`. + allowedMap := make(map[types.FlowKey]struct{}, len(allowed)) + for _, k := range allowed { + allowedMap[k] = struct{}{} } return &subsetPriorityBandAccessor{ originalAccessor: original, - allowedFlows: allowed, - allowedFlowsSlice: ids, + allowedFlows: allowedMap, + allowedFlowsSlice: allowed, } } @@ -111,19 +110,21 @@ func (s *subsetPriorityBandAccessor) PriorityName() string { return s.originalAccessor.PriorityName() } -// FlowIDs returns a slice of all flow IDs within this priority band that are in the allowed subset. +// FlowKeys returns a slice of the composite `types.FlowKey`s for every flow instance currently active within this +// priority band that are in the allowed subset. // This is an O(1) operation because the slice is pre-computed at creation. -func (s *subsetPriorityBandAccessor) FlowIDs() []string { +func (s *subsetPriorityBandAccessor) FlowKeys() []types.FlowKey { return s.allowedFlowsSlice } -// Queue returns a `framework.FlowQueueAccessor` for the specified `flowID` within this priority band, but only if it is +// Queue returns a `framework.FlowQueueAccessor` for the specified `ID` within this priority band, but only if it is // in the allowed subset. This is an O(1) map lookup. If the flow is not in the allowed subset, it returns nil. -func (s *subsetPriorityBandAccessor) Queue(flowID string) framework.FlowQueueAccessor { - if _, ok := s.allowedFlows[flowID]; !ok { +func (s *subsetPriorityBandAccessor) Queue(id string) framework.FlowQueueAccessor { + key := types.FlowKey{ID: id, Priority: s.Priority()} + if _, ok := s.allowedFlows[key]; !ok { return nil } - return s.originalAccessor.Queue(flowID) + return s.originalAccessor.Queue(id) } // IterateQueues executes the given `callback` for each `framework.FlowQueueAccessor` in the allowed subset of this @@ -132,7 +133,7 @@ func (s *subsetPriorityBandAccessor) Queue(flowID string) framework.FlowQueueAcc // efficient than iterating over a pre-computed slice of IDs. func (s *subsetPriorityBandAccessor) IterateQueues(callback func(queue framework.FlowQueueAccessor) bool) { s.originalAccessor.IterateQueues(func(queue framework.FlowQueueAccessor) bool { - if _, ok := s.allowedFlows[queue.FlowSpec().ID]; ok { + if _, ok := s.allowedFlows[queue.FlowKey()]; ok { // This queue is in the allowed set, so execute the callback. if !callback(queue) { return false // The callback requested to stop, so we stop the outer iteration too. diff --git a/pkg/epp/flowcontrol/controller/internal/filter_test.go b/pkg/epp/flowcontrol/controller/internal/filter_test.go index f51581eaf..ceff9e83f 100644 --- a/pkg/epp/flowcontrol/controller/internal/filter_test.go +++ b/pkg/epp/flowcontrol/controller/internal/filter_test.go @@ -35,22 +35,22 @@ func TestNewSaturationFilter(t *testing.T) { t.Parallel() testCases := []struct { - name string - isSaturated bool - expectShouldPause bool - expectAllowed map[string]struct{} + name string + isSaturated bool + expectShouldPause bool + expectFilteredBandNil bool }{ { - name: "should not pause or filter when system is not saturated", - isSaturated: false, - expectShouldPause: false, - expectAllowed: nil, // nil map signals the fast path + name: "should not pause or filter when system is not saturated", + isSaturated: false, + expectShouldPause: false, + expectFilteredBandNil: true, // nil band signals the fast path }, { - name: "should pause when system is saturated", - isSaturated: true, - expectShouldPause: true, - expectAllowed: nil, + name: "should pause when system is saturated", + isSaturated: true, + expectShouldPause: true, + expectFilteredBandNil: true, }, } @@ -66,15 +66,15 @@ func TestNewSaturationFilter(t *testing.T) { mockBand := &frameworkmocks.MockPriorityBandAccessor{} // --- ACT --- - allowed, shouldPause := filter(context.Background(), mockBand, logr.Discard()) + filteredBand, shouldPause := filter(context.Background(), mockBand, logr.Discard()) // --- ASSERT --- assert.Equal(t, tc.expectShouldPause, shouldPause, "The filter's pause signal should match the expected value") - if tc.expectAllowed == nil { - assert.Nil(t, allowed, "Expected allowed map to be nil for the fast path") + if tc.expectFilteredBandNil { + assert.Nil(t, filteredBand, "Expected filtered band to be nil") } else { - assert.Equal(t, tc.expectAllowed, allowed, "The set of allowed flows should match the expected value") + assert.NotNil(t, filteredBand, "Expected a non-nil filtered band") } }) } @@ -85,15 +85,19 @@ func TestSubsetPriorityBandAccessor(t *testing.T) { // --- ARRANGE --- // Setup a mock original accessor that knows about three flows. - mockQueueA := &frameworkmocks.MockFlowQueueAccessor{FlowSpecV: types.FlowSpecification{ID: "flow-a"}} - mockQueueB := &frameworkmocks.MockFlowQueueAccessor{FlowSpecV: types.FlowSpecification{ID: "flow-b"}} - mockQueueC := &frameworkmocks.MockFlowQueueAccessor{FlowSpecV: types.FlowSpecification{ID: "flow-c"}} + flowAKey := types.FlowKey{ID: "flow-a", Priority: 10} + flowBKey := types.FlowKey{ID: "flow-b", Priority: 10} + flowCKey := types.FlowKey{ID: "flow-c", Priority: 10} + + mockQueueA := &frameworkmocks.MockFlowQueueAccessor{FlowKeyV: flowAKey} + mockQueueB := &frameworkmocks.MockFlowQueueAccessor{FlowKeyV: flowBKey} + mockQueueC := &frameworkmocks.MockFlowQueueAccessor{FlowKeyV: flowCKey} originalAccessor := &frameworkmocks.MockPriorityBandAccessor{ PriorityV: 10, PriorityNameV: "High", - FlowIDsFunc: func() []string { - return []string{"flow-a", "flow-b", "flow-c"} + FlowKeysFunc: func() []types.FlowKey { + return []types.FlowKey{flowAKey, flowBKey, flowCKey} }, QueueFunc: func(id string) framework.FlowQueueAccessor { switch id { @@ -118,10 +122,7 @@ func TestSubsetPriorityBandAccessor(t *testing.T) { } // Create a subset view that only allows two of the flows. - allowedFlows := map[string]struct{}{ - "flow-a": {}, - "flow-c": {}, - } + allowedFlows := []types.FlowKey{flowAKey, flowCKey} subsetAccessor := newSubsetPriorityBandAccessor(originalAccessor, allowedFlows) require.NotNil(t, subsetAccessor, "newSubsetPriorityBandAccessor should not return nil") @@ -132,12 +133,14 @@ func TestSubsetPriorityBandAccessor(t *testing.T) { "PriorityName() should pass through from the original accessor") }) - t.Run("should only return allowed flow IDs", func(t *testing.T) { + t.Run("should only return allowed flow keys", func(t *testing.T) { t.Parallel() - flowIDs := subsetAccessor.FlowIDs() + flowKeys := subsetAccessor.FlowKeys() // Sort for consistent comparison, as the pre-computed slice order is not guaranteed. - sort.Strings(flowIDs) - assert.Equal(t, []string{"flow-a", "flow-c"}, flowIDs, "FlowIDs() should only return the allowed subset") + sort.Slice(flowKeys, func(i, j int) bool { + return flowKeys[i].ID < flowKeys[j].ID + }) + assert.Equal(t, []types.FlowKey{flowAKey, flowCKey}, flowKeys, "FlowKeys() should only return the allowed subset") }) t.Run("should only return queues for allowed flows", func(t *testing.T) { @@ -151,7 +154,7 @@ func TestSubsetPriorityBandAccessor(t *testing.T) { t.Parallel() var iterated []string subsetAccessor.IterateQueues(func(queue framework.FlowQueueAccessor) bool { - iterated = append(iterated, queue.FlowSpec().ID) + iterated = append(iterated, queue.FlowKey().ID) return true }) // Sort for consistent comparison, as iteration order is not guaranteed. @@ -163,7 +166,7 @@ func TestSubsetPriorityBandAccessor(t *testing.T) { t.Parallel() var iterated []string subsetAccessor.IterateQueues(func(queue framework.FlowQueueAccessor) bool { - iterated = append(iterated, queue.FlowSpec().ID) + iterated = append(iterated, queue.FlowKey().ID) return false // Exit after the first item. }) assert.Len(t, iterated, 1, "Iteration should have stopped after one item") diff --git a/pkg/epp/flowcontrol/controller/internal/item_test.go b/pkg/epp/flowcontrol/controller/internal/item_test.go index 11e08ae56..d50aaed41 100644 --- a/pkg/epp/flowcontrol/controller/internal/item_test.go +++ b/pkg/epp/flowcontrol/controller/internal/item_test.go @@ -41,7 +41,8 @@ func TestItem(t *testing.T) { t.Run("should have a non-finalized state upon creation", func(t *testing.T) { t.Parallel() - req := typesmocks.NewMockFlowControlRequest(100, "req-1", "flow-a", context.Background()) + key := types.FlowKey{ID: "flow-a", Priority: 10} + req := typesmocks.NewMockFlowControlRequest(100, "req-1", key, context.Background()) item := NewItem(req, time.Minute, time.Now()) require.NotNil(t, item, "NewItem should not return nil") outcome, err := item.FinalState() diff --git a/pkg/epp/flowcontrol/controller/internal/processor.go b/pkg/epp/flowcontrol/controller/internal/processor.go index cef5b3562..7f9c8ee3a 100644 --- a/pkg/epp/flowcontrol/controller/internal/processor.go +++ b/pkg/epp/flowcontrol/controller/internal/processor.go @@ -199,36 +199,37 @@ func (sp *ShardProcessor) Enqueue(item *flowItem) { // main `Run` goroutine, making its "check-then-act" logic for capacity safe. func (sp *ShardProcessor) enqueue(item *flowItem) { req := item.OriginalRequest() + key := req.FlowKey() + logger := log.FromContext(req.Context()).WithName("enqueue").WithValues( - "flowID", req.FlowID(), + "flowKey", key, + "flowID", key.ID, + "priority", key.Priority, "reqID", req.ID(), "reqByteSize", req.ByteSize(), ) - managedQ, err := sp.shard.ActiveManagedQueue(req.FlowID()) + managedQ, err := sp.shard.ManagedQueue(key) if err != nil { - // This is a significant configuration error; an active queue should exist for a valid flow. - finalErr := fmt.Errorf("configuration error: failed to get active queue for flow %q: %w", req.FlowID(), err) + finalErr := fmt.Errorf("configuration error: failed to get queue for flow key %s: %w", key, err) logger.Error(finalErr, "Rejecting item.") item.finalize(types.QueueOutcomeRejectedOther, fmt.Errorf("%w: %w", types.ErrRejected, finalErr)) return } - priority := managedQ.FlowQueueAccessor().FlowSpec().Priority - logger = logger.WithValues("priority", priority) - band, err := sp.shard.PriorityBandAccessor(priority) + band, err := sp.shard.PriorityBandAccessor(key.Priority) if err != nil { - finalErr := fmt.Errorf("configuration error: failed to get priority band for priority %d: %w", priority, err) + finalErr := fmt.Errorf("configuration error: failed to get priority band for priority %d: %w", key.Priority, err) logger.Error(finalErr, "Rejecting item.") item.finalize(types.QueueOutcomeRejectedOther, fmt.Errorf("%w: %w", types.ErrRejected, finalErr)) return } logger = logger.WithValues("priorityName", band.PriorityName()) - if !sp.hasCapacity(priority, req.ByteSize()) { + if !sp.hasCapacity(key.Priority, req.ByteSize()) { // This is an expected outcome, not a system error. Log at the default level with rich context. stats := sp.shard.Stats() - bandStats := stats.PerPriorityBandStats[priority] + bandStats := stats.PerPriorityBandStats[key.Priority] logger.V(logutil.DEFAULT).Info("Rejecting request, queue at capacity", "outcome", types.QueueOutcomeRejectedCapacity, "shardTotalBytes", stats.TotalByteSize, @@ -250,15 +251,14 @@ func (sp *ShardProcessor) enqueue(item *flowItem) { // eventually find and evict any finalized item that slips through this check and is added to a queue. if item.isFinalized() { outcome, err := item.FinalState() - logger.V(logutil.VERBOSE).Info("Item finalized before adding to queue, ignoring.", - "outcome", outcome, "err", err) + logger.V(logutil.VERBOSE).Info("Item finalized before adding to queue, ignoring.", "outcome", outcome, "err", err) return } // This is the point of commitment. After this call, the item is officially in the queue and is the responsibility of // the dispatch or cleanup loops to finalize. if err := managedQ.Add(item); err != nil { - finalErr := fmt.Errorf("failed to add item to queue for flow %q: %w", req.FlowID(), err) + finalErr := fmt.Errorf("failed to add item to queue for flow key %s: %w", key, err) logger.Error(finalErr, "Rejecting item.") item.finalize(types.QueueOutcomeRejectedOther, fmt.Errorf("%w: %w", types.ErrRejected, finalErr)) return @@ -309,27 +309,25 @@ func (sp *ShardProcessor) dispatchCycle(ctx context.Context) bool { // This could be abstracted into a third policy tier (e.g., an `InterBandDispatchPolicy`) if more complex scheduling // between bands, such as Weighted Fair Queuing (WFQ), is ever required. For now, strict priority is sufficient. for _, priority := range sp.shard.AllOrderedPriorityLevels() { - band, err := sp.shard.PriorityBandAccessor(priority) + originalBand, err := sp.shard.PriorityBandAccessor(priority) if err != nil { baseLogger.Error(err, "Failed to get PriorityBandAccessor, skipping band", "priority", priority) continue } - logger := baseLogger.WithValues("priority", priority, "priorityName", band.PriorityName()) + logger := baseLogger.WithValues("priority", priority, "priorityName", originalBand.PriorityName()) // Apply the configured filter to get a view of only the dispatchable flows. - allowedFlows, shouldPause := sp.dispatchFilter(ctx, band, logger) + dispatchableBand, shouldPause := sp.dispatchFilter(ctx, originalBand, logger) if shouldPause { return false // A global gate told us to stop the entire cycle. } - - dispatchableBand := band - if allowedFlows != nil { - // An explicit subset of flows is allowed; create a filtered view. - dispatchableBand = newSubsetPriorityBandAccessor(band, allowedFlows) + if dispatchableBand == nil { + // A nil return from the filter indicates the fast path: no filtering was needed. + dispatchableBand = originalBand } // Pass the (potentially filtered) band to the policies. - item, dispatchPriority, err := sp.selectItem(dispatchableBand, logger) + item, err := sp.selectItem(dispatchableBand, logger) if err != nil { // The error handling strategy depends on the type of failure (inter- vs. intra-flow). if errors.Is(err, errIntraFlow) { @@ -344,9 +342,14 @@ func (sp *ShardProcessor) dispatchCycle(ctx context.Context) bool { logger.V(logutil.TRACE).Info("No item selected by dispatch policies, skipping band") continue } - logger = logger.WithValues("flowID", item.OriginalRequest().FlowID(), "reqID", item.OriginalRequest().ID()) - - if err := sp.dispatchItem(item, dispatchPriority, logger); err != nil { + logger = logger.WithValues( + "flowKey", item.OriginalRequest().FlowKey(), + "flowID", item.OriginalRequest().FlowKey().ID, + "flowPriority", item.OriginalRequest().FlowKey().Priority, + "reqID", item.OriginalRequest().ID(), + "reqByteSize", item.OriginalRequest().ByteSize()) + + if err := sp.dispatchItem(item, logger); err != nil { // All errors from dispatchItem are considered intra-flow and unrecoverable for this band in this cycle. logger.Error(err, "Failed to dispatch item, skipping priority band for this cycle") continue @@ -363,52 +366,52 @@ func (sp *ShardProcessor) dispatchCycle(ctx context.Context) bool { func (sp *ShardProcessor) selectItem( band framework.PriorityBandAccessor, logger logr.Logger, -) (types.QueueItemAccessor, uint, error) { +) (types.QueueItemAccessor, error) { interP, err := sp.shard.InterFlowDispatchPolicy(band.Priority()) if err != nil { - return nil, 0, fmt.Errorf("%w: could not get InterFlowDispatchPolicy: %w", errInterFlow, err) + return nil, fmt.Errorf("%w: could not get InterFlowDispatchPolicy: %w", errInterFlow, err) } queue, err := interP.SelectQueue(band) if err != nil { - return nil, 0, fmt.Errorf("%w: InterFlowDispatchPolicy %q failed to select queue: %w", + return nil, fmt.Errorf("%w: InterFlowDispatchPolicy %q failed to select queue: %w", errInterFlow, interP.Name(), err) } if queue == nil { logger.V(logutil.TRACE).Info("No queue selected by InterFlowDispatchPolicy") - return nil, 0, nil - } - logger = logger.WithValues("selectedFlowID", queue.FlowSpec().ID) - - priority := queue.FlowSpec().Priority - intraP, err := sp.shard.IntraFlowDispatchPolicy(queue.FlowSpec().ID, priority) + return nil, nil + } + key := queue.FlowKey() + logger = logger.WithValues( + "selectedFlowKey", key, + "selectedFlowID", key.ID, + "selectedFlowPriority", key.Priority) + intraP, err := sp.shard.IntraFlowDispatchPolicy(key) if err != nil { // This is an intra-flow failure because we have already successfully selected a queue. - return nil, 0, fmt.Errorf("%w: could not get IntraFlowDispatchPolicy for flow %q: %w", - errIntraFlow, queue.FlowSpec().ID, err) + return nil, fmt.Errorf("%w: could not get IntraFlowDispatchPolicy for flow %q: %w", errIntraFlow, key, err) } item, err := intraP.SelectItem(queue) if err != nil { - return nil, 0, fmt.Errorf("%w: IntraFlowDispatchPolicy %q failed to select item for flow %q: %w", - errIntraFlow, intraP.Name(), queue.FlowSpec().ID, err) + return nil, fmt.Errorf("%w: IntraFlowDispatchPolicy %q failed to select item for flow %q: %w", + errIntraFlow, intraP.Name(), key, err) } if item == nil { logger.V(logutil.TRACE).Info("No item selected by IntraFlowDispatchPolicy") - return nil, 0, nil + return nil, nil } - return item, priority, nil + return item, nil } // dispatchItem handles the final steps of dispatching an item after it has been selected by policies. This includes // removing it from its queue, checking for last-minute expiry, and finalizing its outcome. -func (sp *ShardProcessor) dispatchItem(itemAcc types.QueueItemAccessor, priority uint, logger logr.Logger) error { +func (sp *ShardProcessor) dispatchItem(itemAcc types.QueueItemAccessor, logger logr.Logger) error { logger = logger.WithName("dispatchItem") req := itemAcc.OriginalRequest() // We must look up the queue by its specific priority, as a flow might have draining queues at other levels. - managedQ, err := sp.shard.ManagedQueue(req.FlowID(), priority) + managedQ, err := sp.shard.ManagedQueue(req.FlowKey()) if err != nil { - return fmt.Errorf("%w: failed to get ManagedQueue for flow %q at priority %d: %w", - errIntraFlow, req.FlowID(), priority, err) + return fmt.Errorf("%w: failed to get ManagedQueue for flow %q: %w", errIntraFlow, req.FlowKey(), err) } // The core mutation: remove the item from the queue. @@ -418,7 +421,7 @@ func (sp *ShardProcessor) dispatchItem(itemAcc types.QueueItemAccessor, priority // selected by the policy and the time this function is called. logger.V(logutil.VERBOSE).Info("Item already removed from queue, likely by expiry cleanup", "err", err) return fmt.Errorf("%w: failed to remove item %q from queue for flow %q: %w", - errIntraFlow, req.ID(), req.FlowID(), err) + errIntraFlow, req.ID(), req.FlowKey(), err) } removedItem, ok := removedItemAcc.(*flowItem) @@ -625,8 +628,12 @@ func (sp *ShardProcessor) processAllQueuesConcurrently( go func() { defer wg.Done() for q := range tasks { - queueLogger := logger.WithValues("flowID", q.FlowSpec().ID, "priority", q.FlowSpec().Priority) - managedQ, err := sp.shard.ManagedQueue(q.FlowSpec().ID, q.FlowSpec().Priority) + key := q.FlowKey() + queueLogger := logger.WithValues( + "flowKey", key, + "flowID", key.ID, + "flowPriority", key.Priority) + managedQ, err := sp.shard.ManagedQueue(key) if err != nil { queueLogger.Error(err, "Failed to get ManagedQueue") continue diff --git a/pkg/epp/flowcontrol/controller/internal/processor_test.go b/pkg/epp/flowcontrol/controller/internal/processor_test.go index 09f1206f9..67657a9a4 100644 --- a/pkg/epp/flowcontrol/controller/internal/processor_test.go +++ b/pkg/epp/flowcontrol/controller/internal/processor_test.go @@ -33,7 +33,7 @@ limitations under the License. // ensures that tests are verifying the processor's orchestration logic (i.e., that it calls its dependencies // correctly) and are not affected by confounding bugs in those dependencies. // -// In summary, this strategy is a prerequisite for reliably testing a concurrent engine, not just a simple data +// In summary, this is a prerequisite for reliably testing a concurrent engine, not just a simple data // structure. // @@ -71,7 +71,7 @@ const ( testWaitTimeout = 1 * time.Second ) -var testFlow = types.FlowSpecification{ID: "flow-a", Priority: 10} +var testFlow = types.FlowKey{ID: "flow-a", Priority: 10} // TestMain sets up the logger for all tests in the package. func TestMain(m *testing.M) { @@ -121,8 +121,8 @@ type testHarness struct { // --- Centralized Mock State --- // The harness's mutex protects the single source of truth for all mock state. mu sync.Mutex - queues map[string]*mocks.MockManagedQueue // Key: `flowID` - priorityFlows map[uint][]string // Key: `priority`, Val: slice of `flowIDs` + queues map[types.FlowKey]*mocks.MockManagedQueue + priorityFlows map[uint][]types.FlowKey // Key: `priority` // Customizable policy logic for tests to override. interFlowPolicySelectQueue func(band framework.PriorityBandAccessor) (framework.FlowQueueAccessor, error) @@ -138,12 +138,11 @@ func newTestHarness(t *testing.T, expiryCleanupInterval time.Duration) *testHarn mockClock: newMockClock(), logger: logr.Discard(), startSignal: make(chan struct{}), - queues: make(map[string]*mocks.MockManagedQueue), - priorityFlows: make(map[uint][]string), + queues: make(map[types.FlowKey]*mocks.MockManagedQueue), + priorityFlows: make(map[uint][]types.FlowKey), } // Wire up the harness to provide the mock implementations for the shard's dependencies. - h.ActiveManagedQueueFunc = h.activeManagedQueue h.ManagedQueueFunc = h.managedQueue h.AllOrderedPriorityLevelsFunc = h.allOrderedPriorityLevels h.PriorityBandAccessorFunc = h.priorityBandAccessor @@ -165,7 +164,7 @@ func newTestHarness(t *testing.T, expiryCleanupInterval time.Duration) *testHarn ctx context.Context, band framework.PriorityBandAccessor, logger logr.Logger, - ) (map[string]struct{}, bool) { + ) (framework.PriorityBandAccessor, bool) { return nil, false } h.processor = NewShardProcessor(h, filter, h.mockClock, expiryCleanupInterval, h.logger) @@ -215,48 +214,38 @@ func (h *testHarness) waitForFinalization(item *flowItem) (types.QueueOutcome, e } // newTestItem creates a new flowItem for testing purposes. -func (h *testHarness) newTestItem(id, flowID string, ttl time.Duration) *flowItem { +func (h *testHarness) newTestItem(id string, key types.FlowKey, ttl time.Duration) *flowItem { h.t.Helper() ctx := log.IntoContext(context.Background(), h.logger) - req := typesmocks.NewMockFlowControlRequest(100, id, flowID, ctx) + req := typesmocks.NewMockFlowControlRequest(100, id, key, ctx) return NewItem(req, ttl, h.mockClock.Now()) } // addQueue centrally registers a new mock queue for a given flow, ensuring all harness components are aware of it. -func (h *testHarness) addQueue(spec types.FlowSpecification) *mocks.MockManagedQueue { +func (h *testHarness) addQueue(key types.FlowKey) *mocks.MockManagedQueue { h.t.Helper() h.mu.Lock() defer h.mu.Unlock() - mockQueue := &mocks.MockManagedQueue{FlowSpecV: spec} - h.queues[spec.ID] = mockQueue + mockQueue := &mocks.MockManagedQueue{FlowKeyV: key} + h.queues[key] = mockQueue - // Add the `flowID` to the correct priority band, creating the band if needed. - h.priorityFlows[spec.Priority] = append(h.priorityFlows[spec.Priority], spec.ID) + // Add the key to the correct priority band, creating the band if needed. + h.priorityFlows[key.Priority] = append(h.priorityFlows[key.Priority], key) return mockQueue } // --- Mock Interface Implementations --- -// activeManagedQueue provides the mock implementation for the `RegistryShard` interface. -func (h *testHarness) activeManagedQueue(flowID string) (contracts.ManagedQueue, error) { - h.mu.Lock() - defer h.mu.Unlock() - if q, ok := h.queues[flowID]; ok { - return q, nil - } - return nil, fmt.Errorf("test setup error: no active queue for flow %q", flowID) -} - // managedQueue provides the mock implementation for the `RegistryShard` interface. -func (h *testHarness) managedQueue(flowID string, priority uint) (contracts.ManagedQueue, error) { +func (h *testHarness) managedQueue(key types.FlowKey) (contracts.ManagedQueue, error) { h.mu.Lock() defer h.mu.Unlock() - if q, ok := h.queues[flowID]; ok && q.FlowSpec().Priority == priority { + if q, ok := h.queues[key]; ok { return q, nil } - return nil, fmt.Errorf("test setup error: no queue for %q at priority %d", flowID, priority) + return nil, fmt.Errorf("test setup error: no queue for %q", key) } // allOrderedPriorityLevels provides the mock implementation for the `RegistryShard` interface. @@ -278,15 +267,15 @@ func (h *testHarness) priorityBandAccessor(p uint) (framework.PriorityBandAccess // Safely get a snapshot of the flow IDs under a lock. h.mu.Lock() - flowIDsForPriority := h.priorityFlows[p] + flowKeysForPriority := h.priorityFlows[p] h.mu.Unlock() // Configure the mock's behavior with a closure that reads from the harness's centralized, thread-safe state. band.IterateQueuesFunc = func(cb func(fqa framework.FlowQueueAccessor) bool) { // This closure safely iterates over the snapshot of flow IDs. - for _, id := range flowIDsForPriority { + for _, key := range flowKeysForPriority { // Get the queue using the thread-safe `managedQueue` method. - q, err := h.managedQueue(id, p) + q, err := h.managedQueue(key) if err == nil && q != nil { mq := q.(*mocks.MockManagedQueue) if !cb(mq.FlowQueueAccessor()) { @@ -323,7 +312,7 @@ func (h *testHarness) interFlowDispatchPolicy(p uint) (framework.InterFlowDispat } // intraFlowDispatchPolicy provides the mock implementation for the `contracts.RegistryShard` interface. -func (h *testHarness) intraFlowDispatchPolicy(flowID string, priority uint) (framework.IntraFlowDispatchPolicy, error) { +func (h *testHarness) intraFlowDispatchPolicy(types.FlowKey) (framework.IntraFlowDispatchPolicy, error) { policy := &frameworkmocks.MockIntraFlowDispatchPolicy{} // If the test provided a custom implementation, use it. if h.intraFlowPolicySelectItem != nil { @@ -351,8 +340,8 @@ func TestShardProcessor(t *testing.T) { t.Parallel() // --- ARRANGE --- h := newTestHarness(t, testCleanupTick) - item := h.newTestItem("req-dispatch-success", testFlow.ID, testTTL) - h.addQueue(types.FlowSpecification{ID: testFlow.ID, Priority: testFlow.Priority}) + item := h.newTestItem("req-dispatch-success", testFlow, testTTL) + h.addQueue(types.FlowKey{ID: testFlow.ID, Priority: testFlow.Priority}) // --- ACT --- h.Start() @@ -370,8 +359,8 @@ func TestShardProcessor(t *testing.T) { t.Parallel() // --- ARRANGE --- h := newTestHarness(t, testCleanupTick) - item := h.newTestItem("req-capacity-reject", testFlow.ID, testTTL) - h.addQueue(types.FlowSpecification{ID: testFlow.ID, Priority: testFlow.Priority}) + item := h.newTestItem("req-capacity-reject", testFlow, testTTL) + h.addQueue(testFlow) h.StatsFunc = func() contracts.ShardStats { return contracts.ShardStats{PerPriorityBandStats: map[uint]contracts.PriorityBandStats{ testFlow.Priority: {CapacityBytes: 50}, // 50 is less than item size of 100 @@ -395,9 +384,9 @@ func TestShardProcessor(t *testing.T) { t.Parallel() // --- ARRANGE --- h := newTestHarness(t, testCleanupTick) - item := h.newTestItem("req-lookup-fail-reject", testFlow.ID, testTTL) + item := h.newTestItem("req-lookup-fail-reject", testFlow, testTTL) registryErr := errors.New("test registry lookup error") - h.ActiveManagedQueueFunc = func(flowID string) (contracts.ManagedQueue, error) { + h.ManagedQueueFunc = func(types.FlowKey) (contracts.ManagedQueue, error) { return nil, registryErr } @@ -418,8 +407,8 @@ func TestShardProcessor(t *testing.T) { t.Parallel() // --- ARRANGE --- h := newTestHarness(t, testCleanupTick) - item := h.newTestItem("req-shutdown-reject", testFlow.ID, testTTL) - h.addQueue(types.FlowSpecification{ID: testFlow.ID, Priority: testFlow.Priority}) + item := h.newTestItem("req-shutdown-reject", testFlow, testTTL) + h.addQueue(testFlow) // --- ACT --- h.Start() @@ -439,8 +428,8 @@ func TestShardProcessor(t *testing.T) { t.Parallel() // --- ARRANGE --- h := newTestHarness(t, testCleanupTick) - item := h.newTestItem("req-expired-evict", testFlow.ID, testShortTTL) - h.addQueue(types.FlowSpecification{ID: testFlow.ID, Priority: testFlow.Priority}) + item := h.newTestItem("req-expired-evict", testFlow, testShortTTL) + h.addQueue(testFlow) // --- ACT --- h.Start() @@ -463,8 +452,8 @@ func TestShardProcessor(t *testing.T) { t.Parallel() // --- ARRANGE --- h := newTestHarness(t, 1*time.Hour) // Disable background cleanup to isolate dispatch logic. - item := h.newTestItem("req-expired-dispatch-evict", testFlow.ID, testShortTTL) - mockQueue := h.addQueue(types.FlowSpecification{ID: testFlow.ID, Priority: testFlow.Priority}) + item := h.newTestItem("req-expired-dispatch-evict", testFlow, testShortTTL) + mockQueue := h.addQueue(testFlow) require.NoError(t, mockQueue.Add(item), "Adding item to mock queue should not fail") // Have the policy select the item, but then advance time so it's expired by the time dispatchItem actually runs. @@ -494,9 +483,9 @@ func TestShardProcessor(t *testing.T) { // --- ARRANGE --- h := newTestHarness(t, testCleanupTick) ctx, cancel := context.WithCancel(context.Background()) - req := typesmocks.NewMockFlowControlRequest(100, "req-ctx-cancel", testFlow.ID, ctx) + req := typesmocks.NewMockFlowControlRequest(100, "req-ctx-cancel", testFlow, ctx) item := NewItem(req, testTTL, h.mockClock.Now()) - h.addQueue(types.FlowSpecification{ID: testFlow.ID, Priority: testFlow.Priority}) + h.addQueue(testFlow) // --- ACT --- h.Start() @@ -518,8 +507,8 @@ func TestShardProcessor(t *testing.T) { t.Parallel() // --- ARRANGE --- h := newTestHarness(t, testCleanupTick) - item := h.newTestItem("req-shutdown-evict", testFlow.ID, testTTL) - mockQueue := h.addQueue(types.FlowSpecification{ID: testFlow.ID, Priority: testFlow.Priority}) + item := h.newTestItem("req-shutdown-evict", testFlow, testTTL) + mockQueue := h.addQueue(testFlow) require.NoError(t, mockQueue.Add(item), "Adding item to mock queue should not fail") // Prevent dispatch to ensure we test shutdown eviction, not a successful dispatch. @@ -544,10 +533,10 @@ func TestShardProcessor(t *testing.T) { // --- ARRANGE --- h := newTestHarness(t, testCleanupTick) const numConcurrentItems = 20 - q := h.addQueue(types.FlowSpecification{ID: testFlow.ID, Priority: testFlow.Priority}) + q := h.addQueue(testFlow) itemsToTest := make([]*flowItem, 0, numConcurrentItems) for i := 0; i < numConcurrentItems; i++ { - item := h.newTestItem(fmt.Sprintf("req-concurrent-%d", i), testFlow.ID, testTTL) + item := h.newTestItem(fmt.Sprintf("req-concurrent-%d", i), testFlow, testTTL) itemsToTest = append(itemsToTest, item) } @@ -581,8 +570,8 @@ func TestShardProcessor(t *testing.T) { // --- ARRANGE --- h := newTestHarness(t, 1*time.Hour) // Disable background cleanup to isolate the race. - item := h.newTestItem("req-race", testFlow.ID, testShortTTL) - q := h.addQueue(types.FlowSpecification{ID: testFlow.ID, Priority: testFlow.Priority}) + item := h.newTestItem("req-race", testFlow, testShortTTL) + q := h.addQueue(testFlow) // Use channels to pause the dispatch cycle right before it would remove the item. policyCanProceed := make(chan struct{}) @@ -683,7 +672,7 @@ func TestShardProcessor(t *testing.T) { { name: "should reject item on registry queue lookup failure", setupHarness: func(h *testHarness) { - h.ActiveManagedQueueFunc = func(string) (contracts.ManagedQueue, error) { return nil, testErr } + h.ManagedQueueFunc = func(types.FlowKey) (contracts.ManagedQueue, error) { return nil, testErr } }, assert: func(t *testing.T, h *testHarness, item *flowItem) { outcome, err := item.FinalState() @@ -695,7 +684,7 @@ func TestShardProcessor(t *testing.T) { { name: "should reject item on registry priority band lookup failure", setupHarness: func(h *testHarness) { - h.addQueue(types.FlowSpecification{ID: testFlow.ID, Priority: testFlow.Priority}) + h.addQueue(testFlow) h.PriorityBandAccessorFunc = func(uint) (framework.PriorityBandAccessor, error) { return nil, testErr } }, assert: func(t *testing.T, h *testHarness, item *flowItem) { @@ -708,7 +697,7 @@ func TestShardProcessor(t *testing.T) { { name: "should reject item on queue add failure", setupHarness: func(h *testHarness) { - mockQueue := h.addQueue(types.FlowSpecification{ID: testFlow.ID, Priority: testFlow.Priority}) + mockQueue := h.addQueue(testFlow) mockQueue.AddFunc = func(types.QueueItemAccessor) error { return testErr } }, assert: func(t *testing.T, h *testHarness, item *flowItem) { @@ -721,7 +710,7 @@ func TestShardProcessor(t *testing.T) { { name: "should ignore an already-finalized item", setupHarness: func(h *testHarness) { - mockQueue := h.addQueue(types.FlowSpecification{ID: testFlow.ID, Priority: testFlow.Priority}) + mockQueue := h.addQueue(testFlow) var addCallCount int mockQueue.AddFunc = func(item types.QueueItemAccessor) error { addCallCount++ @@ -734,7 +723,7 @@ func TestShardProcessor(t *testing.T) { }, item: func() *flowItem { // Create a pre-finalized item. - item := newTestHarness(t, 0).newTestItem("req-finalized", testFlow.ID, testTTL) + item := newTestHarness(t, 0).newTestItem("req-finalized", testFlow, testTTL) item.finalize(types.QueueOutcomeDispatched, nil) return item }(), @@ -754,7 +743,7 @@ func TestShardProcessor(t *testing.T) { tc.setupHarness(h) item := tc.item if item == nil { - item = h.newTestItem("req-enqueue-test", testFlow.ID, testTTL) + item = h.newTestItem("req-enqueue-test", testFlow, testTTL) } h.processor.enqueue(item) tc.assert(t, h, item) @@ -833,7 +822,6 @@ func TestShardProcessor(t *testing.T) { t.Parallel() policyErr := errors.New("policy failure") registryErr := errors.New("registry error") - specA := types.FlowSpecification{ID: testFlow.ID, Priority: testFlow.Priority} testCases := []struct { name string @@ -843,7 +831,7 @@ func TestShardProcessor(t *testing.T) { { name: "should do nothing if no items are queued", setupHarness: func(h *testHarness) { - h.addQueue(specA) // Add a queue, but no items. + h.addQueue(testFlow) // Add a queue, but no items. }, expectDidDispatch: false, }, @@ -851,13 +839,13 @@ func TestShardProcessor(t *testing.T) { name: "should stop dispatching when filter signals pause", setupHarness: func(h *testHarness) { // Add an item that *could* be dispatched to prove the pause is effective. - q := h.addQueue(types.FlowSpecification{ID: testFlow.ID, Priority: testFlow.Priority}) - require.NoError(t, q.Add(h.newTestItem("item", testFlow.ID, testTTL))) + q := h.addQueue(testFlow) + require.NoError(t, q.Add(h.newTestItem("item", testFlow, testTTL))) h.processor.dispatchFilter = func( _ context.Context, _ framework.PriorityBandAccessor, _ logr.Logger, - ) (map[string]struct{}, bool) { + ) (framework.PriorityBandAccessor, bool) { return nil, true // Signal pause. } }, @@ -875,7 +863,7 @@ func TestShardProcessor(t *testing.T) { { name: "should skip band on inter-flow policy error", setupHarness: func(h *testHarness) { - h.addQueue(specA) + h.addQueue(testFlow) h.interFlowPolicySelectQueue = func( _ framework.PriorityBandAccessor, ) (framework.FlowQueueAccessor, error) { @@ -887,8 +875,8 @@ func TestShardProcessor(t *testing.T) { { name: "should skip band if inter-flow policy returns no queue", setupHarness: func(h *testHarness) { - q := h.addQueue(specA) - require.NoError(t, q.Add(h.newTestItem("item", testFlow.ID, testTTL))) + q := h.addQueue(testFlow) + require.NoError(t, q.Add(h.newTestItem("item", testFlow, testTTL))) h.interFlowPolicySelectQueue = func( _ framework.PriorityBandAccessor, ) (framework.FlowQueueAccessor, error) { @@ -900,8 +888,8 @@ func TestShardProcessor(t *testing.T) { { name: "should skip band on intra-flow policy error", setupHarness: func(h *testHarness) { - q := h.addQueue(specA) - require.NoError(t, q.Add(h.newTestItem("item", testFlow.ID, testTTL))) + q := h.addQueue(testFlow) + require.NoError(t, q.Add(h.newTestItem("item", testFlow, testTTL))) h.interFlowPolicySelectQueue = func( _ framework.PriorityBandAccessor, ) (framework.FlowQueueAccessor, error) { @@ -916,8 +904,8 @@ func TestShardProcessor(t *testing.T) { { name: "should skip band if intra-flow policy returns no item", setupHarness: func(h *testHarness) { - q := h.addQueue(specA) - require.NoError(t, q.Add(h.newTestItem("item", testFlow.ID, testTTL))) + q := h.addQueue(testFlow) + require.NoError(t, q.Add(h.newTestItem("item", testFlow, testTTL))) h.interFlowPolicySelectQueue = func( _ framework.PriorityBandAccessor, ) (framework.FlowQueueAccessor, error) { @@ -933,12 +921,12 @@ func TestShardProcessor(t *testing.T) { name: "should continue to lower priority band on inter-flow policy error", setupHarness: func(h *testHarness) { // Create a failing high-priority queue and a working low-priority queue. - specHigh := types.FlowSpecification{ID: "flow-high", Priority: testFlow.Priority} - specLow := types.FlowSpecification{ID: "flow-low", Priority: 20} - h.addQueue(specHigh) - qLow := h.addQueue(specLow) + keyHigh := types.FlowKey{ID: "flow-high", Priority: testFlow.Priority} + keyLow := types.FlowKey{ID: "flow-low", Priority: 20} + h.addQueue(keyHigh) + qLow := h.addQueue(keyLow) - itemLow := h.newTestItem("item-low", specLow.ID, testTTL) + itemLow := h.newTestItem("item-low", keyLow, testTTL) require.NoError(t, qLow.Add(itemLow)) h.interFlowPolicySelectQueue = func( @@ -948,7 +936,7 @@ func TestShardProcessor(t *testing.T) { return nil, errors.New("policy failure") // Fail high-priority. } // Succeed for low-priority. - q, _ := h.managedQueue(specLow.ID, specLow.Priority) + q, _ := h.managedQueue(keyLow) return q.FlowQueueAccessor(), nil } }, @@ -971,34 +959,33 @@ func TestShardProcessor(t *testing.T) { t.Parallel() // --- ARRANGE --- h := newTestHarness(t, testCleanupTick) - specA := types.FlowSpecification{ID: testFlow.ID, Priority: testFlow.Priority} - specB := types.FlowSpecification{ID: "flow-b", Priority: testFlow.Priority} - h.addQueue(specA) - qB := h.addQueue(specB) - itemB := h.newTestItem("item-b", specB.ID, testTTL) + flowB := types.FlowKey{ID: "flow-b", Priority: testFlow.Priority} + h.addQueue(testFlow) + qB := h.addQueue(flowB) + itemB := h.newTestItem("item-b", flowB, testTTL) require.NoError(t, qB.Add(itemB)) // This filter only allows flow-b. h.processor.dispatchFilter = func( _ context.Context, - _ framework.PriorityBandAccessor, + originalBand framework.PriorityBandAccessor, _ logr.Logger, - ) (map[string]struct{}, bool) { - return map[string]struct{}{specB.ID: {}}, false + ) (framework.PriorityBandAccessor, bool) { + return newSubsetPriorityBandAccessor(originalBand, []types.FlowKey{flowB}), false } // This policy will be given the filtered view, so it should only see flow-b. h.interFlowPolicySelectQueue = func(band framework.PriorityBandAccessor) (framework.FlowQueueAccessor, error) { var flowIDs []string band.IterateQueues(func(fqa framework.FlowQueueAccessor) bool { - flowIDs = append(flowIDs, fqa.FlowSpec().ID) + flowIDs = append(flowIDs, fqa.FlowKey().ID) return true }) // This is the core assertion of the test. - require.ElementsMatch(t, []string{specB.ID}, flowIDs, "Policy should only see the filtered flow") + require.ElementsMatch(t, []string{flowB.ID}, flowIDs, "Policy should only see the filtered flow") // Select flow-b to prove the chain works. - q, _ := h.managedQueue(specB.ID, specB.Priority) + q, _ := h.managedQueue(flowB) return q.FlowQueueAccessor(), nil } @@ -1016,22 +1003,22 @@ func TestShardProcessor(t *testing.T) { t.Parallel() // --- ARRANGE --- h := newTestHarness(t, testCleanupTick) - specHigh := types.FlowSpecification{ID: "flow-high", Priority: 10} - specLow := types.FlowSpecification{ID: "flow-low", Priority: 20} - qHigh := h.addQueue(specHigh) - qLow := h.addQueue(specLow) + keyHigh := types.FlowKey{ID: "flow-high", Priority: 10} + keyLow := types.FlowKey{ID: "flow-low", Priority: 20} + qHigh := h.addQueue(keyHigh) + qLow := h.addQueue(keyLow) const numItems = 3 highPrioItems := make([]*flowItem, numItems) lowPrioItems := make([]*flowItem, numItems) for i := range numItems { // Add high priority items. - itemH := h.newTestItem(fmt.Sprintf("req-high-%d", i), specHigh.ID, testTTL) + itemH := h.newTestItem(fmt.Sprintf("req-high-%d", i), keyHigh, testTTL) require.NoError(t, qHigh.Add(itemH)) highPrioItems[i] = itemH // Add low priority items. - itemL := h.newTestItem(fmt.Sprintf("req-low-%d", i), specLow.ID, testTTL) + itemL := h.newTestItem(fmt.Sprintf("req-low-%d", i), keyLow, testTTL) require.NoError(t, qLow.Add(itemL)) lowPrioItems[i] = itemL } @@ -1075,14 +1062,14 @@ func TestShardProcessor(t *testing.T) { { name: "on ManagedQueue lookup failure", setupMocks: func(h *testHarness) { - h.ManagedQueueFunc = func(string, uint) (contracts.ManagedQueue, error) { return nil, registryErr } + h.ManagedQueueFunc = func(types.FlowKey) (contracts.ManagedQueue, error) { return nil, registryErr } }, expectedErr: registryErr, }, { name: "on queue remove failure", setupMocks: func(h *testHarness) { - h.ManagedQueueFunc = func(string, uint) (contracts.ManagedQueue, error) { + h.ManagedQueueFunc = func(types.FlowKey) (contracts.ManagedQueue, error) { return &mocks.MockManagedQueue{ RemoveFunc: func(types.QueueItemHandle) (types.QueueItemAccessor, error) { return nil, registryErr @@ -1099,8 +1086,8 @@ func TestShardProcessor(t *testing.T) { t.Parallel() h := newTestHarness(t, testCleanupTick) tc.setupMocks(h) - item := h.newTestItem("req-dispatch-fail", testFlow.ID, testTTL) - err := h.processor.dispatchItem(item, testFlow.Priority, h.logger) + item := h.newTestItem("req-dispatch-fail", testFlow, testTTL) + err := h.processor.dispatchItem(item, h.logger) require.Error(t, err, "dispatchItem should return an error") assert.ErrorIs(t, err, tc.expectedErr, "The underlying registry error should be preserved") }) @@ -1111,9 +1098,9 @@ func TestShardProcessor(t *testing.T) { t.Parallel() // --- ARRANGE --- h := newTestHarness(t, testCleanupTick) - item := h.newTestItem("req-expired-dispatch", testFlow.ID, testShortTTL) + item := h.newTestItem("req-expired-dispatch", testFlow, testShortTTL) - h.ManagedQueueFunc = func(string, uint) (contracts.ManagedQueue, error) { + h.ManagedQueueFunc = func(types.FlowKey) (contracts.ManagedQueue, error) { return &mocks.MockManagedQueue{ RemoveFunc: func(types.QueueItemHandle) (types.QueueItemAccessor, error) { return item, nil @@ -1123,7 +1110,7 @@ func TestShardProcessor(t *testing.T) { // --- ACT --- h.mockClock.Advance(testShortTTL * 2) // Make the item expire. - err := h.processor.dispatchItem(item, testFlow.Priority, h.logger) + err := h.processor.dispatchItem(item, h.logger) // --- ASSERT --- // First, check the error returned by `dispatchItem`. @@ -1142,10 +1129,10 @@ func TestShardProcessor(t *testing.T) { // --- ARRANGE --- h := newTestHarness(t, testCleanupTick) badItem := &typesmocks.MockQueueItemAccessor{ - OriginalRequestV: typesmocks.NewMockFlowControlRequest(0, "bad-item", "", context.Background()), + OriginalRequestV: typesmocks.NewMockFlowControlRequest(0, "bad-item", testFlow, context.Background()), } - h.ManagedQueueFunc = func(string, uint) (contracts.ManagedQueue, error) { + h.ManagedQueueFunc = func(types.FlowKey) (contracts.ManagedQueue, error) { return &mocks.MockManagedQueue{ RemoveFunc: func(types.QueueItemHandle) (types.QueueItemAccessor, error) { return badItem, nil @@ -1153,13 +1140,13 @@ func TestShardProcessor(t *testing.T) { }, nil } - itemToDispatch := h.newTestItem("req-dispatch-panic", testFlow.ID, testTTL) + itemToDispatch := h.newTestItem("req-dispatch-panic", testFlow, testTTL) expectedPanicMsg := fmt.Sprintf("%s: internal error: item %q of type %T is not a *flowItem", errIntraFlow, "bad-item", badItem) // --- ACT & ASSERT --- assert.PanicsWithError(t, expectedPanicMsg, func() { - _ = h.processor.dispatchItem(itemToDispatch, testFlow.Priority, h.logger) + _ = h.processor.dispatchItem(itemToDispatch, h.logger) }, "A type mismatch from a queue should cause a panic") }) }) @@ -1171,10 +1158,9 @@ func TestShardProcessor(t *testing.T) { t.Parallel() // --- ARRANGE --- h := newTestHarness(t, testCleanupTick) - spec := types.FlowSpecification{ID: testFlow.ID, Priority: testFlow.Priority} // Create an item that is already expired relative to the cleanup time. - item := h.newTestItem("req-expired", testFlow.ID, 1*time.Millisecond) - q := h.addQueue(spec) + item := h.newTestItem("req-expired", testFlow, 1*time.Millisecond) + q := h.addQueue(testFlow) require.NoError(t, q.Add(item)) cleanupTime := h.mockClock.Now().Add(10 * time.Millisecond) @@ -1192,9 +1178,8 @@ func TestShardProcessor(t *testing.T) { t.Parallel() // --- ARRANGE --- h := newTestHarness(t, testCleanupTick) - spec := types.FlowSpecification{ID: testFlow.ID, Priority: testFlow.Priority} - item := h.newTestItem("req-pending", testFlow.ID, testTTL) - q := h.addQueue(spec) + item := h.newTestItem("req-pending", testFlow, testTTL) + q := h.addQueue(testFlow) require.NoError(t, q.Add(item)) // --- ACT --- @@ -1228,7 +1213,7 @@ func TestShardProcessor(t *testing.T) { // --- ARRANGE --- h := newTestHarness(t, testCleanupTick) item := &typesmocks.MockQueueItemAccessor{ - OriginalRequestV: typesmocks.NewMockFlowControlRequest(0, "bad-item", testFlow.ID, context.Background()), + OriginalRequestV: typesmocks.NewMockFlowControlRequest(0, "bad-item", testFlow, context.Background()), } items := []types.QueueItemAccessor{item} @@ -1252,11 +1237,11 @@ func TestShardProcessor(t *testing.T) { var processedCount atomic.Int32 for i := range numQueues { - spec := types.FlowSpecification{ + key := types.FlowKey{ ID: fmt.Sprintf("flow-%d", i), Priority: testFlow.Priority, } - h.addQueue(spec) + h.addQueue(key) } processFn := func(mq contracts.ManagedQueue, logger logr.Logger) { @@ -1293,7 +1278,7 @@ func TestCheckItemExpiry(t *testing.T) { { name: "should not be expired if TTL is not reached and context is active", item: NewItem( - typesmocks.NewMockFlowControlRequest(100, "req-not-expired", "", context.Background()), + typesmocks.NewMockFlowControlRequest(100, "req-not-expired", testFlow, context.Background()), testTTL, now), now: now.Add(30 * time.Second), @@ -1304,7 +1289,7 @@ func TestCheckItemExpiry(t *testing.T) { { name: "should not be expired if TTL is disabled (0)", item: NewItem( - typesmocks.NewMockFlowControlRequest(100, "req-not-expired-no-ttl", "", context.Background()), + typesmocks.NewMockFlowControlRequest(100, "req-not-expired-no-ttl", testFlow, context.Background()), 0, now), now: now.Add(30 * time.Second), @@ -1315,7 +1300,7 @@ func TestCheckItemExpiry(t *testing.T) { { name: "should be expired if TTL is exceeded", item: NewItem( - typesmocks.NewMockFlowControlRequest(100, "req-ttl-expired", "", context.Background()), + typesmocks.NewMockFlowControlRequest(100, "req-ttl-expired", testFlow, context.Background()), time.Second, now), now: now.Add(2 * time.Second), @@ -1326,7 +1311,7 @@ func TestCheckItemExpiry(t *testing.T) { { name: "should be expired if context is cancelled", item: NewItem( - typesmocks.NewMockFlowControlRequest(100, "req-ctx-cancelled", "", ctxCancelled), + typesmocks.NewMockFlowControlRequest(100, "req-ctx-cancelled", testFlow, ctxCancelled), testTTL, now), now: now, @@ -1337,7 +1322,10 @@ func TestCheckItemExpiry(t *testing.T) { { name: "should be expired if already finalized", item: func() types.QueueItemAccessor { - i := NewItem(typesmocks.NewMockFlowControlRequest(100, "req-finalized", "", context.Background()), testTTL, now) + i := NewItem( + typesmocks.NewMockFlowControlRequest(100, "req-finalized", testFlow, context.Background()), + testTTL, + now) i.finalize(types.QueueOutcomeDispatched, nil) return i }(), @@ -1376,7 +1364,7 @@ func TestCheckItemExpiry(t *testing.T) { t.Parallel() // --- ARRANGE --- badItem := &typesmocks.MockQueueItemAccessor{ - OriginalRequestV: typesmocks.NewMockFlowControlRequest(0, "item-bad-type", "", context.Background()), + OriginalRequestV: typesmocks.NewMockFlowControlRequest(0, "item-bad-type", testFlow, context.Background()), } expectedPanicMsg := fmt.Sprintf("internal error: item %q of type %T is not a *flowItem", diff --git a/pkg/epp/flowcontrol/framework/mocks/mocks.go b/pkg/epp/flowcontrol/framework/mocks/mocks.go index 804ba71a7..b8715b779 100644 --- a/pkg/epp/flowcontrol/framework/mocks/mocks.go +++ b/pkg/epp/flowcontrol/framework/mocks/mocks.go @@ -41,7 +41,7 @@ type MockFlowQueueAccessor struct { PeekHeadErrV error PeekTailV types.QueueItemAccessor PeekTailErrV error - FlowSpecV types.FlowSpecification + FlowKeyV types.FlowKey ComparatorV framework.ItemComparator CapabilitiesV []framework.QueueCapability } @@ -50,7 +50,7 @@ func (m *MockFlowQueueAccessor) Name() string { ret func (m *MockFlowQueueAccessor) Len() int { return m.LenV } func (m *MockFlowQueueAccessor) ByteSize() uint64 { return m.ByteSizeV } func (m *MockFlowQueueAccessor) Comparator() framework.ItemComparator { return m.ComparatorV } -func (m *MockFlowQueueAccessor) FlowSpec() types.FlowSpecification { return m.FlowSpecV } +func (m *MockFlowQueueAccessor) FlowKey() types.FlowKey { return m.FlowKeyV } func (m *MockFlowQueueAccessor) Capabilities() []framework.QueueCapability { return m.CapabilitiesV } func (m *MockFlowQueueAccessor) PeekHead() (types.QueueItemAccessor, error) { @@ -69,7 +69,7 @@ var _ framework.FlowQueueAccessor = &MockFlowQueueAccessor{} type MockPriorityBandAccessor struct { PriorityV uint PriorityNameV string - FlowIDsFunc func() []string + FlowKeysFunc func() []types.FlowKey QueueFunc func(flowID string) framework.FlowQueueAccessor IterateQueuesFunc func(callback func(queue framework.FlowQueueAccessor) (keepIterating bool)) } @@ -77,16 +77,16 @@ type MockPriorityBandAccessor struct { func (m *MockPriorityBandAccessor) Priority() uint { return m.PriorityV } func (m *MockPriorityBandAccessor) PriorityName() string { return m.PriorityNameV } -func (m *MockPriorityBandAccessor) FlowIDs() []string { - if m.FlowIDsFunc != nil { - return m.FlowIDsFunc() +func (m *MockPriorityBandAccessor) FlowKeys() []types.FlowKey { + if m.FlowKeysFunc != nil { + return m.FlowKeysFunc() } return nil } -func (m *MockPriorityBandAccessor) Queue(flowID string) framework.FlowQueueAccessor { +func (m *MockPriorityBandAccessor) Queue(id string) framework.FlowQueueAccessor { if m.QueueFunc != nil { - return m.QueueFunc(flowID) + return m.QueueFunc(id) } return nil } diff --git a/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/besthead/besthead_test.go b/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/besthead/besthead_test.go index 4905a2157..5971d3f0c 100644 --- a/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/besthead/besthead_test.go +++ b/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/besthead/besthead_test.go @@ -31,11 +31,16 @@ import ( ) const ( - flow1 = "flow1" - flow2 = "flow2" + flow1ID = "flow1" + flow2ID = "flow2" commonScoreType = "enqueue_time_ns_asc" ) +var ( + flow1Key = types.FlowKey{ID: flow1ID, Priority: 0} + flow2Key = types.FlowKey{ID: flow2ID, Priority: 0} +) + // enqueueTimeComparatorFunc is a test utility. Lower enqueue time is better. func enqueueTimeComparatorFunc(a, b types.QueueItemAccessor) bool { return a.EnqueueTime().Before(b.EnqueueTime()) @@ -49,20 +54,21 @@ func newTestComparator() *frameworkmocks.MockItemComparator { } func newTestBand(queues ...framework.FlowQueueAccessor) *frameworkmocks.MockPriorityBandAccessor { - flowIDs := make([]string, 0, len(queues)) + flowKeys := make([]types.FlowKey, 0, len(queues)) queuesByID := make(map[string]framework.FlowQueueAccessor, len(queues)) for _, q := range queues { - flowIDs = append(flowIDs, q.FlowSpec().ID) - queuesByID[q.FlowSpec().ID] = q + key := q.FlowKey() + flowKeys = append(flowKeys, key) + queuesByID[key.ID] = q } return &frameworkmocks.MockPriorityBandAccessor{ - FlowIDsFunc: func() []string { return flowIDs }, + FlowKeysFunc: func() []types.FlowKey { return flowKeys }, QueueFunc: func(id string) framework.FlowQueueAccessor { return queuesByID[id] }, IterateQueuesFunc: func(iterator func(queue framework.FlowQueueAccessor) bool) { - for _, id := range flowIDs { - if !iterator(queuesByID[id]) { + for _, key := range flowKeys { + if !iterator(queuesByID[key.ID]) { break } } @@ -81,27 +87,27 @@ func TestBestHead_SelectQueue(t *testing.T) { policy := newBestHead() now := time.Now() - itemBetter := typesmocks.NewMockQueueItemAccessor(10, "itemBetter", flow1) + itemBetter := typesmocks.NewMockQueueItemAccessor(10, "itemBetter", flow1Key) itemBetter.EnqueueTimeV = now.Add(-10 * time.Second) - itemWorse := typesmocks.NewMockQueueItemAccessor(20, "itemWorse", flow2) + itemWorse := typesmocks.NewMockQueueItemAccessor(20, "itemWorse", flow2Key) itemWorse.EnqueueTimeV = now.Add(-5 * time.Second) queue1 := &frameworkmocks.MockFlowQueueAccessor{ LenV: 1, PeekHeadV: itemBetter, - FlowSpecV: types.FlowSpecification{ID: flow1}, + FlowKeyV: flow1Key, ComparatorV: newTestComparator(), } queue2 := &frameworkmocks.MockFlowQueueAccessor{ LenV: 1, PeekHeadV: itemWorse, - FlowSpecV: types.FlowSpecification{ID: flow2}, + FlowKeyV: flow2Key, ComparatorV: newTestComparator(), } queueEmpty := &frameworkmocks.MockFlowQueueAccessor{ LenV: 0, PeekHeadErrV: framework.ErrQueueEmpty, - FlowSpecV: types.FlowSpecification{ID: "flowEmpty"}, + FlowKeyV: types.FlowKey{ID: "flowEmpty"}, ComparatorV: newTestComparator(), } @@ -115,17 +121,17 @@ func TestBestHead_SelectQueue(t *testing.T) { { name: "BasicSelection_TwoQueues", band: newTestBand(queue1, queue2), - expectedQueueID: flow1, + expectedQueueID: flow1ID, }, { name: "IgnoresEmptyQueues", band: newTestBand(queue1, queueEmpty, queue2), - expectedQueueID: flow1, + expectedQueueID: flow1ID, }, { name: "SingleNonEmptyQueue", band: newTestBand(queue1), - expectedQueueID: flow1, + expectedQueueID: flow1ID, }, { name: "ComparatorCompatibility", @@ -133,13 +139,13 @@ func TestBestHead_SelectQueue(t *testing.T) { &frameworkmocks.MockFlowQueueAccessor{ LenV: 1, PeekHeadV: itemBetter, - FlowSpecV: types.FlowSpecification{ID: flow1}, + FlowKeyV: flow1Key, ComparatorV: &frameworkmocks.MockItemComparator{ScoreTypeV: "typeA", FuncV: enqueueTimeComparatorFunc}, }, &frameworkmocks.MockFlowQueueAccessor{ LenV: 1, PeekHeadV: itemWorse, - FlowSpecV: types.FlowSpecification{ID: flow2}, + FlowKeyV: flow2Key, ComparatorV: &frameworkmocks.MockItemComparator{ScoreTypeV: "typeB", FuncV: enqueueTimeComparatorFunc}, }, ), @@ -151,12 +157,12 @@ func TestBestHead_SelectQueue(t *testing.T) { &frameworkmocks.MockFlowQueueAccessor{ LenV: 1, PeekHeadErrV: errors.New("peek error"), - FlowSpecV: types.FlowSpecification{ID: flow1}, + FlowKeyV: flow1Key, ComparatorV: newTestComparator(), }, queue2, ), - expectedQueueID: flow2, + expectedQueueID: flow2ID, }, { name: "QueueComparatorIsNil", @@ -164,7 +170,7 @@ func TestBestHead_SelectQueue(t *testing.T) { &frameworkmocks.MockFlowQueueAccessor{ LenV: 1, PeekHeadV: itemBetter, - FlowSpecV: types.FlowSpecification{ID: flow1}, + FlowKeyV: flow1Key, ComparatorV: nil, }, queue2, @@ -177,7 +183,7 @@ func TestBestHead_SelectQueue(t *testing.T) { &frameworkmocks.MockFlowQueueAccessor{ LenV: 1, PeekHeadV: itemBetter, - FlowSpecV: types.FlowSpecification{ID: flow1}, + FlowKeyV: flow1Key, ComparatorV: &frameworkmocks.MockItemComparator{ScoreTypeV: commonScoreType, FuncV: nil}, }, queue2, @@ -191,7 +197,7 @@ func TestBestHead_SelectQueue(t *testing.T) { &frameworkmocks.MockFlowQueueAccessor{ LenV: 0, PeekHeadErrV: framework.ErrQueueEmpty, - FlowSpecV: types.FlowSpecification{ID: "flowEmpty2"}, + FlowKeyV: types.FlowKey{ID: "flowEmpty2"}, ComparatorV: newTestComparator(), }, ), @@ -223,7 +229,7 @@ func TestBestHead_SelectQueue(t *testing.T) { assert.Nil(t, selected, "No queue should be selected") } else { require.NotNil(t, selected, "A queue should have been selected") - assert.Equal(t, tc.expectedQueueID, selected.FlowSpec().ID, "The selected queue should have the expected ID") + assert.Equal(t, tc.expectedQueueID, selected.FlowKey().ID, "The selected queue should have the expected ID") } } }) diff --git a/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/functional_test.go b/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/functional_test.go index 9dacc2ad9..7649b324e 100644 --- a/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/functional_test.go +++ b/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/functional_test.go @@ -65,7 +65,7 @@ func runSelectQueueConformanceTests(t *testing.T, policy framework.InterFlowDisp mockQueueEmpty := &frameworkmocks.MockFlowQueueAccessor{ LenV: 0, PeekHeadErrV: framework.ErrQueueEmpty, - FlowSpecV: types.FlowSpecification{ID: flowIDEmpty}, + FlowKeyV: types.FlowKey{ID: flowIDEmpty}, } testCases := []struct { @@ -84,7 +84,7 @@ func runSelectQueueConformanceTests(t *testing.T, policy framework.InterFlowDisp { name: "With an empty priority band accessor", band: &frameworkmocks.MockPriorityBandAccessor{ - FlowIDsFunc: func() []string { return []string{} }, + FlowKeysFunc: func() []types.FlowKey { return []types.FlowKey{} }, IterateQueuesFunc: func(callback func(queue framework.FlowQueueAccessor) bool) { /* no-op */ }, }, expectErr: false, @@ -93,7 +93,7 @@ func runSelectQueueConformanceTests(t *testing.T, policy framework.InterFlowDisp { name: "With a band that has one empty queue", band: &frameworkmocks.MockPriorityBandAccessor{ - FlowIDsFunc: func() []string { return []string{flowIDEmpty} }, + FlowKeysFunc: func() []types.FlowKey { return []types.FlowKey{{ID: flowIDEmpty}} }, QueueFunc: func(fID string) framework.FlowQueueAccessor { if fID == flowIDEmpty { return mockQueueEmpty @@ -107,7 +107,7 @@ func runSelectQueueConformanceTests(t *testing.T, policy framework.InterFlowDisp { name: "With a band that has multiple empty queues", band: &frameworkmocks.MockPriorityBandAccessor{ - FlowIDsFunc: func() []string { return []string{flowIDEmpty, "flow-empty-2"} }, + FlowKeysFunc: func() []types.FlowKey { return []types.FlowKey{{ID: flowIDEmpty}, {ID: "flow-empty-2"}} }, QueueFunc: func(fID string) framework.FlowQueueAccessor { return mockQueueEmpty }, @@ -136,7 +136,7 @@ func runSelectQueueConformanceTests(t *testing.T, policy framework.InterFlowDisp } if tc.expectedQueue != nil { - assert.Equal(t, tc.expectedQueue.FlowSpec().ID, selectedQueue.FlowSpec().ID, + assert.Equal(t, tc.expectedQueue.FlowKey(), selectedQueue.FlowKey(), "SelectQueue for policy %s returned an unexpected queue", policy.Name()) } }) diff --git a/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/roundrobin/roundrobin.go b/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/roundrobin/roundrobin.go index 9906dc11f..c71b801dc 100644 --- a/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/roundrobin/roundrobin.go +++ b/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/roundrobin/roundrobin.go @@ -24,6 +24,7 @@ import ( "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types" ) // RoundRobinPolicyName is the name of the Round Robin policy implementation. @@ -70,7 +71,7 @@ func (p *roundRobin) SelectQueue(band framework.PriorityBandAccessor) (framework // "RoundRobin" displacement policy is introduced, while keeping the dispatch policy's public API stable. type iterator struct { mu sync.Mutex - lastSelected string + lastSelected *types.FlowKey } // newIterator creates a new round-robin Iterator. @@ -85,34 +86,35 @@ func (r *iterator) selectNextQueue(band framework.PriorityBandAccessor) framewor r.mu.Lock() defer r.mu.Unlock() - flowIDs := band.FlowIDs() - if len(flowIDs) == 0 { - r.lastSelected = "" // Reset state if no flows are present + keys := band.FlowKeys() + if len(keys) == 0 { + r.lastSelected = nil // Reset state if no flows are present return nil } - slices.Sort(flowIDs) + // Sort for deterministic ordering. + slices.SortFunc(keys, func(a, b types.FlowKey) int { return a.Compare(b) }) startIndex := 0 - if r.lastSelected != "" { + if r.lastSelected != nil { // Find the index of the last selected flow. // If it's not found (e.g., the flow was removed), we'll start from the beginning. - if idx := slices.Index(flowIDs, r.lastSelected); idx != -1 { - startIndex = (idx + 1) % len(flowIDs) + if idx := slices.Index(keys, *r.lastSelected); idx != -1 { + startIndex = (idx + 1) % len(keys) } } - numFlows := len(flowIDs) + numFlows := len(keys) for i := 0; i < numFlows; i++ { currentIdx := (startIndex + i) % numFlows - currentFlowID := flowIDs[currentIdx] - queue := band.Queue(currentFlowID) + currentKey := keys[currentIdx] + queue := band.Queue(currentKey.ID) if queue != nil && queue.Len() > 0 { - r.lastSelected = currentFlowID + r.lastSelected = ¤tKey return queue } } // No non-empty queue was found. - r.lastSelected = "" + r.lastSelected = nil return nil } diff --git a/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/roundrobin/roundrobin_test.go b/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/roundrobin/roundrobin_test.go index f2255f009..dd4a02602 100644 --- a/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/roundrobin/roundrobin_test.go +++ b/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/roundrobin/roundrobin_test.go @@ -30,6 +30,12 @@ import ( "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types" ) +var ( + flow1Key = types.FlowKey{ID: "flow1", Priority: 0} + flow2Key = types.FlowKey{ID: "flow2", Priority: 0} + flow3Key = types.FlowKey{ID: "flow3", Priority: 0} +) + func TestRoundRobin_Name(t *testing.T) { t.Parallel() policy := newRoundRobin() @@ -41,12 +47,12 @@ func TestRoundRobin_SelectQueue_Logic(t *testing.T) { policy := newRoundRobin() // Setup: Three non-empty queues - queue1 := &frameworkmocks.MockFlowQueueAccessor{LenV: 1, FlowSpecV: types.FlowSpecification{ID: "flow1"}} - queue2 := &frameworkmocks.MockFlowQueueAccessor{LenV: 2, FlowSpecV: types.FlowSpecification{ID: "flow2"}} - queue3 := &frameworkmocks.MockFlowQueueAccessor{LenV: 3, FlowSpecV: types.FlowSpecification{ID: "flow3"}} + queue1 := &frameworkmocks.MockFlowQueueAccessor{LenV: 1, FlowKeyV: flow1Key} + queue2 := &frameworkmocks.MockFlowQueueAccessor{LenV: 2, FlowKeyV: flow2Key} + queue3 := &frameworkmocks.MockFlowQueueAccessor{LenV: 3, FlowKeyV: flow3Key} mockBand := &frameworkmocks.MockPriorityBandAccessor{ - FlowIDsFunc: func() []string { return []string{"flow3", "flow1", "flow2"} }, // Unsorted to test sorting + FlowKeysFunc: func() []types.FlowKey { return []types.FlowKey{flow3Key, flow1Key, flow2Key} }, // Unsorted to test sorting QueueFunc: func(id string) framework.FlowQueueAccessor { switch id { case "flow1": @@ -60,7 +66,7 @@ func TestRoundRobin_SelectQueue_Logic(t *testing.T) { }, } - // Expected order is based on sorted FlowIDs: flow1, flow2, flow3 + // Expected order is based on sorted FlowKeys: flow1, flow2, flow3 expectedOrder := []string{"flow1", "flow2", "flow3"} // First cycle @@ -68,7 +74,7 @@ func TestRoundRobin_SelectQueue_Logic(t *testing.T) { selected, err := policy.SelectQueue(mockBand) require.NoError(t, err, "SelectQueue should not error on a valid band") require.NotNil(t, selected, "SelectQueue should have selected a queue") - assert.Equal(t, expectedOrder[i], selected.FlowSpec().ID, + assert.Equal(t, expectedOrder[i], selected.FlowKey().ID, "Cycle 1, selection %d should be %s", i+1, expectedOrder[i]) } @@ -77,7 +83,7 @@ func TestRoundRobin_SelectQueue_Logic(t *testing.T) { selected, err := policy.SelectQueue(mockBand) require.NoError(t, err, "SelectQueue should not error on a valid band") require.NotNil(t, selected, "SelectQueue should have selected a queue") - assert.Equal(t, expectedOrder[i], selected.FlowSpec().ID, + assert.Equal(t, expectedOrder[i], selected.FlowKey().ID, "Cycle 2, selection %d should be %s", i+1, expectedOrder[i]) } } @@ -87,12 +93,13 @@ func TestRoundRobin_SelectQueue_SkipsEmptyQueues(t *testing.T) { policy := newRoundRobin() // Setup: Two non-empty queues and one empty queue - queue1 := &frameworkmocks.MockFlowQueueAccessor{LenV: 1, FlowSpecV: types.FlowSpecification{ID: "flow1"}} - queueEmpty := &frameworkmocks.MockFlowQueueAccessor{LenV: 0, FlowSpecV: types.FlowSpecification{ID: "flowEmpty"}} - queue3 := &frameworkmocks.MockFlowQueueAccessor{LenV: 3, FlowSpecV: types.FlowSpecification{ID: "flow3"}} + flowEmptyKey := types.FlowKey{ID: "flowEmpty", Priority: 0} + queue1 := &frameworkmocks.MockFlowQueueAccessor{LenV: 1, FlowKeyV: flow1Key} + queueEmpty := &frameworkmocks.MockFlowQueueAccessor{LenV: 0, FlowKeyV: flowEmptyKey} + queue3 := &frameworkmocks.MockFlowQueueAccessor{LenV: 3, FlowKeyV: flow3Key} mockBand := &frameworkmocks.MockPriorityBandAccessor{ - FlowIDsFunc: func() []string { return []string{"flow1", "flowEmpty", "flow3"} }, + FlowKeysFunc: func() []types.FlowKey { return []types.FlowKey{flow1Key, flowEmptyKey, flow3Key} }, QueueFunc: func(id string) framework.FlowQueueAccessor { switch id { case "flow1": @@ -110,17 +117,17 @@ func TestRoundRobin_SelectQueue_SkipsEmptyQueues(t *testing.T) { selected, err := policy.SelectQueue(mockBand) require.NoError(t, err, "SelectQueue should not error when skipping queues") require.NotNil(t, selected, "SelectQueue should select the first non-empty queue") - assert.Equal(t, "flow1", selected.FlowSpec().ID, "First selection should be flow1") + assert.Equal(t, "flow1", selected.FlowKey().ID, "First selection should be flow1") selected, err = policy.SelectQueue(mockBand) require.NoError(t, err, "SelectQueue should not error when skipping queues") require.NotNil(t, selected, "SelectQueue should select the second non-empty queue") - assert.Equal(t, "flow3", selected.FlowSpec().ID, "Second selection should be flow3, skipping flowEmpty") + assert.Equal(t, "flow3", selected.FlowKey().ID, "Second selection should be flow3, skipping flowEmpty") selected, err = policy.SelectQueue(mockBand) require.NoError(t, err, "SelectQueue should not error when wrapping around") require.NotNil(t, selected, "SelectQueue should wrap around and select a queue") - assert.Equal(t, "flow1", selected.FlowSpec().ID, "Should wrap around and select flow1 again") + assert.Equal(t, "flow1", selected.FlowKey().ID, "Should wrap around and select flow1 again") } func TestRoundRobin_SelectQueue_HandlesDynamicFlows(t *testing.T) { @@ -128,10 +135,10 @@ func TestRoundRobin_SelectQueue_HandlesDynamicFlows(t *testing.T) { policy := newRoundRobin() // Initial setup - queue1 := &frameworkmocks.MockFlowQueueAccessor{LenV: 1, FlowSpecV: types.FlowSpecification{ID: "flow1"}} - queue2 := &frameworkmocks.MockFlowQueueAccessor{LenV: 1, FlowSpecV: types.FlowSpecification{ID: "flow2"}} + queue1 := &frameworkmocks.MockFlowQueueAccessor{LenV: 1, FlowKeyV: flow1Key} + queue2 := &frameworkmocks.MockFlowQueueAccessor{LenV: 1, FlowKeyV: flow2Key} mockBand := &frameworkmocks.MockPriorityBandAccessor{ - FlowIDsFunc: func() []string { return []string{"flow1", "flow2"} }, + FlowKeysFunc: func() []types.FlowKey { return []types.FlowKey{flow1Key, flow2Key} }, QueueFunc: func(id string) framework.FlowQueueAccessor { if id == "flow1" { return queue1 @@ -144,11 +151,11 @@ func TestRoundRobin_SelectQueue_HandlesDynamicFlows(t *testing.T) { selected, err := policy.SelectQueue(mockBand) require.NoError(t, err, "SelectQueue should not error on initial selection") require.NotNil(t, selected, "SelectQueue should select a queue initially") - assert.Equal(t, "flow1", selected.FlowSpec().ID, "First selection should be flow1") + assert.Equal(t, "flow1", selected.FlowKey().ID, "First selection should be flow1") // --- Simulate adding a flow --- - queue3 := &frameworkmocks.MockFlowQueueAccessor{LenV: 1, FlowSpecV: types.FlowSpecification{ID: "flow3"}} - mockBand.FlowIDsFunc = func() []string { return []string{"flow1", "flow2", "flow3"} } + queue3 := &frameworkmocks.MockFlowQueueAccessor{LenV: 1, FlowKeyV: flow3Key} + mockBand.FlowKeysFunc = func() []types.FlowKey { return []types.FlowKey{flow1Key, flow2Key, flow3Key} } mockBand.QueueFunc = func(id string) framework.FlowQueueAccessor { switch id { case "flow1": @@ -165,22 +172,22 @@ func TestRoundRobin_SelectQueue_HandlesDynamicFlows(t *testing.T) { selected, err = policy.SelectQueue(mockBand) require.NoError(t, err, "SelectQueue should not error after adding a flow") require.NotNil(t, selected, "SelectQueue should select a queue after adding a flow") - assert.Equal(t, "flow2", selected.FlowSpec().ID, "Next selection should be flow2") + assert.Equal(t, "flow2", selected.FlowKey().ID, "Next selection should be flow2") // Next selection should be flow3 selected, err = policy.SelectQueue(mockBand) require.NoError(t, err, "SelectQueue should not error on the third selection") require.NotNil(t, selected, "SelectQueue should select the new flow") - assert.Equal(t, "flow3", selected.FlowSpec().ID, "Next selection should be the new flow3") + assert.Equal(t, "flow3", selected.FlowKey().ID, "Next selection should be the new flow3") // --- Simulate removing a flow --- - mockBand.FlowIDsFunc = func() []string { return []string{"flow1", "flow3"} } // flow2 is removed + mockBand.FlowKeysFunc = func() []types.FlowKey { return []types.FlowKey{flow1Key, flow3Key} } // flow2 is removed // Next selection should wrap around and pick flow1 selected, err = policy.SelectQueue(mockBand) require.NoError(t, err, "SelectQueue should not error after removing a flow") require.NotNil(t, selected, "SelectQueue should select a queue after removing a flow") - assert.Equal(t, "flow1", selected.FlowSpec().ID, "Next selection should wrap around to flow1 after a removal") + assert.Equal(t, "flow1", selected.FlowKey().ID, "Next selection should wrap around to flow1 after a removal") } func TestRoundRobin_SelectQueue_Concurrency(t *testing.T) { @@ -193,17 +200,17 @@ func TestRoundRobin_SelectQueue_Concurrency(t *testing.T) { // Setup: Three non-empty queues queues := []*frameworkmocks.MockFlowQueueAccessor{ - {LenV: 1, FlowSpecV: types.FlowSpecification{ID: "flow1"}}, - {LenV: 2, FlowSpecV: types.FlowSpecification{ID: "flow2"}}, - {LenV: 3, FlowSpecV: types.FlowSpecification{ID: "flow3"}}, + {LenV: 1, FlowKeyV: flow1Key}, + {LenV: 2, FlowKeyV: flow2Key}, + {LenV: 3, FlowKeyV: flow3Key}, } numQueues := int64(len(queues)) mockBand := &frameworkmocks.MockPriorityBandAccessor{ - FlowIDsFunc: func() []string { return []string{"flow1", "flow2", "flow3"} }, + FlowKeysFunc: func() []types.FlowKey { return []types.FlowKey{flow1Key, flow2Key, flow3Key} }, QueueFunc: func(id string) framework.FlowQueueAccessor { for _, q := range queues { - if q.FlowSpec().ID == id { + if q.FlowKey().ID == id { return q } } @@ -225,7 +232,7 @@ func TestRoundRobin_SelectQueue_Concurrency(t *testing.T) { for range selectionsPerGoroutine { selected, err := policy.SelectQueue(mockBand) if err == nil && selected != nil { - val, _ := selectionCounts.LoadOrStore(selected.FlowSpec().ID, new(atomic.Int64)) + val, _ := selectionCounts.LoadOrStore(selected.FlowKey().ID, new(atomic.Int64)) val.(*atomic.Int64).Add(1) } } diff --git a/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/fcfs/fcfs_test.go b/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/fcfs/fcfs_test.go index 8a13c6c34..45c144238 100644 --- a/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/fcfs/fcfs_test.go +++ b/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/fcfs/fcfs_test.go @@ -29,6 +29,8 @@ import ( typesmocks "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types/mocks" ) +var testFlowKey = types.FlowKey{ID: "test-flow", Priority: 0} + func TestFCFS_Name(t *testing.T) { t.Parallel() policy := newFCFS() @@ -49,7 +51,7 @@ func TestFCFS_SelectItem(t *testing.T) { // This unit test focuses on the policy-specific success path. policy := newFCFS() - mockItem := typesmocks.NewMockQueueItemAccessor(1, "item1", "flow1") + mockItem := typesmocks.NewMockQueueItemAccessor(1, "item1", testFlowKey) mockQueue := &frameworkmocks.MockFlowQueueAccessor{ PeekHeadV: mockItem, LenV: 1, @@ -67,13 +69,13 @@ func TestEnqueueTimeComparator_Func(t *testing.T) { require.NotNil(t, compareFunc) now := time.Now() - itemA := typesmocks.NewMockQueueItemAccessor(10, "itemA", "test-flow") + itemA := typesmocks.NewMockQueueItemAccessor(10, "itemA", testFlowKey) itemA.EnqueueTimeV = now - itemB := typesmocks.NewMockQueueItemAccessor(20, "itemB", "test-flow") + itemB := typesmocks.NewMockQueueItemAccessor(20, "itemB", testFlowKey) itemB.EnqueueTimeV = now.Add(time.Second) // B is later than A - itemC := typesmocks.NewMockQueueItemAccessor(30, "itemC", "test-flow") + itemC := typesmocks.NewMockQueueItemAccessor(30, "itemC", testFlowKey) itemC.EnqueueTimeV = now // C is same time as A testCases := []struct { diff --git a/pkg/epp/flowcontrol/framework/plugins/queue/benchmark_test.go b/pkg/epp/flowcontrol/framework/plugins/queue/benchmark_test.go index 916cacdd9..7d2027098 100644 --- a/pkg/epp/flowcontrol/framework/plugins/queue/benchmark_test.go +++ b/pkg/epp/flowcontrol/framework/plugins/queue/benchmark_test.go @@ -27,6 +27,8 @@ import ( "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types/mocks" ) +var benchmarkFlowKey = types.FlowKey{ID: "benchmark-flow"} + // BenchmarkQueues runs a series of benchmarks against all registered queue implementations. func BenchmarkQueues(b *testing.B) { for queueName, constructor := range queue.RegisteredQueues { @@ -68,7 +70,7 @@ func benchmarkAddRemove(b *testing.B, q framework.SafeQueue) { b.RunParallel(func(pb *testing.PB) { for pb.Next() { - item := mocks.NewMockQueueItemAccessor(1, "item", "benchmark-flow") + item := mocks.NewMockQueueItemAccessor(1, "item", benchmarkFlowKey) err := q.Add(item) if err != nil { b.Fatalf("Add failed: %v", err) @@ -85,7 +87,7 @@ func benchmarkAddRemove(b *testing.B, q framework.SafeQueue) { // common consumer pattern where a single worker peeks at an item before deciding to process and remove it. func benchmarkAddPeekRemove(b *testing.B, q framework.SafeQueue) { // Pre-add one item so PeekHead doesn't fail on the first iteration. - initialItem := mocks.NewMockQueueItemAccessor(1, "initial", "benchmark-flow") + initialItem := mocks.NewMockQueueItemAccessor(1, "initial", benchmarkFlowKey) if err := q.Add(initialItem); err != nil { b.Fatalf("Failed to add initial item: %v", err) } @@ -93,7 +95,7 @@ func benchmarkAddPeekRemove(b *testing.B, q framework.SafeQueue) { b.ReportAllocs() for i := 0; i < b.N; i++ { - item := mocks.NewMockQueueItemAccessor(1, "item", "benchmark-flow") + item := mocks.NewMockQueueItemAccessor(1, "item", benchmarkFlowKey) err := q.Add(item) if err != nil { b.Fatalf("Add failed: %v", err) @@ -122,7 +124,7 @@ func benchmarkBulkAddThenBulkRemove(b *testing.B, q framework.SafeQueue) { // Add a batch of items items := make([]types.QueueItemAccessor, 100) for j := range items { - item := mocks.NewMockQueueItemAccessor(1, fmt.Sprintf("bulk-%d-%d", i, j), "benchmark-flow") + item := mocks.NewMockQueueItemAccessor(1, fmt.Sprintf("bulk-%d-%d", i, j), benchmarkFlowKey) items[j] = item if err := q.Add(item); err != nil { b.Fatalf("Add failed: %v", err) @@ -146,7 +148,7 @@ func benchmarkBulkAddThenBulkRemove(b *testing.B, q framework.SafeQueue) { // understanding the performance of accessing the lowest-priority item. func benchmarkAddPeekTailRemove(b *testing.B, q framework.SafeQueue) { // Pre-add one item so PeekTail doesn't fail on the first iteration. - initialItem := mocks.NewMockQueueItemAccessor(1, "initial", "benchmark-flow") + initialItem := mocks.NewMockQueueItemAccessor(1, "initial", benchmarkFlowKey) if err := q.Add(initialItem); err != nil { b.Fatalf("Failed to add initial item: %v", err) } @@ -154,7 +156,7 @@ func benchmarkAddPeekTailRemove(b *testing.B, q framework.SafeQueue) { b.ReportAllocs() for i := 0; i < b.N; i++ { - item := mocks.NewMockQueueItemAccessor(1, "item", "benchmark-flow") + item := mocks.NewMockQueueItemAccessor(1, "item", benchmarkFlowKey) err := q.Add(item) if err != nil { b.Fatalf("Add failed: %v", err) @@ -177,7 +179,7 @@ func benchmarkAddPeekTailRemove(b *testing.B, q framework.SafeQueue) { func benchmarkHighContention(b *testing.B, q framework.SafeQueue) { // Pre-fill the queue to ensure consumers have work to do immediately. for i := range 1000 { - item := mocks.NewMockQueueItemAccessor(1, fmt.Sprintf("prefill-%d", i), "benchmark-flow") + item := mocks.NewMockQueueItemAccessor(1, fmt.Sprintf("prefill-%d", i), benchmarkFlowKey) if err := q.Add(item); err != nil { b.Fatalf("Failed to pre-fill queue: %v", err) } @@ -196,7 +198,7 @@ func benchmarkHighContention(b *testing.B, q framework.SafeQueue) { case <-stopCh: return default: - item := mocks.NewMockQueueItemAccessor(1, "item", "benchmark-flow") + item := mocks.NewMockQueueItemAccessor(1, "item", benchmarkFlowKey) _ = q.Add(item) } } diff --git a/pkg/epp/flowcontrol/framework/plugins/queue/functional_test.go b/pkg/epp/flowcontrol/framework/plugins/queue/functional_test.go index fe0105313..d6ff8a266 100644 --- a/pkg/epp/flowcontrol/framework/plugins/queue/functional_test.go +++ b/pkg/epp/flowcontrol/framework/plugins/queue/functional_test.go @@ -184,7 +184,7 @@ func TestQueueConformance(t *testing.T) { for queueName, constructor := range queue.RegisteredQueues { t.Run(string(queueName), func(t *testing.T) { t.Parallel() - flowSpec := &types.FlowSpecification{ID: "test-flow-1", Priority: 0} + flowKey := types.FlowKey{ID: "test-flow-1", Priority: 0} t.Run("Initialization", func(t *testing.T) { t.Parallel() @@ -206,11 +206,11 @@ func TestQueueConformance(t *testing.T) { now := time.Now() - item1 := typesmocks.NewMockQueueItemAccessor(100, "item1_fifo", flowSpec.ID) + item1 := typesmocks.NewMockQueueItemAccessor(100, "item1_fifo", flowKey) item1.EnqueueTimeV = now.Add(-2 * time.Second) // Earliest - item2 := typesmocks.NewMockQueueItemAccessor(50, "item2_fifo", flowSpec.ID) + item2 := typesmocks.NewMockQueueItemAccessor(50, "item2_fifo", flowKey) item2.EnqueueTimeV = now.Add(-1 * time.Second) // Middle - item3 := typesmocks.NewMockQueueItemAccessor(20, "item3_fifo", flowSpec.ID) + item3 := typesmocks.NewMockQueueItemAccessor(20, "item3_fifo", flowKey) item3.EnqueueTimeV = now // Latest itemsInFIFOOrder := []*typesmocks.MockQueueItemAccessor{item1, item2, item3} @@ -224,9 +224,9 @@ func TestQueueConformance(t *testing.T) { q, err := constructor(byteSizeComparator) require.NoError(t, err, "Setup: creating queue with byteSizeComparator should not fail") - itemLarge := typesmocks.NewMockQueueItemAccessor(100, "itemLarge_prio", flowSpec.ID) - itemSmall := typesmocks.NewMockQueueItemAccessor(20, "itemSmall_prio", flowSpec.ID) - itemMedium := typesmocks.NewMockQueueItemAccessor(50, "itemMedium_prio", flowSpec.ID) + itemLarge := typesmocks.NewMockQueueItemAccessor(100, "itemLarge_prio", flowKey) + itemSmall := typesmocks.NewMockQueueItemAccessor(20, "itemSmall_prio", flowKey) + itemMedium := typesmocks.NewMockQueueItemAccessor(50, "itemMedium_prio", flowKey) itemsInByteSizeOrder := []*typesmocks.MockQueueItemAccessor{itemSmall, itemMedium, itemLarge} testLifecycleAndOrdering(t, q, itemsInByteSizeOrder, "PriorityByteSize") @@ -238,11 +238,11 @@ func TestQueueConformance(t *testing.T) { require.NoError(t, err, "Setup: creating queue with reverseEnqueueTimeComparator should not fail") now := time.Now() - item1 := typesmocks.NewMockQueueItemAccessor(100, "item1_lifo", flowSpec.ID) + item1 := typesmocks.NewMockQueueItemAccessor(100, "item1_lifo", flowKey) item1.EnqueueTimeV = now.Add(-2 * time.Second) // Earliest - item2 := typesmocks.NewMockQueueItemAccessor(50, "item2_lifo", flowSpec.ID) + item2 := typesmocks.NewMockQueueItemAccessor(50, "item2_lifo", flowKey) item2.EnqueueTimeV = now.Add(-1 * time.Second) // Middle - item3 := typesmocks.NewMockQueueItemAccessor(20, "item3_lifo", flowSpec.ID) + item3 := typesmocks.NewMockQueueItemAccessor(20, "item3_lifo", flowKey) item3.EnqueueTimeV = now // Latest itemsInLIFOOrder := []*typesmocks.MockQueueItemAccessor{item3, item2, item1} @@ -268,13 +268,13 @@ func TestQueueConformance(t *testing.T) { q, err := constructor(enqueueTimeComparator) require.NoError(t, err, "Setup: creating queue for test should not fail") - item := typesmocks.NewMockQueueItemAccessor(100, "item", flowSpec.ID) + item := typesmocks.NewMockQueueItemAccessor(100, "item", flowKey) err = q.Add(item) require.NoError(t, err, "Setup: adding an item should succeed") otherQ, err := constructor(enqueueTimeComparator) // A different queue instance require.NoError(t, err, "Setup: creating otherQ should succeed") - otherItem := typesmocks.NewMockQueueItemAccessor(10, "other_item", "other_flow") + otherItem := typesmocks.NewMockQueueItemAccessor(10, "other_item", types.FlowKey{ID: "other-flow"}) err = otherQ.Add(otherItem) require.NoError(t, err, "Setup: adding item to otherQ should succeed") alienHandle := otherItem.Handle() @@ -318,11 +318,11 @@ func TestQueueConformance(t *testing.T) { require.NoError(t, err, "Setup: creating queue for test should not fail") now := time.Now() - item1 := typesmocks.NewMockQueueItemAccessor(10, "item1_nonhead", flowSpec.ID) + item1 := typesmocks.NewMockQueueItemAccessor(10, "item1_nonhead", flowKey) item1.EnqueueTimeV = now.Add(-3 * time.Second) - item2 := typesmocks.NewMockQueueItemAccessor(20, "item2_nonhead_TARGET", flowSpec.ID) + item2 := typesmocks.NewMockQueueItemAccessor(20, "item2_nonhead_TARGET", flowKey) item2.EnqueueTimeV = now.Add(-2 * time.Second) - item3 := typesmocks.NewMockQueueItemAccessor(30, "item3_nonhead", flowSpec.ID) + item3 := typesmocks.NewMockQueueItemAccessor(30, "item3_nonhead", flowKey) item3.EnqueueTimeV = now.Add(-1 * time.Second) _ = q.Add(item1) @@ -364,8 +364,8 @@ func TestQueueConformance(t *testing.T) { t.Run("Cleanup_PredicateMatchesNone", func(t *testing.T) { t.Parallel() q, _ := constructor(enqueueTimeComparator) - itemK1 := typesmocks.NewMockQueueItemAccessor(10, "k1_matchNone", flowSpec.ID) - itemK2 := typesmocks.NewMockQueueItemAccessor(12, "k2_matchNone", flowSpec.ID) + itemK1 := typesmocks.NewMockQueueItemAccessor(10, "k1_matchNone", flowKey) + itemK2 := typesmocks.NewMockQueueItemAccessor(12, "k2_matchNone", flowKey) _ = q.Add(itemK1) _ = q.Add(itemK2) initialLen := q.Len() @@ -384,8 +384,8 @@ func TestQueueConformance(t *testing.T) { t.Run("Cleanup_PredicateMatchesAll", func(t *testing.T) { t.Parallel() q, _ := constructor(enqueueTimeComparator) - itemR1 := typesmocks.NewMockQueueItemAccessor(11, "r1_matchAll", flowSpec.ID) - itemR2 := typesmocks.NewMockQueueItemAccessor(13, "r2_matchAll", flowSpec.ID) + itemR1 := typesmocks.NewMockQueueItemAccessor(11, "r1_matchAll", flowKey) + itemR2 := typesmocks.NewMockQueueItemAccessor(13, "r2_matchAll", flowKey) _ = q.Add(itemR1) _ = q.Add(itemR2) @@ -401,10 +401,10 @@ func TestQueueConformance(t *testing.T) { t.Run("Cleanup_PredicateMatchesSubset_VerifyHandles", func(t *testing.T) { t.Parallel() q, _ := constructor(enqueueTimeComparator) - iK1 := typesmocks.NewMockQueueItemAccessor(20, "k1_subset", flowSpec.ID) - iR1 := typesmocks.NewMockQueueItemAccessor(11, "r1_subset", flowSpec.ID) - iK2 := typesmocks.NewMockQueueItemAccessor(22, "k2_subset", flowSpec.ID) - iR2 := typesmocks.NewMockQueueItemAccessor(33, "r2_subset", flowSpec.ID) + iK1 := typesmocks.NewMockQueueItemAccessor(20, "k1_subset", flowKey) + iR1 := typesmocks.NewMockQueueItemAccessor(11, "r1_subset", flowKey) + iK2 := typesmocks.NewMockQueueItemAccessor(22, "k2_subset", flowKey) + iR2 := typesmocks.NewMockQueueItemAccessor(33, "r2_subset", flowKey) _ = q.Add(iK1) _ = q.Add(iR1) _ = q.Add(iK2) @@ -453,8 +453,8 @@ func TestQueueConformance(t *testing.T) { q, err := constructor(enqueueTimeComparator) require.NoError(t, err, "Setup: creating queue for drain test should not fail") - itemD1 := typesmocks.NewMockQueueItemAccessor(10, "ditem1", flowSpec.ID) - itemD2 := typesmocks.NewMockQueueItemAccessor(20, "ditem2", flowSpec.ID) + itemD1 := typesmocks.NewMockQueueItemAccessor(10, "ditem1", flowKey) + itemD2 := typesmocks.NewMockQueueItemAccessor(20, "ditem2", flowKey) _ = q.Add(itemD1) _ = q.Add(itemD2) @@ -512,7 +512,7 @@ func TestQueueConformance(t *testing.T) { // Pre-populate the queue with an initial set of items. for i := 0; i < initialItems; i++ { - item := typesmocks.NewMockQueueItemAccessor(1, fmt.Sprintf("%s_conc_init_%d", flowSpec.ID, i), flowSpec.ID) + item := typesmocks.NewMockQueueItemAccessor(1, fmt.Sprintf("%s_conc_init_%d", flowKey, i), flowKey) err := q.Add(item) require.NoError(t, err, "Setup: pre-populating the queue should not fail") handleChan <- item.Handle() @@ -531,7 +531,7 @@ func TestQueueConformance(t *testing.T) { switch opType { case 0: // Add item := typesmocks.NewMockQueueItemAccessor(1, - fmt.Sprintf("%s_conc_init_%d_%d", flowSpec.ID, routineID, j), flowSpec.ID) + fmt.Sprintf("%s_conc_init_%d_%d", flowKey, routineID, j), flowKey) err := q.Add(item) if assert.NoError(t, err, "Add must be goroutine-safe") { successfulAdds.Add(1) diff --git a/pkg/epp/flowcontrol/framework/plugins/queue/maxminheap/maxminheap_test.go b/pkg/epp/flowcontrol/framework/plugins/queue/maxminheap/maxminheap_test.go index 39e82fb5d..22a6290cb 100644 --- a/pkg/epp/flowcontrol/framework/plugins/queue/maxminheap/maxminheap_test.go +++ b/pkg/epp/flowcontrol/framework/plugins/queue/maxminheap/maxminheap_test.go @@ -46,7 +46,7 @@ func TestMaxMinHeap_InternalProperty(t *testing.T) { now := time.Now() for i := range items { // Add items in a somewhat random order of enqueue times - items[i] = typesmocks.NewMockQueueItemAccessor(10, "item", "flow") + items[i] = typesmocks.NewMockQueueItemAccessor(10, "item", types.FlowKey{ID: "flow"}) items[i].EnqueueTimeV = now.Add(time.Duration((i%5-2)*10) * time.Second) err := q.Add(items[i]) require.NoError(t, err, "Add should not fail") diff --git a/pkg/epp/flowcontrol/framework/policies.go b/pkg/epp/flowcontrol/framework/policies.go index da6d5c445..5fc3e646d 100644 --- a/pkg/epp/flowcontrol/framework/policies.go +++ b/pkg/epp/flowcontrol/framework/policies.go @@ -144,8 +144,8 @@ type InterFlowDispatchPolicy interface { // FlowQueueAccessor provides a policy-facing, read-only view of a single flow's queue. // It combines general queue inspection methods (embedded via `QueueInspectionMethods`) with flow-specific metadata. // -// Instances of `FlowQueueAccessor` are vended by a `contracts.ManagedQueue` and are the primary means by which policies -// inspect individual queue state. +// Instances of `FlowQueueAccessor` are typically obtained via a `PriorityBandAccessor` and are the primary means by +// which policies inspect individual queue state. // // Conformance: Implementations MUST ensure all methods (including those embedded from `QueueInspectionMethods`) are // goroutine-safe for concurrent access. @@ -156,12 +156,12 @@ type FlowQueueAccessor interface { // This is determined by the `IntraFlowDispatchPolicy` associated with this queue's flow. Comparator() ItemComparator - // FlowSpec returns the `types.FlowSpecification` of the flow this queue accessor is associated with. - // This provides essential context (like `FlowID`) to policies. - FlowSpec() types.FlowSpecification + // FlowKey returns the unique, immutable `types.FlowKey` of the flow instance this queue accessor is associated with. + // This provides essential context (like the logical grouping `ID` and `Priority`) to policies. + FlowKey() types.FlowKey } -// PriorityBandAccessor provides a read-only view into a specific priority band within the `ports.FlowRegistry`. +// PriorityBandAccessor provides a read-only view into a specific priority band within the `contracts.FlowRegistry`. // It allows the `controller.FlowController` and inter-flow policies to inspect the state of all flow queues within that // band. // @@ -173,15 +173,20 @@ type PriorityBandAccessor interface { // PriorityName returns the human-readable name of this priority band. PriorityName() string - // FlowIDs returns a slice of all flow IDs within this priority band. - // The order of items in the slice is not guaranteed unless specified by the implementations (e.g., for deterministic - // testing scenarios). - FlowIDs() []string + // FlowKeys returns a slice of the composite `types.FlowKey`s for every flow instance currently active within this + // priority band. + // This method provides the complete, canonical identifiers for all flows in the band. The caller can use the `ID` + // field from each key to look up a specific queue via the `Queue(id string)` method. + // The order of keys in the returned slice is not guaranteed unless specified by the implementations (e.g., for + // deterministic testing scenarios). + FlowKeys() []types.FlowKey - // Queue returns a `FlowQueueAccessor` for the specified `flowID` within this priority band. + // Queue returns a `FlowQueueAccessor` for the specified logical grouping `ID` within this priority band. + // Note: This uses the logical `ID` string (`types.FlowKey.ID`), not the full `types.FlowKey`, as the priority is + // already defined by the accessor's scope. // - // Conformance: Implementations MUST return nil if the `flowID` is not found in this band. - Queue(flowID string) FlowQueueAccessor + // Conformance: Implementations MUST return nil if the `ID` is not found in this band. + Queue(id string) FlowQueueAccessor // IterateQueues executes the given `callback` for each `FlowQueueAccessor` in this priority band. // Iteration stops if the `callback` returns false. The order of iteration is not guaranteed unless specified by the diff --git a/pkg/epp/flowcontrol/registry/doc.go b/pkg/epp/flowcontrol/registry/doc.go index 521ad6e7b..a038d3dd3 100644 --- a/pkg/epp/flowcontrol/registry/doc.go +++ b/pkg/epp/flowcontrol/registry/doc.go @@ -32,7 +32,7 @@ limitations under the License. // // - `managedQueue`: A concrete implementation of the `contracts.ManagedQueue` interface. It acts as a stateful // decorator around a `framework.SafeQueue`, adding critical registry-level functionality such as atomic statistics -// tracking and lifecycle state enforcement (active vs. draining). +// tracking. // // - `Config`: The top-level configuration object that defines the structure and default behaviors of the registry, // including the definition of priority bands and default policy selections. This configuration is partitioned and diff --git a/pkg/epp/flowcontrol/registry/managedqueue.go b/pkg/epp/flowcontrol/registry/managedqueue.go index bea7e7e4f..b52adb191 100644 --- a/pkg/epp/flowcontrol/registry/managedqueue.go +++ b/pkg/epp/flowcontrol/registry/managedqueue.go @@ -17,7 +17,6 @@ limitations under the License. package registry import ( - "fmt" "sync/atomic" "github.com/go-logr/logr" @@ -33,12 +32,8 @@ import ( type parentStatsReconciler func(lenDelta, byteSizeDelta int64) // managedQueue implements `contracts.ManagedQueue`. It is a stateful decorator that wraps a `framework.SafeQueue`, -// augmenting it with two critical, registry-level responsibilities: -// 1. Atomic Statistics: It maintains its own `len` and `byteSize` counters, which are updated atomically. This allows -// the parent `registryShard` to aggregate statistics across many queues without locks. -// 2. Lifecycle Enforcement: It tracks the queue's lifecycle state (active vs. draining) via an `isActive` flag. This -// is crucial for graceful flow updates, as it allows the registry to stop new requests from being enqueued while -// allowing existing items to be drained. +// augmenting it with atomic statistics tracking. This allows the parent `registryShard` to aggregate statistics across +// many queues without locks. // // # Statistical Integrity // @@ -60,10 +55,9 @@ type managedQueue struct { // atomic operations on the stats fields. queue framework.SafeQueue dispatchPolicy framework.IntraFlowDispatchPolicy - flowSpec types.FlowSpecification + flowKey types.FlowKey byteSize atomic.Uint64 len atomic.Uint64 - isActive atomic.Bool reconcileShardStats parentStatsReconciler logger logr.Logger } @@ -72,48 +66,35 @@ type managedQueue struct { func newManagedQueue( queue framework.SafeQueue, dispatchPolicy framework.IntraFlowDispatchPolicy, - flowSpec types.FlowSpecification, + flowKey types.FlowKey, logger logr.Logger, reconcileShardStats parentStatsReconciler, ) *managedQueue { mqLogger := logger.WithName("managed-queue").WithValues( - "flowID", flowSpec.ID, - "priority", flowSpec.Priority, + "flowKey", flowKey, + "flowID", flowKey.ID, + "priority", flowKey.Priority, "queueType", queue.Name(), ) mq := &managedQueue{ queue: queue, dispatchPolicy: dispatchPolicy, - flowSpec: flowSpec, + flowKey: flowKey, reconcileShardStats: reconcileShardStats, logger: mqLogger, } - mq.isActive.Store(true) return mq } -// markAsDraining is an internal method called by the parent shard to transition this queue to a draining state. -// Once a queue is marked as draining, it will no longer accept new items via `Add`. -func (mq *managedQueue) markAsDraining() { - // Use CompareAndSwap to ensure we only log the transition once. - if mq.isActive.CompareAndSwap(true, false) { - mq.logger.V(logging.DEFAULT).Info("Queue marked as draining") - } -} - // FlowQueueAccessor returns a new `flowQueueAccessor` instance, which provides a read-only, policy-facing view of the // queue. func (mq *managedQueue) FlowQueueAccessor() framework.FlowQueueAccessor { return &flowQueueAccessor{mq: mq} } -// Add first checks if the queue is active. If it is, it wraps the underlying `framework.SafeQueue.Add` call and -// atomically updates the queue's and the parent shard's statistics. +// Add wraps the underlying `framework.SafeQueue.Add` call and atomically updates the queue's and the parent shard's +// statistics. func (mq *managedQueue) Add(item types.QueueItemAccessor) error { - if !mq.isActive.Load() { - return fmt.Errorf("flow instance %q is not active and cannot accept new requests: %w", - mq.flowSpec.ID, contracts.ErrFlowInstanceNotFound) - } if err := mq.queue.Add(item); err != nil { return err } @@ -240,7 +221,7 @@ func (a *flowQueueAccessor) PeekTail() (types.QueueItemAccessor, error) { return // Comparator returns the `framework.ItemComparator` that defines this queue's item ordering logic. func (a *flowQueueAccessor) Comparator() framework.ItemComparator { return a.mq.Comparator() } -// FlowSpec returns the `types.FlowSpecification` of the flow this queue accessor is associated with. -func (a *flowQueueAccessor) FlowSpec() types.FlowSpecification { return a.mq.flowSpec } +// FlowKey returns the `types.FlowKey` of the flow this queue accessor is associated with. +func (a *flowQueueAccessor) FlowKey() types.FlowKey { return a.mq.flowKey } var _ framework.FlowQueueAccessor = &flowQueueAccessor{} diff --git a/pkg/epp/flowcontrol/registry/managedqueue_test.go b/pkg/epp/flowcontrol/registry/managedqueue_test.go index 6fc172feb..ff734573e 100644 --- a/pkg/epp/flowcontrol/registry/managedqueue_test.go +++ b/pkg/epp/flowcontrol/registry/managedqueue_test.go @@ -26,7 +26,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/contracts" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework" frameworkmocks "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/mocks" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/queue" @@ -35,6 +34,8 @@ import ( typesmocks "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types/mocks" ) +var testFlowKey = types.FlowKey{ID: "test-flow", Priority: 1} + // testStatsReconciler is a mock implementation of the `parentStatsReconciler` function. // It captures the deltas it's called with, allowing tests to assert on them. type testStatsReconciler struct { @@ -64,7 +65,7 @@ type testFixture struct { mockQueue *frameworkmocks.MockSafeQueue mockPolicy *frameworkmocks.MockIntraFlowDispatchPolicy reconciler *testStatsReconciler - flowSpec types.FlowSpecification + flowKey types.FlowKey mockComparator *frameworkmocks.MockItemComparator } @@ -74,7 +75,7 @@ func setupTestManagedQueue(t *testing.T) *testFixture { mockQueue := &frameworkmocks.MockSafeQueue{} reconciler := &testStatsReconciler{} - flowSpec := types.FlowSpecification{ID: "test-flow", Priority: 1} + flowKey := types.FlowKey{ID: "test-flow", Priority: 1} mockComparator := &frameworkmocks.MockItemComparator{} mockPolicy := &frameworkmocks.MockIntraFlowDispatchPolicy{ ComparatorV: mockComparator, @@ -83,7 +84,7 @@ func setupTestManagedQueue(t *testing.T) *testFixture { mq := newManagedQueue( mockQueue, mockPolicy, - flowSpec, + flowKey, logr.Discard(), reconciler.reconcile, ) @@ -94,7 +95,7 @@ func setupTestManagedQueue(t *testing.T) *testFixture { mockQueue: mockQueue, mockPolicy: mockPolicy, reconciler: reconciler, - flowSpec: flowSpec, + flowKey: flowKey, mockComparator: mockComparator, } } @@ -105,7 +106,6 @@ func TestManagedQueue_New(t *testing.T) { assert.Zero(t, f.mq.Len(), "A new managedQueue should have a length of 0") assert.Zero(t, f.mq.ByteSize(), "A new managedQueue should have a byte size of 0") - assert.True(t, f.mq.isActive.Load(), "A new managedQueue should be active") } func TestManagedQueue_Add(t *testing.T) { @@ -115,9 +115,7 @@ func TestManagedQueue_Add(t *testing.T) { name string itemByteSize uint64 mockAddError error - markAsDraining bool expectError bool - expectedErrorIs error expectedLen int expectedByteSize uint64 expectedLenDelta int64 @@ -145,18 +143,6 @@ func TestManagedQueue_Add(t *testing.T) { expectedByteSizeDelta: 0, expectedReconcile: false, }, - { - name: "Error on inactive queue", - itemByteSize: 100, - markAsDraining: true, - expectError: true, - expectedErrorIs: contracts.ErrFlowInstanceNotFound, - expectedLen: 0, - expectedByteSize: 0, - expectedLenDelta: 0, - expectedByteSizeDelta: 0, - expectedReconcile: false, - }, } for _, tc := range testCases { @@ -169,19 +155,11 @@ func TestManagedQueue_Add(t *testing.T) { return tc.mockAddError } - if tc.markAsDraining { - f.mq.markAsDraining() - assert.False(t, f.mq.isActive.Load(), "Setup: queue should be marked as inactive") - } - - item := typesmocks.NewMockQueueItemAccessor(tc.itemByteSize, "req-1", "test-flow") + item := typesmocks.NewMockQueueItemAccessor(tc.itemByteSize, "req-1", testFlowKey) err := f.mq.Add(item) if tc.expectError { require.Error(t, err, "Add should have returned an error") - if tc.expectedErrorIs != nil { - assert.ErrorIs(t, err, tc.expectedErrorIs, "Error should wrap the expected sentinel error") - } } else { require.NoError(t, err, "Add should not have returned an error") } @@ -208,7 +186,7 @@ func TestManagedQueue_Remove(t *testing.T) { f := setupTestManagedQueue(t) // Setup initial state - initialItem := typesmocks.NewMockQueueItemAccessor(100, "req-1", "test-flow") + initialItem := typesmocks.NewMockQueueItemAccessor(100, "req-1", testFlowKey) f.mockQueue.AddFunc = func(item types.QueueItemAccessor) error { return nil } err := f.mq.Add(initialItem) require.NoError(t, err, "Setup: Adding an item should not fail") @@ -263,9 +241,9 @@ func TestManagedQueue_Remove(t *testing.T) { func TestManagedQueue_CleanupAndDrain(t *testing.T) { t.Parallel() - item1 := typesmocks.NewMockQueueItemAccessor(10, "req-1", "test-flow") - item2 := typesmocks.NewMockQueueItemAccessor(20, "req-2", "test-flow") - item3 := typesmocks.NewMockQueueItemAccessor(30, "req-3", "test-flow") + item1 := typesmocks.NewMockQueueItemAccessor(10, "req-1", testFlowKey) + item2 := typesmocks.NewMockQueueItemAccessor(20, "req-2", testFlowKey) + item3 := typesmocks.NewMockQueueItemAccessor(30, "req-3", testFlowKey) // --- Test Cleanup --- t.Run("Cleanup", func(t *testing.T) { @@ -359,7 +337,7 @@ func TestManagedQueue_CleanupAndDrain(t *testing.T) { func TestManagedQueue_FlowQueueAccessor(t *testing.T) { t.Parallel() f := setupTestManagedQueue(t) - item := typesmocks.NewMockQueueItemAccessor(100, "req-1", "test-flow") + item := typesmocks.NewMockQueueItemAccessor(100, "req-1", testFlowKey) // Setup underlying queue state f.mockQueue.PeekHeadV = item @@ -379,7 +357,7 @@ func TestManagedQueue_FlowQueueAccessor(t *testing.T) { assert.Equal(t, f.mq.Capabilities(), accessor.Capabilities(), "Accessor Capabilities() should match managed queue") assert.Equal(t, f.mq.Len(), accessor.Len(), "Accessor Len() should match managed queue") assert.Equal(t, f.mq.ByteSize(), accessor.ByteSize(), "Accessor ByteSize() should match managed queue") - assert.Equal(t, f.flowSpec, accessor.FlowSpec(), "Accessor FlowSpec() should match managed queue") + assert.Equal(t, f.flowKey, accessor.FlowKey(), "Accessor FlowKey() should match managed queue") assert.Equal(t, f.mockComparator, accessor.Comparator(), "Accessor Comparator() should match the one from the policy") assert.Equal(t, f.mockComparator, f.mq.Comparator(), "ManagedQueue Comparator() should match the one from the policy") @@ -400,8 +378,8 @@ func TestManagedQueue_Concurrency(t *testing.T) { require.NoError(t, err, "Setup: creating a real listqueue should not fail") reconciler := &testStatsReconciler{} - flowSpec := types.FlowSpecification{ID: "conc-test-flow", Priority: 1} - mq := newManagedQueue(lq, nil, flowSpec, logr.Discard(), reconciler.reconcile) + flowKey := types.FlowKey{ID: "conc-test-flow", Priority: 1} + mq := newManagedQueue(lq, nil, flowKey, logr.Discard(), reconciler.reconcile) const ( numGoroutines = 20 @@ -418,7 +396,7 @@ func TestManagedQueue_Concurrency(t *testing.T) { // Pre-fill the queue to give removers something to do immediately. for range initialItems { - item := typesmocks.NewMockQueueItemAccessor(uint64(itemByteSize), "initial", "flow") + item := typesmocks.NewMockQueueItemAccessor(uint64(itemByteSize), "initial", testFlowKey) require.NoError(t, mq.Add(item), "Setup: pre-filling queue should not fail") handles <- item.Handle() } @@ -434,7 +412,7 @@ func TestManagedQueue_Concurrency(t *testing.T) { // Mix up operations between adding and removing. if (routineID+j)%2 == 0 { // Add operation - item := typesmocks.NewMockQueueItemAccessor(uint64(itemByteSize), "req", "flow") + item := typesmocks.NewMockQueueItemAccessor(uint64(itemByteSize), "req", testFlowKey) if err := mq.Add(item); err == nil { successfulAdds.Add(1) handles <- item.Handle() diff --git a/pkg/epp/flowcontrol/registry/shard.go b/pkg/epp/flowcontrol/registry/shard.go index 1e894f07a..eb11dc730 100644 --- a/pkg/epp/flowcontrol/registry/shard.go +++ b/pkg/epp/flowcontrol/registry/shard.go @@ -28,6 +28,7 @@ import ( "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework" inter "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch" intra "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" ) @@ -43,8 +44,8 @@ import ( // # Concurrency // // The `registryShard` uses a combination of an `RWMutex` and atomic operations to manage concurrency. -// - The `mu` RWMutex protects the shard's internal maps (`priorityBands`, `activeFlows`) during administrative -// operations like flow registration or updates. This ensures that the set of active or draining queues appears +// - The `mu` RWMutex protects the shard's internal maps (`priorityBands`) during administrative +// operations like flow registration or updates. This ensures that the set of queues appears // atomic to a `controller.FlowController` worker. All read-oriented methods on the shard take a read lock. // - All statistics (`totalByteSize`, `totalLen`, etc.) are implemented as `atomic.Uint64` to allow for lock-free, // high-performance updates from many concurrent queue operations. @@ -55,18 +56,12 @@ type registryShard struct { isActive bool reconcileFun parentStatsReconciler - // mu protects the shard's internal maps (`priorityBands` and `activeFlows`). + // mu protects the shard's internal maps (`priorityBands`). mu sync.RWMutex - // priorityBands is the primary lookup table for all managed queues on this shard, organized by `priority`, then by - // `flowID`. This map contains BOTH active and draining queues. + // priorityBands is the primary lookup table for all managed queues on this shard, organized by `priority`. priorityBands map[uint]*priorityBand - // activeFlows is a flattened map for O(1) access to the SINGLE active queue for a given logical flow ID. - // This is the critical lookup for the `Enqueue` path. If a `flowID` is not in this map, it has no active queue on - // this shard. - activeFlows map[string]*managedQueue - // orderedPriorityLevels is a cached, sorted list of `priority` levels. // It is populated at initialization to avoid repeated map key iteration and sorting during the dispatch loop, // ensuring a deterministic, ordered traversal from highest to lowest priority. @@ -82,9 +77,12 @@ type priorityBand struct { // config holds the partitioned config for this specific band. config PriorityBandConfig - // queues holds all `managedQueue` instances within this band, keyed by `flowID`. This includes both active and - // draining queues. - queues map[string]*managedQueue + // queues holds all `managedQueue` instances within this band, keyed by their composite `FlowKey`. + queues map[types.FlowKey]*managedQueue + + // flowKeys is a cached slice of the keys from the `queues` map. This is an optimization to avoid repeated map key + // iteration in the `FlowKeys()` accessor method, making it an O(1) operation. + flowKeys []types.FlowKey // Band-level statistics, which are updated atomically. byteSize atomic.Uint64 @@ -110,7 +108,6 @@ func newShard( isActive: true, reconcileFun: reconcileFunc, priorityBands: make(map[uint]*priorityBand, len(partitionedConfig.PriorityBands)), - activeFlows: make(map[string]*managedQueue), } for _, bandConfig := range partitionedConfig.PriorityBands { @@ -128,7 +125,7 @@ func newShard( s.priorityBands[bandConfig.Priority] = &priorityBand{ config: bandConfig, - queues: make(map[string]*managedQueue), + queues: make(map[types.FlowKey]*managedQueue), interFlowDispatchPolicy: interPolicy, defaultIntraFlowDispatchPolicy: intraPolicy, } @@ -171,51 +168,36 @@ func (s *registryShard) IsActive() bool { return s.isActive } -// ActiveManagedQueue returns the currently active `ManagedQueue` for a given flow. -func (s *registryShard) ActiveManagedQueue(flowID string) (contracts.ManagedQueue, error) { - s.mu.RLock() - defer s.mu.RUnlock() - - mq, ok := s.activeFlows[flowID] - if !ok { - return nil, fmt.Errorf("failed to get active queue for flow %q: %w", flowID, contracts.ErrFlowInstanceNotFound) - } - return mq, nil -} - -// ManagedQueue retrieves a specific (potentially draining) `ManagedQueue` instance from this shard. -func (s *registryShard) ManagedQueue(flowID string, priority uint) (contracts.ManagedQueue, error) { +// ManagedQueue retrieves a specific `ManagedQueue` instance from this shard. +func (s *registryShard) ManagedQueue(key types.FlowKey) (contracts.ManagedQueue, error) { s.mu.RLock() defer s.mu.RUnlock() - band, ok := s.priorityBands[priority] + band, ok := s.priorityBands[key.Priority] if !ok { - return nil, fmt.Errorf("failed to get managed queue for flow %q: %w", flowID, contracts.ErrPriorityBandNotFound) + return nil, fmt.Errorf("failed to get managed queue for flow %q: %w", key, contracts.ErrPriorityBandNotFound) } - mq, ok := band.queues[flowID] + mq, ok := band.queues[key] if !ok { - return nil, fmt.Errorf("failed to get managed queue for flow %q at priority %d: %w", - flowID, priority, contracts.ErrFlowInstanceNotFound) + return nil, fmt.Errorf("failed to get managed queue for flow %q: %w", + key, contracts.ErrFlowInstanceNotFound) } return mq, nil } // IntraFlowDispatchPolicy retrieves a flow's configured `framework.IntraFlowDispatchPolicy`. -func (s *registryShard) IntraFlowDispatchPolicy( - flowID string, - priority uint, -) (framework.IntraFlowDispatchPolicy, error) { +func (s *registryShard) IntraFlowDispatchPolicy(key types.FlowKey) (framework.IntraFlowDispatchPolicy, error) { s.mu.RLock() defer s.mu.RUnlock() - band, ok := s.priorityBands[priority] + band, ok := s.priorityBands[key.Priority] if !ok { - return nil, fmt.Errorf("failed to get intra-flow policy for flow %q: %w", flowID, contracts.ErrPriorityBandNotFound) + return nil, fmt.Errorf("failed to get intra-flow policy for flow %q: %w", key, contracts.ErrPriorityBandNotFound) } - mq, ok := band.queues[flowID] + mq, ok := band.queues[key] if !ok { - return nil, fmt.Errorf("failed to get intra-flow policy for flow %q at priority %d: %w", - flowID, priority, contracts.ErrFlowInstanceNotFound) + return nil, fmt.Errorf("failed to get intra-flow policy for flow %q: %w", + key, contracts.ErrFlowInstanceNotFound) } // The policy is stored on the managed queue. return mq.dispatchPolicy, nil @@ -299,24 +281,20 @@ func (a *priorityBandAccessor) PriorityName() string { return a.band.config.PriorityName } -// FlowIDs returns a slice of all flow IDs within this priority band. -func (a *priorityBandAccessor) FlowIDs() []string { +// FlowKeys returns a slice of all flow keys within this priority band. +// This is an O(1) operation because the slice is pre-computed and cached. +func (a *priorityBandAccessor) FlowKeys() []types.FlowKey { a.shard.mu.RLock() defer a.shard.mu.RUnlock() - - flowIDs := make([]string, 0, len(a.band.queues)) - for id := range a.band.queues { - flowIDs = append(flowIDs, id) - } - return flowIDs + return a.band.flowKeys } -// Queue returns a `framework.FlowQueueAccessor` for the specified `flowID` within this priority band. -func (a *priorityBandAccessor) Queue(flowID string) framework.FlowQueueAccessor { +// Queue returns a `framework.FlowQueueAccessor` for the specified `ID` within this priority band. +func (a *priorityBandAccessor) Queue(id string) framework.FlowQueueAccessor { a.shard.mu.RLock() defer a.shard.mu.RUnlock() - mq, ok := a.band.queues[flowID] + mq, ok := a.band.queues[types.FlowKey{ID: id, Priority: a.Priority()}] if !ok { return nil } @@ -330,7 +308,8 @@ func (a *priorityBandAccessor) IterateQueues(callback func(queue framework.FlowQ a.shard.mu.RLock() defer a.shard.mu.RUnlock() - for _, mq := range a.band.queues { + for _, key := range a.band.flowKeys { + mq := a.band.queues[key] if !callback(mq.FlowQueueAccessor()) { return } diff --git a/pkg/epp/flowcontrol/registry/shard_test.go b/pkg/epp/flowcontrol/registry/shard_test.go index fe7f96e77..b04bde466 100644 --- a/pkg/epp/flowcontrol/registry/shard_test.go +++ b/pkg/epp/flowcontrol/registry/shard_test.go @@ -18,7 +18,7 @@ package registry import ( "errors" - "sort" + "slices" "testing" "github.com/go-logr/logr" @@ -75,17 +75,16 @@ func setupTestShard(t *testing.T) *shardTestFixture { } // _reconcileFlow_testOnly is a test helper that simulates the future admin logic for adding or updating a flow. -// It creates a `managedQueue` and correctly populates the `priorityBands` and `activeFlows` maps. +// It creates a `managedQueue` and correctly populates the `priorityBands` map and the `flowKeys` slice. // This helper is intended to be replaced by the real `reconcileFlow` method in a future PR. func (s *registryShard) _reconcileFlow_testOnly( t *testing.T, flowSpec types.FlowSpecification, - isActive bool, ) *managedQueue { t.Helper() - band, ok := s.priorityBands[flowSpec.Priority] - require.True(t, ok, "Setup: priority band %d should exist", flowSpec.Priority) + band, ok := s.priorityBands[flowSpec.Key.Priority] + require.True(t, ok, "Setup: priority band %d should exist", flowSpec.Key.Priority) lq, err := queue.NewQueueFromName(listqueue.ListQueueName, nil) require.NoError(t, err, "Setup: creating a real listqueue should not fail") @@ -93,16 +92,15 @@ func (s *registryShard) _reconcileFlow_testOnly( mq := newManagedQueue( lq, band.defaultIntraFlowDispatchPolicy, - flowSpec, + flowSpec.Key, logr.Discard(), - func(lenDelta, byteSizeDelta int64) { s.reconcileStats(flowSpec.Priority, lenDelta, byteSizeDelta) }, + func(lenDelta, byteSizeDelta int64) { s.reconcileStats(flowSpec.Key.Priority, lenDelta, byteSizeDelta) }, ) require.NotNil(t, mq, "Setup: newManagedQueue should not return nil") - band.queues[flowSpec.ID] = mq - if isActive { - s.activeFlows[flowSpec.ID] = mq - } + // Add the queue to the map and update the cached key slice. + band.queues[flowSpec.Key] = mq + band.flowKeys = append(band.flowKeys, flowSpec.Key) return mq } @@ -188,11 +186,12 @@ func TestShard_Stats(t *testing.T) { f := setupTestShard(t) // Add a queue and some items to test stats aggregation - mq := f.shard._reconcileFlow_testOnly(t, types.FlowSpecification{ID: "flow1", Priority: 10}, true) + flowKey := types.FlowKey{ID: "flow1", Priority: 10} + mq := f.shard._reconcileFlow_testOnly(t, types.FlowSpecification{Key: flowKey}) // Add items - require.NoError(t, mq.Add(mocks.NewMockQueueItemAccessor(100, "req1", "flow1")), "Adding item should not fail") - require.NoError(t, mq.Add(mocks.NewMockQueueItemAccessor(50, "req2", "flow1")), "Adding item should not fail") + require.NoError(t, mq.Add(mocks.NewMockQueueItemAccessor(100, "req1", flowKey)), "Adding item should not fail") + require.NoError(t, mq.Add(mocks.NewMockQueueItemAccessor(50, "req2", flowKey)), "Adding item should not fail") stats := f.shard.Stats() @@ -217,67 +216,44 @@ func TestShard_Accessors(t *testing.T) { t.Parallel() f := setupTestShard(t) - flowID := "test-flow" - activePriority := uint(10) - drainingPriority := uint(20) - - // Setup state with one active and one draining queue for the same flow - activeQueue := f.shard._reconcileFlow_testOnly(t, types.FlowSpecification{ - ID: flowID, - Priority: activePriority, - }, true) - drainingQueue := f.shard._reconcileFlow_testOnly(t, types.FlowSpecification{ - ID: flowID, - Priority: drainingPriority, - }, false) - - t.Run("ActiveManagedQueue", func(t *testing.T) { - t.Parallel() - retrievedActiveQueue, err := f.shard.ActiveManagedQueue(flowID) - require.NoError(t, err, "ActiveManagedQueue should not error for an existing flow") - assert.Same(t, activeQueue, retrievedActiveQueue, "Should return the correct active queue") + key1 := types.FlowKey{ID: "flow1", Priority: 10} + key2 := types.FlowKey{ID: "flow2", Priority: 20} - _, err = f.shard.ActiveManagedQueue("non-existent-flow") - require.Error(t, err, "ActiveManagedQueue should error for a non-existent flow") - assert.ErrorIs(t, err, contracts.ErrFlowInstanceNotFound, "Error should be ErrFlowInstanceNotFound") - }) + // Setup state + queue1 := f.shard._reconcileFlow_testOnly(t, types.FlowSpecification{Key: key1}) + f.shard._reconcileFlow_testOnly(t, types.FlowSpecification{Key: key2}) t.Run("ManagedQueue", func(t *testing.T) { t.Parallel() - retrievedDrainingQueue, err := f.shard.ManagedQueue(flowID, drainingPriority) - require.NoError(t, err, "ManagedQueue should not error for a draining queue") - assert.Same(t, drainingQueue, retrievedDrainingQueue, "Should return the correct draining queue") + retrievedQueue, err := f.shard.ManagedQueue(key1) + require.NoError(t, err, "ManagedQueue should not error for an existing key") + assert.Same(t, queue1, retrievedQueue, "Should return the correct queue") - _, err = f.shard.ManagedQueue(flowID, 99) // Non-existent priority + _, err = f.shard.ManagedQueue(types.FlowKey{ID: "non-existent", Priority: 10}) + require.Error(t, err, "ManagedQueue should error for a non-existent flow key") + assert.ErrorIs(t, err, contracts.ErrFlowInstanceNotFound, "Error should be ErrFlowInstanceNotFound") + + _, err = f.shard.ManagedQueue(types.FlowKey{ID: "flow1", Priority: 99}) // Non-existent priority require.Error(t, err, "ManagedQueue should error for a non-existent priority") assert.ErrorIs(t, err, contracts.ErrPriorityBandNotFound, "Error should be ErrPriorityBandNotFound") - - _, err = f.shard.ManagedQueue("non-existent-flow", activePriority) - require.Error(t, err, "ManagedQueue should error for a non-existent flow in an existing priority") - assert.ErrorIs(t, err, contracts.ErrFlowInstanceNotFound, "Error should be ErrFlowInstanceNotFound") }) t.Run("IntraFlowDispatchPolicy", func(t *testing.T) { t.Parallel() - retrievedActivePolicy, err := f.shard.IntraFlowDispatchPolicy(flowID, activePriority) - require.NoError(t, err, "IntraFlowDispatchPolicy should not error for an active instance") - assert.Same(t, activeQueue.dispatchPolicy, retrievedActivePolicy, - "Should return the policy from the active instance") + retrievedPolicy, err := f.shard.IntraFlowDispatchPolicy(key1) + require.NoError(t, err, "IntraFlowDispatchPolicy should not error for an existing key") + assert.Same(t, queue1.dispatchPolicy, retrievedPolicy, "Should return the policy from the correct instance") - _, err = f.shard.IntraFlowDispatchPolicy("non-existent-flow", activePriority) + _, err = f.shard.IntraFlowDispatchPolicy(types.FlowKey{ID: "non-existent", Priority: 10}) require.Error(t, err, "IntraFlowDispatchPolicy should error for a non-existent flow") assert.ErrorIs(t, err, contracts.ErrFlowInstanceNotFound, "Error should be ErrFlowInstanceNotFound") - - _, err = f.shard.IntraFlowDispatchPolicy(flowID, 99) // Non-existent priority - require.Error(t, err, "IntraFlowDispatchPolicy should error for a non-existent priority") - assert.ErrorIs(t, err, contracts.ErrPriorityBandNotFound, "Error should be ErrPriorityBandNotFound") }) t.Run("InterFlowDispatchPolicy", func(t *testing.T) { t.Parallel() - retrievedInterPolicy, err := f.shard.InterFlowDispatchPolicy(activePriority) + retrievedInterPolicy, err := f.shard.InterFlowDispatchPolicy(10) require.NoError(t, err, "InterFlowDispatchPolicy should not error for an existing priority") - assert.Same(t, f.shard.priorityBands[activePriority].interFlowDispatchPolicy, retrievedInterPolicy, + assert.Same(t, f.shard.priorityBands[10].interFlowDispatchPolicy, retrievedInterPolicy, "Should return the correct inter-flow policy") _, err = f.shard.InterFlowDispatchPolicy(99) // Non-existent priority @@ -292,57 +268,66 @@ func TestShard_PriorityBandAccessor(t *testing.T) { // Setup shard state for the tests p1, p2 := uint(10), uint(20) - f.shard._reconcileFlow_testOnly(t, types.FlowSpecification{ID: "flow1", Priority: p1}, true) - f.shard._reconcileFlow_testOnly(t, types.FlowSpecification{ID: "flow1", Priority: p2}, false) - f.shard._reconcileFlow_testOnly(t, types.FlowSpecification{ID: "flow2", Priority: p1}, true) + key1P1 := types.FlowKey{ID: "flow1", Priority: p1} + key1P2 := types.FlowKey{ID: "flow1", Priority: p2} + key2P2 := types.FlowKey{ID: "flow2", Priority: p2} + + f.shard._reconcileFlow_testOnly(t, types.FlowSpecification{Key: key1P1}) + f.shard._reconcileFlow_testOnly(t, types.FlowSpecification{Key: key1P2}) + f.shard._reconcileFlow_testOnly(t, types.FlowSpecification{Key: key2P2}) t.Run("Accessor for existing priority", func(t *testing.T) { t.Parallel() - accessor, err := f.shard.PriorityBandAccessor(p1) + accessor, err := f.shard.PriorityBandAccessor(p2) require.NoError(t, err, "PriorityBandAccessor should not fail for existing priority") require.NotNil(t, accessor, "Accessor should not be nil") t.Run("Properties", func(t *testing.T) { t.Parallel() - assert.Equal(t, p1, accessor.Priority(), "Accessor should have correct priority") - assert.Equal(t, "High", accessor.PriorityName(), "Accessor should have correct priority name") + assert.Equal(t, p2, accessor.Priority(), "Accessor should have correct priority") + assert.Equal(t, "Low", accessor.PriorityName(), "Accessor should have correct priority name") }) - t.Run("FlowIDs", func(t *testing.T) { + t.Run("FlowKeys", func(t *testing.T) { t.Parallel() - flowIDs := accessor.FlowIDs() - sort.Strings(flowIDs) - assert.Equal(t, []string{"flow1", "flow2"}, flowIDs, - "Accessor should return correct flow IDs for the priority band") + flowKeys := accessor.FlowKeys() + // Sort for consistent comparison as map iteration order is not guaranteed. + slices.SortFunc(flowKeys, func(a, b types.FlowKey) int { + return a.Compare(b) + }) + assert.Equal(t, []types.FlowKey{key1P2, key2P2}, flowKeys, + "Accessor should return correct flow keys for the priority band") }) t.Run("Queue", func(t *testing.T) { t.Parallel() q := accessor.Queue("flow1") require.NotNil(t, q, "Accessor should return queue for flow1") - assert.Equal(t, p1, q.FlowSpec().Priority, "Queue should have the correct priority") + assert.Equal(t, p2, q.FlowKey().Priority, "Queue should have the correct priority") assert.Nil(t, accessor.Queue("non-existent"), "Accessor should return nil for non-existent flow") }) t.Run("IterateQueues", func(t *testing.T) { t.Parallel() - var iteratedFlows []string + var iteratedKeys []types.FlowKey accessor.IterateQueues(func(queue framework.FlowQueueAccessor) bool { - iteratedFlows = append(iteratedFlows, queue.FlowSpec().ID) + iteratedKeys = append(iteratedKeys, queue.FlowKey()) return true }) - sort.Strings(iteratedFlows) - assert.Equal(t, []string{"flow1", "flow2"}, iteratedFlows, "IterateQueues should visit all flows in the band") + slices.SortFunc(iteratedKeys, func(a, b types.FlowKey) int { + return a.Compare(b) + }) + assert.Equal(t, []types.FlowKey{key1P2, key2P2}, iteratedKeys, "IterateQueues should visit all flows in the band") }) t.Run("IterateQueues with early exit", func(t *testing.T) { t.Parallel() - var iteratedFlows []string + var iteratedKeys []types.FlowKey accessor.IterateQueues(func(queue framework.FlowQueueAccessor) bool { - iteratedFlows = append(iteratedFlows, queue.FlowSpec().ID) + iteratedKeys = append(iteratedKeys, queue.FlowKey()) return false // Exit after first item }) - assert.Len(t, iteratedFlows, 1, "IterateQueues should exit early if callback returns false") + assert.Len(t, iteratedKeys, 1, "IterateQueues should exit early if callback returns false") }) }) diff --git a/pkg/epp/flowcontrol/types/flow.go b/pkg/epp/flowcontrol/types/flow.go index 4bdd50832..0d8a1b8b9 100644 --- a/pkg/epp/flowcontrol/types/flow.go +++ b/pkg/epp/flowcontrol/types/flow.go @@ -16,13 +16,63 @@ limitations under the License. package types -// FlowSpecification defines the complete configuration for a single logical flow. -// It is the data contract used by the `contracts.FlowRegistry` to create and manage the lifecycle of queues and -// policies. -type FlowSpecification struct { - // ID is the unique identifier for this flow (e.g., model name, tenant ID). +import ( + "fmt" + "strings" +) + +// FlowKey is the unique, immutable identifier for a single, independently managed flow instance within the +// `contracts.FlowRegistry`. +// It combines a logical grouping `ID` with a specific `Priority` level to form a composite primary key. +// +// The core architectural principle of this model is that each unique, immutable `FlowKey` represents a completely +// separate stream of traffic with its own queue, lifecycle and statistics. +type FlowKey struct { + // ID is the logical grouping identifier for a flow, such as a tenant ID or a model name. This ID can be shared across + // multiple `FlowKey`s to represent different classes of traffic for the same logical entity. It provides a way to + // group related traffic but does not uniquely identify a manageable flow instance on its own. ID string - // Priority is the numerical priority level for this flow. Lower values indicate higher priority. + // Priority is the numerical priority level that defines the priority band for this specific flow instance. + // + // A different Priority value for the same ID creates a distinct flow instance. For example, the keys + // `{ID: "TenantA", Priority: 10}` and `{ID: "TenantA", Priority: 100}` are treated as two completely separate, + // concurrently active flows. This allows a single tenant to serve traffic at multiple priority levels simultaneously. + // + // Because the `FlowKey` is immutable, changing the priority of traffic requires using a new `FlowKey`; the old flow + // instance will be automatically garbage collected by the registry when it becomes idle. Priority uint } + +// Compare provides a stable comparison function for two FlowKey instances, suitable for use with sorting algorithms. +// It returns -1 if the key is less than the other, 0 if they are equal, and 1 if the key is greater than the other. +// The comparison is performed first by Priority (descending) and then by ID (ascending). +func (k FlowKey) Compare(other FlowKey) int { + if k.Priority > other.Priority { + return -1 + } + if k.Priority < other.Priority { + return 1 + } + return strings.Compare(k.ID, other.ID) +} + +func (k FlowKey) String() string { + return fmt.Sprintf("%s::%d", k.ID, k.Priority) +} + +// FlowSpecification defines the complete configuration for a single logical flow instance, which is uniquely identified +// by its immutable `Key`. +// +// It is the primary data contract used by the `contracts.FlowRegistry` to register or update a flow instance. The +// registry manages one flow instance for each unique `FlowKey`. While the `Key` itself is immutable, the specification +// can be updated via `RegisterOrUpdateFlow` to modify configurable parameters (e.g., policies, capacity limits) for the +// existing flow instance (when these update stories are supported). +type FlowSpecification struct { + // Key is the unique, immutable identifier for the flow instance this specification describes. + Key FlowKey + + // TODO: Add other flow-scoped configuration fields here, such as: + // - IntraFlowDispatchPolicy intra.RegisteredPolicyName + // - CapacityBytes uint64 +} diff --git a/pkg/epp/flowcontrol/types/mocks/mocks.go b/pkg/epp/flowcontrol/types/mocks/mocks.go index 7eb6f3743..dbef031d7 100644 --- a/pkg/epp/flowcontrol/types/mocks/mocks.go +++ b/pkg/epp/flowcontrol/types/mocks/mocks.go @@ -28,27 +28,32 @@ import ( // MockFlowControlRequest provides a mock implementation of the `types.FlowControlRequest` interface. type MockFlowControlRequest struct { Ctx context.Context - FlowIDV string + FlowKeyV types.FlowKey ByteSizeV uint64 InitialEffectiveTTLV time.Duration IDV string } // NewMockFlowControlRequest creates a new `MockFlowControlRequest` instance. -func NewMockFlowControlRequest(byteSize uint64, id, flowID string, ctx context.Context) *MockFlowControlRequest { +func NewMockFlowControlRequest( + byteSize uint64, + id string, + key types.FlowKey, + ctx context.Context, +) *MockFlowControlRequest { if ctx == nil { ctx = context.Background() } return &MockFlowControlRequest{ ByteSizeV: byteSize, IDV: id, - FlowIDV: flowID, + FlowKeyV: key, Ctx: ctx, } } func (m *MockFlowControlRequest) Context() context.Context { return m.Ctx } -func (m *MockFlowControlRequest) FlowID() string { return m.FlowIDV } +func (m *MockFlowControlRequest) FlowKey() types.FlowKey { return m.FlowKeyV } func (m *MockFlowControlRequest) ByteSize() uint64 { return m.ByteSizeV } func (m *MockFlowControlRequest) InitialEffectiveTTL() time.Duration { return m.InitialEffectiveTTLV } func (m *MockFlowControlRequest) ID() string { return m.IDV } @@ -92,13 +97,13 @@ var _ types.QueueItemAccessor = &MockQueueItemAccessor{} // NewMockQueueItemAccessor is a constructor for `MockQueueItemAccessor` that initializes the mock with a default // `MockFlowControlRequest` and `MockQueueItemHandle` to prevent nil pointer dereferences in tests. -func NewMockQueueItemAccessor(byteSize uint64, reqID, flowID string) *MockQueueItemAccessor { +func NewMockQueueItemAccessor(byteSize uint64, reqID string, key types.FlowKey) *MockQueueItemAccessor { return &MockQueueItemAccessor{ EnqueueTimeV: time.Now(), OriginalRequestV: NewMockFlowControlRequest( byteSize, reqID, - flowID, + key, context.Background(), ), HandleV: &MockQueueItemHandle{}, diff --git a/pkg/epp/flowcontrol/types/request.go b/pkg/epp/flowcontrol/types/request.go index 8b5023667..756940704 100644 --- a/pkg/epp/flowcontrol/types/request.go +++ b/pkg/epp/flowcontrol/types/request.go @@ -33,10 +33,11 @@ type FlowControlRequest interface { // queue. Context() context.Context - // FlowID returns the unique identifier for the flow this request belongs to (e.g., model name, tenant ID). The - // `controller.FlowController` uses this ID to look up the active `contracts.ManagedQueue` and configured - // `framework.IntraFlowDispatchPolicy` from a `contracts.RegistryShard`. - FlowID() string + // FlowKey returns the composite key that uniquely identifies the flow instance this request belongs to. + // The `controller.FlowController` uses this key as the primary identifier to look up the correct + // `contracts.ManagedQueue` and configured `framework.IntraFlowDispatchPolicy` from a `contracts.RegistryShard`. + // The returned key is treated as an immutable value. + FlowKey() FlowKey // ByteSize returns the request's size in bytes (e.g., prompt size). This is used by the `controller.FlowController` // for managing byte-based capacity limits and for `contracts.FlowRegistry` statistics.