@@ -32,6 +32,7 @@ import (
32
32
"go.uber.org/zap/zapcore"
33
33
"google.golang.org/grpc"
34
34
healthPb "google.golang.org/grpc/health/grpc_health_v1"
35
+ "k8s.io/apimachinery/pkg/runtime/schema"
35
36
"k8s.io/apimachinery/pkg/types"
36
37
ctrl "sigs.k8s.io/controller-runtime"
37
38
"sigs.k8s.io/controller-runtime/pkg/log"
@@ -41,6 +42,7 @@ import (
41
42
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
42
43
43
44
"sigs.k8s.io/gateway-api-inference-extension/internal/runnable"
45
+ "sigs.k8s.io/gateway-api-inference-extension/pkg/common"
44
46
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
45
47
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/config/loader"
46
48
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
@@ -50,7 +52,6 @@ import (
50
52
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/requestcontrol"
51
53
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/saturationdetector"
52
54
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling"
53
- "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/filter"
54
55
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/multi/prefix"
55
56
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/picker"
56
57
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/profile"
82
83
"pool-name" ,
83
84
runserver .DefaultPoolName ,
84
85
"Name of the InferencePool this Endpoint Picker is associated with." )
86
+ poolGroup = flag .String (
87
+ "pool-group" ,
88
+ runserver .DefaultPoolGroup ,
89
+ "group of the InferencePool this Endpoint Picker is associated with." )
85
90
poolNamespace = flag .String (
86
91
"pool-namespace" ,
87
92
runserver .DefaultPoolNamespace ,
@@ -104,20 +109,6 @@ var (
104
109
"The path to the certificate for secure serving. The certificate and private key files " +
105
110
"are assumed to be named tls.crt and tls.key, respectively. If not set, and secureServing is enabled, " +
106
111
"then a self-signed certificate is used." )
107
- // header/metadata flags
108
- destinationEndpointHintKey = flag .String (
109
- "destination-endpoint-hint-key" ,
110
- runserver .DefaultDestinationEndpointHintKey ,
111
- "Header and response metadata key used by Envoy to route to the appropriate pod. This must match Envoy configuration." )
112
- destinationEndpointHintMetadataNamespace = flag .String (
113
- "destination-endpoint-hint-metadata-namespace" ,
114
- runserver .DefaultDestinationEndpointHintMetadataNamespace ,
115
- "The key for the outer namespace struct in the metadata field of the extproc response that is used to wrap the" +
116
- "target endpoint. If not set, then an outer namespace struct should not be created." )
117
- fairnessIDHeaderKey = flag .String (
118
- "fairness-id-header-key" ,
119
- runserver .DefaultFairnessIDHeaderKey ,
120
- "The header key used to pass the fairness ID to be used in Flow Control." )
121
112
// metric flags
122
113
totalQueuedRequestsMetric = flag .String (
123
114
"total-queued-requests-metric" ,
@@ -196,7 +187,6 @@ func bindEnvToFlags() {
196
187
"MODEL_SERVER_METRICS_PATH" : "model-server-metrics-path" ,
197
188
"MODEL_SERVER_METRICS_SCHEME" : "model-server-metrics-scheme" ,
198
189
"MODEL_SERVER_METRICS_HTTPS_INSECURE_SKIP_VERIFY" : "model-server-metrics-https-insecure-skip-verify" ,
199
- "DESTINATION_ENDPOINT_HINT_KEY" : "destination-endpoint-hint-key" ,
200
190
"POOL_NAME" : "pool-name" ,
201
191
"POOL_NAMESPACE" : "pool-namespace" ,
202
192
// durations & bools work too; flag.Set expects the *string* form
@@ -301,7 +291,15 @@ func (r *Runner) Run(ctx context.Context) error {
301
291
Name : * poolName ,
302
292
Namespace : * poolNamespace ,
303
293
}
304
- mgr , err := runserver .NewDefaultManager (poolNamespacedName , cfg , metricsServerOptions )
294
+ poolGroupKind := schema.GroupKind {
295
+ Group : * poolGroup ,
296
+ Kind : "InferencePool" ,
297
+ }
298
+ poolGKNN := common.GKNN {
299
+ NamespacedName : poolNamespacedName ,
300
+ GroupKind : poolGroupKind ,
301
+ }
302
+ mgr , err := runserver .NewDefaultManager (poolGKNN , cfg , metricsServerOptions )
305
303
if err != nil {
306
304
setupLog .Error (err , "Failed to create controller manager" )
307
305
return err
@@ -339,19 +337,17 @@ func (r *Runner) Run(ctx context.Context) error {
339
337
340
338
// --- Setup ExtProc Server Runner ---
341
339
serverRunner := & runserver.ExtProcServerRunner {
342
- GrpcPort : * grpcPort ,
343
- DestinationEndpointHintMetadataNamespace : * destinationEndpointHintMetadataNamespace ,
344
- DestinationEndpointHintKey : * destinationEndpointHintKey ,
345
- FairnessIDHeaderKey : * fairnessIDHeaderKey ,
346
- PoolNamespacedName : poolNamespacedName ,
347
- Datastore : datastore ,
348
- SecureServing : * secureServing ,
349
- HealthChecking : * healthChecking ,
350
- CertPath : * certPath ,
351
- RefreshPrometheusMetricsInterval : * refreshPrometheusMetricsInterval ,
352
- MetricsStalenessThreshold : * metricsStalenessThreshold ,
353
- Director : director ,
354
- SaturationDetector : saturationDetector ,
340
+ GrpcPort : * grpcPort ,
341
+ PoolNamespacedName : poolNamespacedName ,
342
+ PoolGKNN : poolGKNN ,
343
+ Datastore : datastore ,
344
+ SecureServing : * secureServing ,
345
+ HealthChecking : * healthChecking ,
346
+ CertPath : * certPath ,
347
+ RefreshPrometheusMetricsInterval : * refreshPrometheusMetricsInterval ,
348
+ MetricsStalenessThreshold : * metricsStalenessThreshold ,
349
+ Director : director ,
350
+ SaturationDetector : saturationDetector ,
355
351
}
356
352
if err := serverRunner .SetupWithManager (ctx , mgr ); err != nil {
357
353
setupLog .Error (err , "Failed to setup EPP controllers" )
@@ -382,11 +378,6 @@ func (r *Runner) Run(ctx context.Context) error {
382
378
383
379
// registerInTreePlugins registers the factory functions of all known plugins
384
380
func (r * Runner ) registerInTreePlugins () {
385
- plugins .Register (filter .DecisionTreeFilterType , filter .DecisionTreeFilterFactory )
386
- plugins .Register (filter .LeastKVCacheFilterType , filter .LeastKVCacheFilterFactory )
387
- plugins .Register (filter .LeastQueueFilterType , filter .LeastQueueFilterFactory )
388
- plugins .Register (filter .LoraAffinityFilterType , filter .LoraAffinityFilterFactory )
389
- plugins .Register (filter .LowQueueFilterType , filter .LowQueueFilterFactory )
390
381
plugins .Register (prefix .PrefixCachePluginType , prefix .PrefixCachePluginFactory )
391
382
plugins .Register (picker .MaxScorePickerType , picker .MaxScorePickerFactory )
392
383
plugins .Register (picker .RandomPickerType , picker .RandomPickerFactory )
@@ -403,6 +394,8 @@ func (r *Runner) parsePluginsConfiguration(ctx context.Context) error {
403
394
return nil // configuring through code, not through file
404
395
}
405
396
397
+ logger := log .FromContext (ctx )
398
+
406
399
var configBytes []byte
407
400
if * configText != "" {
408
401
configBytes = []byte (* configText )
@@ -416,20 +409,17 @@ func (r *Runner) parsePluginsConfiguration(ctx context.Context) error {
416
409
417
410
r .registerInTreePlugins ()
418
411
handle := plugins .NewEppHandle (ctx )
419
- config , err := loader .LoadConfig (configBytes , handle )
412
+ config , err := loader .LoadConfig (configBytes , handle , logger )
420
413
if err != nil {
421
414
return fmt .Errorf ("failed to load the configuration - %w" , err )
422
415
}
423
416
424
- r .schedulerConfig , err = loader .LoadSchedulerConfig (config .SchedulingProfiles , handle )
425
- if err != nil {
426
- return fmt .Errorf ("failed to create Scheduler configuration - %w" , err )
427
- }
417
+ r .schedulerConfig = config .SchedulerConfig
428
418
429
419
// Add requestControl plugins
430
420
r .requestControlConfig .AddPlugins (handle .GetAllPlugins ()... )
431
421
432
- log . FromContext ( ctx ) .Info ("loaded configuration from file/text successfully" )
422
+ logger .Info ("loaded configuration from file/text successfully" )
433
423
return nil
434
424
}
435
425
0 commit comments