Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion apix/config/v1alpha1/endpointpickerconfig_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,20 @@ type EndpointPickerConfig struct {
// +optional
// Data configures the DataLayer. It is required if the new DataLayer is enabled.
Data *DataLayerConfig `json:"data"`
// +optional
// RequestControl configures the request control stage of the EPP pipeline.
RequestControl *RequestControlConfig `json:"requestControl,omitempty"`
}

func (cfg EndpointPickerConfig) String() string {
return fmt.Sprintf(
"{FeatureGates: %v, Plugins: %v, SchedulingProfiles: %v, Data: %v, SaturationDetector: %v}",
"{FeatureGates: %v, Plugins: %v, SchedulingProfiles: %v, Data: %v, SaturationDetector: %v, RequestControl: %v}",
cfg.FeatureGates,
cfg.Plugins,
cfg.SchedulingProfiles,
cfg.Data,
cfg.SaturationDetector,
cfg.RequestControl,
)
}

Expand Down Expand Up @@ -177,6 +181,23 @@ type SaturationDetector struct {
MetricsStalenessThreshold metav1.Duration `json:"metricsStalenessThreshold,omitempty"`
}

// RequestControlConfig configures the request control stage.
type RequestControlConfig struct {
// +optional
// PrepareDataTimeout defines the timeout for PrepareData plugins.
PrepareDataTimeout metav1.Duration `json:"prepareDataTimeout,omitempty"`
}

func (rc *RequestControlConfig) String() string {
if rc == nil {
return "{}"
}
if rc.PrepareDataTimeout.Duration == 0 {
return "{}"
}
return "{PrepareDataTimeout: " + rc.PrepareDataTimeout.String() + "}"
}

