Skip to content

Commit 1457f63

Browse files
authored
Update priority in EPP flow control from uint to int (#1518)
* Update type for priority from uint to int in EPP flow control * Update tests to accomodate for priority changes * Avoid using Reverse to sort in descending order, update comments
1 parent 71de48f commit 1457f63

File tree

15 files changed

+81
-77
lines changed

15 files changed

+81
-77
lines changed

pkg/epp/flowcontrol/contracts/mocks/mocks.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,9 @@ type MockRegistryShard struct {
4848
IsActiveFunc func() bool
4949
ManagedQueueFunc func(key types.FlowKey) (contracts.ManagedQueue, error)
5050
IntraFlowDispatchPolicyFunc func(key types.FlowKey) (framework.IntraFlowDispatchPolicy, error)
51-
InterFlowDispatchPolicyFunc func(priority uint) (framework.InterFlowDispatchPolicy, error)
52-
PriorityBandAccessorFunc func(priority uint) (framework.PriorityBandAccessor, error)
53-
AllOrderedPriorityLevelsFunc func() []uint
51+
InterFlowDispatchPolicyFunc func(priority int) (framework.InterFlowDispatchPolicy, error)
52+
PriorityBandAccessorFunc func(priority int) (framework.PriorityBandAccessor, error)
53+
AllOrderedPriorityLevelsFunc func() []int
5454
StatsFunc func() contracts.ShardStats
5555
}
5656

@@ -82,21 +82,21 @@ func (m *MockRegistryShard) IntraFlowDispatchPolicy(key types.FlowKey) (framewor
8282
return nil, nil
8383
}
8484

85-
func (m *MockRegistryShard) InterFlowDispatchPolicy(priority uint) (framework.InterFlowDispatchPolicy, error) {
85+
func (m *MockRegistryShard) InterFlowDispatchPolicy(priority int) (framework.InterFlowDispatchPolicy, error) {
8686
if m.InterFlowDispatchPolicyFunc != nil {
8787
return m.InterFlowDispatchPolicyFunc(priority)
8888
}
8989
return nil, nil
9090
}
9191

92-
func (m *MockRegistryShard) PriorityBandAccessor(priority uint) (framework.PriorityBandAccessor, error) {
92+
func (m *MockRegistryShard) PriorityBandAccessor(priority int) (framework.PriorityBandAccessor, error) {
9393
if m.PriorityBandAccessorFunc != nil {
9494
return m.PriorityBandAccessorFunc(priority)
9595
}
9696
return nil, nil
9797
}
9898

99-
func (m *MockRegistryShard) AllOrderedPriorityLevels() []uint {
99+
func (m *MockRegistryShard) AllOrderedPriorityLevels() []int {
100100
if m.AllOrderedPriorityLevelsFunc != nil {
101101
return m.AllOrderedPriorityLevelsFunc()
102102
}

pkg/epp/flowcontrol/contracts/registry.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -124,20 +124,20 @@ type RegistryShard interface {
124124
// InterFlowDispatchPolicy retrieves a priority band's configured `framework.InterFlowDispatchPolicy` for this shard.
125125
// The registry guarantees that a non-nil default policy is returned if none is configured for the band.
126126
// Returns an error wrapping `ErrPriorityBandNotFound` if the priority level is not configured.
127-
InterFlowDispatchPolicy(priority uint) (framework.InterFlowDispatchPolicy, error)
127+
InterFlowDispatchPolicy(priority int) (framework.InterFlowDispatchPolicy, error)
128128

129129
// PriorityBandAccessor retrieves a read-only accessor for a given priority level, providing a view of the band's
130130
// state as seen by this specific shard. This is the primary entry point for inter-flow dispatch policies that need to
131131
// inspect and compare multiple flow queues within the same priority band.
132132
// Returns an error wrapping `ErrPriorityBandNotFound` if the priority level is not configured.
133-
PriorityBandAccessor(priority uint) (framework.PriorityBandAccessor, error)
133+
PriorityBandAccessor(priority int) (framework.PriorityBandAccessor, error)
134134

135-
// AllOrderedPriorityLevels returns all configured priority levels that this shard is aware of, sorted in ascending
136-
// numerical order. This order corresponds to highest priority (lowest numeric value) to lowest priority (highest
135+
// AllOrderedPriorityLevels returns all configured priority levels that this shard is aware of, sorted in descending
136+
// numerical order. This order corresponds to highest priority (highest numeric value) to lowest priority (lowest
137137
// numeric value).
138138
// The returned slice provides a definitive, ordered list of priority levels for iteration, for example, by a
139139
// `controller.FlowController` worker's dispatch loop.
140-
AllOrderedPriorityLevels() []uint
140+
AllOrderedPriorityLevels() []int
141141

142142
// Stats returns a snapshot of the statistics for this specific shard.
143143
Stats() ShardStats
@@ -170,7 +170,7 @@ type AggregateStats struct {
170170
// TotalLen is the total number of items currently queued across the entire system.
171171
TotalLen uint64
172172
// PerPriorityBandStats maps each configured priority level to its globally aggregated statistics.
173-
PerPriorityBandStats map[uint]PriorityBandStats
173+
PerPriorityBandStats map[int]PriorityBandStats
174174
}
175175

176176
// ShardStats holds statistics for a single internal shard within the `FlowRegistry`.
@@ -188,13 +188,13 @@ type ShardStats struct {
188188
// The capacity values within represent this shard's partition of the global band capacity.
189189
// The key is the numerical priority level.
190190
// All configured priority levels are guaranteed to be represented.
191-
PerPriorityBandStats map[uint]PriorityBandStats
191+
PerPriorityBandStats map[int]PriorityBandStats
192192
}
193193

194194
// PriorityBandStats holds aggregated statistics for a single priority band.
195195
type PriorityBandStats struct {
196196
// Priority is the numerical priority level this struct describes.
197-
Priority uint
197+
Priority int
198198
// PriorityName is a human-readable name for the priority band (e.g., "Critical", "Sheddable").
199199
// The registry configuration requires this field, so it is guaranteed to be non-empty.
200200
PriorityName string

pkg/epp/flowcontrol/controller/internal/filter.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ func newSubsetPriorityBandAccessor(original framework.PriorityBandAccessor, allo
101101
}
102102

103103
// Priority returns the numerical priority level of this band.
104-
func (s *subsetPriorityBandAccessor) Priority() uint {
104+
func (s *subsetPriorityBandAccessor) Priority() int {
105105
return s.originalAccessor.Priority()
106106
}
107107

pkg/epp/flowcontrol/controller/internal/filter_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ func TestSubsetPriorityBandAccessor(t *testing.T) {
128128

129129
t.Run("should pass through priority and name", func(t *testing.T) {
130130
t.Parallel()
131-
assert.Equal(t, uint(10), subsetAccessor.Priority(), "Priority() should pass through from the original accessor")
131+
assert.Equal(t, 10, subsetAccessor.Priority(), "Priority() should pass through from the original accessor")
132132
assert.Equal(t, "High", subsetAccessor.PriorityName(),
133133
"PriorityName() should pass through from the original accessor")
134134
})

pkg/epp/flowcontrol/controller/internal/processor.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,7 @@ func (sp *ShardProcessor) enqueue(item *flowItem) {
268268

269269
// hasCapacity checks if the shard and the specific priority band have enough capacity to accommodate an item of a given
270270
// size.
271-
func (sp *ShardProcessor) hasCapacity(priority uint, itemByteSize uint64) bool {
271+
func (sp *ShardProcessor) hasCapacity(priority int, itemByteSize uint64) bool {
272272
if itemByteSize == 0 {
273273
return true
274274
}

pkg/epp/flowcontrol/controller/internal/processor_test.go

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ import (
4444
"errors"
4545
"fmt"
4646
"os"
47-
"slices"
47+
"sort"
4848
"sync"
4949
"sync/atomic"
5050
"testing"
@@ -122,7 +122,7 @@ type testHarness struct {
122122
// The harness's mutex protects the single source of truth for all mock state.
123123
mu sync.Mutex
124124
queues map[types.FlowKey]*mocks.MockManagedQueue
125-
priorityFlows map[uint][]types.FlowKey // Key: `priority`
125+
priorityFlows map[int][]types.FlowKey // Key: `priority`
126126

127127
// Customizable policy logic for tests to override.
128128
interFlowPolicySelectQueue func(band framework.PriorityBandAccessor) (framework.FlowQueueAccessor, error)
@@ -139,7 +139,7 @@ func newTestHarness(t *testing.T, expiryCleanupInterval time.Duration) *testHarn
139139
logger: logr.Discard(),
140140
startSignal: make(chan struct{}),
141141
queues: make(map[types.FlowKey]*mocks.MockManagedQueue),
142-
priorityFlows: make(map[uint][]types.FlowKey),
142+
priorityFlows: make(map[int][]types.FlowKey),
143143
}
144144

145145
// Wire up the harness to provide the mock implementations for the shard's dependencies.
@@ -153,7 +153,7 @@ func newTestHarness(t *testing.T, expiryCleanupInterval time.Duration) *testHarn
153153
h.StatsFunc = func() contracts.ShardStats {
154154
return contracts.ShardStats{
155155
TotalCapacityBytes: 1e9,
156-
PerPriorityBandStats: map[uint]contracts.PriorityBandStats{
156+
PerPriorityBandStats: map[int]contracts.PriorityBandStats{
157157
testFlow.Priority: {CapacityBytes: 1e9},
158158
},
159159
}
@@ -249,20 +249,23 @@ func (h *testHarness) managedQueue(key types.FlowKey) (contracts.ManagedQueue, e
249249
}
250250

251251
// allOrderedPriorityLevels provides the mock implementation for the `RegistryShard` interface.
252-
func (h *testHarness) allOrderedPriorityLevels() []uint {
252+
func (h *testHarness) allOrderedPriorityLevels() []int {
253253
h.mu.Lock()
254254
defer h.mu.Unlock()
255-
prios := make([]uint, 0, len(h.priorityFlows))
255+
prios := make([]int, 0, len(h.priorityFlows))
256256
for p := range h.priorityFlows {
257257
prios = append(prios, p)
258258
}
259-
slices.Sort(prios)
259+
sort.Slice(prios, func(i, j int) bool {
260+
return prios[i] > prios[j]
261+
})
262+
260263
return prios
261264
}
262265

263266
// priorityBandAccessor provides the mock implementation for the `RegistryShard` interface. It acts as a factory for a
264267
// fully-configured, stateless mock that is safe for concurrent use.
265-
func (h *testHarness) priorityBandAccessor(p uint) (framework.PriorityBandAccessor, error) {
268+
func (h *testHarness) priorityBandAccessor(p int) (framework.PriorityBandAccessor, error) {
266269
band := &frameworkmocks.MockPriorityBandAccessor{PriorityV: p}
267270

268271
// Safely get a snapshot of the flow IDs under a lock.
@@ -288,7 +291,7 @@ func (h *testHarness) priorityBandAccessor(p uint) (framework.PriorityBandAccess
288291
}
289292

290293
// interFlowDispatchPolicy provides the mock implementation for the `contracts.RegistryShard` interface.
291-
func (h *testHarness) interFlowDispatchPolicy(p uint) (framework.InterFlowDispatchPolicy, error) {
294+
func (h *testHarness) interFlowDispatchPolicy(p int) (framework.InterFlowDispatchPolicy, error) {
292295
policy := &frameworkmocks.MockInterFlowDispatchPolicy{}
293296
// If the test provided a custom implementation, use it.
294297
if h.interFlowPolicySelectQueue != nil {
@@ -362,7 +365,7 @@ func TestShardProcessor(t *testing.T) {
362365
item := h.newTestItem("req-capacity-reject", testFlow, testTTL)
363366
h.addQueue(testFlow)
364367
h.StatsFunc = func() contracts.ShardStats {
365-
return contracts.ShardStats{PerPriorityBandStats: map[uint]contracts.PriorityBandStats{
368+
return contracts.ShardStats{PerPriorityBandStats: map[int]contracts.PriorityBandStats{
366369
testFlow.Priority: {CapacityBytes: 50}, // 50 is less than item size of 100
367370
}}
368371
}
@@ -685,7 +688,7 @@ func TestShardProcessor(t *testing.T) {
685688
name: "should reject item on registry priority band lookup failure",
686689
setupHarness: func(h *testHarness) {
687690
h.addQueue(testFlow)
688-
h.PriorityBandAccessorFunc = func(uint) (framework.PriorityBandAccessor, error) { return nil, testErr }
691+
h.PriorityBandAccessorFunc = func(int) (framework.PriorityBandAccessor, error) { return nil, testErr }
689692
},
690693
assert: func(t *testing.T, h *testHarness, item *flowItem) {
691694
outcome, err := item.FinalState()
@@ -776,7 +779,7 @@ func TestShardProcessor(t *testing.T) {
776779
itemByteSize: 1,
777780
stats: contracts.ShardStats{
778781
TotalCapacityBytes: 200, TotalByteSize: 100,
779-
PerPriorityBandStats: map[uint]contracts.PriorityBandStats{
782+
PerPriorityBandStats: map[int]contracts.PriorityBandStats{
780783
testFlow.Priority: {ByteSize: 50, CapacityBytes: 50},
781784
},
782785
},
@@ -787,7 +790,7 @@ func TestShardProcessor(t *testing.T) {
787790
itemByteSize: 1,
788791
stats: contracts.ShardStats{
789792
TotalCapacityBytes: 200, TotalByteSize: 100,
790-
PerPriorityBandStats: map[uint]contracts.PriorityBandStats{}, // Missing stats for priority 10
793+
PerPriorityBandStats: map[int]contracts.PriorityBandStats{}, // Missing stats for priority 10
791794
},
792795
expectHasCap: false,
793796
},
@@ -796,7 +799,7 @@ func TestShardProcessor(t *testing.T) {
796799
itemByteSize: 10,
797800
stats: contracts.ShardStats{
798801
TotalCapacityBytes: 200, TotalByteSize: 100,
799-
PerPriorityBandStats: map[uint]contracts.PriorityBandStats{
802+
PerPriorityBandStats: map[int]contracts.PriorityBandStats{
800803
testFlow.Priority: {ByteSize: 50, CapacityBytes: 100},
801804
},
802805
},
@@ -854,7 +857,7 @@ func TestShardProcessor(t *testing.T) {
854857
{
855858
name: "should skip band on priority band accessor error",
856859
setupHarness: func(h *testHarness) {
857-
h.PriorityBandAccessorFunc = func(uint) (framework.PriorityBandAccessor, error) {
860+
h.PriorityBandAccessorFunc = func(int) (framework.PriorityBandAccessor, error) {
858861
return nil, registryErr
859862
}
860863
},
@@ -1003,8 +1006,8 @@ func TestShardProcessor(t *testing.T) {
10031006
t.Parallel()
10041007
// --- ARRANGE ---
10051008
h := newTestHarness(t, testCleanupTick)
1006-
keyHigh := types.FlowKey{ID: "flow-high", Priority: 10}
1007-
keyLow := types.FlowKey{ID: "flow-low", Priority: 20}
1009+
keyHigh := types.FlowKey{ID: "flow-high", Priority: 20}
1010+
keyLow := types.FlowKey{ID: "flow-low", Priority: 10}
10081011
qHigh := h.addQueue(keyHigh)
10091012
qLow := h.addQueue(keyLow)
10101013

@@ -1196,8 +1199,8 @@ func TestShardProcessor(t *testing.T) {
11961199
t.Parallel()
11971200
// --- ARRANGE ---
11981201
h := newTestHarness(t, testCleanupTick)
1199-
h.AllOrderedPriorityLevelsFunc = func() []uint { return []uint{testFlow.Priority} }
1200-
h.PriorityBandAccessorFunc = func(p uint) (framework.PriorityBandAccessor, error) {
1202+
h.AllOrderedPriorityLevelsFunc = func() []int { return []int{testFlow.Priority} }
1203+
h.PriorityBandAccessorFunc = func(p int) (framework.PriorityBandAccessor, error) {
12011204
return nil, errors.New("registry error")
12021205
}
12031206

pkg/epp/flowcontrol/framework/mocks/mocks.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,14 +67,14 @@ var _ framework.FlowQueueAccessor = &MockFlowQueueAccessor{}
6767
// Simple accessors are configured with public value fields (e.g., `PriorityV`).
6868
// Complex methods with logic are configured with function fields (e.g., `IterateQueuesFunc`).
6969
type MockPriorityBandAccessor struct {
70-
PriorityV uint
70+
PriorityV int
7171
PriorityNameV string
7272
FlowKeysFunc func() []types.FlowKey
7373
QueueFunc func(flowID string) framework.FlowQueueAccessor
7474
IterateQueuesFunc func(callback func(queue framework.FlowQueueAccessor) (keepIterating bool))
7575
}
7676

77-
func (m *MockPriorityBandAccessor) Priority() uint { return m.PriorityV }
77+
func (m *MockPriorityBandAccessor) Priority() int { return m.PriorityV }
7878
func (m *MockPriorityBandAccessor) PriorityName() string { return m.PriorityNameV }
7979

8080
func (m *MockPriorityBandAccessor) FlowKeys() []types.FlowKey {

pkg/epp/flowcontrol/framework/policies.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ type FlowQueueAccessor interface {
168168
// Conformance: Implementations MUST ensure all methods are goroutine-safe for concurrent access.
169169
type PriorityBandAccessor interface {
170170
// Priority returns the numerical priority level of this band.
171-
Priority() uint
171+
Priority() int
172172

173173
// PriorityName returns the human-readable name of this priority band.
174174
PriorityName() string

pkg/epp/flowcontrol/registry/config.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ type Config struct {
9898
// the correct element within this specific configuration instance, preventing common "pointer-to-loop-variable"
9999
// errors, especially across deep copies or partitioning.
100100
// It is populated during validation and when the config is copied or partitioned.
101-
priorityBandMap map[uint]*PriorityBandConfig
101+
priorityBandMap map[int]*PriorityBandConfig
102102

103103
// Factory functions used for plugin instantiation during configuration validation.
104104
// These enable dependency injection for unit testing the validation logic.
@@ -112,9 +112,9 @@ type Config struct {
112112
// that operate at this priority level.
113113
type PriorityBandConfig struct {
114114
// Priority is the unique numerical priority level for this band.
115-
// Convention: Lower numerical values indicate higher priority (e.g., 0 is highest).
115+
// Convention: Lower numerical values indicate lower priority.
116116
// Required.
117-
Priority uint
117+
Priority int
118118

119119
// PriorityName is a human-readable name for this priority band (e.g., "Critical", "Standard").
120120
// It must be unique across all priority bands in the configuration.
@@ -170,13 +170,13 @@ type ShardConfig struct {
170170
// priorityBandMap provides O(1) lookups of `ShardPriorityBandConfig` by priority level.
171171
// It serves as a correctness mechanism, ensuring that accessors return a safe, stable pointer to the correct element
172172
// within this specific shard configuration instance.
173-
priorityBandMap map[uint]*ShardPriorityBandConfig
173+
priorityBandMap map[int]*ShardPriorityBandConfig
174174
}
175175

176176
// ShardPriorityBandConfig holds the partitioned configuration for a single priority band within a single shard.
177177
type ShardPriorityBandConfig struct {
178178
// Priority is the unique numerical priority level for this band.
179-
Priority uint
179+
Priority int
180180
// PriorityName is a unique human-readable name for this priority band.
181181
PriorityName string
182182
// IntraFlowDispatchPolicy is the name of the policy for dispatch within a flow's queue.
@@ -192,7 +192,7 @@ type ShardPriorityBandConfig struct {
192192

193193
// getBandConfig finds and returns the shard-level configuration for a specific priority level.
194194
// Returns an error wrapping `contracts.ErrPriorityBandNotFound` if the priority is not configured.
195-
func (sc *ShardConfig) getBandConfig(priority uint) (*ShardPriorityBandConfig, error) {
195+
func (sc *ShardConfig) getBandConfig(priority int) (*ShardPriorityBandConfig, error) {
196196
if band, ok := sc.priorityBandMap[priority]; ok {
197197
return band, nil
198198
}
@@ -235,9 +235,9 @@ func (c *Config) validateAndApplyDefaults() error {
235235
}
236236

237237
// Validate and default each priority band.
238-
priorities := make(map[uint]struct{})
238+
priorities := make(map[int]struct{})
239239
priorityNames := make(map[string]struct{})
240-
c.priorityBandMap = make(map[uint]*PriorityBandConfig, len(c.PriorityBands))
240+
c.priorityBandMap = make(map[int]*PriorityBandConfig, len(c.PriorityBands))
241241

242242
for i := range c.PriorityBands {
243243
band := &c.PriorityBands[i]
@@ -326,7 +326,7 @@ func (c *Config) partition(shardIndex, totalShards int) *ShardConfig {
326326
shardCfg := &ShardConfig{
327327
MaxBytes: partitionUint64(c.MaxBytes, shardIndex, totalShards),
328328
PriorityBands: make([]ShardPriorityBandConfig, len(c.PriorityBands)),
329-
priorityBandMap: make(map[uint]*ShardPriorityBandConfig, len(c.PriorityBands)),
329+
priorityBandMap: make(map[int]*ShardPriorityBandConfig, len(c.PriorityBands)),
330330
}
331331

332332
for i, template := range c.PriorityBands {
@@ -436,7 +436,7 @@ func (c *Config) deepCopy() *Config {
436436
FlowGCTimeout: c.FlowGCTimeout,
437437
EventChannelBufferSize: c.EventChannelBufferSize,
438438
PriorityBands: make([]PriorityBandConfig, len(c.PriorityBands)),
439-
priorityBandMap: make(map[uint]*PriorityBandConfig, len(c.PriorityBands)),
439+
priorityBandMap: make(map[int]*PriorityBandConfig, len(c.PriorityBands)),
440440
interFlowDispatchPolicyFactory: c.interFlowDispatchPolicyFactory,
441441
intraFlowDispatchPolicyFactory: c.intraFlowDispatchPolicyFactory,
442442
queueFactory: c.queueFactory,
@@ -456,7 +456,7 @@ func (c *Config) deepCopy() *Config {
456456

457457
// getBandConfig finds and returns the global configuration template for a specific priority level.
458458
// Returns an error wrapping `contracts.ErrPriorityBandNotFound` if the priority is not configured.
459-
func (c *Config) getBandConfig(priority uint) (*PriorityBandConfig, error) {
459+
func (c *Config) getBandConfig(priority int) (*PriorityBandConfig, error) {
460460
if band, ok := c.priorityBandMap[priority]; ok {
461461
return band, nil
462462
}

pkg/epp/flowcontrol/registry/managedqueue_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ type mockStatsPropagator struct {
9898
byteSizeDelta atomic.Int64
9999
}
100100

101-
func (p *mockStatsPropagator) propagate(_ uint, lenDelta, byteSizeDelta int64) {
101+
func (p *mockStatsPropagator) propagate(_ int, lenDelta, byteSizeDelta int64) {
102102
p.lenDelta.Add(lenDelta)
103103
p.byteSizeDelta.Add(byteSizeDelta)
104104
}

0 commit comments

Comments
 (0)