Skip to content

Commit a5fe612

Browse files
some cleanup in runner and config loading + deprecation notes (#1880)
* some cleanup in runner and config loading + deprecation notes Signed-off-by: Nir Rozenbaum <[email protected]> * Apply suggestions from code review Co-authored-by: Shmuel Kallner <[email protected]> --------- Signed-off-by: Nir Rozenbaum <[email protected]> Co-authored-by: Shmuel Kallner <[email protected]>
1 parent bef80ca commit a5fe612

File tree

4 files changed

+30
-35
lines changed

4 files changed

+30
-35
lines changed

cmd/epp/runner/runner.go

Lines changed: 26 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -76,12 +76,15 @@ import (
7676

7777
const (
7878
// enableExperimentalDatalayerV2 defines the environment variable used as feature flag for the pluggable data layer.
79+
// DEPRECATION NOTICE - this env var will be removed in the next version as we switch to configuring the EPP using FeatureGates in the config file.
7980
enableExperimentalDatalayerV2 = "ENABLE_EXPERIMENTAL_DATALAYER_V2"
8081
// enableExperimentalFlowControlLayer defines the environment variable used as a feature flag for the pluggable flow
8182
// control layer.
83+
// DEPRECATION NOTICE - this env var will be removed in the next version as we switch to configuring the EPP using FeatureGates in the config file.
8284
enableExperimentalFlowControlLayer = "ENABLE_EXPERIMENTAL_FLOW_CONTROL_LAYER"
8385

8486
// Saturation Detector deprecated configuration environment variables
87+
// DEPRECATION NOTICE - these env vars will be removed in the next version as we switch to configuring the EPP using the config file.
8588
EnvSdQueueDepthThreshold = "SD_QUEUE_DEPTH_THRESHOLD"
8689
EnvSdKVCacheUtilThreshold = "SD_KV_CACHE_UTIL_THRESHOLD"
8790
EnvSdMetricsStalenessThreshold = "SD_METRICS_STALENESS_THRESHOLD"
@@ -145,13 +148,14 @@ func NewRunner() *Runner {
145148
return &Runner{
146149
eppExecutableName: "GIE",
147150
requestControlConfig: requestcontrol.NewConfig(), // default requestcontrol config has empty plugin list
151+
customCollectors: []prometheus.Collector{},
148152
}
149153
}
150154

151155
// Runner is used to run epp with its plugins
152156
type Runner struct {
153157
eppExecutableName string // the EPP executable name
154-
featureGates config.FeatureConfig
158+
featureGates map[string]bool
155159
requestControlConfig *requestcontrol.Config
156160
schedulerConfig *scheduling.SchedulerConfig
157161
customCollectors []prometheus.Collector
@@ -216,12 +220,11 @@ func (r *Runner) Run(ctx context.Context) error {
216220
return err
217221
}
218222

219-
rawConfig, featureGates, err := r.parseConfigurationPhaseOne(ctx)
223+
rawConfig, err := r.parseConfigurationPhaseOne(ctx)
220224
if err != nil {
221225
setupLog.Error(err, "Failed to parse configuration")
222226
return err
223227
}
224-
r.featureGates = featureGates
225228

226229
// --- Setup Datastore ---
227230
epf, err := r.setupMetricsCollection(setupLog, r.featureGates[datalayer.FeatureGate])
@@ -237,11 +240,8 @@ func (r *Runner) Run(ctx context.Context) error {
237240
}
238241

239242
// --- Setup Metrics Server ---
240-
customCollectors := []prometheus.Collector{collectors.NewInferencePoolMetricsCollector(datastore)}
241-
if r.customCollectors != nil {
242-
customCollectors = append(customCollectors, r.customCollectors...)
243-
}
244-
metrics.Register(customCollectors...)
243+
r.customCollectors = append(r.customCollectors, collectors.NewInferencePoolMetricsCollector(datastore))
244+
metrics.Register(r.customCollectors...)
245245
metrics.RecordInferenceExtensionInfo(version.CommitSHA, version.BuildRef)
246246
// Register metrics handler.
247247
// Metrics endpoint is enabled in 'config/default/kustomization.yaml'. The Metrics options configure the server.
@@ -342,13 +342,7 @@ func (r *Runner) Run(ctx context.Context) error {
342342
if err != nil {
343343
return fmt.Errorf("failed to initialize Flow Registry: %w", err)
344344
}
345-
fc, err := fccontroller.NewFlowController(
346-
ctx,
347-
fcCfg.Controller,
348-
registry,
349-
saturationDetector,
350-
setupLog,
351-
)
345+
fc, err := fccontroller.NewFlowController(ctx, fcCfg.Controller, registry, saturationDetector, setupLog)
352346
if err != nil {
353347
return fmt.Errorf("failed to initialize Flow Controller: %w", err)
354348
}
@@ -359,11 +353,7 @@ func (r *Runner) Run(ctx context.Context) error {
359353
admissionController = requestcontrol.NewLegacyAdmissionController(saturationDetector)
360354
}
361355

362-
director := requestcontrol.NewDirectorWithConfig(
363-
datastore,
364-
scheduler,
365-
admissionController,
366-
r.requestControlConfig)
356+
director := requestcontrol.NewDirectorWithConfig(datastore, scheduler, admissionController, r.requestControlConfig)
367357

368358
// --- Setup ExtProc Server Runner ---
369359
serverRunner := &runserver.ExtProcServerRunner{
@@ -423,9 +413,9 @@ func (r *Runner) registerInTreePlugins() {
423413
plugins.Register(testresponsereceived.DestinationEndpointServedVerifierType, testresponsereceived.DestinationEndpointServedVerifierFactory)
424414
}
425415

426-
func (r *Runner) parseConfigurationPhaseOne(ctx context.Context) (*configapi.EndpointPickerConfig, config.FeatureConfig, error) {
416+
func (r *Runner) parseConfigurationPhaseOne(ctx context.Context) (*configapi.EndpointPickerConfig, error) {
427417
if *configText == "" && *configFile == "" {
428-
return nil, nil, nil // configuring through code, not through file
418+
return nil, nil // configuring through code, not through file
429419
}
430420

431421
logger := log.FromContext(ctx)
@@ -437,7 +427,7 @@ func (r *Runner) parseConfigurationPhaseOne(ctx context.Context) (*configapi.End
437427
var err error
438428
configBytes, err = os.ReadFile(*configFile)
439429
if err != nil {
440-
return nil, nil, fmt.Errorf("failed to load config from a file '%s' - %w", *configFile, err)
430+
return nil, fmt.Errorf("failed to load config from a file '%s' - %w", *configFile, err)
441431
}
442432
}
443433

@@ -446,7 +436,14 @@ func (r *Runner) parseConfigurationPhaseOne(ctx context.Context) (*configapi.End
446436

447437
r.registerInTreePlugins()
448438

449-
return loader.LoadConfigPhaseOne(configBytes, logger)
439+
rawConfig, featureGates, err := loader.LoadConfigPhaseOne(configBytes, logger)
440+
if err != nil {
441+
return nil, fmt.Errorf("failed to parse config - %w", err)
442+
}
443+
444+
r.featureGates = featureGates
445+
446+
return rawConfig, nil
450447
}
451448

452449
func (r *Runner) parseConfigurationPhaseTwo(ctx context.Context, rawConfig *configapi.EndpointPickerConfig, ds datastore.Datastore) (*config.Config, error) {
@@ -474,33 +471,33 @@ func (r *Runner) deprecatedConfigurationHelper(cfg *config.Config, logger logr.L
474471
// Handle deprecated environment variable based feature flags
475472

476473
if _, ok := os.LookupEnv(enableExperimentalDatalayerV2); ok {
477-
logger.Info("Enabling the experimental Data Layer V2 using environment variables is deprecated")
474+
logger.Info("Enabling the experimental Data Layer V2 using environment variables is deprecated and will be removed in next version")
478475
r.featureGates[datalayer.FeatureGate] = env.GetEnvBool(enableExperimentalDatalayerV2, false, logger)
479476
}
480477
if _, ok := os.LookupEnv(enableExperimentalFlowControlLayer); ok {
481-
logger.Info("Enabling the experimental Flow Control layer using environment variables is deprecated")
478+
logger.Info("Enabling the experimental Flow Control layer using environment variables is deprecated and will be removed in next version")
482479
r.featureGates[flowcontrol.FeatureGate] = env.GetEnvBool(enableExperimentalFlowControlLayer, false, setupLog)
483480
}
484481

485482
// Handle deprecated environment variable base Saturation Detector configuration
486483

487484
if _, ok := os.LookupEnv(EnvSdQueueDepthThreshold); ok {
488-
logger.Info("Configuring Saturation Detector using environment variables is deprecated")
485+
logger.Info("Configuring Saturation Detector using environment variables is deprecated and will be removed in next version")
489486
cfg.SaturationDetectorConfig.QueueDepthThreshold =
490487
env.GetEnvInt(EnvSdQueueDepthThreshold, saturationdetector.DefaultQueueDepthThreshold, logger)
491488
if cfg.SaturationDetectorConfig.QueueDepthThreshold <= 0 {
492489
cfg.SaturationDetectorConfig.QueueDepthThreshold = saturationdetector.DefaultQueueDepthThreshold
493490
}
494491
}
495492
if _, ok := os.LookupEnv(EnvSdKVCacheUtilThreshold); ok {
496-
logger.Info("Configuring Saturation Detector using environment variables is deprecated")
493+
logger.Info("Configuring Saturation Detector using environment variables is deprecated and will be removed in next version")
497494
cfg.SaturationDetectorConfig.KVCacheUtilThreshold = env.GetEnvFloat(EnvSdKVCacheUtilThreshold, saturationdetector.DefaultKVCacheUtilThreshold, logger)
498495
if cfg.SaturationDetectorConfig.KVCacheUtilThreshold <= 0 || cfg.SaturationDetectorConfig.KVCacheUtilThreshold >= 1 {
499496
cfg.SaturationDetectorConfig.KVCacheUtilThreshold = saturationdetector.DefaultKVCacheUtilThreshold
500497
}
501498
}
502499
if _, ok := os.LookupEnv(EnvSdMetricsStalenessThreshold); ok {
503-
logger.Info("Configuring Saturation Detector using environment variables is deprecated")
500+
logger.Info("Configuring Saturation Detector using environment variables is deprecated and will be removed in next version")
504501
cfg.SaturationDetectorConfig.MetricsStalenessThreshold = env.GetEnvDuration(EnvSdMetricsStalenessThreshold, saturationdetector.DefaultMetricsStalenessThreshold, logger)
505502
if cfg.SaturationDetectorConfig.MetricsStalenessThreshold <= 0 {
506503
cfg.SaturationDetectorConfig.MetricsStalenessThreshold = saturationdetector.DefaultMetricsStalenessThreshold

pkg/epp/config/config.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,5 +26,3 @@ type Config struct {
2626
SchedulerConfig *scheduling.SchedulerConfig
2727
SaturationDetectorConfig *saturationdetector.Config
2828
}
29-
30-
type FeatureConfig map[string]bool

pkg/epp/config/loader/configloader.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,14 +37,14 @@ import (
3737

3838
var scheme = runtime.NewScheme()
3939

40-
var registeredFeatureGates = map[string]struct{}{}
40+
var registeredFeatureGates = sets.New[string]() // set of feature gates names, a name must be unique
4141

4242
func init() {
4343
utilruntime.Must(configapi.Install(scheme))
4444
}
4545

4646
// LoadConfigPhaseOne first phase of loading configuration from supplied text that was converted to []byte
47-
func LoadConfigPhaseOne(configBytes []byte, logger logr.Logger) (*configapi.EndpointPickerConfig, config.FeatureConfig, error) {
47+
func LoadConfigPhaseOne(configBytes []byte, logger logr.Logger) (*configapi.EndpointPickerConfig, map[string]bool, error) {
4848
rawConfig, err := loadRawConfig(configBytes)
4949
if err != nil {
5050
return nil, nil, err
@@ -201,5 +201,5 @@ func instantiatePlugins(configuredPlugins []configapi.PluginSpec, handle plugins
201201

202202
// RegisterFeatureGate registers feature gate keys for validation
203203
func RegisterFeatureGate(gate string) {
204-
registeredFeatureGates[gate] = struct{}{}
204+
registeredFeatureGates.Insert(gate)
205205
}

pkg/epp/config/loader/validation.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ func validateFeatureGates(fg configapi.FeatureGates) error {
6262
}
6363

6464
for _, gate := range fg {
65-
if _, ok := registeredFeatureGates[gate]; !ok {
65+
if !registeredFeatureGates.Has(gate) {
6666
return errors.New(gate + " is an unregistered Feature Gate")
6767
}
6868
}

0 commit comments

Comments
 (0)