Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions pkg/epp/flowcontrol/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package flowcontrol

import (
"fmt"

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/controller"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/registry"
)

// Config is the top-level configuration for the entire flow control module.
// It embeds the configurations for the controller and the registry, providing a single point of entry for validation
// and initialization.
type Config struct {
Controller controller.Config
Registry registry.Config
}

// ValidateAndApplyDefaults checks the configuration for validity and populates any empty fields with system defaults.
// It delegates validation to the underlying controller and registry configurations.
// It returns a new, validated `Config` object and does not mutate the receiver.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kfswain Let's continue our discussion from the previous controller PR here regarding the config defaulting pattern.

Copy link
Contributor Author

@LukeAVanDrie LukeAVanDrie Sep 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// It returns a new, validated Config object and does not mutate the receiver.

My preference is to keep functions pure. It's easier to test and has no side effects. You could argue that this is better expressed as ValidateAndApplyDefaults(cfg Config) (*Config, error). You can also argue that my existing implementation should mutate the receiver.

Or even that validation and defaulting should be split.

I am not strongly opinionated here. We should just decide on a consistent pattern.

func (c *Config) ValidateAndApplyDefaults() (*Config, error) {
validatedControllerCfg, err := c.Controller.ValidateAndApplyDefaults()
if err != nil {
return nil, fmt.Errorf("controller config validation failed: %w", err)
}
validatedRegistryCfg, err := c.Registry.ValidateAndApplyDefaults()
if err != nil {
return nil, fmt.Errorf("registry config validation failed: %w", err)
}
return &Config{
Controller: *validatedControllerCfg,
Registry: *validatedRegistryCfg,
}, nil
}
91 changes: 91 additions & 0 deletions pkg/epp/flowcontrol/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package flowcontrol

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/controller"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/registry"
)

func TestConfig_ValidateAndApplyDefaults(t *testing.T) {
t.Parallel()

// A minimal valid registry config, which is required for the success case.
validRegistryConfig := registry.Config{
PriorityBands: []registry.PriorityBandConfig{
{Priority: 1, PriorityName: "TestBand"},
},
}

testCases := []struct {
name string
input Config
expectErr bool
expectedErrIs error
}{
{
name: "ShouldSucceed_WhenSubConfigsAreValid",
input: Config{
Controller: controller.Config{},
Registry: validRegistryConfig,
},
expectErr: false,
},
{
name: "ShouldFail_WhenControllerConfigIsInvalid",
input: Config{
Controller: controller.Config{
DefaultRequestTTL: -1 * time.Second,
},
Registry: validRegistryConfig,
},
expectErr: true,
},
{
name: "ShouldFail_WhenRegistryConfigIsInvalid",
input: Config{
Controller: controller.Config{},
Registry: registry.Config{
PriorityBands: []registry.PriorityBandConfig{},
},
},
expectErr: true,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
originalInput := tc.input
validatedCfg, err := tc.input.ValidateAndApplyDefaults()

if tc.expectErr {
require.Error(t, err, "expected an error but got nil")
} else {
require.NoError(t, err, "expected no error but got: %v", err)
require.NotNil(t, validatedCfg, "validatedCfg should not be nil on success")
}

assert.Equal(t, originalInput, tc.input, "input config should not be mutated")
})
}
}
50 changes: 21 additions & 29 deletions pkg/epp/flowcontrol/controller/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,46 +53,38 @@ type Config struct {
EnqueueChannelBufferSize int
}

