Skip to content

Commit 83ba4aa

Browse files
authored
Merge branch 'kubernetes-sigs:main' into parallel-plugins2
2 parents 78783dc + 943e676 commit 83ba4aa

File tree

26 files changed

+999
-216
lines changed

26 files changed

+999
-216
lines changed

apix/config/v1alpha1/endpointpickerconfig_types.go

Lines changed: 74 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,25 @@ type EndpointPickerConfig struct {
3939
// SchedulingProfiles is the list of named SchedulingProfiles
4040
// that will be created.
4141
SchedulingProfiles []SchedulingProfile `json:"schedulingProfiles"`
42+
43+
// +optional
44+
// FeatureGates is a set of flags that enable various experimental features with the EPP.
45+
// If omitted non of these experimental features will be enabled.
46+
FeatureGates FeatureGates `json:"featureGates,omitempty"`
47+
48+
// +optional
49+
// SaturationDetector when present specifies the configuration of the
50+
// Saturation detector. If not present, default values are used.
51+
SaturationDetector *SaturationDetector `json:"saturationDetector,omitempty"`
4252
}
4353

4454
func (cfg EndpointPickerConfig) String() string {
4555
return fmt.Sprintf(
46-
"{Plugins: %v, SchedulingProfiles: %v}",
56+
"{Plugins: %v, SchedulingProfiles: %v, FeatureGates: %v, SaturationDetector: %v}",
4757
cfg.Plugins,
4858
cfg.SchedulingProfiles,
59+
cfg.FeatureGates,
60+
cfg.SaturationDetector,
4961
)
5062
}
5163

@@ -118,3 +130,64 @@ func (sp SchedulingPlugin) String() string {
118130
}
119131
return fmt.Sprintf("{PluginRef: %s%s}", sp.PluginRef, weight)
120132
}
133+
134+
// FeatureGates is a set of flags that enable various experimental features with the EPP
135+
type FeatureGates []string
136+
137+
func (fg FeatureGates) String() string {
138+
if fg == nil {
139+
return "{}"
140+
}
141+
142+
result := ""
143+
for _, gate := range fg {
144+
result += gate + ","
145+
}
146+
147+
if len(result) > 0 {
148+
result = result[:len(result)-1]
149+
}
150+
return "{" + result + "}"
151+
}
152+
153+
// SaturationDetector
154+
type SaturationDetector struct {
155+
// +optional
156+
// QueueDepthThreshold defines the backend waiting queue size above which a
157+
// pod is considered to have insufficient capacity for new requests.
158+
QueueDepthThreshold int `json:"queueDepthThreshold,omitempty"`
159+
160+
// +optional
161+
// KVCacheUtilThreshold defines the KV cache utilization (0.0 to 1.0) above
162+
// which a pod is considered to have insufficient capacity.
163+
KVCacheUtilThreshold float64 `json:"kvCacheUtilThreshold,omitempty"`
164+
165+
// +optional
166+
// MetricsStalenessThreshold defines how old a pod's metrics can be.
167+
// If a pod's metrics are older than this, it might be excluded from
168+
// "good capacity" considerations or treated as having no capacity for
169+
// safety.
170+
MetricsStalenessThreshold metav1.Duration `json:"metricsStalenessThreshold,omitempty"`
171+
}
172+
173+
func (sd *SaturationDetector) String() string {
174+
result := ""
175+
if sd != nil {
176+
if sd.QueueDepthThreshold != 0 {
177+
result += fmt.Sprintf("QueueDepthThreshold: %d", sd.QueueDepthThreshold)
178+
}
179+
if sd.KVCacheUtilThreshold != 0.0 {
180+
if len(result) != 0 {
181+
result += ", "
182+
}
183+
result += fmt.Sprintf("KVCacheUtilThreshold: %g", sd.KVCacheUtilThreshold)
184+
}
185+
if sd.MetricsStalenessThreshold.Duration != 0.0 {
186+
if len(result) != 0 {
187+
result += ", "
188+
}
189+
result += fmt.Sprintf("MetricsStalenessThreshold: %s", sd.MetricsStalenessThreshold)
190+
}
191+
}
192+
return "{" + result + "}"
193+
}

apix/config/v1alpha1/zz_generated.deepcopy.go

Lines changed: 45 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cmd/epp/runner/runner.go

