Skip to content

Commit db08577

Browse files
Experimental SLO-Aware Routing and Latency Prediction (#1568)
* add latency predictor * add cv in model and update epp deployment * bug fix * track mape for predictions * add running queue size to metrics * add xgboost regressor and update tpot sampling logic * emit predicted and actual ttft tpot in body * seperate servers for training and prediction * add latency predictor put the predictor functions in director in a helper function add scores to reqcxt record prediction duration metrics add prefix cache score to model input slo based routing changes retreive request priority queue from the datastore update scoring logic * better inital implemenation Add scheduling profile, working state remove latencypredictor from director Move all latency prediction logic out of director and into scheduling profile. Make all Request/Response plugins take in RequestContext * progress towards fixing up merge conflicts from latency predictor merge * More refactor progress, fixing and adding tests * working state, latency prediction * Clean up changes, remove unneeded files, working functionality without latency flag and scheduling plugins * Rebase cleanup, remove duplicate lines * Integrate new alpha-beta slo scoring into scoring plugin * Fix prefix cache scoring for slo-aware routing * Add pycache or latency predictor to gitignore * Rebase with main * Fix prefix cache scoring being piped to latencyprediction_helper * add dependancies in scorer * chage to single profile * chage to single profile * restore two profiles * restore two profiles * restore two profiles * update admit request to shed based on predictions * add TODOs for future changes * Change artifact registry references to personal compiled images * Fix existing non-slo aware routing unit tests * update latency predictor with better eval metrics * Fix saturation detector unit test * Change naming of SLO headers and prediction based routing header * Remove port 9002 service on InferencePool causing make test to fail * Fix epp hermetic integration test to expect ProcessingMode Send in response header --------- Co-authored-by: kaushikmitr <[email protected]>
1 parent 8b154ba commit db08577

File tree

70 files changed

+13567
-257
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

70 files changed

+13567
-257
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
bin/*
88
Dockerfile.cross
99
artifacts
10+
latencypredictor-v1/__pycache__
1011

1112
# Test binary, built with `go test -c`
1213
*.test

cmd/epp/runner/runner.go

Lines changed: 70 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package runner
1919
import (
2020
"context"
2121
"crypto/tls"
22+
"encoding/json"
2223
"errors"
2324
"flag"
2425
"fmt"
@@ -50,10 +51,12 @@ import (
5051
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer"
5152
dlmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer/metrics"
5253
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
54+
latencypredictor "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/latencypredictorasync"
5355
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
5456
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics/collectors"
5557
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
5658
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/requestcontrol"
59+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/requestcontrol/plugins/slorequest"
5760
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/saturationdetector"
5861
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling"
5962
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/multi/prefix"
@@ -89,6 +92,7 @@ var (
8992
"then a self-signed certificate is used.")
9093
// metric flags
9194
totalQueuedRequestsMetric = flag.String("total-queued-requests-metric", runserver.DefaultTotalQueuedRequestsMetric, "Prometheus metric for the number of queued requests.")
95+
totalRunningRequestsMetric = flag.String("total-running-requests-metric", runserver.DefaultTotalRunningRequestsMetric, "Prometheus metric for the number of running requests.")
9296
kvCacheUsagePercentageMetric = flag.String("kv-cache-usage-percentage-metric", runserver.DefaultKvCacheUsagePercentageMetric, "Prometheus metric for the fraction of KV-cache blocks currently in use (from 0 to 1).")
9397
// LoRA metrics
9498
loraInfoMetric = flag.String("lora-info-metric", runserver.DefaultLoraInfoMetric, "Prometheus metric for the LoRA info metrics (must be in vLLM label format).")
@@ -107,6 +111,9 @@ var (
107111
modelServerMetricsHttpsInsecureSkipVerify = flag.Bool("model-server-metrics-https-insecure-skip-verify", true, "When using 'https' scheme for 'model-server-metrics-scheme', configure 'InsecureSkipVerify' (default to true)")
108112
haEnableLeaderElection = flag.Bool("ha-enable-leader-election", false, "Enables leader election for high availability. When enabled, readiness probes will only pass on the leader.")
109113

114+
// Latency Predictor Flag
115+
enableLatencyPredictor = flag.Bool("enable-latency-predictor", false, "Enable the regression-based latency predictor and scheduler scorer.")
116+
110117
setupLog = ctrl.Log.WithName("setup")
111118
)
112119

@@ -233,9 +240,29 @@ func (r *Runner) Run(ctx context.Context) error {
233240
runtime.SetBlockProfileRate(1)
234241
}
235242

236-
err = r.parsePluginsConfiguration(ctx)
243+
// ===================================================================
244+
// == Latency Predictor Integration
245+
// ===================================================================
246+
var predictor latencypredictor.PredictorInterface // Use the interface type
247+
if *enableLatencyPredictor {
248+
setupLog.Info("Latency predictor is enabled. Initializing...")
249+
predictor = latencypredictor.New(latencypredictor.ConfigFromEnv(), ctrl.Log.WithName("latency-predictor"))
250+
251+
// For the runnable, you'll need to type assert back to the concrete type
252+
concretePredictor := predictor.(*latencypredictor.Predictor)
253+
if err := mgr.Add(runnable.NoLeaderElection(&predictorRunnable{predictor: concretePredictor})); err != nil {
254+
setupLog.Error(err, "Failed to register latency predictor runnable")
255+
return err
256+
}
257+
} else {
258+
setupLog.Info("Latency predictor is disabled.")
259+
predictor = nil // This will be a true nil interface
260+
}
261+
// ===================================================================
262+
263+
err = r.parsePluginsConfiguration(ctx, predictor, datastore)
237264
if err != nil {
238-
setupLog.Error(err, "Failed to parse plugins configuration")
265+
setupLog.Error(err, "Failed to parse the configuration")
239266
return err
240267
}
241268

@@ -268,6 +295,7 @@ func (r *Runner) Run(ctx context.Context) error {
268295
Director: director,
269296
SaturationDetector: saturationDetector,
270297
UseExperimentalDatalayerV2: useDatalayerV2, // pluggable data layer feature flag
298+
LatencyPredictor: predictor,
271299
}
272300
if err := serverRunner.SetupWithManager(ctx, mgr); err != nil {
273301
setupLog.Error(err, "Failed to setup EPP controllers")
@@ -310,7 +338,20 @@ func (r *Runner) registerInTreePlugins() {
310338
plugins.Register(testfilter.HeaderBasedTestingFilterType, testfilter.HeaderBasedTestingFilterFactory)
311339
}
312340

313-
func (r *Runner) parsePluginsConfiguration(ctx context.Context) error {
341+
func (r *Runner) registerLatencyPredictorPlugins(predictor latencypredictor.PredictorInterface, datastore datastore.Datastore) {
342+
// Register the SLO request tracker and scorer plugin, these plugins need access to the predictor and datastore.
343+
// We have to specify a custom factory function to create the plugins with the correct dependencies.
344+
plugins.Register(slorequest.SLORequestTrackerPluginType, func(name string, _ json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) {
345+
return slorequest.New(predictor, datastore).WithName(name), nil
346+
})
347+
plugins.Register(scorer.SLOScorerPluginType, func(name string, _ json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) {
348+
return scorer.NewSLOScorer(predictor, datastore, scorer.HeadroomSelectionStrategy).WithName(name), nil
349+
})
350+
plugins.Register(profile.SLOAwareProfileHandlerType, profile.SLOAwareProfileHandlerFactory)
351+
plugins.Register(picker.WeightedRandomPickerType, picker.WeightedRandomPickerFactory)
352+
}
353+
354+
func (r *Runner) parsePluginsConfiguration(ctx context.Context, predictor latencypredictor.PredictorInterface, datastore datastore.Datastore) error {
314355
if *configText == "" && *configFile == "" {
315356
return nil // configuring through code, not through file
316357
}
@@ -329,6 +370,12 @@ func (r *Runner) parsePluginsConfiguration(ctx context.Context) error {
329370
}
330371

331372
r.registerInTreePlugins()
373+
// If we have a latency predictor enabled and predictor and datastore are not nil,
374+
// register the latency predictor plugins (currently just the SLO scorer).
375+
if *enableLatencyPredictor && predictor != nil && datastore != nil {
376+
setupLog.Info("Registering latency predictor plugins")
377+
r.registerLatencyPredictorPlugins(predictor, datastore)
378+
}
332379
handle := plugins.NewEppHandle(ctx)
333380
config, err := loader.LoadConfig(configBytes, handle, logger)
334381
if err != nil {
@@ -358,6 +405,7 @@ func (r *Runner) setupMetricsCollection(setupLog logr.Logger, useExperimentalDat
358405
func setupMetricsV1(setupLog logr.Logger) (datalayer.EndpointFactory, error) {
359406
mapping, err := backendmetrics.NewMetricMapping(
360407
*totalQueuedRequestsMetric,
408+
*totalRunningRequestsMetric,
361409
*kvCacheUsagePercentageMetric,
362410
*loraInfoMetric,
363411
)
@@ -402,6 +450,7 @@ func setupDatalayer() (datalayer.EndpointFactory, error) {
402450
*modelServerMetricsHttpsInsecureSkipVerify,
403451
nil)
404452
extractor, err := dlmetrics.NewExtractor(*totalQueuedRequestsMetric,
453+
*totalRunningRequestsMetric,
405454
*kvCacheUsagePercentageMetric,
406455
*loraInfoMetric)
407456

@@ -510,3 +559,21 @@ func setupPprofHandlers(mgr ctrl.Manager) error {
510559
}
511560
return nil
512561
}
562+
563+
// ===================================================================
564+
// == Latency Predictor Plugin and Helpers
565+
// ===================================================================
566+
567+
// predictorRunnable implements controller-runtime's Runnable interface to manage the predictor's lifecycle.
568+
type predictorRunnable struct {
569+
predictor *latencypredictor.Predictor
570+
}
571+
572+
func (p *predictorRunnable) Start(ctx context.Context) error {
573+
setupLog.Info("Starting latency predictor...")
574+
p.predictor.Start(ctx)
575+
<-ctx.Done()
576+
setupLog.Info("Stopping latency predictor...")
577+
p.predictor.Stop()
578+
return nil
579+
}

config/manifests/gateway/gke/gcp-backend-policy.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ metadata:
44
name: inferencepool-backend-policy
55
spec:
66
targetRef:
7-
group: "inference.networking.k8s.io"
7+
group: "inference.networking.x-k8s.io"
88
kind: InferencePool
99
name: vllm-llama3-8b-instruct
1010
default:

config/manifests/gateway/gke/healthcheck.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ metadata:
55
namespace: default
66
spec:
77
targetRef:
8-
group: "inference.networking.k8s.io"
8+
group: "inference.networking.x-k8s.io"
99
kind: InferencePool
1010
name: vllm-llama3-8b-instruct
1111
default:

config/manifests/gateway/gke/httproute.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ spec:
99
name: inference-gateway
1010
rules:
1111
- backendRefs:
12-
- group: inference.networking.k8s.io
12+
- group: inference.networking.x-k8s.io
1313
kind: InferencePool
1414
name: vllm-llama3-8b-instruct
1515
matches:

0 commit comments

Comments
 (0)