// newConfig performs validation and initialization, returning a guaranteed-valid `Config` object.
// This is the required constructor for creating a new configuration.
// It does not mutate the input `cfg`.
func newConfig(cfg Config) (*Config, error) {
newCfg := cfg.deepCopy()
if err := newCfg.validateAndApplyDefaults(); err != nil {
return nil, err
}
return newCfg, nil
}
// ValidateAndApplyDefaults checks the global configuration for validity and then creates a new `Config` object,
// populating any empty fields with system defaults.
// It does not mutate the receiver.
func (c *Config) ValidateAndApplyDefaults() (*Config, error) {
cfg := c.deepCopy()

// validateAndApplyDefaults checks the global configuration for validity and then mutates the receiver to populate any
// empty fields with system defaults.
func (c *Config) validateAndApplyDefaults() error {
// --- Validation ---
if c.DefaultRequestTTL < 0 {
return fmt.Errorf("DefaultRequestTTL cannot be negative, but got %v", c.DefaultRequestTTL)
if cfg.DefaultRequestTTL < 0 {
return nil, fmt.Errorf("DefaultRequestTTL cannot be negative, but got %v", cfg.DefaultRequestTTL)
}
if c.ExpiryCleanupInterval < 0 {
return fmt.Errorf("ExpiryCleanupInterval cannot be negative, but got %v", c.ExpiryCleanupInterval)
if cfg.ExpiryCleanupInterval < 0 {
return nil, fmt.Errorf("ExpiryCleanupInterval cannot be negative, but got %v", cfg.ExpiryCleanupInterval)
}
if c.ProcessorReconciliationInterval < 0 {
return fmt.Errorf("ProcessorReconciliationInterval cannot be negative, but got %v",
c.ProcessorReconciliationInterval)
if cfg.ProcessorReconciliationInterval < 0 {
return nil, fmt.Errorf("ProcessorReconciliationInterval cannot be negative, but got %v",
cfg.ProcessorReconciliationInterval)
}
if c.EnqueueChannelBufferSize < 0 {
return fmt.Errorf("EnqueueChannelBufferSize cannot be negative, but got %d", c.EnqueueChannelBufferSize)
if cfg.EnqueueChannelBufferSize < 0 {
return nil, fmt.Errorf("EnqueueChannelBufferSize cannot be negative, but got %d", cfg.EnqueueChannelBufferSize)
}

// --- Defaulting ---
if c.ExpiryCleanupInterval == 0 {
c.ExpiryCleanupInterval = defaultExpiryCleanupInterval
if cfg.ExpiryCleanupInterval == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: in k8s, object defaulting is done before validation, consider having a similar sequence here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, would we default invalid values (e.g., a negative duration) to the default silently?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we run validation after defaulting, we protect against that, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah; I am just thinking about UX. Say we wire this up to the text-based config. A user specified an invalid byte limit for queue capacity (e.g., outside of a min-max range of 10mb to 1gb -- not something we enforce yet, but meant as an illustrative example). Is it better to default an invalid value (e.g., 5 mb) to the min (possibly logging a warning) or just fail immediately?

Either way is safe. It's just a matter of failing fast and loud vs silently (or transparently) correcting.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whenever I connect this to the config surface, I can revisit this based on your feedback. Either approach sgtm. For now, there is no way to specify non-default values anyways.

cfg.ExpiryCleanupInterval = defaultExpiryCleanupInterval
}
if c.ProcessorReconciliationInterval == 0 {
c.ProcessorReconciliationInterval = defaultProcessorReconciliationInterval
if cfg.ProcessorReconciliationInterval == 0 {
cfg.ProcessorReconciliationInterval = defaultProcessorReconciliationInterval
}
if c.EnqueueChannelBufferSize == 0 {
c.EnqueueChannelBufferSize = defaultEnqueueChannelBufferSize
if cfg.EnqueueChannelBufferSize == 0 {
cfg.EnqueueChannelBufferSize = defaultEnqueueChannelBufferSize
}
return nil
return cfg, nil
}

// deepCopy creates a deep copy of the `Config` object.
Expand Down
4 changes: 2 additions & 2 deletions pkg/epp/flowcontrol/controller/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"github.com/stretchr/testify/require"
)

func TestNewConfig(t *testing.T) {
func TestConfig_ValidateAndApplyDefaults(t *testing.T) {
t.Parallel()

testCases := []struct {
Expand Down Expand Up @@ -88,7 +88,7 @@ func TestNewConfig(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
originalInput := tc.input.deepCopy()
validatedCfg, err := newConfig(tc.input)
validatedCfg, err := tc.input.ValidateAndApplyDefaults()

if tc.expectErr {
require.Error(t, err, "expected an error but got nil")
Expand Down
7 changes: 1 addition & 6 deletions pkg/epp/flowcontrol/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,8 @@ func NewFlowController(
logger logr.Logger,
opts ...flowControllerOption,
) (*FlowController, error) {
validatedConfig, err := newConfig(config)
if err != nil {
return nil, fmt.Errorf("invalid flow controller configuration: %w", err)
}

fc := &FlowController{
config: *validatedConfig,
config: *config.deepCopy(),
registry: registry,
saturationDetector: sd,
clock: clock.RealClock{},
Expand Down
18 changes: 1 addition & 17 deletions pkg/epp/flowcontrol/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,23 +273,6 @@ func newTestRequest(ctx context.Context, key types.FlowKey) *typesmocks.MockFlow

// --- Test Cases ---

func TestNewFlowController(t *testing.T) {
t.Parallel()

t.Run("ErrorOnInvalidConfig", func(t *testing.T) {
t.Parallel()
invalidCfg := Config{ProcessorReconciliationInterval: -1 * time.Second}
_, err := NewFlowController(
context.Background(),
invalidCfg,
&mockRegistryClient{},
&mocks.MockSaturationDetector{},
logr.Discard(),
)
require.Error(t, err, "NewFlowController must return an error for invalid configuration")
})
}

func TestFlowController_EnqueueAndWait(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -813,6 +796,7 @@ func TestFlowController_Concurrency(t *testing.T) {
// Use a generous buffer to prevent flakes in the test due to transient queuing delays.
EnqueueChannelBufferSize: numRequests,
DefaultRequestTTL: 1 * time.Second,
ExpiryCleanupInterval: 100 * time.Millisecond,
}, mockRegistry)

var wg sync.WaitGroup
Expand Down
Loading