@@ -19,6 +19,7 @@ package runner
1919import (
2020 "context"
2121 "crypto/tls"
22+ "encoding/json"
2223 "errors"
2324 "flag"
2425 "fmt"
@@ -54,13 +55,15 @@ import (
5455 "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol"
5556 fccontroller "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/controller"
5657 fcregistry "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/registry"
58+ latencypredictor "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/latencypredictorasync"
5759 "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
5860 "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics/collectors"
5961 "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
6062 "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/requestcontrol"
6163 "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/saturationdetector"
6264 "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling"
6365 "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/multi/prefix"
66+ "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/multi/slo_aware_router"
6467 "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/picker"
6568 "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/profile"
6669 "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/scorer"
@@ -108,6 +111,7 @@ var (
108111 "then a self-signed certificate is used." )
109112 // metric flags
110113 totalQueuedRequestsMetric = flag .String ("total-queued-requests-metric" , runserver .DefaultTotalQueuedRequestsMetric , "Prometheus metric for the number of queued requests." )
114+ totalRunningRequestsMetric = flag .String ("total-running-requests-metric" , runserver .DefaultTotalRunningRequestsMetric , "Prometheus metric for the number of running requests." )
111115 kvCacheUsagePercentageMetric = flag .String ("kv-cache-usage-percentage-metric" , runserver .DefaultKvCacheUsagePercentageMetric , "Prometheus metric for the fraction of KV-cache blocks currently in use (from 0 to 1)." )
112116 // LoRA metrics
113117 loraInfoMetric = flag .String ("lora-info-metric" , runserver .DefaultLoraInfoMetric , "Prometheus metric for the LoRA info metrics (must be in vLLM label format)." )
@@ -127,7 +131,10 @@ var (
127131 modelServerMetricsScheme = flag .String ("model-server-metrics-scheme" , "http" , "Scheme to scrape metrics from pods" )
128132 modelServerMetricsHttpsInsecureSkipVerify = flag .Bool ("model-server-metrics-https-insecure-skip-verify" , true , "When using 'https' scheme for 'model-server-metrics-scheme', configure 'InsecureSkipVerify' (default to true)" )
129133 haEnableLeaderElection = flag .Bool ("ha-enable-leader-election" , false , "Enables leader election for high availability. When enabled, readiness probes will only pass on the leader." )
130- tracing = flag .Bool ("tracing" , true , "Enables emitting traces" )
134+
135+ // Latency Predictor Flag
136+ enableLatencyPredictor = flag .Bool ("enable-latency-predictor" , false , "Enable the regression-based latency predictor and scheduler scorer." )
137+ tracing = flag .Bool ("tracing" , true , "Enables emitting traces" )
131138
132139 setupLog = ctrl .Log .WithName ("setup" )
133140)
@@ -288,9 +295,29 @@ func (r *Runner) Run(ctx context.Context) error {
288295 runtime .SetBlockProfileRate (1 )
289296 }
290297
291- err = r .parsePluginsConfiguration (ctx , datastore )
298+ // ===================================================================
299+ // == Latency Predictor Integration
300+ // ===================================================================
301+ var predictor latencypredictor.PredictorInterface // Use the interface type
302+ if * enableLatencyPredictor {
303+ setupLog .Info ("Latency predictor is enabled. Initializing..." )
304+ predictor = latencypredictor .New (latencypredictor .ConfigFromEnv (), ctrl .Log .WithName ("latency-predictor" ))
305+
306+ // For the runnable, you'll need to type assert back to the concrete type
307+ concretePredictor := predictor .(* latencypredictor.Predictor )
308+ if err := mgr .Add (runnable .NoLeaderElection (& predictorRunnable {predictor : concretePredictor })); err != nil {
309+ setupLog .Error (err , "Failed to register latency predictor runnable" )
310+ return err
311+ }
312+ } else {
313+ setupLog .Info ("Latency predictor is disabled." )
314+ predictor = nil // This will be a true nil interface
315+ }
316+ // ===================================================================
317+
318+ err = r .parsePluginsConfiguration (ctx , predictor , datastore )
292319 if err != nil {
293- setupLog .Error (err , "Failed to parse plugins configuration" )
320+ setupLog .Error (err , "Failed to parse the configuration" )
294321 return err
295322 }
296323
@@ -359,6 +386,7 @@ func (r *Runner) Run(ctx context.Context) error {
359386 Director : director ,
360387 SaturationDetector : saturationDetector ,
361388 UseExperimentalDatalayerV2 : useDatalayerV2 , // pluggable data layer feature flag
389+ LatencyPredictor : predictor ,
362390 }
363391 if err := serverRunner .SetupWithManager (ctx , mgr ); err != nil {
364392 setupLog .Error (err , "Failed to setup EPP controllers" )
@@ -401,7 +429,14 @@ func (r *Runner) registerInTreePlugins() {
401429 plugins .Register (testfilter .HeaderBasedTestingFilterType , testfilter .HeaderBasedTestingFilterFactory )
402430}
403431
404- func (r * Runner ) parsePluginsConfiguration (ctx context.Context , ds datastore.Datastore ) error {
432+ func (r * Runner ) registerLatencyPredictorPlugins (predictor latencypredictor.PredictorInterface ) {
433+ plugins .Register (slo_aware_router .SLOAwareRouterPluginType , func (name string , _ json.RawMessage , _ plugins.Handle ) (plugins.Plugin , error ) {
434+ return slo_aware_router .NewSLOAwareRouter (predictor , slo_aware_router .HeadroomSelectionStrategy ).WithName (name ), nil
435+ })
436+ plugins .Register (profile .SLOAwareProfileHandlerType , profile .SLOAwareProfileHandlerFactory )
437+ }
438+
439+ func (r * Runner ) parsePluginsConfiguration (ctx context.Context , predictor latencypredictor.PredictorInterface , ds datastore.Datastore ) error {
405440 if * configText == "" && * configFile == "" {
406441 return nil // configuring through code, not through file
407442 }
@@ -420,6 +455,12 @@ func (r *Runner) parsePluginsConfiguration(ctx context.Context, ds datastore.Dat
420455 }
421456
422457 r .registerInTreePlugins ()
458+ // If we have a latency predictor enabled and predictor and datastore are not nil,
459+ // register the latency predictor plugins (currently just the SLO scorer).
460+ if * enableLatencyPredictor && predictor != nil {
461+ setupLog .Info ("Registering latency predictor plugins" )
462+ r .registerLatencyPredictorPlugins (predictor )
463+ }
423464 handle := plugins .NewEppHandle (ctx , ds .PodList )
424465 config , err := loader .LoadConfig (configBytes , handle , logger )
425466
@@ -450,6 +491,7 @@ func (r *Runner) setupMetricsCollection(setupLog logr.Logger, useExperimentalDat
450491func setupMetricsV1 (setupLog logr.Logger ) (datalayer.EndpointFactory , error ) {
451492 mapping , err := backendmetrics .NewMetricMapping (
452493 * totalQueuedRequestsMetric ,
494+ * totalRunningRequestsMetric ,
453495 * kvCacheUsagePercentageMetric ,
454496 * loraInfoMetric ,
455497 * cacheInfoMetric ,
@@ -493,6 +535,7 @@ func setupDatalayer() (datalayer.EndpointFactory, error) {
493535 * modelServerMetricsHttpsInsecureSkipVerify ,
494536 nil )
495537 extractor , err := dlmetrics .NewExtractor (* totalQueuedRequestsMetric ,
538+ * totalRunningRequestsMetric ,
496539 * kvCacheUsagePercentageMetric ,
497540 * loraInfoMetric , * cacheInfoMetric )
498541
@@ -604,3 +647,21 @@ func setupPprofHandlers(mgr ctrl.Manager) error {
604647 }
605648 return nil
606649}
650+
651+ // ===================================================================
652+ // == Latency Predictor Plugin and Helpers
653+ // ===================================================================
654+
655+ // predictorRunnable implements controller-runtime's Runnable interface to manage the predictor's lifecycle.
656+ type predictorRunnable struct {
657+ predictor * latencypredictor.Predictor
658+ }
659+
660+ func (p * predictorRunnable ) Start (ctx context.Context ) error {
661+ setupLog .Info ("Starting latency predictor..." )
662+ p .predictor .Start (ctx )
663+ <- ctx .Done ()
664+ setupLog .Info ("Stopping latency predictor..." )
665+ p .predictor .Stop ()
666+ return nil
667+ }
0 commit comments