From 30421ce8c2829a9dc7fd1b15ee75150ed69abf05 Mon Sep 17 00:00:00 2001 From: Luke Van Drie Date: Sat, 6 Sep 2025 19:14:41 +0000 Subject: [PATCH 1/3] refactor: Standardize FC config validation pattern Introduces a consistent `ValidateAndApplyDefaults` method to the configuration structs in the `flowcontrol/controller` and `flowcontrol/registry` packages. This change refactors the configuration handling to follow a unified pattern: - Configuration structs now have a `ValidateAndApplyDefaults` method that returns a new, validated config object without mutating the original. - Constructors for `FlowController` and `FlowRegistry` now assume they receive a valid configuration, simplifying their logic and pushing the responsibility of validation to the caller. - The `deepCopy` logic is corrected to ensure test assertions for immutability pass reliably. This improves the clarity and robustness of configuration management within the flow control module, creating a consistent foundation for future wiring work. --- pkg/epp/flowcontrol/controller/config.go | 50 ++++------ pkg/epp/flowcontrol/controller/config_test.go | 4 +- pkg/epp/flowcontrol/controller/controller.go | 7 +- .../flowcontrol/controller/controller_test.go | 17 ---- pkg/epp/flowcontrol/registry/config.go | 99 ++++++++++--------- pkg/epp/flowcontrol/registry/config_test.go | 30 +++--- pkg/epp/flowcontrol/registry/registry.go | 12 +-- pkg/epp/flowcontrol/registry/registry_test.go | 75 +------------- pkg/epp/flowcontrol/registry/shard_test.go | 4 +- 9 files changed, 103 insertions(+), 195 deletions(-) diff --git a/pkg/epp/flowcontrol/controller/config.go b/pkg/epp/flowcontrol/controller/config.go index 17c1429aa..e542c4d6b 100644 --- a/pkg/epp/flowcontrol/controller/config.go +++ b/pkg/epp/flowcontrol/controller/config.go @@ -53,46 +53,38 @@ type Config struct { EnqueueChannelBufferSize int } -// newConfig performs validation and initialization, returning a guaranteed-valid `Config` object. -// This is the required constructor for creating a new configuration. -// It does not mutate the input `cfg`. -func newConfig(cfg Config) (*Config, error) { - newCfg := cfg.deepCopy() - if err := newCfg.validateAndApplyDefaults(); err != nil { - return nil, err - } - return newCfg, nil -} +// ValidateAndApplyDefaults checks the global configuration for validity and then creates a new `Config` object, +// populating any empty fields with system defaults. +// It does not mutate the receiver. +func (c *Config) ValidateAndApplyDefaults() (*Config, error) { + cfg := c.deepCopy() -// validateAndApplyDefaults checks the global configuration for validity and then mutates the receiver to populate any -// empty fields with system defaults. -func (c *Config) validateAndApplyDefaults() error { // --- Validation --- - if c.DefaultRequestTTL < 0 { - return fmt.Errorf("DefaultRequestTTL cannot be negative, but got %v", c.DefaultRequestTTL) + if cfg.DefaultRequestTTL < 0 { + return nil, fmt.Errorf("DefaultRequestTTL cannot be negative, but got %v", cfg.DefaultRequestTTL) } - if c.ExpiryCleanupInterval < 0 { - return fmt.Errorf("ExpiryCleanupInterval cannot be negative, but got %v", c.ExpiryCleanupInterval) + if cfg.ExpiryCleanupInterval < 0 { + return nil, fmt.Errorf("ExpiryCleanupInterval cannot be negative, but got %v", cfg.ExpiryCleanupInterval) } - if c.ProcessorReconciliationInterval < 0 { - return fmt.Errorf("ProcessorReconciliationInterval cannot be negative, but got %v", - c.ProcessorReconciliationInterval) + if cfg.ProcessorReconciliationInterval < 0 { + return nil, fmt.Errorf("ProcessorReconciliationInterval cannot be negative, but got %v", + cfg.ProcessorReconciliationInterval) } - if c.EnqueueChannelBufferSize < 0 { - return fmt.Errorf("EnqueueChannelBufferSize cannot be negative, but got %d", c.EnqueueChannelBufferSize) + if cfg.EnqueueChannelBufferSize < 0 { + return nil, fmt.Errorf("EnqueueChannelBufferSize cannot be negative, but got %d", cfg.EnqueueChannelBufferSize) } // --- Defaulting --- - if c.ExpiryCleanupInterval == 0 { - c.ExpiryCleanupInterval = defaultExpiryCleanupInterval + if cfg.ExpiryCleanupInterval == 0 { + cfg.ExpiryCleanupInterval = defaultExpiryCleanupInterval } - if c.ProcessorReconciliationInterval == 0 { - c.ProcessorReconciliationInterval = defaultProcessorReconciliationInterval + if cfg.ProcessorReconciliationInterval == 0 { + cfg.ProcessorReconciliationInterval = defaultProcessorReconciliationInterval } - if c.EnqueueChannelBufferSize == 0 { - c.EnqueueChannelBufferSize = defaultEnqueueChannelBufferSize + if cfg.EnqueueChannelBufferSize == 0 { + cfg.EnqueueChannelBufferSize = defaultEnqueueChannelBufferSize } - return nil + return cfg, nil } // deepCopy creates a deep copy of the `Config` object. diff --git a/pkg/epp/flowcontrol/controller/config_test.go b/pkg/epp/flowcontrol/controller/config_test.go index a89db25ff..710df9fa7 100644 --- a/pkg/epp/flowcontrol/controller/config_test.go +++ b/pkg/epp/flowcontrol/controller/config_test.go @@ -24,7 +24,7 @@ import ( "github.com/stretchr/testify/require" ) -func TestNewConfig(t *testing.T) { +func TestConfig_ValidateAndApplyDefaults(t *testing.T) { t.Parallel() testCases := []struct { @@ -88,7 +88,7 @@ func TestNewConfig(t *testing.T) { t.Run(tc.name, func(t *testing.T) { t.Parallel() originalInput := tc.input.deepCopy() - validatedCfg, err := newConfig(tc.input) + validatedCfg, err := tc.input.ValidateAndApplyDefaults() if tc.expectErr { require.Error(t, err, "expected an error but got nil") diff --git a/pkg/epp/flowcontrol/controller/controller.go b/pkg/epp/flowcontrol/controller/controller.go index e25eda969..8449159b1 100644 --- a/pkg/epp/flowcontrol/controller/controller.go +++ b/pkg/epp/flowcontrol/controller/controller.go @@ -118,13 +118,8 @@ func NewFlowController( logger logr.Logger, opts ...flowControllerOption, ) (*FlowController, error) { - validatedConfig, err := newConfig(config) - if err != nil { - return nil, fmt.Errorf("invalid flow controller configuration: %w", err) - } - fc := &FlowController{ - config: *validatedConfig, + config: *config.deepCopy(), registry: registry, saturationDetector: sd, clock: clock.RealClock{}, diff --git a/pkg/epp/flowcontrol/controller/controller_test.go b/pkg/epp/flowcontrol/controller/controller_test.go index 511bae8ce..89376c73c 100644 --- a/pkg/epp/flowcontrol/controller/controller_test.go +++ b/pkg/epp/flowcontrol/controller/controller_test.go @@ -273,23 +273,6 @@ func newTestRequest(ctx context.Context, key types.FlowKey) *typesmocks.MockFlow // --- Test Cases --- -func TestNewFlowController(t *testing.T) { - t.Parallel() - - t.Run("ErrorOnInvalidConfig", func(t *testing.T) { - t.Parallel() - invalidCfg := Config{ProcessorReconciliationInterval: -1 * time.Second} - _, err := NewFlowController( - context.Background(), - invalidCfg, - &mockRegistryClient{}, - &mocks.MockSaturationDetector{}, - logr.Discard(), - ) - require.Error(t, err, "NewFlowController must return an error for invalid configuration") - }) -} - func TestFlowController_EnqueueAndWait(t *testing.T) { t.Parallel() diff --git a/pkg/epp/flowcontrol/registry/config.go b/pkg/epp/flowcontrol/registry/config.go index f9ad8e637..af345a665 100644 --- a/pkg/epp/flowcontrol/registry/config.go +++ b/pkg/epp/flowcontrol/registry/config.go @@ -112,7 +112,7 @@ type Config struct { // that operate at this priority level. type PriorityBandConfig struct { // Priority is the unique numerical priority level for this band. - // Convention: Lower numerical values indicate lower priority. + // Convention: Highest numeric value corresponds to highest priority (centered on 0). // Required. Priority int @@ -140,20 +140,6 @@ type PriorityBandConfig struct { MaxBytes uint64 } -// NewConfig performs validation and initialization, returning a guaranteed-valid `Config` object. -// This is the required constructor for creating a new configuration. It applies provided functional options (primarily -// for testing) and does not mutate the input `cfg`. -func NewConfig(cfg Config, opts ...configOption) (*Config, error) { - newCfg := cfg.deepCopy() - for _, opt := range opts { - opt(newCfg) - } - if err := newCfg.validateAndApplyDefaults(); err != nil { - return nil, err - } - return newCfg, nil -} - // ============================================================================= // Shard-Level Configuration // ============================================================================= @@ -205,52 +191,55 @@ func (sc *ShardConfig) getBandConfig(priority int) (*ShardPriorityBandConfig, er // --- Validation and Defaulting --- -// validateAndApplyDefaults checks the global configuration for validity (including plugin compatibility) and mutates -// the receiver to populate any empty fields with system defaults. It also initializes internal lookup maps. -func (c *Config) validateAndApplyDefaults() error { +// ValidateAndApplyDefaults checks the global configuration for validity and then creates a new `Config` object, +// populating any empty fields with system defaults. +// It does not mutate the receiver. +func (c *Config) ValidateAndApplyDefaults() (*Config, error) { + cfg := c.deepCopy() + // Apply defaults to top-level fields. - if c.InitialShardCount <= 0 { - c.InitialShardCount = defaultInitialShardCount + if cfg.InitialShardCount <= 0 { + cfg.InitialShardCount = defaultInitialShardCount } - if c.FlowGCTimeout <= 0 { - c.FlowGCTimeout = defaultFlowGCTimeout + if cfg.FlowGCTimeout <= 0 { + cfg.FlowGCTimeout = defaultFlowGCTimeout } - if c.EventChannelBufferSize <= 0 { - c.EventChannelBufferSize = defaultEventChannelBufferSize + if cfg.EventChannelBufferSize <= 0 { + cfg.EventChannelBufferSize = defaultEventChannelBufferSize } // Ensure the DI factories are initialized for production use if `NewConfig` was called without options. - if c.interFlowDispatchPolicyFactory == nil { - c.interFlowDispatchPolicyFactory = inter.NewPolicyFromName + if cfg.interFlowDispatchPolicyFactory == nil { + cfg.interFlowDispatchPolicyFactory = inter.NewPolicyFromName } - if c.intraFlowDispatchPolicyFactory == nil { - c.intraFlowDispatchPolicyFactory = intra.NewPolicyFromName + if cfg.intraFlowDispatchPolicyFactory == nil { + cfg.intraFlowDispatchPolicyFactory = intra.NewPolicyFromName } - if c.queueFactory == nil { - c.queueFactory = queue.NewQueueFromName + if cfg.queueFactory == nil { + cfg.queueFactory = queue.NewQueueFromName } - if len(c.PriorityBands) == 0 { - return errors.New("config validation failed: at least one priority band must be defined") + if len(cfg.PriorityBands) == 0 { + return nil, errors.New("config validation failed: at least one priority band must be defined") } // Validate and default each priority band. priorities := make(map[int]struct{}) priorityNames := make(map[string]struct{}) - c.priorityBandMap = make(map[int]*PriorityBandConfig, len(c.PriorityBands)) + cfg.priorityBandMap = make(map[int]*PriorityBandConfig, len(cfg.PriorityBands)) - for i := range c.PriorityBands { - band := &c.PriorityBands[i] + for i := range cfg.PriorityBands { + band := &cfg.PriorityBands[i] if _, exists := priorities[band.Priority]; exists { - return fmt.Errorf("config validation failed: duplicate priority level %d found", band.Priority) + return nil, fmt.Errorf("config validation failed: duplicate priority level %d found", band.Priority) } priorities[band.Priority] = struct{}{} if band.PriorityName == "" { - return fmt.Errorf("config validation failed: PriorityName is required for priority band %d", band.Priority) + return nil, fmt.Errorf("config validation failed: PriorityName is required for priority band %d", band.Priority) } if _, exists := priorityNames[band.PriorityName]; exists { - return fmt.Errorf("config validation failed: duplicate priority name %q found", band.PriorityName) + return nil, fmt.Errorf("config validation failed: duplicate priority name %q found", band.PriorityName) } priorityNames[band.PriorityName] = struct{}{} @@ -267,12 +256,12 @@ func (c *Config) validateAndApplyDefaults() error { band.MaxBytes = defaultPriorityBandMaxBytes } - if err := c.validateBandCompatibility(*band); err != nil { - return err + if err := cfg.validateBandCompatibility(*band); err != nil { + return nil, err } - c.priorityBandMap[band.Priority] = band + cfg.priorityBandMap[band.Priority] = band } - return nil + return cfg, nil } // validateBandCompatibility verifies that a band's configured queue type has the necessary capabilities. @@ -423,6 +412,18 @@ func withQueueFactory(factory queueFactory) configOption { } } +// newConfig creates a new validated and defaulted `Config` object. +// It applies provided test-only functional options before validation and defaulting. +// It does not mutate the input `cfg`. +// test-only +func newConfig(cfg Config, opts ...configOption) (*Config, error) { + newCfg := cfg.deepCopy() + for _, opt := range opts { + opt(newCfg) + } + return newCfg.ValidateAndApplyDefaults() +} + // --- Internal Utilities --- // deepCopy creates a deep copy of the `Config` object. @@ -436,7 +437,6 @@ func (c *Config) deepCopy() *Config { FlowGCTimeout: c.FlowGCTimeout, EventChannelBufferSize: c.EventChannelBufferSize, PriorityBands: make([]PriorityBandConfig, len(c.PriorityBands)), - priorityBandMap: make(map[int]*PriorityBandConfig, len(c.PriorityBands)), interFlowDispatchPolicyFactory: c.interFlowDispatchPolicyFactory, intraFlowDispatchPolicyFactory: c.intraFlowDispatchPolicyFactory, queueFactory: c.queueFactory, @@ -445,11 +445,14 @@ func (c *Config) deepCopy() *Config { // PriorityBandConfig contains only value types, so a slice copy is sufficient for a deep copy. copy(newCfg.PriorityBands, c.PriorityBands) - // Crucial: We must rebuild the map and take the address of the elements within the new slice (`newCfg.PriorityBands`) - // to ensure the map pointers are correct for the newly created `Config` instance. - for i := range newCfg.PriorityBands { - band := &newCfg.PriorityBands[i] - newCfg.priorityBandMap[band.Priority] = band + if c.priorityBandMap != nil { + newCfg.priorityBandMap = make(map[int]*PriorityBandConfig, len(c.PriorityBands)) + // Crucial: We must rebuild the map and take the address of the elements within the new slice (`newCfg.PriorityBands`) + // to ensure the map pointers are correct for the newly created `Config` instance. + for i := range newCfg.PriorityBands { + band := &newCfg.PriorityBands[i] + newCfg.priorityBandMap[band.Priority] = band + } } return newCfg } diff --git a/pkg/epp/flowcontrol/registry/config_test.go b/pkg/epp/flowcontrol/registry/config_test.go index ea9c19b7d..47814ae6e 100644 --- a/pkg/epp/flowcontrol/registry/config_test.go +++ b/pkg/epp/flowcontrol/registry/config_test.go @@ -35,7 +35,7 @@ import ( "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/queue/listqueue" ) -func TestConfig_NewConfig(t *testing.T) { +func TestConfig_ValidateAndApplyDefaults(t *testing.T) { t.Parallel() testCases := []struct { @@ -286,20 +286,24 @@ func TestConfig_NewConfig(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { t.Parallel() - originalInputCopy := tc.input.deepCopy() - newCfg, err := NewConfig(tc.input, tc.opts...) + originalInput := tc.input.deepCopy() + validatedCfg, err := newConfig(tc.input, tc.opts...) + if tc.expectErr { - require.Error(t, err, "NewConfig should have returned an error") + require.Error(t, err, "expected an error but got nil") if tc.expectedErrIs != nil { - assert.ErrorIs(t, err, tc.expectedErrIs, "Error should wrap the expected error type") + assert.ErrorIs(t, err, tc.expectedErrIs, "error should wrap the expected error type") } - assert.Nil(t, newCfg, "On error, the returned config should be nil") + assert.Nil(t, validatedCfg, "validatedCfg should be nil on error") } else { - require.NoError(t, err, "NewConfig should not have returned an error") - require.NotNil(t, newCfg, "On success, the returned config should not be nil") + require.NoError(t, err, "expected no error but got: %v", err) + require.NotNil(t, validatedCfg, "validatedCfg should not be nil on success") if tc.assertion != nil { - tc.assertion(t, *originalInputCopy, newCfg) + tc.assertion(t, *originalInput, validatedCfg) } + + // Ensure the original config is not mutated. + assert.Equal(t, *originalInput, tc.input, "input config should not be mutated") } }) } @@ -307,7 +311,7 @@ func TestConfig_NewConfig(t *testing.T) { func TestConfig_Partition(t *testing.T) { t.Parallel() - baseCfg, err := NewConfig(Config{ + baseCfg, err := newConfig(Config{ MaxBytes: 103, // Will not distribute evenly PriorityBands: []PriorityBandConfig{ {Priority: 1, PriorityName: "High", MaxBytes: 55}, // Will not distribute evenly @@ -391,7 +395,7 @@ func TestConfig_Partition(t *testing.T) { func TestConfig_GetBandConfig(t *testing.T) { t.Parallel() - cfg, err := NewConfig(Config{ + cfg, err := newConfig(Config{ PriorityBands: []PriorityBandConfig{ {Priority: 10, PriorityName: "High"}, }, @@ -427,7 +431,7 @@ func TestConfig_DeepCopy(t *testing.T) { }, } // Create a fully initialized "original" config to be the source of the copy. - original, err := NewConfig(baseCfg) + original, err := newConfig(baseCfg) require.NoError(t, err, "Setup for deep copy should not fail") t.Run("ShouldReturnNil_ForNilReceiver", func(t *testing.T) { @@ -481,7 +485,7 @@ func TestConfig_DeepCopy(t *testing.T) { func TestShardConfig_GetBandConfig(t *testing.T) { t.Parallel() - baseCfg, err := NewConfig(Config{ + baseCfg, err := newConfig(Config{ PriorityBands: []PriorityBandConfig{ {Priority: 10, PriorityName: "High"}, {Priority: 20, PriorityName: "Low"}, diff --git a/pkg/epp/flowcontrol/registry/registry.go b/pkg/epp/flowcontrol/registry/registry.go index 5f295541d..3a73ef706 100644 --- a/pkg/epp/flowcontrol/registry/registry.go +++ b/pkg/epp/flowcontrol/registry/registry.go @@ -148,17 +148,13 @@ func withClock(clk clock.WithTickerAndDelayedExecution) RegistryOption { // NewFlowRegistry creates and initializes a new `FlowRegistry` instance. func NewFlowRegistry(config Config, logger logr.Logger, opts ...RegistryOption) (*FlowRegistry, error) { - validatedConfig, err := NewConfig(config) - if err != nil { - return nil, fmt.Errorf("master configuration is invalid: %w", err) - } - + cfg := config.deepCopy() fr := &FlowRegistry{ - config: validatedConfig, + config: cfg, logger: logger.WithName("flow-registry"), activeShards: []*registryShard{}, drainingShards: make(map[string]*registryShard), - perPriorityBandStats: make(map[int]*bandStats, len(validatedConfig.PriorityBands)), + perPriorityBandStats: make(map[int]*bandStats, len(cfg.PriorityBands)), } for _, opt := range opts { @@ -173,7 +169,7 @@ func NewFlowRegistry(config Config, logger logr.Logger, opts ...RegistryOption) fr.perPriorityBandStats[band.Priority] = &bandStats{} } - if err := fr.updateShardCount(validatedConfig.InitialShardCount); err != nil { + if err := fr.updateShardCount(cfg.InitialShardCount); err != nil { return nil, fmt.Errorf("failed to initialize shards: %w", err) } fr.logger.V(logging.DEFAULT).Info("FlowRegistry initialized successfully") diff --git a/pkg/epp/flowcontrol/registry/registry_test.go b/pkg/epp/flowcontrol/registry/registry_test.go index b63b151c2..b5bc322cb 100644 --- a/pkg/epp/flowcontrol/registry/registry_test.go +++ b/pkg/epp/flowcontrol/registry/registry_test.go @@ -73,9 +73,12 @@ func newRegistryTestHarness(t *testing.T, opts harnessOptions) *registryTestHarn config.InitialShardCount = opts.initialShardCount } + validatedCfg, err := config.ValidateAndApplyDefaults() + require.NoError(t, err, "Test setup: validating config should not fail") + fakeClock := testclock.NewFakeClock(time.Now()) registryOpts := []RegistryOption{withClock(fakeClock)} - fr, err := NewFlowRegistry(config, logr.Discard(), registryOpts...) + fr, err := NewFlowRegistry(*validatedCfg, logr.Discard(), registryOpts...) require.NoError(t, err, "Test setup: NewFlowRegistry should not fail") // Start the GC loop in the background. @@ -132,69 +135,9 @@ func (h *registryTestHarness) openConnectionOnFlow(key types.FlowKey) { func TestFlowRegistry_New(t *testing.T) { t.Parallel() - t.Run("ShouldApplyDefaults_WhenInitialized", func(t *testing.T) { - t.Parallel() - config := Config{PriorityBands: []PriorityBandConfig{{Priority: highPriority, PriorityName: "DefaultedBand"}}} - fr, err := NewFlowRegistry(config, logr.Discard()) - require.NoError(t, err, "Creating a valid registry with defaults should not fail") - assert.Equal(t, defaultInitialShardCount, fr.config.InitialShardCount, "InitialShardCount should be defaulted") - assert.Equal(t, defaultFlowGCTimeout, fr.config.FlowGCTimeout, "FlowGCTimeout should be defaulted") - assert.Equal(t, defaultEventChannelBufferSize, fr.config.EventChannelBufferSize, - "EventChannelBufferSize should be defaulted") - assert.Len(t, fr.allShards, defaultInitialShardCount, - "Registry should be initialized with the default number of shards") - bandConf, err := fr.config.getBandConfig(highPriority) - require.NoError(t, err, "Getting the defaulted band config should not fail") - assert.Equal(t, defaultPriorityBandMaxBytes, bandConf.MaxBytes, "Priority band MaxBytes should be defaulted") - }) - - t.Run("ShouldFail_OnInvalidConfiguration", func(t *testing.T) { - t.Parallel() - testCases := []struct { - name string - config Config - expectErrSubStr string - }{ - { - name: "WhenNoPriorityBandsAreDefined", - config: Config{}, - expectErrSubStr: "at least one priority band must be defined", - }, - { - name: "WhenPriorityLevelsAreDuplicated", - config: Config{ - PriorityBands: []PriorityBandConfig{ - {Priority: highPriority, PriorityName: "A"}, - {Priority: highPriority, PriorityName: "B"}, - }, - }, - expectErrSubStr: fmt.Sprintf("duplicate priority level %d", highPriority), - }, - { - name: "WhenPriorityNamesAreDuplicated", - config: Config{ - PriorityBands: []PriorityBandConfig{ - {Priority: highPriority, PriorityName: "A"}, - {Priority: lowPriority, PriorityName: "A"}, - }, - }, - expectErrSubStr: `duplicate priority name "A"`, - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - t.Parallel() - _, err := NewFlowRegistry(tc.config, logr.Discard()) - require.Error(t, err, "NewFlowRegistry should fail with an invalid config") - assert.Contains(t, err.Error(), tc.expectErrSubStr, "Error message should contain the expected reason") - }) - } - }) - t.Run("ShouldFail_WhenInitialShardCreationFails", func(t *testing.T) { t.Parallel() - config, err := NewConfig( + config, err := newConfig( Config{PriorityBands: []PriorityBandConfig{{Priority: highPriority, PriorityName: "A"}}}, withInterFlowDispatchPolicyFactory(func(inter.RegisteredPolicyName) (framework.InterFlowDispatchPolicy, error) { return nil, errors.New("injected factory failure") @@ -544,14 +487,6 @@ func TestFlowRegistry_UpdateShardCount(t *testing.T) { expectedPartitionedGlobalCapacities: map[uint64]int{25: 4}, expectedPartitionedBandCapacities: map[uint64]int{12: 2, 13: 2}, }, - { - name: "Succeeds_ScaleUp_FromZero", - initialShardCount: 0, - targetShardCount: 4, - expectedActiveCount: 4, - expectedPartitionedGlobalCapacities: map[uint64]int{25: 4}, - expectedPartitionedBandCapacities: map[uint64]int{12: 2, 13: 2}, - }, { name: "Succeeds_ScaleDown_ToOne", initialShardCount: 3, diff --git a/pkg/epp/flowcontrol/registry/shard_test.go b/pkg/epp/flowcontrol/registry/shard_test.go index afb2fac34..23bf81325 100644 --- a/pkg/epp/flowcontrol/registry/shard_test.go +++ b/pkg/epp/flowcontrol/registry/shard_test.go @@ -59,7 +59,7 @@ type shardTestHarness struct { // newShardTestHarness initializes a `shardTestHarness` with a default configuration. func newShardTestHarness(t *testing.T) *shardTestHarness { t.Helper() - globalConfig, err := NewConfig(Config{ + globalConfig, err := newConfig(Config{ PriorityBands: []PriorityBandConfig{ {Priority: highPriority, PriorityName: "High"}, {Priority: lowPriority, PriorityName: "Low"}, @@ -146,7 +146,7 @@ func TestShard_New(t *testing.T) { t.Run("ShouldFail_WhenInterFlowPolicyFactoryFails", func(t *testing.T) { t.Parallel() - shardConfig, _ := NewConfig(Config{PriorityBands: []PriorityBandConfig{ + shardConfig, _ := newConfig(Config{PriorityBands: []PriorityBandConfig{ {Priority: highPriority, PriorityName: "High"}, }}) failingFactory := func(inter.RegisteredPolicyName) (framework.InterFlowDispatchPolicy, error) { From 4fad1a925ea646c091aadd0ae9a91dfe72558ec5 Mon Sep 17 00:00:00 2001 From: Luke Van Drie Date: Sat, 6 Sep 2025 19:18:07 +0000 Subject: [PATCH 2/3] feat(flowcontrol): Add bundled Flow Control config Introduces a new top-level `Config` for the Flow Control layer. This config bundles the configurations for the `controller` and `registry` packages, providing a single, unified point of entry fo validation and default application. This simplifies the management and initialization of the flow control system by centralizing its configuration. --- pkg/epp/flowcontrol/config.go | 50 ++++++++++++++++ pkg/epp/flowcontrol/config_test.go | 91 ++++++++++++++++++++++++++++++ 2 files changed, 141 insertions(+) create mode 100644 pkg/epp/flowcontrol/config.go create mode 100644 pkg/epp/flowcontrol/config_test.go diff --git a/pkg/epp/flowcontrol/config.go b/pkg/epp/flowcontrol/config.go new file mode 100644 index 000000000..edc23abad --- /dev/null +++ b/pkg/epp/flowcontrol/config.go @@ -0,0 +1,50 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package flowcontrol + +import ( + "fmt" + + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/controller" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/registry" +) + +// Config is the top-level configuration for the entire flow control module. +// It embeds the configurations for the controller and the registry, providing a single point of entry for validation +// and initialization. +type Config struct { + Controller controller.Config + Registry registry.Config +} + +// ValidateAndApplyDefaults checks the configuration for validity and populates any empty fields with system defaults. +// It delegates validation to the underlying controller and registry configurations. +// It returns a new, validated `Config` object and does not mutate the receiver. +func (c *Config) ValidateAndApplyDefaults() (*Config, error) { + validatedControllerCfg, err := c.Controller.ValidateAndApplyDefaults() + if err != nil { + return nil, fmt.Errorf("controller config validation failed: %w", err) + } + validatedRegistryCfg, err := c.Registry.ValidateAndApplyDefaults() + if err != nil { + return nil, fmt.Errorf("registry config validation failed: %w", err) + } + return &Config{ + Controller: *validatedControllerCfg, + Registry: *validatedRegistryCfg, + }, nil +} diff --git a/pkg/epp/flowcontrol/config_test.go b/pkg/epp/flowcontrol/config_test.go new file mode 100644 index 000000000..713abee77 --- /dev/null +++ b/pkg/epp/flowcontrol/config_test.go @@ -0,0 +1,91 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package flowcontrol + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/controller" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/registry" +) + +func TestConfig_ValidateAndApplyDefaults(t *testing.T) { + t.Parallel() + + // A minimal valid registry config, which is required for the success case. + validRegistryConfig := registry.Config{ + PriorityBands: []registry.PriorityBandConfig{ + {Priority: 1, PriorityName: "TestBand"}, + }, + } + + testCases := []struct { + name string + input Config + expectErr bool + expectedErrIs error + }{ + { + name: "ShouldSucceed_WhenSubConfigsAreValid", + input: Config{ + Controller: controller.Config{}, + Registry: validRegistryConfig, + }, + expectErr: false, + }, + { + name: "ShouldFail_WhenControllerConfigIsInvalid", + input: Config{ + Controller: controller.Config{ + DefaultRequestTTL: -1 * time.Second, + }, + Registry: validRegistryConfig, + }, + expectErr: true, + }, + { + name: "ShouldFail_WhenRegistryConfigIsInvalid", + input: Config{ + Controller: controller.Config{}, + Registry: registry.Config{ + PriorityBands: []registry.PriorityBandConfig{}, + }, + }, + expectErr: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + originalInput := tc.input + validatedCfg, err := tc.input.ValidateAndApplyDefaults() + + if tc.expectErr { + require.Error(t, err, "expected an error but got nil") + } else { + require.NoError(t, err, "expected no error but got: %v", err) + require.NotNil(t, validatedCfg, "validatedCfg should not be nil on success") + } + + assert.Equal(t, originalInput, tc.input, "input config should not be mutated") + }) + } +} From b94dd82cfe18b6b88807c53f3e6efdc8da4428f0 Mon Sep 17 00:00:00 2001 From: Luke Van Drie Date: Mon, 15 Sep 2025 16:39:38 +0000 Subject: [PATCH 3/3] fix: Update controller tests with valid config The previous commit introduced a unified and validated configuration for the flow control system, requiring callers to pass a pre-validated cofig to the controller and registry respectively. This change updates the controller tests to provide a valid configuration instead of relying on the now-removed defaulting logic in the constructor. --- pkg/epp/flowcontrol/controller/controller_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/epp/flowcontrol/controller/controller_test.go b/pkg/epp/flowcontrol/controller/controller_test.go index 89376c73c..435cdb510 100644 --- a/pkg/epp/flowcontrol/controller/controller_test.go +++ b/pkg/epp/flowcontrol/controller/controller_test.go @@ -796,6 +796,7 @@ func TestFlowController_Concurrency(t *testing.T) { // Use a generous buffer to prevent flakes in the test due to transient queuing delays. EnqueueChannelBufferSize: numRequests, DefaultRequestTTL: 1 * time.Second, + ExpiryCleanupInterval: 100 * time.Millisecond, }, mockRegistry) var wg sync.WaitGroup