func (sd *SaturationDetector) String() string {
result := ""
if sd != nil {
Expand Down
21 changes: 21 additions & 0 deletions apix/config/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions cmd/epp/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,10 @@ func (r *Runner) parseConfigurationPhaseTwo(ctx context.Context, rawConfig *conf

r.schedulerConfig = cfg.SchedulerConfig

if cfg.RequestControlConfig != nil && cfg.RequestControlConfig.PrepareDataTimeout > 0 {
r.requestControlConfig.WithPrepareDataTimeout(cfg.RequestControlConfig.PrepareDataTimeout)
}

// Add requestControl plugins
r.requestControlConfig.AddPlugins(handle.GetAllPlugins()...)

Expand Down
67 changes: 67 additions & 0 deletions cmd/epp/runner/runner_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why aren't these tests simply part of the config_loader tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair point. I grouped them here for organization, but I’m happy to move them into the config_loader tests if that’s preferred.

Copyright 2025 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package runner

import (
"context"
"reflect"
"testing"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

configapi "sigs.k8s.io/gateway-api-inference-extension/apix/config/v1alpha1"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
)

func TestParseConfigurationPhaseTwoAppliesPrepareDataTimeout(t *testing.T) {
t.Parallel()

r := NewRunner()
r.registerInTreePlugins()

rawConfig := &configapi.EndpointPickerConfig{
RequestControl: &configapi.RequestControlConfig{
PrepareDataTimeout: metav1.Duration{Duration: 125 * time.Millisecond},
},
}

ctx := context.Background()
epFactory := datalayer.NewEndpointFactory(nil, time.Millisecond)
ds := datastore.NewDatastore(ctx, epFactory, 0)

if _, err := r.parseConfigurationPhaseTwo(ctx, rawConfig, ds); err != nil {
t.Fatalf("parseConfigurationPhaseTwo failed: %v", err)
}

got := readPrepareDataTimeout(t, r.requestControlConfig)
want := 125 * time.Millisecond
if got != want {
t.Fatalf("prepareDataTimeout = %v, want %v", got, want)
}
}

func readPrepareDataTimeout(t *testing.T, cfg any) time.Duration {
t.Helper()

v := reflect.ValueOf(cfg).Elem().FieldByName("prepareDataTimeout")
if v.Kind() != reflect.Int64 {
t.Fatalf("unexpected kind for prepareDataTimeout: %v", v.Kind())
}
return time.Duration(v.Int())
}
8 changes: 8 additions & 0 deletions pkg/epp/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package config

import (
"time"

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/saturationdetector/framework/plugins/utilizationdetector"
Expand All @@ -29,4 +31,10 @@ type Config struct {
SaturationDetectorConfig *utilizationdetector.Config
DataConfig *datalayer.Config
FlowControlConfig *flowcontrol.Config
RequestControlConfig *RequestControlConfig
}

// RequestControlConfig holds configuration for request control behaviors.
type RequestControlConfig struct {
PrepareDataTimeout time.Duration
}
12 changes: 12 additions & 0 deletions pkg/epp/config/loader/configloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ func InstantiateAndConfigure(
SaturationDetectorConfig: buildSaturationConfig(rawConfig.SaturationDetector),
DataConfig: dataConfig,
FlowControlConfig: flowControlConfig,
RequestControlConfig: buildRequestControlConfig(rawConfig.RequestControl),
}, nil
}

Expand Down Expand Up @@ -253,6 +254,17 @@ func buildSaturationConfig(apiConfig *configapi.SaturationDetector) *utilization
return cfg
}

func buildRequestControlConfig(apiConfig *configapi.RequestControlConfig) *config.RequestControlConfig {
if apiConfig == nil {
return nil
}
cfg := &config.RequestControlConfig{}
if apiConfig.PrepareDataTimeout.Duration > 0 {
cfg.PrepareDataTimeout = apiConfig.PrepareDataTimeout.Duration
}
return cfg
}

func buildDataLayerConfig(rawDataConfig *configapi.DataLayerConfig, dataLayerEnabled bool, handle plugins.Handle) (*datalayer.Config, error) {
if !dataLayerEnabled {
if rawDataConfig != nil {
Expand Down
15 changes: 7 additions & 8 deletions pkg/epp/requestcontrol/director.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,6 @@ import (
requtil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/request"
)

const (
// TODO(https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/2081):
// Make this timeout configurable per-plugin or globally via the Director configuration to support plugins with
// varying latency profiles.
prepareDataTimeout = 400 * time.Millisecond
)

// Datastore defines the interface required by the Director.
type Datastore interface {
PoolGet() (*datalayer.EndpointPool, error)
Expand Down Expand Up @@ -351,7 +344,13 @@ func (d *Director) runPrepareDataPlugins(ctx context.Context,
if len(d.requestControlPlugins.prepareDataPlugins) == 0 {
return nil
}
return prepareDataPluginsWithTimeout(prepareDataTimeout, d.requestControlPlugins.prepareDataPlugins, ctx, request, endpoints)
return prepareDataPluginsWithTimeout(
d.requestControlPlugins.prepareDataTimeout,
d.requestControlPlugins.prepareDataPlugins,
ctx,
request,
endpoints,
)
}

func (d *Director) runAdmissionPlugins(ctx context.Context,
Expand Down
12 changes: 12 additions & 0 deletions pkg/epp/requestcontrol/request_control_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@ limitations under the License.
package requestcontrol

import (
"time"

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
)

const defaultPrepareDataTimeout = 400 * time.Millisecond

// NewConfig creates a new Config object and returns its pointer.
func NewConfig() *Config {
return &Config{
Expand All @@ -29,6 +33,7 @@ func NewConfig() *Config {
responseReceivedPlugins: []ResponseReceived{},
responseStreamingPlugins: []ResponseStreaming{},
responseCompletePlugins: []ResponseComplete{},
prepareDataTimeout: defaultPrepareDataTimeout,
}
}

Expand All @@ -40,6 +45,7 @@ type Config struct {
responseReceivedPlugins []ResponseReceived
responseStreamingPlugins []ResponseStreaming
responseCompletePlugins []ResponseComplete
prepareDataTimeout time.Duration
}

// WithPreRequestPlugins sets the given plugins as the PreRequest plugins.
Expand Down Expand Up @@ -76,6 +82,12 @@ func (c *Config) WithPrepareDataPlugins(plugins ...PrepareDataPlugin) *Config {
return c
}

// WithPrepareDataTimeout sets the timeout for PrepareData plugins.
func (c *Config) WithPrepareDataTimeout(timeout time.Duration) *Config {
c.prepareDataTimeout = timeout
return c
}

// WithAdmissionPlugins sets the given plugins as the AdmitRequest plugins.
func (c *Config) WithAdmissionPlugins(plugins ...AdmissionPlugin) *Config {
c.admissionPlugins = plugins
Expand Down