Lines changed: 82 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,11 @@ import (
4444
"sigs.k8s.io/controller-runtime/pkg/metrics/filters"
4545
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
4646

47+
configapi "sigs.k8s.io/gateway-api-inference-extension/apix/config/v1alpha1"
4748
"sigs.k8s.io/gateway-api-inference-extension/internal/runnable"
4849
"sigs.k8s.io/gateway-api-inference-extension/pkg/common"
4950
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
51+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/config"
5052
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/config/loader"
5153
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer"
5254
dlmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer/metrics"
@@ -77,6 +79,11 @@ const (
7779
// enableExperimentalFlowControlLayer defines the environment variable used as a feature flag for the pluggable flow
7880
// control layer.
7981
enableExperimentalFlowControlLayer = "ENABLE_EXPERIMENTAL_FLOW_CONTROL_LAYER"
82+
83+
// Saturation Detector deprecated configuration environment variables
84+
EnvSdQueueDepthThreshold = "SD_QUEUE_DEPTH_THRESHOLD"
85+
EnvSdKVCacheUtilThreshold = "SD_KV_CACHE_UTIL_THRESHOLD"
86+
EnvSdMetricsStalenessThreshold = "SD_METRICS_STALENESS_THRESHOLD"
8087
)
8188

8289
// TODO: this is hardcoded for POC only. This needs to be hooked up to our text-based config story.
@@ -143,6 +150,7 @@ func NewRunner() *Runner {
143150
// Runner is used to run epp with its plugins
144151
type Runner struct {
145152
eppExecutableName string // the EPP executable name
153+
featureGates config.FeatureConfig
146154
requestControlConfig *requestcontrol.Config
147155
schedulerConfig *scheduling.SchedulerConfig
148156
customCollectors []prometheus.Collector
@@ -200,24 +208,33 @@ func (r *Runner) Run(ctx context.Context) error {
200208
})
201209
setupLog.Info("Flags processed", "flags", flags)
202210

203-
// --- Load Configurations from Environment Variables ---
204-
sdConfig := saturationdetector.LoadConfigFromEnv()
205-
206211
// --- Get Kubernetes Config ---
207212
cfg, err := ctrl.GetConfig()
208213
if err != nil {
209214
setupLog.Error(err, "Failed to get Kubernetes rest config")
210215
return err
211216
}
212217

218+
rawConfig, featureGates, err := r.parseConfigurationPhaseOne(ctx)
219+
if err != nil {
220+
setupLog.Error(err, "Failed to parse configuration")
221+
return err
222+
}
223+
r.featureGates = featureGates
224+
213225
// --- Setup Datastore ---
214-
useDatalayerV2 := env.GetEnvBool(enableExperimentalDatalayerV2, false, setupLog)
215-
epf, err := r.setupMetricsCollection(setupLog, useDatalayerV2)
226+
epf, err := r.setupMetricsCollection(setupLog, r.featureGates[datalayer.FeatureGate])
216227
if err != nil {
217228
return err
218229
}
219230
datastore := datastore.NewDatastore(ctx, epf, int32(*modelServerMetricsPort))
220231

232+
eppConfig, err := r.parseConfigurationPhaseTwo(ctx, rawConfig, datastore)
233+
if err != nil {
234+
setupLog.Error(err, "Failed to parse configuration")
235+
return err
236+
}
237+
221238
// --- Setup Metrics Server ---
222239
customCollectors := []prometheus.Collector{collectors.NewInferencePoolMetricsCollector(datastore)}
223240
if r.customCollectors != nil {
@@ -297,12 +314,6 @@ func (r *Runner) Run(ctx context.Context) error {
297314
runtime.SetBlockProfileRate(1)
298315
}
299316

300-
err = r.parsePluginsConfiguration(ctx, datastore)
301-
if err != nil {
302-
setupLog.Error(err, "Failed to parse plugins configuration")
303-
return err
304-
}
305-
306317
// --- Initialize Core EPP Components ---
307318
if r.schedulerConfig == nil {
308319
err := errors.New("scheduler config must be set either by config api or through code")
@@ -314,12 +325,11 @@ func (r *Runner) Run(ctx context.Context) error {
314325

315326
scheduler := scheduling.NewSchedulerWithConfig(r.schedulerConfig)
316327

317-
saturationDetector := saturationdetector.NewDetector(sdConfig, setupLog)
328+
saturationDetector := saturationdetector.NewDetector(eppConfig.SaturationDetectorConfig, setupLog)
318329

319330
// --- Admission Control Initialization ---
320-
enableFlowControl := env.GetEnvBool(enableExperimentalFlowControlLayer, false, setupLog)
321331
var admissionController requestcontrol.AdmissionController
322-
if enableFlowControl {
332+
if r.featureGates[flowcontrol.FeatureGate] {
323333
setupLog.Info("Initializing experimental Flow Control layer")
324334
fcCfg, err := flowControlConfig.ValidateAndApplyDefaults()
325335
if err != nil {
@@ -367,7 +377,7 @@ func (r *Runner) Run(ctx context.Context) error {
367377
MetricsStalenessThreshold: *metricsStalenessThreshold,
368378
Director: director,
369379
SaturationDetector: saturationDetector,
370-
UseExperimentalDatalayerV2: useDatalayerV2, // pluggable data layer feature flag
380+
UseExperimentalDatalayerV2: r.featureGates[datalayer.FeatureGate], // pluggable data layer feature flag
371381
}
372382
if err := serverRunner.SetupWithManager(ctx, mgr); err != nil {
373383
setupLog.Error(err, "Failed to setup EPP controllers")
@@ -410,9 +420,9 @@ func (r *Runner) registerInTreePlugins() {
410420
plugins.Register(testfilter.HeaderBasedTestingFilterType, testfilter.HeaderBasedTestingFilterFactory)
411421
}
412422

413-
func (r *Runner) parsePluginsConfiguration(ctx context.Context, ds datastore.Datastore) error {
423+
func (r *Runner) parseConfigurationPhaseOne(ctx context.Context) (*configapi.EndpointPickerConfig, config.FeatureConfig, error) {
414424
if *configText == "" && *configFile == "" {
415-
return nil // configuring through code, not through file
425+
return nil, nil, nil // configuring through code, not through file
416426
}
417427

418428
logger := log.FromContext(ctx)
@@ -424,19 +434,28 @@ func (r *Runner) parsePluginsConfiguration(ctx context.Context, ds datastore.Dat
424434
var err error
425435
configBytes, err = os.ReadFile(*configFile)
426436
if err != nil {
427-
return fmt.Errorf("failed to load config from a file '%s' - %w", *configFile, err)
437+
return nil, nil, fmt.Errorf("failed to load config from a file '%s' - %w", *configFile, err)
428438
}
429439
}
430440

441+
loader.RegisterFeatureGate(datalayer.FeatureGate)
442+
loader.RegisterFeatureGate(flowcontrol.FeatureGate)
443+
431444
r.registerInTreePlugins()
445+
446+
return loader.LoadConfigPhaseOne(configBytes, logger)
447+
}
448+
449+
func (r *Runner) parseConfigurationPhaseTwo(ctx context.Context, rawConfig *configapi.EndpointPickerConfig, ds datastore.Datastore) (*config.Config, error) {
450+
logger := log.FromContext(ctx)
432451
handle := plugins.NewEppHandle(ctx, ds.PodList)
433-
config, err := loader.LoadConfig(configBytes, handle, logger)
452+
cfg, err := loader.LoadConfigPhaseTwo(rawConfig, handle, logger)
434453

435454
if err != nil {
436-
return fmt.Errorf("failed to load the configuration - %w", err)
455+
return nil, fmt.Errorf("failed to load the configuration - %w", err)
437456
}
438457

439-
r.schedulerConfig = config.SchedulerConfig
458+
r.schedulerConfig = cfg.SchedulerConfig
440459

441460
// Add requestControl plugins
442461
r.requestControlConfig.AddPlugins(handle.GetAllPlugins()...)
@@ -445,8 +464,49 @@ func (r *Runner) parsePluginsConfiguration(ctx context.Context, ds datastore.Dat
445464
return errors.New("failed to load the configuration - prepare data plugins have cyclic dependencies")
446465
}
447466

467+
// Handler deprecated configuration options
468+
r.deprecatedConfigurationHelper(cfg, logger)
469+
448470
logger.Info("loaded configuration from file/text successfully")
449-
return nil
471+
return cfg, nil
472+
}
473+
474+
func (r *Runner) deprecatedConfigurationHelper(cfg *config.Config, logger logr.Logger) {
475+
// Handle deprecated environment variable based feature flags
476+
477+
if _, ok := os.LookupEnv(enableExperimentalDatalayerV2); ok {
478+
logger.Info("Enabling the experimental Data Layer V2 using environment variables is deprecated")
479+
r.featureGates[datalayer.FeatureGate] = env.GetEnvBool(enableExperimentalDatalayerV2, false, logger)
480+
}
481+
if _, ok := os.LookupEnv(enableExperimentalFlowControlLayer); ok {
482+
logger.Info("Enabling the experimental Flow Control layer using environment variables is deprecated")
483+
r.featureGates[flowcontrol.FeatureGate] = env.GetEnvBool(enableExperimentalFlowControlLayer, false, setupLog)
484+
}
485+
486+
// Handle deprecated environment variable base Saturation Detector configuration
487+
488+
if _, ok := os.LookupEnv(EnvSdQueueDepthThreshold); ok {
489+
logger.Info("Configuring Saturation Detector using environment variables is deprecated")
490+
cfg.SaturationDetectorConfig.QueueDepthThreshold =
491+
env.GetEnvInt(EnvSdQueueDepthThreshold, saturationdetector.DefaultQueueDepthThreshold, logger)
492+
if cfg.SaturationDetectorConfig.QueueDepthThreshold <= 0 {
493+
cfg.SaturationDetectorConfig.QueueDepthThreshold = saturationdetector.DefaultQueueDepthThreshold
494+
}
495+
}
496+
if _, ok := os.LookupEnv(EnvSdKVCacheUtilThreshold); ok {
497+
logger.Info("Configuring Saturation Detector using environment variables is deprecated")
498+
cfg.SaturationDetectorConfig.KVCacheUtilThreshold = env.GetEnvFloat(EnvSdKVCacheUtilThreshold, saturationdetector.DefaultKVCacheUtilThreshold, logger)
499+
if cfg.SaturationDetectorConfig.KVCacheUtilThreshold <= 0 || cfg.SaturationDetectorConfig.KVCacheUtilThreshold >= 1 {
500+
cfg.SaturationDetectorConfig.KVCacheUtilThreshold = saturationdetector.DefaultKVCacheUtilThreshold
501+
}
502+
}
503+
if _, ok := os.LookupEnv(EnvSdMetricsStalenessThreshold); ok {
504+
logger.Info("Configuring Saturation Detector using environment variables is deprecated")
505+
cfg.SaturationDetectorConfig.MetricsStalenessThreshold = env.GetEnvDuration(EnvSdMetricsStalenessThreshold, saturationdetector.DefaultMetricsStalenessThreshold, logger)
506+
if cfg.SaturationDetectorConfig.MetricsStalenessThreshold <= 0 {
507+
cfg.SaturationDetectorConfig.MetricsStalenessThreshold = saturationdetector.DefaultMetricsStalenessThreshold
508+
}
509+
}
450510
}
451511

452512
func (r *Runner) setupMetricsCollection(setupLog logr.Logger, useExperimentalDatalayer bool) (datalayer.EndpointFactory, error) {

go.mod

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,12 @@ require (
2626
golang.org/x/sync v0.18.0
2727
google.golang.org/grpc v1.76.0
2828
google.golang.org/protobuf v1.36.10
29-
k8s.io/api v0.34.1
30-
k8s.io/apiextensions-apiserver v0.34.1
31-
k8s.io/apimachinery v0.34.1
32-
k8s.io/client-go v0.34.1
33-
k8s.io/code-generator v0.34.1
34-
k8s.io/component-base v0.34.1
29+
k8s.io/api v0.34.2
30+
k8s.io/apiextensions-apiserver v0.34.2
31+
k8s.io/apimachinery v0.34.2
32+
k8s.io/client-go v0.34.2
33+
k8s.io/code-generator v0.34.2
34+
k8s.io/component-base v0.34.2
3535
k8s.io/utils v0.0.0-20250820121507-0af2bda4dd1d
3636
sigs.k8s.io/controller-runtime v0.22.4
3737
// Update the CONTROLLER_TOOLS_VERSION in Makefile when bumping controller-tools.
@@ -124,7 +124,7 @@ require (
124124
gopkg.in/inf.v0 v0.9.1 // indirect
125125
gopkg.in/yaml.v2 v2.4.0 // indirect
126126
gopkg.in/yaml.v3 v3.0.1 // indirect
127-
k8s.io/apiserver v0.34.1 // indirect
127+
k8s.io/apiserver v0.34.2 // indirect
128128
k8s.io/gengo/v2 v2.0.0-20250820003526-c297c0c1eb9d // indirect
129129
k8s.io/klog/v2 v2.130.1 // indirect
130130
k8s.io/kube-openapi v0.0.0-20250814151709-d7b6acb124c3 // indirect

0 commit comments

Comments
 (0)