Skip to content

Commit 9111ca7

Browse files
committed
refactor: Standardize FC config validation pattern
Introduces a consistent `ValidateAndApplyDefaults` method to the configuration structs in the `flowcontrol/controller` and `flowcontrol/registry` packages. This change refactors the configuration handling to follow a unified pattern: - Configuration structs now have a `ValidateAndApplyDefaults` method that returns a new, validated config object without mutating the original. - Constructors for `FlowController` and `FlowRegistry` now assume they receive a valid configuration, simplifying their logic and pushing the responsibility of validation to the caller. - The `deepCopy` logic is corrected to ensure test assertions for immutability pass reliably. This improves the clarity and robustness of configuration management within the flow control module, creating a consistent foundation for future wiring work.
1 parent d03d3b6 commit 9111ca7

File tree

9 files changed

+103
-195
lines changed

9 files changed

+103
-195
lines changed

pkg/epp/flowcontrol/controller/config.go

Lines changed: 21 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -53,46 +53,38 @@ type Config struct {
5353
EnqueueChannelBufferSize int
5454
}
5555

56-
// newConfig performs validation and initialization, returning a guaranteed-valid `Config` object.
57-
// This is the required constructor for creating a new configuration.
58-
// It does not mutate the input `cfg`.
59-
func newConfig(cfg Config) (*Config, error) {
60-
newCfg := cfg.deepCopy()
61-
if err := newCfg.validateAndApplyDefaults(); err != nil {
62-
return nil, err
63-
}
64-
return newCfg, nil
65-
}
56+
// ValidateAndApplyDefaults checks the global configuration for validity and then creates a new `Config` object,
57+
// populating any empty fields with system defaults.
58+
// It does not mutate the receiver.
59+
func (c *Config) ValidateAndApplyDefaults() (*Config, error) {
60+
cfg := c.deepCopy()
6661

67-
// validateAndApplyDefaults checks the global configuration for validity and then mutates the receiver to populate any
68-
// empty fields with system defaults.
69-
func (c *Config) validateAndApplyDefaults() error {
7062
// --- Validation ---
71-
if c.DefaultRequestTTL < 0 {
72-
return fmt.Errorf("DefaultRequestTTL cannot be negative, but got %v", c.DefaultRequestTTL)
63+
if cfg.DefaultRequestTTL < 0 {
64+
return nil, fmt.Errorf("DefaultRequestTTL cannot be negative, but got %v", cfg.DefaultRequestTTL)
7365
}
74-
if c.ExpiryCleanupInterval < 0 {
75-
return fmt.Errorf("ExpiryCleanupInterval cannot be negative, but got %v", c.ExpiryCleanupInterval)
66+
if cfg.ExpiryCleanupInterval < 0 {
67+
return nil, fmt.Errorf("ExpiryCleanupInterval cannot be negative, but got %v", cfg.ExpiryCleanupInterval)
7668
}
77-
if c.ProcessorReconciliationInterval < 0 {
78-
return fmt.Errorf("ProcessorReconciliationInterval cannot be negative, but got %v",
79-
c.ProcessorReconciliationInterval)
69+
if cfg.ProcessorReconciliationInterval < 0 {
70+
return nil, fmt.Errorf("ProcessorReconciliationInterval cannot be negative, but got %v",
71+
cfg.ProcessorReconciliationInterval)
8072
}
81-
if c.EnqueueChannelBufferSize < 0 {
82-
return fmt.Errorf("EnqueueChannelBufferSize cannot be negative, but got %d", c.EnqueueChannelBufferSize)
73+
if cfg.EnqueueChannelBufferSize < 0 {
74+
return nil, fmt.Errorf("EnqueueChannelBufferSize cannot be negative, but got %d", cfg.EnqueueChannelBufferSize)
8375
}
8476

8577
// --- Defaulting ---
86-
if c.ExpiryCleanupInterval == 0 {
87-
c.ExpiryCleanupInterval = defaultExpiryCleanupInterval
78+
if cfg.ExpiryCleanupInterval == 0 {
79+
cfg.ExpiryCleanupInterval = defaultExpiryCleanupInterval
8880
}
89-
if c.ProcessorReconciliationInterval == 0 {
90-
c.ProcessorReconciliationInterval = defaultProcessorReconciliationInterval
81+
if cfg.ProcessorReconciliationInterval == 0 {
82+
cfg.ProcessorReconciliationInterval = defaultProcessorReconciliationInterval
9183
}
92-
if c.EnqueueChannelBufferSize == 0 {
93-
c.EnqueueChannelBufferSize = defaultEnqueueChannelBufferSize
84+
if cfg.EnqueueChannelBufferSize == 0 {
85+
cfg.EnqueueChannelBufferSize = defaultEnqueueChannelBufferSize
9486
}
95-
return nil
87+
return cfg, nil
9688
}
9789

9890
// deepCopy creates a deep copy of the `Config` object.

pkg/epp/flowcontrol/controller/config_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import (
2424
"github.com/stretchr/testify/require"
2525
)
2626

27-
func TestNewConfig(t *testing.T) {
27+
func TestConfig_ValidateAndApplyDefaults(t *testing.T) {
2828
t.Parallel()
2929

3030
testCases := []struct {
@@ -88,7 +88,7 @@ func TestNewConfig(t *testing.T) {
8888
t.Run(tc.name, func(t *testing.T) {
8989
t.Parallel()
9090
originalInput := tc.input.deepCopy()
91-
validatedCfg, err := newConfig(tc.input)
91+
validatedCfg, err := tc.input.ValidateAndApplyDefaults()
9292

9393
if tc.expectErr {
9494
require.Error(t, err, "expected an error but got nil")

pkg/epp/flowcontrol/controller/controller.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -118,13 +118,8 @@ func NewFlowController(
118118
logger logr.Logger,
119119
opts ...flowControllerOption,
120120
) (*FlowController, error) {
121-
validatedConfig, err := newConfig(config)
122-
if err != nil {
123-
return nil, fmt.Errorf("invalid flow controller configuration: %w", err)
124-
}
125-
126121
fc := &FlowController{
127-
config: *validatedConfig,
122+
config: *config.deepCopy(),
128123
registry: registry,
129124
saturationDetector: sd,
130125
clock: clock.RealClock{},

pkg/epp/flowcontrol/controller/controller_test.go

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -273,23 +273,6 @@ func newTestRequest(ctx context.Context, key types.FlowKey) *typesmocks.MockFlow
273273

274274
// --- Test Cases ---
275275

276-
func TestNewFlowController(t *testing.T) {
277-
t.Parallel()
278-
279-
t.Run("ErrorOnInvalidConfig", func(t *testing.T) {
280-
t.Parallel()
281-
invalidCfg := Config{ProcessorReconciliationInterval: -1 * time.Second}
282-
_, err := NewFlowController(
283-
context.Background(),
284-
invalidCfg,
285-
&mockRegistryClient{},
286-
&mocks.MockSaturationDetector{},
287-
logr.Discard(),
288-
)
289-
require.Error(t, err, "NewFlowController must return an error for invalid configuration")
290-
})
291-
}
292-
293276
func TestFlowController_EnqueueAndWait(t *testing.T) {
294277
t.Parallel()
295278

pkg/epp/flowcontrol/registry/config.go

Lines changed: 51 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ type Config struct {
112112
// that operate at this priority level.
113113
type PriorityBandConfig struct {
114114
// Priority is the unique numerical priority level for this band.
115-
// Convention: Lower numerical values indicate lower priority.
115+
// Convention: Highest numeric value corresponds to highest priority (centered on 0).
116116
// Required.
117117
Priority int
118118

@@ -140,20 +140,6 @@ type PriorityBandConfig struct {
140140
MaxBytes uint64
141141
}
142142

143-
// NewConfig performs validation and initialization, returning a guaranteed-valid `Config` object.
144-
// This is the required constructor for creating a new configuration. It applies provided functional options (primarily
145-
// for testing) and does not mutate the input `cfg`.
146-
func NewConfig(cfg Config, opts ...configOption) (*Config, error) {
147-
newCfg := cfg.deepCopy()
148-
for _, opt := range opts {
149-
opt(newCfg)
150-
}
151-
if err := newCfg.validateAndApplyDefaults(); err != nil {
152-
return nil, err
153-
}
154-
return newCfg, nil
155-
}
156-
157143
// =============================================================================
158144
// Shard-Level Configuration
159145
// =============================================================================
@@ -205,52 +191,55 @@ func (sc *ShardConfig) getBandConfig(priority int) (*ShardPriorityBandConfig, er
205191

206192
// --- Validation and Defaulting ---
207193

208-
// validateAndApplyDefaults checks the global configuration for validity (including plugin compatibility) and mutates
209-
// the receiver to populate any empty fields with system defaults. It also initializes internal lookup maps.
210-
func (c *Config) validateAndApplyDefaults() error {
194+
// ValidateAndApplyDefaults checks the global configuration for validity and then creates a new `Config` object,
195+
// populating any empty fields with system defaults.
196+
// It does not mutate the receiver.
197+
func (c *Config) ValidateAndApplyDefaults() (*Config, error) {
198+
cfg := c.deepCopy()
199+
211200
// Apply defaults to top-level fields.
212-
if c.InitialShardCount <= 0 {
213-
c.InitialShardCount = defaultInitialShardCount
201+
if cfg.InitialShardCount <= 0 {
202+
cfg.InitialShardCount = defaultInitialShardCount
214203
}
215-
if c.FlowGCTimeout <= 0 {
216-
c.FlowGCTimeout = defaultFlowGCTimeout
204+
if cfg.FlowGCTimeout <= 0 {
205+
cfg.FlowGCTimeout = defaultFlowGCTimeout
217206
}
218-
if c.EventChannelBufferSize <= 0 {
219-
c.EventChannelBufferSize = defaultEventChannelBufferSize
207+
if cfg.EventChannelBufferSize <= 0 {
208+
cfg.EventChannelBufferSize = defaultEventChannelBufferSize
220209
}
221210

222211
// Ensure the DI factories are initialized for production use if `NewConfig` was called without options.
223-
if c.interFlowDispatchPolicyFactory == nil {
224-
c.interFlowDispatchPolicyFactory = inter.NewPolicyFromName
212+
if cfg.interFlowDispatchPolicyFactory == nil {
213+
cfg.interFlowDispatchPolicyFactory = inter.NewPolicyFromName
225214
}
226-
if c.intraFlowDispatchPolicyFactory == nil {
227-
c.intraFlowDispatchPolicyFactory = intra.NewPolicyFromName
215+
if cfg.intraFlowDispatchPolicyFactory == nil {
216+
cfg.intraFlowDispatchPolicyFactory = intra.NewPolicyFromName
228217
}
229-
if c.queueFactory == nil {
230-
c.queueFactory = queue.NewQueueFromName
218+
if cfg.queueFactory == nil {
219+
cfg.queueFactory = queue.NewQueueFromName
231220
}
232221

233-
if len(c.PriorityBands) == 0 {
234-
return errors.New("config validation failed: at least one priority band must be defined")
222+
if len(cfg.PriorityBands) == 0 {
223+
return nil, errors.New("config validation failed: at least one priority band must be defined")
235224
}
236225

237226
// Validate and default each priority band.
238227
priorities := make(map[int]struct{})
239228
priorityNames := make(map[string]struct{})
240-
c.priorityBandMap = make(map[int]*PriorityBandConfig, len(c.PriorityBands))
229+
cfg.priorityBandMap = make(map[int]*PriorityBandConfig, len(cfg.PriorityBands))
241230

242-
for i := range c.PriorityBands {
243-
band := &c.PriorityBands[i]
231+
for i := range cfg.PriorityBands {
232+
band := &cfg.PriorityBands[i]
244233
if _, exists := priorities[band.Priority]; exists {
245-
return fmt.Errorf("config validation failed: duplicate priority level %d found", band.Priority)
234+
return nil, fmt.Errorf("config validation failed: duplicate priority level %d found", band.Priority)
246235
}
247236
priorities[band.Priority] = struct{}{}
248237

249238
if band.PriorityName == "" {
250-
return fmt.Errorf("config validation failed: PriorityName is required for priority band %d", band.Priority)
239+
return nil, fmt.Errorf("config validation failed: PriorityName is required for priority band %d", band.Priority)
251240
}
252241
if _, exists := priorityNames[band.PriorityName]; exists {
253-
return fmt.Errorf("config validation failed: duplicate priority name %q found", band.PriorityName)
242+
return nil, fmt.Errorf("config validation failed: duplicate priority name %q found", band.PriorityName)
254243
}
255244
priorityNames[band.PriorityName] = struct{}{}
256245

@@ -267,12 +256,12 @@ func (c *Config) validateAndApplyDefaults() error {
267256
band.MaxBytes = defaultPriorityBandMaxBytes
268257
}
269258

270-
if err := c.validateBandCompatibility(*band); err != nil {
271-
return err
259+
if err := cfg.validateBandCompatibility(*band); err != nil {
260+
return nil, err
272261
}
273-
c.priorityBandMap[band.Priority] = band
262+
cfg.priorityBandMap[band.Priority] = band
274263
}
275-
return nil
264+
return cfg, nil
276265
}
277266

278267
// validateBandCompatibility verifies that a band's configured queue type has the necessary capabilities.
@@ -423,6 +412,18 @@ func withQueueFactory(factory queueFactory) configOption {
423412
}
424413
}
425414

415+
// newConfig creates a new validated and defaulted `Config` object.
416+
// It applies provided test-only functional options before validation and defaulting.
417+
// It does not mutate the input `cfg`.
418+
// test-only
419+
func newConfig(cfg Config, opts ...configOption) (*Config, error) {
420+
newCfg := cfg.deepCopy()
421+
for _, opt := range opts {
422+
opt(newCfg)
423+
}
424+
return newCfg.ValidateAndApplyDefaults()
425+
}
426+
426427
// --- Internal Utilities ---
427428

428429
// deepCopy creates a deep copy of the `Config` object.
@@ -436,7 +437,6 @@ func (c *Config) deepCopy() *Config {
436437
FlowGCTimeout: c.FlowGCTimeout,
437438
EventChannelBufferSize: c.EventChannelBufferSize,
438439
PriorityBands: make([]PriorityBandConfig, len(c.PriorityBands)),
439-
priorityBandMap: make(map[int]*PriorityBandConfig, len(c.PriorityBands)),
440440
interFlowDispatchPolicyFactory: c.interFlowDispatchPolicyFactory,
441441
intraFlowDispatchPolicyFactory: c.intraFlowDispatchPolicyFactory,
442442
queueFactory: c.queueFactory,
@@ -445,11 +445,14 @@ func (c *Config) deepCopy() *Config {
445445
// PriorityBandConfig contains only value types, so a slice copy is sufficient for a deep copy.
446446
copy(newCfg.PriorityBands, c.PriorityBands)
447447

448-
// Crucial: We must rebuild the map and take the address of the elements within the new slice (`newCfg.PriorityBands`)
449-
// to ensure the map pointers are correct for the newly created `Config` instance.
450-
for i := range newCfg.PriorityBands {
451-
band := &newCfg.PriorityBands[i]
452-
newCfg.priorityBandMap[band.Priority] = band
448+
if c.priorityBandMap != nil {
449+
newCfg.priorityBandMap = make(map[int]*PriorityBandConfig, len(c.PriorityBands))
450+
// Crucial: We must rebuild the map and take the address of the elements within the new slice (`newCfg.PriorityBands`)
451+
// to ensure the map pointers are correct for the newly created `Config` instance.
452+
for i := range newCfg.PriorityBands {
453+
band := &newCfg.PriorityBands[i]
454+
newCfg.priorityBandMap[band.Priority] = band
455+
}
453456
}
454457
return newCfg
455458
}

pkg/epp/flowcontrol/registry/config_test.go

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ import (
3535
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/queue/listqueue"
3636
)
3737

38-
func TestConfig_NewConfig(t *testing.T) {
38+
func TestConfig_ValidateAndApplyDefaults(t *testing.T) {
3939
t.Parallel()
4040

4141
testCases := []struct {
@@ -286,28 +286,32 @@ func TestConfig_NewConfig(t *testing.T) {
286286
for _, tc := range testCases {
287287
t.Run(tc.name, func(t *testing.T) {
288288
t.Parallel()
289-
originalInputCopy := tc.input.deepCopy()
290-
newCfg, err := NewConfig(tc.input, tc.opts...)
289+
originalInput := tc.input.deepCopy()
290+
validatedCfg, err := newConfig(tc.input, tc.opts...)
291+
291292
if tc.expectErr {
292-
require.Error(t, err, "NewConfig should have returned an error")
293+
require.Error(t, err, "expected an error but got nil")
293294
if tc.expectedErrIs != nil {
294-
assert.ErrorIs(t, err, tc.expectedErrIs, "Error should wrap the expected error type")
295+
assert.ErrorIs(t, err, tc.expectedErrIs, "error should wrap the expected error type")
295296
}
296-
assert.Nil(t, newCfg, "On error, the returned config should be nil")
297+
assert.Nil(t, validatedCfg, "validatedCfg should be nil on error")
297298
} else {
298-
require.NoError(t, err, "NewConfig should not have returned an error")
299-
require.NotNil(t, newCfg, "On success, the returned config should not be nil")
299+
require.NoError(t, err, "expected no error but got: %v", err)
300+
require.NotNil(t, validatedCfg, "validatedCfg should not be nil on success")
300301
if tc.assertion != nil {
301-
tc.assertion(t, *originalInputCopy, newCfg)
302+
tc.assertion(t, *originalInput, validatedCfg)
302303
}
304+
305+
// Ensure the original config is not mutated.
306+
assert.Equal(t, *originalInput, tc.input, "input config should not be mutated")
303307
}
304308
})
305309
}
306310
}
307311

308312
func TestConfig_Partition(t *testing.T) {
309313
t.Parallel()
310-
baseCfg, err := NewConfig(Config{
314+
baseCfg, err := newConfig(Config{
311315
MaxBytes: 103, // Will not distribute evenly
312316
PriorityBands: []PriorityBandConfig{
313317
{Priority: 1, PriorityName: "High", MaxBytes: 55}, // Will not distribute evenly
@@ -391,7 +395,7 @@ func TestConfig_Partition(t *testing.T) {
391395

392396
func TestConfig_GetBandConfig(t *testing.T) {
393397
t.Parallel()
394-
cfg, err := NewConfig(Config{
398+
cfg, err := newConfig(Config{
395399
PriorityBands: []PriorityBandConfig{
396400
{Priority: 10, PriorityName: "High"},
397401
},
@@ -427,7 +431,7 @@ func TestConfig_DeepCopy(t *testing.T) {
427431
},
428432
}
429433
// Create a fully initialized "original" config to be the source of the copy.
430-
original, err := NewConfig(baseCfg)
434+
original, err := newConfig(baseCfg)
431435
require.NoError(t, err, "Setup for deep copy should not fail")
432436

433437
t.Run("ShouldReturnNil_ForNilReceiver", func(t *testing.T) {
@@ -481,7 +485,7 @@ func TestConfig_DeepCopy(t *testing.T) {
481485

482486
func TestShardConfig_GetBandConfig(t *testing.T) {
483487
t.Parallel()
484-
baseCfg, err := NewConfig(Config{
488+
baseCfg, err := newConfig(Config{
485489
PriorityBands: []PriorityBandConfig{
486490
{Priority: 10, PriorityName: "High"},
487491
{Priority: 20, PriorityName: "Low"},

0 commit comments

Comments
 (0)