Skip to content

Commit 943e676

Browse files
authored
feat: Extend the text based configuration to include feature flags and the SaturationDetector's configuration (#1492)
* Added feature gates and saturation config to YAML config Signed-off-by: Shmuel Kallner <[email protected]> * Added feature gates and saturation config to EPP config Signed-off-by: Shmuel Kallner <[email protected]> * Updated config loader for feature gates and saturation config Signed-off-by: Shmuel Kallner <[email protected]> * Moved validation into a separate file Signed-off-by: Shmuel Kallner <[email protected]> * Split Datastore initialization into two parts Signed-off-by: Shmuel Kallner <[email protected]> * Updated runner to use new config capabilities Signed-off-by: Shmuel Kallner <[email protected]> * Updates to tests Signed-off-by: Shmuel Kallner <[email protected]> * Updates to documentation Signed-off-by: Shmuel Kallner <[email protected]> * Made saturation detector config a pointer in epp config as per review Signed-off-by: Shmuel Kallner <[email protected]> * Split config loading into two parts and removed feature gates from config struct Signed-off-by: Shmuel Kallner <[email protected]> * Undid changes to Datastore Signed-off-by: Shmuel Kallner <[email protected]> * Updates due to config loading split Signed-off-by: Shmuel Kallner <[email protected]> * Test updates due to config loading split Signed-off-by: Shmuel Kallner <[email protected]> * Undid test changes due to Datastore restore Signed-off-by: Shmuel Kallner <[email protected]> --------- Signed-off-by: Shmuel Kallner <[email protected]>
1 parent 03b51da commit 943e676

File tree

16 files changed

+615
-193
lines changed

16 files changed

+615
-193
lines changed

apix/config/v1alpha1/endpointpickerconfig_types.go

Lines changed: 74 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,25 @@ type EndpointPickerConfig struct {
3939
// SchedulingProfiles is the list of named SchedulingProfiles
4040
// that will be created.
4141
SchedulingProfiles []SchedulingProfile `json:"schedulingProfiles"`
42+
43+
// +optional
44+
// FeatureGates is a set of flags that enable various experimental features with the EPP.
45+
// If omitted non of these experimental features will be enabled.
46+
FeatureGates FeatureGates `json:"featureGates,omitempty"`
47+
48+
// +optional
49+
// SaturationDetector when present specifies the configuration of the
50+
// Saturation detector. If not present, default values are used.
51+
SaturationDetector *SaturationDetector `json:"saturationDetector,omitempty"`
4252
}
4353

4454
func (cfg EndpointPickerConfig) String() string {
4555
return fmt.Sprintf(
46-
"{Plugins: %v, SchedulingProfiles: %v}",
56+
"{Plugins: %v, SchedulingProfiles: %v, FeatureGates: %v, SaturationDetector: %v}",
4757
cfg.Plugins,
4858
cfg.SchedulingProfiles,
59+
cfg.FeatureGates,
60+
cfg.SaturationDetector,
4961
)
5062
}
5163

@@ -118,3 +130,64 @@ func (sp SchedulingPlugin) String() string {
118130
}
119131
return fmt.Sprintf("{PluginRef: %s%s}", sp.PluginRef, weight)
120132
}
133+
134+
// FeatureGates is a set of flags that enable various experimental features with the EPP
135+
type FeatureGates []string
136+
137+
func (fg FeatureGates) String() string {
138+
if fg == nil {
139+
return "{}"
140+
}
141+
142+
result := ""
143+
for _, gate := range fg {
144+
result += gate + ","
145+
}
146+
147+
if len(result) > 0 {
148+
result = result[:len(result)-1]
149+
}
150+
return "{" + result + "}"
151+
}
152+
153+
// SaturationDetector
154+
type SaturationDetector struct {
155+
// +optional
156+
// QueueDepthThreshold defines the backend waiting queue size above which a
157+
// pod is considered to have insufficient capacity for new requests.
158+
QueueDepthThreshold int `json:"queueDepthThreshold,omitempty"`
159+
160+
// +optional
161+
// KVCacheUtilThreshold defines the KV cache utilization (0.0 to 1.0) above
162+
// which a pod is considered to have insufficient capacity.
163+
KVCacheUtilThreshold float64 `json:"kvCacheUtilThreshold,omitempty"`
164+
165+
// +optional
166+
// MetricsStalenessThreshold defines how old a pod's metrics can be.
167+
// If a pod's metrics are older than this, it might be excluded from
168+
// "good capacity" considerations or treated as having no capacity for
169+
// safety.
170+
MetricsStalenessThreshold metav1.Duration `json:"metricsStalenessThreshold,omitempty"`
171+
}
172+
173+
func (sd *SaturationDetector) String() string {
174+
result := ""
175+
if sd != nil {
176+
if sd.QueueDepthThreshold != 0 {
177+
result += fmt.Sprintf("QueueDepthThreshold: %d", sd.QueueDepthThreshold)
178+
}
179+
if sd.KVCacheUtilThreshold != 0.0 {
180+
if len(result) != 0 {
181+
result += ", "
182+
}
183+
result += fmt.Sprintf("KVCacheUtilThreshold: %g", sd.KVCacheUtilThreshold)
184+
}
185+
if sd.MetricsStalenessThreshold.Duration != 0.0 {
186+
if len(result) != 0 {
187+
result += ", "
188+
}
189+
result += fmt.Sprintf("MetricsStalenessThreshold: %s", sd.MetricsStalenessThreshold)
190+
}
191+
}
192+
return "{" + result + "}"
193+
}

