Skip to content

refactor(flowcontrol): Adopt Composite FlowKey as Primary Identifier #1340

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 9 additions & 17 deletions pkg/epp/flowcontrol/contracts/mocks/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,8 @@ import (
type MockRegistryShard struct {
IDFunc func() string
IsActiveFunc func() bool
ActiveManagedQueueFunc func(flowID string) (contracts.ManagedQueue, error)
ManagedQueueFunc func(flowID string, priority uint) (contracts.ManagedQueue, error)
IntraFlowDispatchPolicyFunc func(flowID string, priority uint) (framework.IntraFlowDispatchPolicy, error)
ManagedQueueFunc func(key types.FlowKey) (contracts.ManagedQueue, error)
IntraFlowDispatchPolicyFunc func(key types.FlowKey) (framework.IntraFlowDispatchPolicy, error)
InterFlowDispatchPolicyFunc func(priority uint) (framework.InterFlowDispatchPolicy, error)
PriorityBandAccessorFunc func(priority uint) (framework.PriorityBandAccessor, error)
AllOrderedPriorityLevelsFunc func() []uint
Expand All @@ -69,23 +68,16 @@ func (m *MockRegistryShard) IsActive() bool {
return false
}

func (m *MockRegistryShard) ActiveManagedQueue(flowID string) (contracts.ManagedQueue, error) {
if m.ActiveManagedQueueFunc != nil {
return m.ActiveManagedQueueFunc(flowID)
}
return nil, nil
}

func (m *MockRegistryShard) ManagedQueue(flowID string, priority uint) (contracts.ManagedQueue, error) {
func (m *MockRegistryShard) ManagedQueue(key types.FlowKey) (contracts.ManagedQueue, error) {
if m.ManagedQueueFunc != nil {
return m.ManagedQueueFunc(flowID, priority)
return m.ManagedQueueFunc(key)
}
return nil, nil
}

func (m *MockRegistryShard) IntraFlowDispatchPolicy(flowID string, priority uint) (framework.IntraFlowDispatchPolicy, error) {
func (m *MockRegistryShard) IntraFlowDispatchPolicy(key types.FlowKey) (framework.IntraFlowDispatchPolicy, error) {
if m.IntraFlowDispatchPolicyFunc != nil {
return m.IntraFlowDispatchPolicyFunc(flowID, priority)
return m.IntraFlowDispatchPolicyFunc(key)
}
return nil, nil
}
Expand Down Expand Up @@ -148,8 +140,8 @@ func (m *MockSaturationDetector) IsSaturated(ctx context.Context) bool {
// 3. **Self-Wiring**: The `FlowQueueAccessor()` method returns the mock itself, ensuring the accessor is always
// correctly connected to the queue's state without manual wiring in tests.
type MockManagedQueue struct {
// FlowSpecV defines the flow specification for this mock queue. It should be set by the test.
FlowSpecV types.FlowSpecification
// FlowKeyV defines the flow specification for this mock queue. It should be set by the test.
FlowKeyV types.FlowKey

// AddFunc allows a test to completely override the default Add behavior.
AddFunc func(item types.QueueItemAccessor) error
Expand Down Expand Up @@ -249,7 +241,7 @@ func (m *MockManagedQueue) Drain() ([]types.QueueItemAccessor, error) {
return drained, nil
}

func (m *MockManagedQueue) FlowSpec() types.FlowSpecification { return m.FlowSpecV }
func (m *MockManagedQueue) FlowKey() types.FlowKey { return m.FlowKeyV }
func (m *MockManagedQueue) Name() string { return "" }
func (m *MockManagedQueue) Capabilities() []framework.QueueCapability { return nil }
func (m *MockManagedQueue) Comparator() framework.ItemComparator { return nil }
Expand Down
30 changes: 14 additions & 16 deletions pkg/epp/flowcontrol/contracts/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ package contracts

import (
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types"
)

// RegistryShard defines the read-oriented interface that a `controller.FlowController` worker uses to access its
// specific slice (shard) of the `FlowRegistry`'s state. It provides the necessary methods for a worker to perform its
// dispatch operations by accessing queues and policies in a concurrent-safe manner.
// specific slice (shard) of the `FlowRegistry's` state. It provides a concurrent-safe view of all flow instances, which
// are uniquely identified by their composite `types.FlowKey`. It is the primary contract for performing dispatch
// operations.
//
// # Conformance
//
Expand All @@ -41,21 +43,19 @@ type RegistryShard interface {
// being gracefully drained and should not be given new work.
IsActive() bool

// ActiveManagedQueue returns the currently active `ManagedQueue` for a given flow on this shard. This is the queue to
// which new requests for the flow should be enqueued.
// Returns an error wrapping `ErrFlowInstanceNotFound` if no active instance exists for the given `flowID`.
ActiveManagedQueue(flowID string) (ManagedQueue, error)

// ManagedQueue retrieves a specific (potentially draining) `ManagedQueue` instance from this shard. This allows a
// worker to continue dispatching items from queues that are draining as part of a flow update.
// Returns an error wrapping `ErrFlowInstanceNotFound` if no instance for the given flowID and priority exists.
ManagedQueue(flowID string, priority uint) (ManagedQueue, error)
// ManagedQueue retrieves the managed queue for the given, unique `types.FlowKey`. This is the primary method for
// accessing a specific flow's queue for either enqueueing or dispatching requests.
//
// Returns an error wrapping `ErrPriorityBandNotFound` if the priority specified in the key is not configured, or
// `ErrFlowInstanceNotFound` if no instance exists for the given `key`.
ManagedQueue(key types.FlowKey) (ManagedQueue, error)

// IntraFlowDispatchPolicy retrieves a flow's configured `framework.IntraFlowDispatchPolicy` for this shard.
// IntraFlowDispatchPolicy retrieves a flow's configured `framework.IntraFlowDispatchPolicy` for this shard,
// identified by its unique `FlowKey`.
// The registry guarantees that a non-nil default policy (as configured at the priority-band level) is returned if
// none is specified on the flow itself.
// none is specified for the flow.
// Returns an error wrapping `ErrFlowInstanceNotFound` if the flow instance does not exist.
IntraFlowDispatchPolicy(flowID string, priority uint) (framework.IntraFlowDispatchPolicy, error)
IntraFlowDispatchPolicy(key types.FlowKey) (framework.IntraFlowDispatchPolicy, error)

// InterFlowDispatchPolicy retrieves a priority band's configured `framework.InterFlowDispatchPolicy` for this shard.
// The registry guarantees that a non-nil default policy is returned if none is configured for the band.
Expand Down Expand Up @@ -86,8 +86,6 @@ type RegistryShard interface {
// # Conformance
//
// - All methods (including those embedded from `framework.SafeQueue`) MUST be goroutine-safe.
// - The `Add()` method MUST reject new items if the queue has been marked as "draining" by the `FlowRegistry`,
// ensuring that lifecycle changes are respected even by consumers holding a stale pointer to the queue.
// - All mutating methods (`Add()`, `Remove()`, `Cleanup()`, `Drain()`) MUST atomically update relevant statistics
// (e.g., queue length, byte size).
type ManagedQueue interface {
Expand Down
57 changes: 29 additions & 28 deletions pkg/epp/flowcontrol/controller/internal/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ import (

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/contracts"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types"
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
)

// BandFilter is a function that acts as a pre-policy gate. It takes a complete view of a priority band and returns a
// subset of flow IDs that are considered viable candidates for a subsequent policy decision. It can also return a
// boolean signal to pause the entire operation for the band.
// potentially filtered `framework.PriorityBandAccessor` containing only the flows that are viable candidates for a
// subsequent policy decision. It can also return a boolean signal to pause the entire operation for the band.
//
// This abstraction decouples the logic of determining *viability* (e.g., is a flow subject to backpressure?) from the
// logic of *selection* (e.g., which of the viable flows is the fairest to pick next?). This separation simplifies the
Expand All @@ -39,13 +40,13 @@ import (
// can be chained to apply different viability criteria. For example, a future filter could be developed to temporarily
// exclude a "misbehaving" flow that is causing repeated errors, quarantining it from policy consideration.
//
// A nil `allowedFlows` map indicates that no filtering is necessary and all flows in the band are visible.
// This provides a zero-allocation fast path for the common case where no flows are being filtered.
// A nil returned `PriorityBandAccessor` indicates that no filtering was necessary and the original accessor should be
// used. This provides a zero-allocation fast path for the common case where no flows are being filtered.
type BandFilter func(
ctx context.Context,
band framework.PriorityBandAccessor,
logger logr.Logger,
) (allowedFlows map[string]struct{}, shouldPause bool)
) (filteredBand framework.PriorityBandAccessor, shouldPause bool)

// NewSaturationFilter creates a `BandFilter` that uses the provided `contracts.SaturationDetector` to determine which
// flows are dispatchable. This is the standard filter used in the production `FlowController` for the dispatch
Expand All @@ -55,15 +56,16 @@ func NewSaturationFilter(sd contracts.SaturationDetector) BandFilter {
ctx context.Context,
band framework.PriorityBandAccessor,
logger logr.Logger,
) (map[string]struct{}, bool) {
) (framework.PriorityBandAccessor, bool) {
// Phase 1: Implement the current global saturation check.
if sd.IsSaturated(ctx) {
logger.V(logutil.VERBOSE).Info("System saturated, pausing dispatch for this shard.")
return nil, true // Pause dispatching for all bands.
}

// Phase 2 (Future): This is where per-flow saturation logic would go.
// It would iterate `band`, call `IsSaturated(ctx, flowID)`, and build a filtered map of allowed flows.
// It would iterate `band`, call `IsSaturated(ctx, flowID)`, and build a filtered map of allowed flows,
// then return `newSubsetPriorityBandAccessor(band, allowedFlows)`.
// For now, no per-flow filtering is done. Return nil to signal the fast path.
return nil, false // Do not pause, and do not filter any flows.
}
Expand All @@ -73,31 +75,28 @@ func NewSaturationFilter(sd contracts.SaturationDetector) BandFilter {
// It implements the `framework.PriorityBandAccessor` interface, ensuring that any policy operating on it will only
// see the allowed flows, regardless of which accessor method is used. This provides correctness by construction.
//
// For performance, it pre-computes a slice of the allowed flow IDs at creation time, making subsequent calls to
// `FlowIDs()` an O(1) operation with zero allocations.
// For performance, it pre-computes a slice of the allowed flows at creation time, making subsequent calls to
// `FlowKeys()` an O(1) operation with zero allocations.
type subsetPriorityBandAccessor struct {
originalAccessor framework.PriorityBandAccessor
allowedFlows map[string]struct{}
allowedFlowsSlice []string
allowedFlows map[types.FlowKey]struct{}
allowedFlowsSlice []types.FlowKey
}

var _ framework.PriorityBandAccessor = &subsetPriorityBandAccessor{}

// newSubsetPriorityBandAccessor creates a new filtered view of a priority band.
func newSubsetPriorityBandAccessor(
original framework.PriorityBandAccessor,
allowed map[string]struct{},
) *subsetPriorityBandAccessor {
// Pre-compute the slice of flow IDs for performance.
ids := make([]string, 0, len(allowed))
for id := range allowed {
ids = append(ids, id)
func newSubsetPriorityBandAccessor(original framework.PriorityBandAccessor, allowed []types.FlowKey) *subsetPriorityBandAccessor {
// Pre-compute the map for efficient lookups in `Queue()` and `IterateQueues()`.
allowedMap := make(map[types.FlowKey]struct{}, len(allowed))
for _, k := range allowed {
allowedMap[k] = struct{}{}
}

return &subsetPriorityBandAccessor{
originalAccessor: original,
allowedFlows: allowed,
allowedFlowsSlice: ids,
allowedFlows: allowedMap,
allowedFlowsSlice: allowed,
}
}

Expand All @@ -111,19 +110,21 @@ func (s *subsetPriorityBandAccessor) PriorityName() string {
return s.originalAccessor.PriorityName()
}

// FlowIDs returns a slice of all flow IDs within this priority band that are in the allowed subset.
// FlowKeys returns a slice of the composite `types.FlowKey`s for every flow instance currently active within this
// priority band that are in the allowed subset.
// This is an O(1) operation because the slice is pre-computed at creation.
func (s *subsetPriorityBandAccessor) FlowIDs() []string {
func (s *subsetPriorityBandAccessor) FlowKeys() []types.FlowKey {
return s.allowedFlowsSlice
}

// Queue returns a `framework.FlowQueueAccessor` for the specified `flowID` within this priority band, but only if it is
// Queue returns a `framework.FlowQueueAccessor` for the specified `ID` within this priority band, but only if it is
// in the allowed subset. This is an O(1) map lookup. If the flow is not in the allowed subset, it returns nil.
func (s *subsetPriorityBandAccessor) Queue(flowID string) framework.FlowQueueAccessor {
if _, ok := s.allowedFlows[flowID]; !ok {
func (s *subsetPriorityBandAccessor) Queue(id string) framework.FlowQueueAccessor {
key := types.FlowKey{ID: id, Priority: s.Priority()}
if _, ok := s.allowedFlows[key]; !ok {
return nil
}
return s.originalAccessor.Queue(flowID)
return s.originalAccessor.Queue(id)
}

// IterateQueues executes the given `callback` for each `framework.FlowQueueAccessor` in the allowed subset of this
Expand All @@ -132,7 +133,7 @@ func (s *subsetPriorityBandAccessor) Queue(flowID string) framework.FlowQueueAcc
// efficient than iterating over a pre-computed slice of IDs.
func (s *subsetPriorityBandAccessor) IterateQueues(callback func(queue framework.FlowQueueAccessor) bool) {
s.originalAccessor.IterateQueues(func(queue framework.FlowQueueAccessor) bool {
if _, ok := s.allowedFlows[queue.FlowSpec().ID]; ok {
if _, ok := s.allowedFlows[queue.FlowKey()]; ok {
// This queue is in the allowed set, so execute the callback.
if !callback(queue) {
return false // The callback requested to stop, so we stop the outer iteration too.
Expand Down
65 changes: 34 additions & 31 deletions pkg/epp/flowcontrol/controller/internal/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,22 +35,22 @@ func TestNewSaturationFilter(t *testing.T) {
t.Parallel()

testCases := []struct {
name string
isSaturated bool
expectShouldPause bool
expectAllowed map[string]struct{}
name string
isSaturated bool
expectShouldPause bool
expectFilteredBandNil bool
}{
{
name: "should not pause or filter when system is not saturated",
isSaturated: false,
expectShouldPause: false,
expectAllowed: nil, // nil map signals the fast path
name: "should not pause or filter when system is not saturated",
isSaturated: false,
expectShouldPause: false,
expectFilteredBandNil: true, // nil band signals the fast path
},
{
name: "should pause when system is saturated",
isSaturated: true,
expectShouldPause: true,
expectAllowed: nil,
name: "should pause when system is saturated",
isSaturated: true,
expectShouldPause: true,
expectFilteredBandNil: true,
},
}

Expand All @@ -66,15 +66,15 @@ func TestNewSaturationFilter(t *testing.T) {
mockBand := &frameworkmocks.MockPriorityBandAccessor{}

// --- ACT ---
allowed, shouldPause := filter(context.Background(), mockBand, logr.Discard())
filteredBand, shouldPause := filter(context.Background(), mockBand, logr.Discard())

// --- ASSERT ---
assert.Equal(t, tc.expectShouldPause, shouldPause, "The filter's pause signal should match the expected value")

if tc.expectAllowed == nil {
assert.Nil(t, allowed, "Expected allowed map to be nil for the fast path")
if tc.expectFilteredBandNil {
assert.Nil(t, filteredBand, "Expected filtered band to be nil")
} else {
assert.Equal(t, tc.expectAllowed, allowed, "The set of allowed flows should match the expected value")
assert.NotNil(t, filteredBand, "Expected a non-nil filtered band")
}
})
}
Expand All @@ -85,15 +85,19 @@ func TestSubsetPriorityBandAccessor(t *testing.T) {

// --- ARRANGE ---
// Setup a mock original accessor that knows about three flows.
mockQueueA := &frameworkmocks.MockFlowQueueAccessor{FlowSpecV: types.FlowSpecification{ID: "flow-a"}}
mockQueueB := &frameworkmocks.MockFlowQueueAccessor{FlowSpecV: types.FlowSpecification{ID: "flow-b"}}
mockQueueC := &frameworkmocks.MockFlowQueueAccessor{FlowSpecV: types.FlowSpecification{ID: "flow-c"}}
flowAKey := types.FlowKey{ID: "flow-a", Priority: 10}
flowBKey := types.FlowKey{ID: "flow-b", Priority: 10}
flowCKey := types.FlowKey{ID: "flow-c", Priority: 10}

mockQueueA := &frameworkmocks.MockFlowQueueAccessor{FlowKeyV: flowAKey}
mockQueueB := &frameworkmocks.MockFlowQueueAccessor{FlowKeyV: flowBKey}
mockQueueC := &frameworkmocks.MockFlowQueueAccessor{FlowKeyV: flowCKey}

originalAccessor := &frameworkmocks.MockPriorityBandAccessor{
PriorityV: 10,
PriorityNameV: "High",
FlowIDsFunc: func() []string {
return []string{"flow-a", "flow-b", "flow-c"}
FlowKeysFunc: func() []types.FlowKey {
return []types.FlowKey{flowAKey, flowBKey, flowCKey}
},
QueueFunc: func(id string) framework.FlowQueueAccessor {
switch id {
Expand All @@ -118,10 +122,7 @@ func TestSubsetPriorityBandAccessor(t *testing.T) {
}

// Create a subset view that only allows two of the flows.
allowedFlows := map[string]struct{}{
"flow-a": {},
"flow-c": {},
}
allowedFlows := []types.FlowKey{flowAKey, flowCKey}
subsetAccessor := newSubsetPriorityBandAccessor(originalAccessor, allowedFlows)
require.NotNil(t, subsetAccessor, "newSubsetPriorityBandAccessor should not return nil")

Expand All @@ -132,12 +133,14 @@ func TestSubsetPriorityBandAccessor(t *testing.T) {
"PriorityName() should pass through from the original accessor")
})

t.Run("should only return allowed flow IDs", func(t *testing.T) {
t.Run("should only return allowed flow keys", func(t *testing.T) {
t.Parallel()
flowIDs := subsetAccessor.FlowIDs()
flowKeys := subsetAccessor.FlowKeys()
// Sort for consistent comparison, as the pre-computed slice order is not guaranteed.
sort.Strings(flowIDs)
assert.Equal(t, []string{"flow-a", "flow-c"}, flowIDs, "FlowIDs() should only return the allowed subset")
sort.Slice(flowKeys, func(i, j int) bool {
return flowKeys[i].ID < flowKeys[j].ID
})
assert.Equal(t, []types.FlowKey{flowAKey, flowCKey}, flowKeys, "FlowKeys() should only return the allowed subset")
})

t.Run("should only return queues for allowed flows", func(t *testing.T) {
Expand All @@ -151,7 +154,7 @@ func TestSubsetPriorityBandAccessor(t *testing.T) {
t.Parallel()
var iterated []string
subsetAccessor.IterateQueues(func(queue framework.FlowQueueAccessor) bool {
iterated = append(iterated, queue.FlowSpec().ID)
iterated = append(iterated, queue.FlowKey().ID)
return true
})
// Sort for consistent comparison, as iteration order is not guaranteed.
Expand All @@ -163,7 +166,7 @@ func TestSubsetPriorityBandAccessor(t *testing.T) {
t.Parallel()
var iterated []string
subsetAccessor.IterateQueues(func(queue framework.FlowQueueAccessor) bool {
iterated = append(iterated, queue.FlowSpec().ID)
iterated = append(iterated, queue.FlowKey().ID)
return false // Exit after the first item.
})
assert.Len(t, iterated, 1, "Iteration should have stopped after one item")
Expand Down
3 changes: 2 additions & 1 deletion pkg/epp/flowcontrol/controller/internal/item_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ func TestItem(t *testing.T) {

t.Run("should have a non-finalized state upon creation", func(t *testing.T) {
t.Parallel()
req := typesmocks.NewMockFlowControlRequest(100, "req-1", "flow-a", context.Background())
key := types.FlowKey{ID: "flow-a", Priority: 10}
req := typesmocks.NewMockFlowControlRequest(100, "req-1", key, context.Background())
item := NewItem(req, time.Minute, time.Now())
require.NotNil(t, item, "NewItem should not return nil")
outcome, err := item.FinalState()
Expand Down
Loading