Skip to content

Commit 61b0996

Browse files
Add latency predictor plugins, deployment, and runner.go integration
1 parent 5b22378 commit 61b0996

File tree

27 files changed

+3949
-27
lines changed

27 files changed

+3949
-27
lines changed

cmd/epp/runner/runner.go

Lines changed: 65 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package runner
1919
import (
2020
"context"
2121
"crypto/tls"
22+
"encoding/json"
2223
"errors"
2324
"flag"
2425
"fmt"
@@ -61,13 +62,15 @@ import (
6162
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/saturationdetector"
6263
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling"
6364
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/multi/prefix"
65+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/multi/slo_aware_router"
6466
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/picker"
6567
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/profile"
6668
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/scorer"
6769
testfilter "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/test/filter"
6870
runserver "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/server"
6971
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/env"
7072
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
73+
latencypredictor "sigs.k8s.io/gateway-api-inference-extension/sidecars/latencypredictorasync"
7174
"sigs.k8s.io/gateway-api-inference-extension/version"
7275
)
7376

@@ -108,6 +111,7 @@ var (
108111
"then a self-signed certificate is used.")
109112
// metric flags
110113
totalQueuedRequestsMetric = flag.String("total-queued-requests-metric", runserver.DefaultTotalQueuedRequestsMetric, "Prometheus metric for the number of queued requests.")
114+
totalRunningRequestsMetric = flag.String("total-running-requests-metric", runserver.DefaultTotalRunningRequestsMetric, "Prometheus metric for the number of running requests.")
111115
kvCacheUsagePercentageMetric = flag.String("kv-cache-usage-percentage-metric", runserver.DefaultKvCacheUsagePercentageMetric, "Prometheus metric for the fraction of KV-cache blocks currently in use (from 0 to 1).")
112116
// LoRA metrics
113117
loraInfoMetric = flag.String("lora-info-metric", runserver.DefaultLoraInfoMetric, "Prometheus metric for the LoRA info metrics (must be in vLLM label format).")
@@ -127,7 +131,10 @@ var (
127131
modelServerMetricsScheme = flag.String("model-server-metrics-scheme", "http", "Scheme to scrape metrics from pods")
128132
modelServerMetricsHttpsInsecureSkipVerify = flag.Bool("model-server-metrics-https-insecure-skip-verify", true, "When using 'https' scheme for 'model-server-metrics-scheme', configure 'InsecureSkipVerify' (default to true)")
129133
haEnableLeaderElection = flag.Bool("ha-enable-leader-election", false, "Enables leader election for high availability. When enabled, readiness probes will only pass on the leader.")
130-
tracing = flag.Bool("tracing", true, "Enables emitting traces")
134+
135+
// Latency Predictor Flag
136+
enableLatencyPredictor = flag.Bool("enable-latency-predictor", false, "Enable the regression-based latency predictor and scheduler scorer.")
137+
tracing = flag.Bool("tracing", true, "Enables emitting traces")
131138

132139
setupLog = ctrl.Log.WithName("setup")
133140
)
@@ -297,9 +304,29 @@ func (r *Runner) Run(ctx context.Context) error {
297304
runtime.SetBlockProfileRate(1)
298305
}
299306

300-
err = r.parsePluginsConfiguration(ctx, datastore)
307+
// ===================================================================
308+
// == Latency Predictor Integration
309+
// ===================================================================
310+
var predictor latencypredictor.PredictorInterface // Use the interface type
311+
if *enableLatencyPredictor {
312+
setupLog.Info("Latency predictor is enabled. Initializing...")
313+
predictor = latencypredictor.New(latencypredictor.ConfigFromEnv(), ctrl.Log.WithName("latency-predictor"))
314+
315+
// For the runnable, you'll need to type assert back to the concrete type
316+
concretePredictor := predictor.(*latencypredictor.Predictor)
317+
if err := mgr.Add(runnable.NoLeaderElection(&predictorRunnable{predictor: concretePredictor})); err != nil {
318+
setupLog.Error(err, "Failed to register latency predictor runnable")
319+
return err
320+
}
321+
} else {
322+
setupLog.Info("Latency predictor is disabled.")
323+
predictor = nil // This will be a true nil interface
324+
}
325+
// ===================================================================
326+
327+
err = r.parsePluginsConfiguration(ctx, predictor, datastore)
301328
if err != nil {
302-
setupLog.Error(err, "Failed to parse plugins configuration")
329+
setupLog.Error(err, "Failed to parse the configuration")
303330
return err
304331
}
305332

@@ -368,6 +395,7 @@ func (r *Runner) Run(ctx context.Context) error {
368395
Director: director,
369396
SaturationDetector: saturationDetector,
370397
UseExperimentalDatalayerV2: useDatalayerV2, // pluggable data layer feature flag
398+
LatencyPredictor: predictor,
371399
}
372400
if err := serverRunner.SetupWithManager(ctx, mgr); err != nil {
373401
setupLog.Error(err, "Failed to setup EPP controllers")
@@ -410,7 +438,14 @@ func (r *Runner) registerInTreePlugins() {
410438
plugins.Register(testfilter.HeaderBasedTestingFilterType, testfilter.HeaderBasedTestingFilterFactory)
411439
}
412440

413-
func (r *Runner) parsePluginsConfiguration(ctx context.Context, ds datastore.Datastore) error {
441+
func (r *Runner) registerLatencyPredictorPlugins(predictor latencypredictor.PredictorInterface) {
442+
plugins.Register(slo_aware_router.SLOAwareRouterPluginType, func(name string, _ json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) {
443+
return slo_aware_router.NewSLOAwareRouter(predictor, slo_aware_router.HeadroomSelectionStrategy).WithName(name), nil
444+
})
445+
plugins.Register(profile.SLOAwareProfileHandlerType, profile.SLOAwareProfileHandlerFactory)
446+
}
447+
448+
func (r *Runner) parsePluginsConfiguration(ctx context.Context, predictor latencypredictor.PredictorInterface, ds datastore.Datastore) error {
414449
if *configText == "" && *configFile == "" {
415450
return nil // configuring through code, not through file
416451
}
@@ -429,6 +464,12 @@ func (r *Runner) parsePluginsConfiguration(ctx context.Context, ds datastore.Dat
429464
}
430465

431466
r.registerInTreePlugins()
467+
// If we have a latency predictor enabled and predictor and datastore are not nil,
468+
// register the latency predictor plugins (currently just the SLO scorer).
469+
if *enableLatencyPredictor && predictor != nil {
470+
setupLog.Info("Registering latency predictor plugins")
471+
r.registerLatencyPredictorPlugins(predictor)
472+
}
432473
handle := plugins.NewEppHandle(ctx, ds.PodList)
433474
config, err := loader.LoadConfig(configBytes, handle, logger)
434475

@@ -459,6 +500,7 @@ func (r *Runner) setupMetricsCollection(setupLog logr.Logger, useExperimentalDat
459500
func setupMetricsV1(setupLog logr.Logger) (datalayer.EndpointFactory, error) {
460501
mapping, err := backendmetrics.NewMetricMapping(
461502
*totalQueuedRequestsMetric,
503+
*totalRunningRequestsMetric,
462504
*kvCacheUsagePercentageMetric,
463505
*loraInfoMetric,
464506
*cacheInfoMetric,
@@ -502,6 +544,7 @@ func setupDatalayer() (datalayer.EndpointFactory, error) {
502544
*modelServerMetricsHttpsInsecureSkipVerify,
503545
nil)
504546
extractor, err := dlmetrics.NewExtractor(*totalQueuedRequestsMetric,
547+
*totalRunningRequestsMetric,
505548
*kvCacheUsagePercentageMetric,
506549
*loraInfoMetric, *cacheInfoMetric)
507550

@@ -613,3 +656,21 @@ func setupPprofHandlers(mgr ctrl.Manager) error {
613656
}
614657
return nil
615658
}
659+
660+
// ===================================================================
661+
// == Latency Predictor Plugin and Helpers
662+
// ===================================================================
663+
664+
// predictorRunnable implements controller-runtime's Runnable interface to manage the predictor's lifecycle.
665+
type predictorRunnable struct {
666+
predictor *latencypredictor.Predictor
667+
}
668+
669+
func (p *predictorRunnable) Start(ctx context.Context) error {
670+
setupLog.Info("Starting latency predictor...")
671+
p.predictor.Start(ctx)
672+
<-ctx.Done()
673+
setupLog.Info("Stopping latency predictor...")
674+
p.predictor.Stop()
675+
return nil
676+
}

0 commit comments

Comments
 (0)