Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions pkg/epp/flowcontrol/contracts/mocks/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ type MockRegistryShard struct {
IsActiveFunc func() bool
ManagedQueueFunc func(key types.FlowKey) (contracts.ManagedQueue, error)
IntraFlowDispatchPolicyFunc func(key types.FlowKey) (framework.IntraFlowDispatchPolicy, error)
InterFlowDispatchPolicyFunc func(priority uint) (framework.InterFlowDispatchPolicy, error)
PriorityBandAccessorFunc func(priority uint) (framework.PriorityBandAccessor, error)
AllOrderedPriorityLevelsFunc func() []uint
InterFlowDispatchPolicyFunc func(priority int) (framework.InterFlowDispatchPolicy, error)
PriorityBandAccessorFunc func(priority int) (framework.PriorityBandAccessor, error)
AllOrderedPriorityLevelsFunc func() []int
StatsFunc func() contracts.ShardStats
}

Expand Down Expand Up @@ -82,21 +82,21 @@ func (m *MockRegistryShard) IntraFlowDispatchPolicy(key types.FlowKey) (framewor
return nil, nil
}

func (m *MockRegistryShard) InterFlowDispatchPolicy(priority uint) (framework.InterFlowDispatchPolicy, error) {
func (m *MockRegistryShard) InterFlowDispatchPolicy(priority int) (framework.InterFlowDispatchPolicy, error) {
if m.InterFlowDispatchPolicyFunc != nil {
return m.InterFlowDispatchPolicyFunc(priority)
}
return nil, nil
}

func (m *MockRegistryShard) PriorityBandAccessor(priority uint) (framework.PriorityBandAccessor, error) {
func (m *MockRegistryShard) PriorityBandAccessor(priority int) (framework.PriorityBandAccessor, error) {
if m.PriorityBandAccessorFunc != nil {
return m.PriorityBandAccessorFunc(priority)
}
return nil, nil
}

func (m *MockRegistryShard) AllOrderedPriorityLevels() []uint {
func (m *MockRegistryShard) AllOrderedPriorityLevels() []int {
if m.AllOrderedPriorityLevelsFunc != nil {
return m.AllOrderedPriorityLevelsFunc()
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/epp/flowcontrol/contracts/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,20 +124,20 @@ type RegistryShard interface {
// InterFlowDispatchPolicy retrieves a priority band's configured `framework.InterFlowDispatchPolicy` for this shard.
// The registry guarantees that a non-nil default policy is returned if none is configured for the band.
// Returns an error wrapping `ErrPriorityBandNotFound` if the priority level is not configured.
InterFlowDispatchPolicy(priority uint) (framework.InterFlowDispatchPolicy, error)
InterFlowDispatchPolicy(priority int) (framework.InterFlowDispatchPolicy, error)

// PriorityBandAccessor retrieves a read-only accessor for a given priority level, providing a view of the band's
// state as seen by this specific shard. This is the primary entry point for inter-flow dispatch policies that need to
// inspect and compare multiple flow queues within the same priority band.
// Returns an error wrapping `ErrPriorityBandNotFound` if the priority level is not configured.
PriorityBandAccessor(priority uint) (framework.PriorityBandAccessor, error)
PriorityBandAccessor(priority int) (framework.PriorityBandAccessor, error)

// AllOrderedPriorityLevels returns all configured priority levels that this shard is aware of, sorted in ascending
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorted in ascending numerical order. This order corresponds to highest priority (lowest numeric value) to lowest priority (highest numeric value

We should either remove this or update it to reflect the new sorting order (highest priority is highest numeric value).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the comment, thanks!

// numerical order. This order corresponds to highest priority (lowest numeric value) to lowest priority (highest
// numeric value).
// The returned slice provides a definitive, ordered list of priority levels for iteration, for example, by a
// `controller.FlowController` worker's dispatch loop.
AllOrderedPriorityLevels() []uint
AllOrderedPriorityLevels() []int

// Stats returns a snapshot of the statistics for this specific shard.
Stats() ShardStats
Expand Down Expand Up @@ -170,7 +170,7 @@ type AggregateStats struct {
// TotalLen is the total number of items currently queued across the entire system.
TotalLen uint64
// PerPriorityBandStats maps each configured priority level to its globally aggregated statistics.
PerPriorityBandStats map[uint]PriorityBandStats
PerPriorityBandStats map[int]PriorityBandStats
}

// ShardStats holds statistics for a single internal shard within the `FlowRegistry`.
Expand All @@ -188,13 +188,13 @@ type ShardStats struct {
// The capacity values within represent this shard's partition of the global band capacity.
// The key is the numerical priority level.
// All configured priority levels are guaranteed to be represented.
PerPriorityBandStats map[uint]PriorityBandStats
PerPriorityBandStats map[int]PriorityBandStats
}

// PriorityBandStats holds aggregated statistics for a single priority band.
type PriorityBandStats struct {
// Priority is the numerical priority level this struct describes.
Priority uint
Priority int
// PriorityName is a human-readable name for the priority band (e.g., "Critical", "Sheddable").
// The registry configuration requires this field, so it is guaranteed to be non-empty.
PriorityName string
Expand Down
2 changes: 1 addition & 1 deletion pkg/epp/flowcontrol/controller/internal/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func newSubsetPriorityBandAccessor(original framework.PriorityBandAccessor, allo
}

// Priority returns the numerical priority level of this band.
func (s *subsetPriorityBandAccessor) Priority() uint {
func (s *subsetPriorityBandAccessor) Priority() int {
return s.originalAccessor.Priority()
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/epp/flowcontrol/controller/internal/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func TestSubsetPriorityBandAccessor(t *testing.T) {

t.Run("should pass through priority and name", func(t *testing.T) {
t.Parallel()
assert.Equal(t, uint(10), subsetAccessor.Priority(), "Priority() should pass through from the original accessor")
assert.Equal(t, 10, subsetAccessor.Priority(), "Priority() should pass through from the original accessor")
assert.Equal(t, "High", subsetAccessor.PriorityName(),
"PriorityName() should pass through from the original accessor")
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/epp/flowcontrol/controller/internal/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ func (sp *ShardProcessor) enqueue(item *flowItem) {

// hasCapacity checks if the shard and the specific priority band have enough capacity to accommodate an item of a given
// size.
func (sp *ShardProcessor) hasCapacity(priority uint, itemByteSize uint64) bool {
func (sp *ShardProcessor) hasCapacity(priority int, itemByteSize uint64) bool {
if itemByteSize == 0 {
return true
}
Expand Down
30 changes: 15 additions & 15 deletions pkg/epp/flowcontrol/controller/internal/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ type testHarness struct {
// The harness's mutex protects the single source of truth for all mock state.
mu sync.Mutex
queues map[types.FlowKey]*mocks.MockManagedQueue
priorityFlows map[uint][]types.FlowKey // Key: `priority`
priorityFlows map[int][]types.FlowKey // Key: `priority`

// Customizable policy logic for tests to override.
interFlowPolicySelectQueue func(band framework.PriorityBandAccessor) (framework.FlowQueueAccessor, error)
Expand All @@ -139,7 +139,7 @@ func newTestHarness(t *testing.T, expiryCleanupInterval time.Duration) *testHarn
logger: logr.Discard(),
startSignal: make(chan struct{}),
queues: make(map[types.FlowKey]*mocks.MockManagedQueue),
priorityFlows: make(map[uint][]types.FlowKey),
priorityFlows: make(map[int][]types.FlowKey),
}

// Wire up the harness to provide the mock implementations for the shard's dependencies.
Expand All @@ -153,7 +153,7 @@ func newTestHarness(t *testing.T, expiryCleanupInterval time.Duration) *testHarn
h.StatsFunc = func() contracts.ShardStats {
return contracts.ShardStats{
TotalCapacityBytes: 1e9,
PerPriorityBandStats: map[uint]contracts.PriorityBandStats{
PerPriorityBandStats: map[int]contracts.PriorityBandStats{
testFlow.Priority: {CapacityBytes: 1e9},
},
}
Expand Down Expand Up @@ -249,10 +249,10 @@ func (h *testHarness) managedQueue(key types.FlowKey) (contracts.ManagedQueue, e
}

// allOrderedPriorityLevels provides the mock implementation for the `RegistryShard` interface.
func (h *testHarness) allOrderedPriorityLevels() []uint {
func (h *testHarness) allOrderedPriorityLevels() []int {
h.mu.Lock()
defer h.mu.Unlock()
prios := make([]uint, 0, len(h.priorityFlows))
prios := make([]int, 0, len(h.priorityFlows))
for p := range h.priorityFlows {
prios = append(prios, p)
}
Expand All @@ -262,7 +262,7 @@ func (h *testHarness) allOrderedPriorityLevels() []uint {

// priorityBandAccessor provides the mock implementation for the `RegistryShard` interface. It acts as a factory for a
// fully-configured, stateless mock that is safe for concurrent use.
func (h *testHarness) priorityBandAccessor(p uint) (framework.PriorityBandAccessor, error) {
func (h *testHarness) priorityBandAccessor(p int) (framework.PriorityBandAccessor, error) {
band := &frameworkmocks.MockPriorityBandAccessor{PriorityV: p}

// Safely get a snapshot of the flow IDs under a lock.
Expand All @@ -288,7 +288,7 @@ func (h *testHarness) priorityBandAccessor(p uint) (framework.PriorityBandAccess
}

// interFlowDispatchPolicy provides the mock implementation for the `contracts.RegistryShard` interface.
func (h *testHarness) interFlowDispatchPolicy(p uint) (framework.InterFlowDispatchPolicy, error) {
func (h *testHarness) interFlowDispatchPolicy(p int) (framework.InterFlowDispatchPolicy, error) {
policy := &frameworkmocks.MockInterFlowDispatchPolicy{}
// If the test provided a custom implementation, use it.
if h.interFlowPolicySelectQueue != nil {
Expand Down Expand Up @@ -362,7 +362,7 @@ func TestShardProcessor(t *testing.T) {
item := h.newTestItem("req-capacity-reject", testFlow, testTTL)
h.addQueue(testFlow)
h.StatsFunc = func() contracts.ShardStats {
return contracts.ShardStats{PerPriorityBandStats: map[uint]contracts.PriorityBandStats{
return contracts.ShardStats{PerPriorityBandStats: map[int]contracts.PriorityBandStats{
testFlow.Priority: {CapacityBytes: 50}, // 50 is less than item size of 100
}}
}
Expand Down Expand Up @@ -685,7 +685,7 @@ func TestShardProcessor(t *testing.T) {
name: "should reject item on registry priority band lookup failure",
setupHarness: func(h *testHarness) {
h.addQueue(testFlow)
h.PriorityBandAccessorFunc = func(uint) (framework.PriorityBandAccessor, error) { return nil, testErr }
h.PriorityBandAccessorFunc = func(int) (framework.PriorityBandAccessor, error) { return nil, testErr }
},
assert: func(t *testing.T, h *testHarness, item *flowItem) {
outcome, err := item.FinalState()
Expand Down Expand Up @@ -776,7 +776,7 @@ func TestShardProcessor(t *testing.T) {
itemByteSize: 1,
stats: contracts.ShardStats{
TotalCapacityBytes: 200, TotalByteSize: 100,
PerPriorityBandStats: map[uint]contracts.PriorityBandStats{
PerPriorityBandStats: map[int]contracts.PriorityBandStats{
testFlow.Priority: {ByteSize: 50, CapacityBytes: 50},
},
},
Expand All @@ -787,7 +787,7 @@ func TestShardProcessor(t *testing.T) {
itemByteSize: 1,
stats: contracts.ShardStats{
TotalCapacityBytes: 200, TotalByteSize: 100,
PerPriorityBandStats: map[uint]contracts.PriorityBandStats{}, // Missing stats for priority 10
PerPriorityBandStats: map[int]contracts.PriorityBandStats{}, // Missing stats for priority 10
},
expectHasCap: false,
},
Expand All @@ -796,7 +796,7 @@ func TestShardProcessor(t *testing.T) {
itemByteSize: 10,
stats: contracts.ShardStats{
TotalCapacityBytes: 200, TotalByteSize: 100,
PerPriorityBandStats: map[uint]contracts.PriorityBandStats{
PerPriorityBandStats: map[int]contracts.PriorityBandStats{
testFlow.Priority: {ByteSize: 50, CapacityBytes: 100},
},
},
Expand Down Expand Up @@ -854,7 +854,7 @@ func TestShardProcessor(t *testing.T) {
{
name: "should skip band on priority band accessor error",
setupHarness: func(h *testHarness) {
h.PriorityBandAccessorFunc = func(uint) (framework.PriorityBandAccessor, error) {
h.PriorityBandAccessorFunc = func(int) (framework.PriorityBandAccessor, error) {
return nil, registryErr
}
},
Expand Down Expand Up @@ -1196,8 +1196,8 @@ func TestShardProcessor(t *testing.T) {
t.Parallel()
// --- ARRANGE ---
h := newTestHarness(t, testCleanupTick)
h.AllOrderedPriorityLevelsFunc = func() []uint { return []uint{testFlow.Priority} }
h.PriorityBandAccessorFunc = func(p uint) (framework.PriorityBandAccessor, error) {
h.AllOrderedPriorityLevelsFunc = func() []int { return []int{testFlow.Priority} }
h.PriorityBandAccessorFunc = func(p int) (framework.PriorityBandAccessor, error) {
return nil, errors.New("registry error")
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/epp/flowcontrol/framework/mocks/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,14 @@ var _ framework.FlowQueueAccessor = &MockFlowQueueAccessor{}
// Simple accessors are configured with public value fields (e.g., `PriorityV`).
// Complex methods with logic are configured with function fields (e.g., `IterateQueuesFunc`).
type MockPriorityBandAccessor struct {
PriorityV uint
PriorityV int
PriorityNameV string
FlowKeysFunc func() []types.FlowKey
QueueFunc func(flowID string) framework.FlowQueueAccessor
IterateQueuesFunc func(callback func(queue framework.FlowQueueAccessor) (keepIterating bool))
}

func (m *MockPriorityBandAccessor) Priority() uint { return m.PriorityV }
func (m *MockPriorityBandAccessor) Priority() int { return m.PriorityV }
func (m *MockPriorityBandAccessor) PriorityName() string { return m.PriorityNameV }

func (m *MockPriorityBandAccessor) FlowKeys() []types.FlowKey {
Expand Down
2 changes: 1 addition & 1 deletion pkg/epp/flowcontrol/framework/policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ type FlowQueueAccessor interface {
// Conformance: Implementations MUST ensure all methods are goroutine-safe for concurrent access.
type PriorityBandAccessor interface {
// Priority returns the numerical priority level of this band.
Priority() uint
Priority() int

// PriorityName returns the human-readable name of this priority band.
PriorityName() string
Expand Down
20 changes: 10 additions & 10 deletions pkg/epp/flowcontrol/registry/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ type Config struct {
// the correct element within this specific configuration instance, preventing common "pointer-to-loop-variable"
// errors, especially across deep copies or partitioning.
// It is populated during validation and when the config is copied or partitioned.
priorityBandMap map[uint]*PriorityBandConfig
priorityBandMap map[int]*PriorityBandConfig

// Factory functions used for plugin instantiation during configuration validation.
// These enable dependency injection for unit testing the validation logic.
Expand All @@ -114,7 +114,7 @@ type PriorityBandConfig struct {
// Priority is the unique numerical priority level for this band.
// Convention: Lower numerical values indicate higher priority (e.g., 0 is highest).
// Required.
Priority uint
Priority int

// PriorityName is a human-readable name for this priority band (e.g., "Critical", "Standard").
// It must be unique across all priority bands in the configuration.
Expand Down Expand Up @@ -170,13 +170,13 @@ type ShardConfig struct {
// priorityBandMap provides O(1) lookups of `ShardPriorityBandConfig` by priority level.
// It serves as a correctness mechanism, ensuring that accessors return a safe, stable pointer to the correct element
// within this specific shard configuration instance.
priorityBandMap map[uint]*ShardPriorityBandConfig
priorityBandMap map[int]*ShardPriorityBandConfig
}

// ShardPriorityBandConfig holds the partitioned configuration for a single priority band within a single shard.
type ShardPriorityBandConfig struct {
// Priority is the unique numerical priority level for this band.
Priority uint
Priority int
// PriorityName is a unique human-readable name for this priority band.
PriorityName string
// IntraFlowDispatchPolicy is the name of the policy for dispatch within a flow's queue.
Expand All @@ -192,7 +192,7 @@ type ShardPriorityBandConfig struct {

// getBandConfig finds and returns the shard-level configuration for a specific priority level.
// Returns an error wrapping `contracts.ErrPriorityBandNotFound` if the priority is not configured.
func (sc *ShardConfig) getBandConfig(priority uint) (*ShardPriorityBandConfig, error) {
func (sc *ShardConfig) getBandConfig(priority int) (*ShardPriorityBandConfig, error) {
if band, ok := sc.priorityBandMap[priority]; ok {
return band, nil
}
Expand Down Expand Up @@ -235,9 +235,9 @@ func (c *Config) validateAndApplyDefaults() error {
}

// Validate and default each priority band.
priorities := make(map[uint]struct{})
priorities := make(map[int]struct{})
priorityNames := make(map[string]struct{})
c.priorityBandMap = make(map[uint]*PriorityBandConfig, len(c.PriorityBands))
c.priorityBandMap = make(map[int]*PriorityBandConfig, len(c.PriorityBands))

for i := range c.PriorityBands {
band := &c.PriorityBands[i]
Expand Down Expand Up @@ -326,7 +326,7 @@ func (c *Config) partition(shardIndex, totalShards int) *ShardConfig {
shardCfg := &ShardConfig{
MaxBytes: partitionUint64(c.MaxBytes, shardIndex, totalShards),
PriorityBands: make([]ShardPriorityBandConfig, len(c.PriorityBands)),
priorityBandMap: make(map[uint]*ShardPriorityBandConfig, len(c.PriorityBands)),
priorityBandMap: make(map[int]*ShardPriorityBandConfig, len(c.PriorityBands)),
}

for i, template := range c.PriorityBands {
Expand Down Expand Up @@ -436,7 +436,7 @@ func (c *Config) deepCopy() *Config {
FlowGCTimeout: c.FlowGCTimeout,
EventChannelBufferSize: c.EventChannelBufferSize,
PriorityBands: make([]PriorityBandConfig, len(c.PriorityBands)),
priorityBandMap: make(map[uint]*PriorityBandConfig, len(c.PriorityBands)),
priorityBandMap: make(map[int]*PriorityBandConfig, len(c.PriorityBands)),
interFlowDispatchPolicyFactory: c.interFlowDispatchPolicyFactory,
intraFlowDispatchPolicyFactory: c.intraFlowDispatchPolicyFactory,
queueFactory: c.queueFactory,
Expand All @@ -456,7 +456,7 @@ func (c *Config) deepCopy() *Config {

// getBandConfig finds and returns the global configuration template for a specific priority level.
// Returns an error wrapping `contracts.ErrPriorityBandNotFound` if the priority is not configured.
func (c *Config) getBandConfig(priority uint) (*PriorityBandConfig, error) {
func (c *Config) getBandConfig(priority int) (*PriorityBandConfig, error) {
if band, ok := c.priorityBandMap[priority]; ok {
return band, nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/epp/flowcontrol/registry/managedqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ type mockStatsPropagator struct {
byteSizeDelta atomic.Int64
}

func (p *mockStatsPropagator) propagate(_ uint, lenDelta, byteSizeDelta int64) {
func (p *mockStatsPropagator) propagate(_ int, lenDelta, byteSizeDelta int64) {
p.lenDelta.Add(lenDelta)
p.byteSizeDelta.Add(byteSizeDelta)
}
Expand Down
Loading