diff --git a/pkg/epp/flowcontrol/contracts/mocks/mocks.go b/pkg/epp/flowcontrol/contracts/mocks/mocks.go index c5c8d2e3b..02f9863fb 100644 --- a/pkg/epp/flowcontrol/contracts/mocks/mocks.go +++ b/pkg/epp/flowcontrol/contracts/mocks/mocks.go @@ -48,9 +48,9 @@ type MockRegistryShard struct { IsActiveFunc func() bool ManagedQueueFunc func(key types.FlowKey) (contracts.ManagedQueue, error) IntraFlowDispatchPolicyFunc func(key types.FlowKey) (framework.IntraFlowDispatchPolicy, error) - InterFlowDispatchPolicyFunc func(priority uint) (framework.InterFlowDispatchPolicy, error) - PriorityBandAccessorFunc func(priority uint) (framework.PriorityBandAccessor, error) - AllOrderedPriorityLevelsFunc func() []uint + InterFlowDispatchPolicyFunc func(priority int) (framework.InterFlowDispatchPolicy, error) + PriorityBandAccessorFunc func(priority int) (framework.PriorityBandAccessor, error) + AllOrderedPriorityLevelsFunc func() []int StatsFunc func() contracts.ShardStats } @@ -82,21 +82,21 @@ func (m *MockRegistryShard) IntraFlowDispatchPolicy(key types.FlowKey) (framewor return nil, nil } -func (m *MockRegistryShard) InterFlowDispatchPolicy(priority uint) (framework.InterFlowDispatchPolicy, error) { +func (m *MockRegistryShard) InterFlowDispatchPolicy(priority int) (framework.InterFlowDispatchPolicy, error) { if m.InterFlowDispatchPolicyFunc != nil { return m.InterFlowDispatchPolicyFunc(priority) } return nil, nil } -func (m *MockRegistryShard) PriorityBandAccessor(priority uint) (framework.PriorityBandAccessor, error) { +func (m *MockRegistryShard) PriorityBandAccessor(priority int) (framework.PriorityBandAccessor, error) { if m.PriorityBandAccessorFunc != nil { return m.PriorityBandAccessorFunc(priority) } return nil, nil } -func (m *MockRegistryShard) AllOrderedPriorityLevels() []uint { +func (m *MockRegistryShard) AllOrderedPriorityLevels() []int { if m.AllOrderedPriorityLevelsFunc != nil { return m.AllOrderedPriorityLevelsFunc() } diff --git a/pkg/epp/flowcontrol/contracts/registry.go b/pkg/epp/flowcontrol/contracts/registry.go index de1b89ae6..e2c42fdbc 100644 --- a/pkg/epp/flowcontrol/contracts/registry.go +++ b/pkg/epp/flowcontrol/contracts/registry.go @@ -124,20 +124,20 @@ type RegistryShard interface { // InterFlowDispatchPolicy retrieves a priority band's configured `framework.InterFlowDispatchPolicy` for this shard. // The registry guarantees that a non-nil default policy is returned if none is configured for the band. // Returns an error wrapping `ErrPriorityBandNotFound` if the priority level is not configured. - InterFlowDispatchPolicy(priority uint) (framework.InterFlowDispatchPolicy, error) + InterFlowDispatchPolicy(priority int) (framework.InterFlowDispatchPolicy, error) // PriorityBandAccessor retrieves a read-only accessor for a given priority level, providing a view of the band's // state as seen by this specific shard. This is the primary entry point for inter-flow dispatch policies that need to // inspect and compare multiple flow queues within the same priority band. // Returns an error wrapping `ErrPriorityBandNotFound` if the priority level is not configured. - PriorityBandAccessor(priority uint) (framework.PriorityBandAccessor, error) + PriorityBandAccessor(priority int) (framework.PriorityBandAccessor, error) - // AllOrderedPriorityLevels returns all configured priority levels that this shard is aware of, sorted in ascending - // numerical order. This order corresponds to highest priority (lowest numeric value) to lowest priority (highest + // AllOrderedPriorityLevels returns all configured priority levels that this shard is aware of, sorted in descending + // numerical order. This order corresponds to highest priority (highest numeric value) to lowest priority (lowest // numeric value). // The returned slice provides a definitive, ordered list of priority levels for iteration, for example, by a // `controller.FlowController` worker's dispatch loop. - AllOrderedPriorityLevels() []uint + AllOrderedPriorityLevels() []int // Stats returns a snapshot of the statistics for this specific shard. Stats() ShardStats @@ -170,7 +170,7 @@ type AggregateStats struct { // TotalLen is the total number of items currently queued across the entire system. TotalLen uint64 // PerPriorityBandStats maps each configured priority level to its globally aggregated statistics. - PerPriorityBandStats map[uint]PriorityBandStats + PerPriorityBandStats map[int]PriorityBandStats } // ShardStats holds statistics for a single internal shard within the `FlowRegistry`. @@ -188,13 +188,13 @@ type ShardStats struct { // The capacity values within represent this shard's partition of the global band capacity. // The key is the numerical priority level. // All configured priority levels are guaranteed to be represented. - PerPriorityBandStats map[uint]PriorityBandStats + PerPriorityBandStats map[int]PriorityBandStats } // PriorityBandStats holds aggregated statistics for a single priority band. type PriorityBandStats struct { // Priority is the numerical priority level this struct describes. - Priority uint + Priority int // PriorityName is a human-readable name for the priority band (e.g., "Critical", "Sheddable"). // The registry configuration requires this field, so it is guaranteed to be non-empty. PriorityName string diff --git a/pkg/epp/flowcontrol/controller/internal/filter.go b/pkg/epp/flowcontrol/controller/internal/filter.go index 0a0669224..12788d4cb 100644 --- a/pkg/epp/flowcontrol/controller/internal/filter.go +++ b/pkg/epp/flowcontrol/controller/internal/filter.go @@ -101,7 +101,7 @@ func newSubsetPriorityBandAccessor(original framework.PriorityBandAccessor, allo } // Priority returns the numerical priority level of this band. -func (s *subsetPriorityBandAccessor) Priority() uint { +func (s *subsetPriorityBandAccessor) Priority() int { return s.originalAccessor.Priority() } diff --git a/pkg/epp/flowcontrol/controller/internal/filter_test.go b/pkg/epp/flowcontrol/controller/internal/filter_test.go index ceff9e83f..7e51453c4 100644 --- a/pkg/epp/flowcontrol/controller/internal/filter_test.go +++ b/pkg/epp/flowcontrol/controller/internal/filter_test.go @@ -128,7 +128,7 @@ func TestSubsetPriorityBandAccessor(t *testing.T) { t.Run("should pass through priority and name", func(t *testing.T) { t.Parallel() - assert.Equal(t, uint(10), subsetAccessor.Priority(), "Priority() should pass through from the original accessor") + assert.Equal(t, 10, subsetAccessor.Priority(), "Priority() should pass through from the original accessor") assert.Equal(t, "High", subsetAccessor.PriorityName(), "PriorityName() should pass through from the original accessor") }) diff --git a/pkg/epp/flowcontrol/controller/internal/processor.go b/pkg/epp/flowcontrol/controller/internal/processor.go index 7f9c8ee3a..abcd9f2bc 100644 --- a/pkg/epp/flowcontrol/controller/internal/processor.go +++ b/pkg/epp/flowcontrol/controller/internal/processor.go @@ -268,7 +268,7 @@ func (sp *ShardProcessor) enqueue(item *flowItem) { // hasCapacity checks if the shard and the specific priority band have enough capacity to accommodate an item of a given // size. -func (sp *ShardProcessor) hasCapacity(priority uint, itemByteSize uint64) bool { +func (sp *ShardProcessor) hasCapacity(priority int, itemByteSize uint64) bool { if itemByteSize == 0 { return true } diff --git a/pkg/epp/flowcontrol/controller/internal/processor_test.go b/pkg/epp/flowcontrol/controller/internal/processor_test.go index 67657a9a4..81807e0e3 100644 --- a/pkg/epp/flowcontrol/controller/internal/processor_test.go +++ b/pkg/epp/flowcontrol/controller/internal/processor_test.go @@ -44,7 +44,7 @@ import ( "errors" "fmt" "os" - "slices" + "sort" "sync" "sync/atomic" "testing" @@ -122,7 +122,7 @@ type testHarness struct { // The harness's mutex protects the single source of truth for all mock state. mu sync.Mutex queues map[types.FlowKey]*mocks.MockManagedQueue - priorityFlows map[uint][]types.FlowKey // Key: `priority` + priorityFlows map[int][]types.FlowKey // Key: `priority` // Customizable policy logic for tests to override. interFlowPolicySelectQueue func(band framework.PriorityBandAccessor) (framework.FlowQueueAccessor, error) @@ -139,7 +139,7 @@ func newTestHarness(t *testing.T, expiryCleanupInterval time.Duration) *testHarn logger: logr.Discard(), startSignal: make(chan struct{}), queues: make(map[types.FlowKey]*mocks.MockManagedQueue), - priorityFlows: make(map[uint][]types.FlowKey), + priorityFlows: make(map[int][]types.FlowKey), } // Wire up the harness to provide the mock implementations for the shard's dependencies. @@ -153,7 +153,7 @@ func newTestHarness(t *testing.T, expiryCleanupInterval time.Duration) *testHarn h.StatsFunc = func() contracts.ShardStats { return contracts.ShardStats{ TotalCapacityBytes: 1e9, - PerPriorityBandStats: map[uint]contracts.PriorityBandStats{ + PerPriorityBandStats: map[int]contracts.PriorityBandStats{ testFlow.Priority: {CapacityBytes: 1e9}, }, } @@ -249,20 +249,23 @@ func (h *testHarness) managedQueue(key types.FlowKey) (contracts.ManagedQueue, e } // allOrderedPriorityLevels provides the mock implementation for the `RegistryShard` interface. -func (h *testHarness) allOrderedPriorityLevels() []uint { +func (h *testHarness) allOrderedPriorityLevels() []int { h.mu.Lock() defer h.mu.Unlock() - prios := make([]uint, 0, len(h.priorityFlows)) + prios := make([]int, 0, len(h.priorityFlows)) for p := range h.priorityFlows { prios = append(prios, p) } - slices.Sort(prios) + sort.Slice(prios, func(i, j int) bool { + return prios[i] > prios[j] + }) + return prios } // priorityBandAccessor provides the mock implementation for the `RegistryShard` interface. It acts as a factory for a // fully-configured, stateless mock that is safe for concurrent use. -func (h *testHarness) priorityBandAccessor(p uint) (framework.PriorityBandAccessor, error) { +func (h *testHarness) priorityBandAccessor(p int) (framework.PriorityBandAccessor, error) { band := &frameworkmocks.MockPriorityBandAccessor{PriorityV: p} // Safely get a snapshot of the flow IDs under a lock. @@ -288,7 +291,7 @@ func (h *testHarness) priorityBandAccessor(p uint) (framework.PriorityBandAccess } // interFlowDispatchPolicy provides the mock implementation for the `contracts.RegistryShard` interface. -func (h *testHarness) interFlowDispatchPolicy(p uint) (framework.InterFlowDispatchPolicy, error) { +func (h *testHarness) interFlowDispatchPolicy(p int) (framework.InterFlowDispatchPolicy, error) { policy := &frameworkmocks.MockInterFlowDispatchPolicy{} // If the test provided a custom implementation, use it. if h.interFlowPolicySelectQueue != nil { @@ -362,7 +365,7 @@ func TestShardProcessor(t *testing.T) { item := h.newTestItem("req-capacity-reject", testFlow, testTTL) h.addQueue(testFlow) h.StatsFunc = func() contracts.ShardStats { - return contracts.ShardStats{PerPriorityBandStats: map[uint]contracts.PriorityBandStats{ + return contracts.ShardStats{PerPriorityBandStats: map[int]contracts.PriorityBandStats{ testFlow.Priority: {CapacityBytes: 50}, // 50 is less than item size of 100 }} } @@ -685,7 +688,7 @@ func TestShardProcessor(t *testing.T) { name: "should reject item on registry priority band lookup failure", setupHarness: func(h *testHarness) { h.addQueue(testFlow) - h.PriorityBandAccessorFunc = func(uint) (framework.PriorityBandAccessor, error) { return nil, testErr } + h.PriorityBandAccessorFunc = func(int) (framework.PriorityBandAccessor, error) { return nil, testErr } }, assert: func(t *testing.T, h *testHarness, item *flowItem) { outcome, err := item.FinalState() @@ -776,7 +779,7 @@ func TestShardProcessor(t *testing.T) { itemByteSize: 1, stats: contracts.ShardStats{ TotalCapacityBytes: 200, TotalByteSize: 100, - PerPriorityBandStats: map[uint]contracts.PriorityBandStats{ + PerPriorityBandStats: map[int]contracts.PriorityBandStats{ testFlow.Priority: {ByteSize: 50, CapacityBytes: 50}, }, }, @@ -787,7 +790,7 @@ func TestShardProcessor(t *testing.T) { itemByteSize: 1, stats: contracts.ShardStats{ TotalCapacityBytes: 200, TotalByteSize: 100, - PerPriorityBandStats: map[uint]contracts.PriorityBandStats{}, // Missing stats for priority 10 + PerPriorityBandStats: map[int]contracts.PriorityBandStats{}, // Missing stats for priority 10 }, expectHasCap: false, }, @@ -796,7 +799,7 @@ func TestShardProcessor(t *testing.T) { itemByteSize: 10, stats: contracts.ShardStats{ TotalCapacityBytes: 200, TotalByteSize: 100, - PerPriorityBandStats: map[uint]contracts.PriorityBandStats{ + PerPriorityBandStats: map[int]contracts.PriorityBandStats{ testFlow.Priority: {ByteSize: 50, CapacityBytes: 100}, }, }, @@ -854,7 +857,7 @@ func TestShardProcessor(t *testing.T) { { name: "should skip band on priority band accessor error", setupHarness: func(h *testHarness) { - h.PriorityBandAccessorFunc = func(uint) (framework.PriorityBandAccessor, error) { + h.PriorityBandAccessorFunc = func(int) (framework.PriorityBandAccessor, error) { return nil, registryErr } }, @@ -1003,8 +1006,8 @@ func TestShardProcessor(t *testing.T) { t.Parallel() // --- ARRANGE --- h := newTestHarness(t, testCleanupTick) - keyHigh := types.FlowKey{ID: "flow-high", Priority: 10} - keyLow := types.FlowKey{ID: "flow-low", Priority: 20} + keyHigh := types.FlowKey{ID: "flow-high", Priority: 20} + keyLow := types.FlowKey{ID: "flow-low", Priority: 10} qHigh := h.addQueue(keyHigh) qLow := h.addQueue(keyLow) @@ -1196,8 +1199,8 @@ func TestShardProcessor(t *testing.T) { t.Parallel() // --- ARRANGE --- h := newTestHarness(t, testCleanupTick) - h.AllOrderedPriorityLevelsFunc = func() []uint { return []uint{testFlow.Priority} } - h.PriorityBandAccessorFunc = func(p uint) (framework.PriorityBandAccessor, error) { + h.AllOrderedPriorityLevelsFunc = func() []int { return []int{testFlow.Priority} } + h.PriorityBandAccessorFunc = func(p int) (framework.PriorityBandAccessor, error) { return nil, errors.New("registry error") } diff --git a/pkg/epp/flowcontrol/framework/mocks/mocks.go b/pkg/epp/flowcontrol/framework/mocks/mocks.go index b8715b779..ff8441fde 100644 --- a/pkg/epp/flowcontrol/framework/mocks/mocks.go +++ b/pkg/epp/flowcontrol/framework/mocks/mocks.go @@ -67,14 +67,14 @@ var _ framework.FlowQueueAccessor = &MockFlowQueueAccessor{} // Simple accessors are configured with public value fields (e.g., `PriorityV`). // Complex methods with logic are configured with function fields (e.g., `IterateQueuesFunc`). type MockPriorityBandAccessor struct { - PriorityV uint + PriorityV int PriorityNameV string FlowKeysFunc func() []types.FlowKey QueueFunc func(flowID string) framework.FlowQueueAccessor IterateQueuesFunc func(callback func(queue framework.FlowQueueAccessor) (keepIterating bool)) } -func (m *MockPriorityBandAccessor) Priority() uint { return m.PriorityV } +func (m *MockPriorityBandAccessor) Priority() int { return m.PriorityV } func (m *MockPriorityBandAccessor) PriorityName() string { return m.PriorityNameV } func (m *MockPriorityBandAccessor) FlowKeys() []types.FlowKey { diff --git a/pkg/epp/flowcontrol/framework/policies.go b/pkg/epp/flowcontrol/framework/policies.go index 5fc3e646d..eeea034eb 100644 --- a/pkg/epp/flowcontrol/framework/policies.go +++ b/pkg/epp/flowcontrol/framework/policies.go @@ -168,7 +168,7 @@ type FlowQueueAccessor interface { // Conformance: Implementations MUST ensure all methods are goroutine-safe for concurrent access. type PriorityBandAccessor interface { // Priority returns the numerical priority level of this band. - Priority() uint + Priority() int // PriorityName returns the human-readable name of this priority band. PriorityName() string diff --git a/pkg/epp/flowcontrol/registry/config.go b/pkg/epp/flowcontrol/registry/config.go index d404e78f7..f9ad8e637 100644 --- a/pkg/epp/flowcontrol/registry/config.go +++ b/pkg/epp/flowcontrol/registry/config.go @@ -98,7 +98,7 @@ type Config struct { // the correct element within this specific configuration instance, preventing common "pointer-to-loop-variable" // errors, especially across deep copies or partitioning. // It is populated during validation and when the config is copied or partitioned. - priorityBandMap map[uint]*PriorityBandConfig + priorityBandMap map[int]*PriorityBandConfig // Factory functions used for plugin instantiation during configuration validation. // These enable dependency injection for unit testing the validation logic. @@ -112,9 +112,9 @@ type Config struct { // that operate at this priority level. type PriorityBandConfig struct { // Priority is the unique numerical priority level for this band. - // Convention: Lower numerical values indicate higher priority (e.g., 0 is highest). + // Convention: Lower numerical values indicate lower priority. // Required. - Priority uint + Priority int // PriorityName is a human-readable name for this priority band (e.g., "Critical", "Standard"). // It must be unique across all priority bands in the configuration. @@ -170,13 +170,13 @@ type ShardConfig struct { // priorityBandMap provides O(1) lookups of `ShardPriorityBandConfig` by priority level. // It serves as a correctness mechanism, ensuring that accessors return a safe, stable pointer to the correct element // within this specific shard configuration instance. - priorityBandMap map[uint]*ShardPriorityBandConfig + priorityBandMap map[int]*ShardPriorityBandConfig } // ShardPriorityBandConfig holds the partitioned configuration for a single priority band within a single shard. type ShardPriorityBandConfig struct { // Priority is the unique numerical priority level for this band. - Priority uint + Priority int // PriorityName is a unique human-readable name for this priority band. PriorityName string // IntraFlowDispatchPolicy is the name of the policy for dispatch within a flow's queue. @@ -192,7 +192,7 @@ type ShardPriorityBandConfig struct { // getBandConfig finds and returns the shard-level configuration for a specific priority level. // Returns an error wrapping `contracts.ErrPriorityBandNotFound` if the priority is not configured. -func (sc *ShardConfig) getBandConfig(priority uint) (*ShardPriorityBandConfig, error) { +func (sc *ShardConfig) getBandConfig(priority int) (*ShardPriorityBandConfig, error) { if band, ok := sc.priorityBandMap[priority]; ok { return band, nil } @@ -235,9 +235,9 @@ func (c *Config) validateAndApplyDefaults() error { } // Validate and default each priority band. - priorities := make(map[uint]struct{}) + priorities := make(map[int]struct{}) priorityNames := make(map[string]struct{}) - c.priorityBandMap = make(map[uint]*PriorityBandConfig, len(c.PriorityBands)) + c.priorityBandMap = make(map[int]*PriorityBandConfig, len(c.PriorityBands)) for i := range c.PriorityBands { band := &c.PriorityBands[i] @@ -326,7 +326,7 @@ func (c *Config) partition(shardIndex, totalShards int) *ShardConfig { shardCfg := &ShardConfig{ MaxBytes: partitionUint64(c.MaxBytes, shardIndex, totalShards), PriorityBands: make([]ShardPriorityBandConfig, len(c.PriorityBands)), - priorityBandMap: make(map[uint]*ShardPriorityBandConfig, len(c.PriorityBands)), + priorityBandMap: make(map[int]*ShardPriorityBandConfig, len(c.PriorityBands)), } for i, template := range c.PriorityBands { @@ -436,7 +436,7 @@ func (c *Config) deepCopy() *Config { FlowGCTimeout: c.FlowGCTimeout, EventChannelBufferSize: c.EventChannelBufferSize, PriorityBands: make([]PriorityBandConfig, len(c.PriorityBands)), - priorityBandMap: make(map[uint]*PriorityBandConfig, len(c.PriorityBands)), + priorityBandMap: make(map[int]*PriorityBandConfig, len(c.PriorityBands)), interFlowDispatchPolicyFactory: c.interFlowDispatchPolicyFactory, intraFlowDispatchPolicyFactory: c.intraFlowDispatchPolicyFactory, queueFactory: c.queueFactory, @@ -456,7 +456,7 @@ func (c *Config) deepCopy() *Config { // getBandConfig finds and returns the global configuration template for a specific priority level. // Returns an error wrapping `contracts.ErrPriorityBandNotFound` if the priority is not configured. -func (c *Config) getBandConfig(priority uint) (*PriorityBandConfig, error) { +func (c *Config) getBandConfig(priority int) (*PriorityBandConfig, error) { if band, ok := c.priorityBandMap[priority]; ok { return band, nil } diff --git a/pkg/epp/flowcontrol/registry/managedqueue_test.go b/pkg/epp/flowcontrol/registry/managedqueue_test.go index f5e3d7fa7..64e5ab80c 100644 --- a/pkg/epp/flowcontrol/registry/managedqueue_test.go +++ b/pkg/epp/flowcontrol/registry/managedqueue_test.go @@ -98,7 +98,7 @@ type mockStatsPropagator struct { byteSizeDelta atomic.Int64 } -func (p *mockStatsPropagator) propagate(_ uint, lenDelta, byteSizeDelta int64) { +func (p *mockStatsPropagator) propagate(_ int, lenDelta, byteSizeDelta int64) { p.lenDelta.Add(lenDelta) p.byteSizeDelta.Add(byteSizeDelta) } diff --git a/pkg/epp/flowcontrol/registry/registry.go b/pkg/epp/flowcontrol/registry/registry.go index 95d604ede..12f83e2ad 100644 --- a/pkg/epp/flowcontrol/registry/registry.go +++ b/pkg/epp/flowcontrol/registry/registry.go @@ -37,7 +37,7 @@ import ( // propagateStatsDeltaFunc defines the callback function used to propagate statistics changes (deltas) up the hierarchy // (Queue -> Shard -> Registry). // Implementations MUST be non-blocking (relying on atomics). -type propagateStatsDeltaFunc func(priority uint, lenDelta, byteSizeDelta int64) +type propagateStatsDeltaFunc func(priority int, lenDelta, byteSizeDelta int64) // bandStats holds the aggregated atomic statistics for a single priority band across all shards. type bandStats struct { @@ -120,7 +120,7 @@ type FlowRegistry struct { // Globally aggregated statistics, updated atomically via lock-free propagation. totalByteSize atomic.Int64 totalLen atomic.Int64 - perPriorityBandStats map[uint]*bandStats // Keyed by priority. + perPriorityBandStats map[int]*bandStats // Keyed by priority. // --- Administrative state (protected by `mu`) --- @@ -158,7 +158,7 @@ func NewFlowRegistry(config Config, logger logr.Logger, opts ...RegistryOption) logger: logger.WithName("flow-registry"), activeShards: []*registryShard{}, drainingShards: make(map[string]*registryShard), - perPriorityBandStats: make(map[uint]*bandStats, len(validatedConfig.PriorityBands)), + perPriorityBandStats: make(map[int]*bandStats, len(validatedConfig.PriorityBands)), } for _, opt := range opts { @@ -289,7 +289,7 @@ func (fr *FlowRegistry) Stats() contracts.AggregateStats { TotalCapacityBytes: fr.config.MaxBytes, TotalByteSize: uint64(fr.totalByteSize.Load()), TotalLen: uint64(fr.totalLen.Load()), - PerPriorityBandStats: make(map[uint]contracts.PriorityBandStats, len(fr.config.PriorityBands)), + PerPriorityBandStats: make(map[int]contracts.PriorityBandStats, len(fr.config.PriorityBands)), } for p, s := range fr.perPriorityBandStats { @@ -592,7 +592,7 @@ func (fr *FlowRegistry) updateAllShardsCacheLocked() { } // propagateStatsDelta is the top-level, lock-free aggregator for all statistics. -func (fr *FlowRegistry) propagateStatsDelta(priority uint, lenDelta, byteSizeDelta int64) { +func (fr *FlowRegistry) propagateStatsDelta(priority int, lenDelta, byteSizeDelta int64) { stats, ok := fr.perPriorityBandStats[priority] if !ok { panic(fmt.Sprintf("invariant violation: priority band (%d) stats missing during propagation", priority)) diff --git a/pkg/epp/flowcontrol/registry/registry_test.go b/pkg/epp/flowcontrol/registry/registry_test.go index 5ffa600d0..0e1bee194 100644 --- a/pkg/epp/flowcontrol/registry/registry_test.go +++ b/pkg/epp/flowcontrol/registry/registry_test.go @@ -589,7 +589,7 @@ func TestFlowRegistry_UpdateShardCount(t *testing.T) { } h := newRegistryTestHarness(t, harnessOptions{config: &config}) - key := types.FlowKey{ID: "flow", Priority: 10} + key := types.FlowKey{ID: "flow", Priority: highPriority} h.openConnectionOnFlow(key) err := h.fr.updateShardCount(tc.targetShardCount) diff --git a/pkg/epp/flowcontrol/registry/shard.go b/pkg/epp/flowcontrol/registry/shard.go index 36032e42c..c87a8e485 100644 --- a/pkg/epp/flowcontrol/registry/shard.go +++ b/pkg/epp/flowcontrol/registry/shard.go @@ -18,7 +18,7 @@ package registry import ( "fmt" - "slices" + "sort" "sync" "sync/atomic" @@ -76,7 +76,7 @@ type registryShard struct { // onStatsDelta is the callback used to propagate statistics changes up to the parent registry. onStatsDelta propagateStatsDeltaFunc // orderedPriorityLevels is a cached, sorted list of priority levels. - orderedPriorityLevels []uint + orderedPriorityLevels []int // --- State Protected by `mu` --- @@ -88,7 +88,7 @@ type registryShard struct { // config holds the partitioned configuration for this shard, derived from the `FlowRegistry`'s global `Config`. config *ShardConfig // priorityBands is the primary lookup table for all managed queues on this shard. - priorityBands map[uint]*priorityBand + priorityBands map[int]*priorityBand // --- Concurrent-Safe State (Atomics) --- @@ -116,7 +116,7 @@ func newShard( logger: shardLogger, config: config, onStatsDelta: onStatsDelta, - priorityBands: make(map[uint]*priorityBand, len(config.PriorityBands)), + priorityBands: make(map[int]*priorityBand, len(config.PriorityBands)), } for _, bandConfig := range config.PriorityBands { @@ -133,8 +133,9 @@ func newShard( } s.orderedPriorityLevels = append(s.orderedPriorityLevels, bandConfig.Priority) } - - slices.Sort(s.orderedPriorityLevels) + sort.Slice(s.orderedPriorityLevels, func(i, j int) bool { + return s.orderedPriorityLevels[i] > s.orderedPriorityLevels[j] + }) s.logger.V(logging.DEFAULT).Info("Registry shard initialized successfully", "priorityBandCount", len(s.priorityBands), "orderedPriorities", s.orderedPriorityLevels) return s, nil @@ -184,7 +185,7 @@ func (s *registryShard) IntraFlowDispatchPolicy(key types.FlowKey) (framework.In // InterFlowDispatchPolicy retrieves a priority band's configured `framework.InterFlowDispatchPolicy`. // This read is lock-free as the policy instance is immutable after the shard is initialized. -func (s *registryShard) InterFlowDispatchPolicy(priority uint) (framework.InterFlowDispatchPolicy, error) { +func (s *registryShard) InterFlowDispatchPolicy(priority int) (framework.InterFlowDispatchPolicy, error) { // This read is safe because the `priorityBands` map structure is immutable after initialization. band, ok := s.priorityBands[priority] if !ok { @@ -195,7 +196,7 @@ func (s *registryShard) InterFlowDispatchPolicy(priority uint) (framework.InterF } // PriorityBandAccessor retrieves a read-only view for a given priority level. -func (s *registryShard) PriorityBandAccessor(priority uint) (framework.PriorityBandAccessor, error) { +func (s *registryShard) PriorityBandAccessor(priority int) (framework.PriorityBandAccessor, error) { s.mu.RLock() defer s.mu.RUnlock() @@ -209,7 +210,7 @@ func (s *registryShard) PriorityBandAccessor(priority uint) (framework.PriorityB // AllOrderedPriorityLevels returns a cached, sorted slice of all configured priority levels for this shard. // This is a lock-free read. -func (s *registryShard) AllOrderedPriorityLevels() []uint { +func (s *registryShard) AllOrderedPriorityLevels() []int { return s.orderedPriorityLevels } @@ -230,7 +231,7 @@ func (s *registryShard) Stats() contracts.ShardStats { TotalCapacityBytes: s.config.MaxBytes, TotalByteSize: uint64(s.totalByteSize.Load()), TotalLen: uint64(s.totalLen.Load()), - PerPriorityBandStats: make(map[uint]contracts.PriorityBandStats, len(s.priorityBands)), + PerPriorityBandStats: make(map[int]contracts.PriorityBandStats, len(s.priorityBands)), } for priority, band := range s.priorityBands { @@ -325,7 +326,7 @@ func (s *registryShard) updateConfig(newConfig *ShardConfig) { // propagateStatsDelta is the single point of entry for all statistics changes within the shard. // It atomically updates the relevant band's stats, the shard's total stats, and propagates the delta to the parent // registry. -func (s *registryShard) propagateStatsDelta(priority uint, lenDelta, byteSizeDelta int64) { +func (s *registryShard) propagateStatsDelta(priority int, lenDelta, byteSizeDelta int64) { // This read is safe because the `priorityBands` map structure is immutable after initialization. band, ok := s.priorityBands[priority] if !ok { @@ -355,7 +356,7 @@ type priorityBandAccessor struct { var _ framework.PriorityBandAccessor = &priorityBandAccessor{} // Priority returns the numerical priority level of this band. -func (a *priorityBandAccessor) Priority() uint { return a.band.config.Priority } +func (a *priorityBandAccessor) Priority() int { return a.band.config.Priority } // PriorityName returns the human-readable name of this priority band. func (a *priorityBandAccessor) PriorityName() string { return a.band.config.PriorityName } diff --git a/pkg/epp/flowcontrol/registry/shard_test.go b/pkg/epp/flowcontrol/registry/shard_test.go index 214497e41..7fbda572e 100644 --- a/pkg/epp/flowcontrol/registry/shard_test.go +++ b/pkg/epp/flowcontrol/registry/shard_test.go @@ -37,11 +37,11 @@ import ( const ( // highPriority is the priority level for the "High" priority band in the test harness config. - highPriority uint = 10 + highPriority int = 20 // lowPriority is the priority level for the "Low" priority band in the test harness config. - lowPriority uint = 20 + lowPriority int = 10 // nonExistentPriority is a priority that is known not to exist in the test harness config. - nonExistentPriority uint = 99 + nonExistentPriority int = 99 ) // --- Test Harness and Mocks --- @@ -133,7 +133,7 @@ func TestShard_New(t *testing.T) { assert.Equal(t, "test-shard-1", h.shard.ID(), "Shard ID must match the value provided during construction") assert.True(t, h.shard.IsActive(), "A newly created shard must be initialized in the Active state") - assert.Equal(t, []uint{highPriority, lowPriority}, h.shard.AllOrderedPriorityLevels(), + assert.Equal(t, []int{highPriority, lowPriority}, h.shard.AllOrderedPriorityLevels(), "Shard must report configured priority levels sorted numerically (highest priority first)") bandHigh, ok := h.shard.priorityBands[highPriority] diff --git a/pkg/epp/flowcontrol/types/flow.go b/pkg/epp/flowcontrol/types/flow.go index 71017c13c..2af2d5bd0 100644 --- a/pkg/epp/flowcontrol/types/flow.go +++ b/pkg/epp/flowcontrol/types/flow.go @@ -41,7 +41,7 @@ type FlowKey struct { // // Because the `FlowKey` is immutable, changing the priority of traffic requires using a new `FlowKey`; the old flow // instance will be automatically garbage collected by the registry when it becomes idle. - Priority uint + Priority int } func (k FlowKey) String() string { @@ -49,13 +49,13 @@ func (k FlowKey) String() string { } // Compare provides a stable comparison function for two FlowKey instances, suitable for use with sorting algorithms. -// It returns -1 if the key is less than the other, 0 if they are equal, and 1 if the key is greater than the other. +// It returns 1 if the key is less than the other, 0 if they are equal, and -1 if the key is greater than the other. // The comparison is performed first by `Priority` (ascending, higher priority first) and then by `ID` (ascending). func (k FlowKey) Compare(other FlowKey) int { - if k.Priority < other.Priority { // Lower number means higher priority + if k.Priority > other.Priority { // Higher number means higher priority return -1 } - if k.Priority > other.Priority { + if k.Priority < other.Priority { return 1 } return strings.Compare(k.ID, other.ID)