apix/config/v1alpha1/zz_generated.deepcopy.go

Lines changed: 45 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cmd/epp/runner/runner.go

Lines changed: 82 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,11 @@ import (
4444
"sigs.k8s.io/controller-runtime/pkg/metrics/filters"
4545
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
4646

47+
configapi "sigs.k8s.io/gateway-api-inference-extension/apix/config/v1alpha1"
4748
"sigs.k8s.io/gateway-api-inference-extension/internal/runnable"
4849
"sigs.k8s.io/gateway-api-inference-extension/pkg/common"
4950
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
51+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/config"
5052
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/config/loader"
5153
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer"
5254
dlmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer/metrics"
@@ -77,6 +79,11 @@ const (
7779
// enableExperimentalFlowControlLayer defines the environment variable used as a feature flag for the pluggable flow
7880
// control layer.
7981
enableExperimentalFlowControlLayer = "ENABLE_EXPERIMENTAL_FLOW_CONTROL_LAYER"
82+
83+
// Saturation Detector deprecated configuration environment variables
84+
EnvSdQueueDepthThreshold = "SD_QUEUE_DEPTH_THRESHOLD"
85+
EnvSdKVCacheUtilThreshold = "SD_KV_CACHE_UTIL_THRESHOLD"
86+
EnvSdMetricsStalenessThreshold = "SD_METRICS_STALENESS_THRESHOLD"
8087
)
8188

8289
// TODO: this is hardcoded for POC only. This needs to be hooked up to our text-based config story.
@@ -143,6 +150,7 @@ func NewRunner() *Runner {
143150
// Runner is used to run epp with its plugins
144151
type Runner struct {
145152
eppExecutableName string // the EPP executable name
153+
featureGates config.FeatureConfig
146154
requestControlConfig *requestcontrol.Config
147155
schedulerConfig *scheduling.SchedulerConfig
148156
customCollectors []prometheus.Collector
@@ -200,24 +208,33 @@ func (r *Runner) Run(ctx context.Context) error {
200208
})
201209
setupLog.Info("Flags processed", "flags", flags)
202210

203-
// --- Load Configurations from Environment Variables ---
204-
sdConfig := saturationdetector.LoadConfigFromEnv()
205-
206211
// --- Get Kubernetes Config ---
207212
cfg, err := ctrl.GetConfig()
208213
if err != nil {
209214
setupLog.Error(err, "Failed to get Kubernetes rest config")
210215
return err
211216
}
212217

218+
rawConfig, featureGates, err := r.parseConfigurationPhaseOne(ctx)
219+
if err != nil {
220+
setupLog.Error(err, "Failed to parse configuration")
221+
return err
222+
}
223+
r.featureGates = featureGates
224+
213225
// --- Setup Datastore ---
214-
useDatalayerV2 := env.GetEnvBool(enableExperimentalDatalayerV2, false, setupLog)
215-
epf, err := r.setupMetricsCollection(setupLog, useDatalayerV2)
226+
epf, err := r.setupMetricsCollection(setupLog, r.featureGates[datalayer.FeatureGate])
216227
if err != nil {
217228
return err
218229
}
219230
datastore := datastore.NewDatastore(ctx, epf, int32(*modelServerMetricsPort))
220231

232+
eppConfig, err := r.parseConfigurationPhaseTwo(ctx, rawConfig, datastore)
233+
if err != nil {
234+
setupLog.Error(err, "Failed to parse configuration")
235+
return err
236+
}
237+
221238
// --- Setup Metrics Server ---
222239
customCollectors := []prometheus.Collector{collectors.NewInferencePoolMetricsCollector(datastore)}
223240
if r.customCollectors != nil {
@@ -297,12 +314,6 @@ func (r *Runner) Run(ctx context.Context) error {
297314
runtime.SetBlockProfileRate(1)
298315
}
299316

300-
err = r.parsePluginsConfiguration(ctx, datastore)
301-
if err != nil {
302-
setupLog.Error(err, "Failed to parse plugins configuration")
303-
return err
304-
}
305-
306317
// --- Initialize Core EPP Components ---
307318
if r.schedulerConfig == nil {
308319
err := errors.New("scheduler config must be set either by config api or through code")
@@ -314,12 +325,11 @@ func (r *Runner) Run(ctx context.Context) error {
314325

315326
scheduler := scheduling.NewSchedulerWithConfig(r.schedulerConfig)
316327

317-
saturationDetector := saturationdetector.NewDetector(sdConfig, setupLog)
328+
saturationDetector := saturationdetector.NewDetector(eppConfig.SaturationDetectorConfig, setupLog)
318329

319330
// --- Admission Control Initialization ---
320-
enableFlowControl := env.GetEnvBool(enableExperimentalFlowControlLayer, false, setupLog)
321331
var admissionController requestcontrol.AdmissionController
322-
if enableFlowControl {
332+
if r.featureGates[flowcontrol.FeatureGate] {
323333
setupLog.Info("Initializing experimental Flow Control layer")
324334
fcCfg, err := flowControlConfig.ValidateAndApplyDefaults()
325335
if err != nil {
@@ -367,7 +377,7 @@ func (r *Runner) Run(ctx context.Context) error {
367377
MetricsStalenessThreshold: *metricsStalenessThreshold,
368378
Director: director,
369379
SaturationDetector: saturationDetector,
370-
UseExperimentalDatalayerV2: useDatalayerV2, // pluggable data layer feature flag
380+
UseExperimentalDatalayerV2: r.featureGates[datalayer.FeatureGate], // pluggable data layer feature flag
371381
}
372382
if err := serverRunner.SetupWithManager(ctx, mgr); err != nil {
373383
setupLog.Error(err, "Failed to setup EPP controllers")
@@ -410,9 +420,9 @@ func (r *Runner) registerInTreePlugins() {
410420
plugins.Register(testfilter.HeaderBasedTestingFilterType, testfilter.HeaderBasedTestingFilterFactory)
411421
}
412422

413-
func (r *Runner) parsePluginsConfiguration(ctx context.Context, ds datastore.Datastore) error {
423+
func (r *Runner) parseConfigurationPhaseOne(ctx context.Context) (*configapi.EndpointPickerConfig, config.FeatureConfig, error) {
414424
if *configText == "" && *configFile == "" {
415-
return nil // configuring through code, not through file
425+
return nil, nil, nil // configuring through code, not through file
416426
}
417427

418428
logger := log.FromContext(ctx)
@@ -424,25 +434,75 @@ func (r *Runner) parsePluginsConfiguration(ctx context.Context, ds datastore.Dat
424434
var err error
425435
configBytes, err = os.ReadFile(*configFile)
426436
if err != nil {
427-
return fmt.Errorf("failed to load config from a file '%s' - %w", *configFile, err)
437+
return nil, nil, fmt.Errorf("failed to load config from a file '%s' - %w", *configFile, err)
428438
}
429439
}
430440

441+
loader.RegisterFeatureGate(datalayer.FeatureGate)
442+
loader.RegisterFeatureGate(flowcontrol.FeatureGate)
443+
431444
r.registerInTreePlugins()
445+
446+
return loader.LoadConfigPhaseOne(configBytes, logger)
447+
}
448+
449+
func (r *Runner) parseConfigurationPhaseTwo(ctx context.Context, rawConfig *configapi.EndpointPickerConfig, ds datastore.Datastore) (*config.Config, error) {
450+
logger := log.FromContext(ctx)
432451
handle := plugins.NewEppHandle(ctx, ds.PodList)
433-
config, err := loader.LoadConfig(configBytes, handle, logger)
452+
cfg, err := loader.LoadConfigPhaseTwo(rawConfig, handle, logger)
434453

435454
if err != nil {
436-
return fmt.Errorf("failed to load the configuration - %w", err)
455+
return nil, fmt.Errorf("failed to load the configuration - %w", err)
437456
}
438457

439-
r.schedulerConfig = config.SchedulerConfig
458+
r.schedulerConfig = cfg.SchedulerConfig
440459

441460
// Add requestControl plugins
442461
r.requestControlConfig.AddPlugins(handle.GetAllPlugins()...)
443462

463+
// Handler deprecated configuration options
464+
r.deprecatedConfigurationHelper(cfg, logger)
465+
444466
logger.Info("loaded configuration from file/text successfully")
445-
return nil
467+
return cfg, nil
468+
}
469+
470+
func (r *Runner) deprecatedConfigurationHelper(cfg *config.Config, logger logr.Logger) {
471+
// Handle deprecated environment variable based feature flags
472+
473+
if _, ok := os.LookupEnv(enableExperimentalDatalayerV2); ok {
474+
logger.Info("Enabling the experimental Data Layer V2 using environment variables is deprecated")
475+
r.featureGates[datalayer.FeatureGate] = env.GetEnvBool(enableExperimentalDatalayerV2, false, logger)
476+
}
477+
if _, ok := os.LookupEnv(enableExperimentalFlowControlLayer); ok {
478+
logger.Info("Enabling the experimental Flow Control layer using environment variables is deprecated")
479+
r.featureGates[flowcontrol.FeatureGate] = env.GetEnvBool(enableExperimentalFlowControlLayer, false, setupLog)
480+
}
481+
482+
// Handle deprecated environment variable base Saturation Detector configuration
483+
484+
if _, ok := os.LookupEnv(EnvSdQueueDepthThreshold); ok {
485+
logger.Info("Configuring Saturation Detector using environment variables is deprecated")
486+
cfg.SaturationDetectorConfig.QueueDepthThreshold =
487+
env.GetEnvInt(EnvSdQueueDepthThreshold, saturationdetector.DefaultQueueDepthThreshold, logger)
488+
if cfg.SaturationDetectorConfig.QueueDepthThreshold <= 0 {
489+
cfg.SaturationDetectorConfig.QueueDepthThreshold = saturationdetector.DefaultQueueDepthThreshold
490+
}
491+
}
492+
if _, ok := os.LookupEnv(EnvSdKVCacheUtilThreshold); ok {
493+
logger.Info("Configuring Saturation Detector using environment variables is deprecated")
494+
cfg.SaturationDetectorConfig.KVCacheUtilThreshold = env.GetEnvFloat(EnvSdKVCacheUtilThreshold, saturationdetector.DefaultKVCacheUtilThreshold, logger)
495+
if cfg.SaturationDetectorConfig.KVCacheUtilThreshold <= 0 || cfg.SaturationDetectorConfig.KVCacheUtilThreshold >= 1 {
496+
cfg.SaturationDetectorConfig.KVCacheUtilThreshold = saturationdetector.DefaultKVCacheUtilThreshold
497+
}
498+
}
499+
if _, ok := os.LookupEnv(EnvSdMetricsStalenessThreshold); ok {
500+
logger.Info("Configuring Saturation Detector using environment variables is deprecated")
501+
cfg.SaturationDetectorConfig.MetricsStalenessThreshold = env.GetEnvDuration(EnvSdMetricsStalenessThreshold, saturationdetector.DefaultMetricsStalenessThreshold, logger)
502+
if cfg.SaturationDetectorConfig.MetricsStalenessThreshold <= 0 {
503+
cfg.SaturationDetectorConfig.MetricsStalenessThreshold = saturationdetector.DefaultMetricsStalenessThreshold
504+
}
505+
}
446506
}
447507

448508
func (r *Runner) setupMetricsCollection(setupLog logr.Logger, useExperimentalDatalayer bool) (datalayer.EndpointFactory, error) {

mkdocs.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ nav:
7373
- InferencePool Rollout: guides/inferencepool-rollout.md
7474
- Metrics and Observability: guides/metrics-and-observability.md
7575
- Configuration Guide:
76-
- Configuring the plugins via configuration YAML file: guides/epp-configuration/config-text.md
76+
- Configuring the EndPoint Picker via configuration YAML file: guides/epp-configuration/config-text.md
7777
- Prefix Cache Aware Plugin: guides/epp-configuration/prefix-aware.md
7878
- Migration Guide: guides/ga-migration.md
7979
- Troubleshooting Guide: guides/troubleshooting.md

pkg/epp/config/config.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,15 @@ limitations under the License.
1616

1717
package config
1818

19-
import "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling"
19+
import (
20+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/saturationdetector"
21+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling"
22+
)
2023

2124
// Config is the configuration loaded from the text based configuration
2225
type Config struct {
23-
SchedulerConfig *scheduling.SchedulerConfig
26+
SchedulerConfig *scheduling.SchedulerConfig
27+
SaturationDetectorConfig *saturationdetector.Config
2428
}
29+
30+
type FeatureConfig map[string]bool

0 commit comments

Comments
 (0)