Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
9c4b4eb
add latency predictor
kaushikmitr Jun 25, 2025
7b26d9b
add cv in model and update epp deployment
kaushikmitr Jun 26, 2025
6d7f90a
bug fix
kaushikmitr Jun 27, 2025
5b20959
track mape for predictions
kaushikmitr Jun 27, 2025
dc418d7
add running queue size to metrics
kaushikmitr Jun 28, 2025
c3e1c01
add xgboost regressor and update tpot sampling logic
kaushikmitr Jul 1, 2025
f32d873
emit predicted and actual ttft tpot in body
kaushikmitr Jul 4, 2025
ddbd7db
seperate servers for training and prediction
kaushikmitr Jul 10, 2025
86d9eb6
add latency predictor
kaushikmitr Jul 10, 2025
6432af7
better inital implemenation
BenjaminBraunDev Aug 5, 2025
bcb83be
progress towards fixing up merge conflicts from latency predictor merge
BenjaminBraunDev Aug 18, 2025
bdb1d57
More refactor progress, fixing and adding tests
BenjaminBraunDev Aug 19, 2025
4f1f4ae
working state, latency prediction
BenjaminBraunDev Aug 21, 2025
2c099f6
Clean up changes, remove unneeded files, working functionality withou…
BenjaminBraunDev Aug 22, 2025
2852eb5
Rebase cleanup, remove duplicate lines
BenjaminBraunDev Aug 22, 2025
fe82a14
Integrate new alpha-beta slo scoring into scoring plugin
BenjaminBraunDev Aug 26, 2025
d91834c
Fix prefix cache scoring for slo-aware routing
BenjaminBraunDev Aug 27, 2025
47c86b0
Add pycache or latency predictor to gitignore
BenjaminBraunDev Aug 27, 2025
0cb3466
Rebase with main
BenjaminBraunDev Aug 30, 2025
6e2b6e1
Fix prefix cache scoring being piped to latencyprediction_helper
BenjaminBraunDev Sep 4, 2025
8a8521c
add dependancies in scorer
kaushikmitr Sep 5, 2025
62d1479
chage to single profile
kaushikmitr Sep 5, 2025
38ba84d
chage to single profile
kaushikmitr Sep 5, 2025
e13b53b
restore two profiles
kaushikmitr Sep 5, 2025
f65ed44
restore two profiles
kaushikmitr Sep 5, 2025
40e3b79
restore two profiles
kaushikmitr Sep 5, 2025
c712de9
update admit request to shed based on predictions
kaushikmitr Sep 8, 2025
772b6a0
add TODOs for future changes
BenjaminBraunDev Sep 9, 2025
b0e1f1d
Change artifact registry references to personal compiled images
BenjaminBraunDev Sep 9, 2025
ef504d9
Fix existing non-slo aware routing unit tests
BenjaminBraunDev Sep 11, 2025
82754ff
update latency predictor with better eval metrics
kaushikmitr Sep 11, 2025
a7fd852
Fix saturation detector unit test
BenjaminBraunDev Sep 12, 2025
efaced4
Change naming of SLO headers and prediction based routing header
BenjaminBraunDev Sep 12, 2025
ed28448
Remove port 9002 service on InferencePool causing make test to fail
BenjaminBraunDev Sep 12, 2025
6ed7590
Fix epp hermetic integration test to expect ProcessingMode Send in re…
BenjaminBraunDev Sep 13, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
bin/*
Dockerfile.cross
artifacts
latencypredictor-v1/__pycache__

# Test binary, built with `go test -c`
*.test
Expand Down
73 changes: 70 additions & 3 deletions cmd/epp/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package runner
import (
"context"
"crypto/tls"
"encoding/json"
"errors"
"flag"
"fmt"
Expand Down Expand Up @@ -50,10 +51,12 @@ import (
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer"
dlmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer/metrics"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
latencypredictor "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/latencypredictorasync"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics/collectors"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/requestcontrol"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/requestcontrol/plugins/slorequest"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/saturationdetector"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/multi/prefix"
Expand Down Expand Up @@ -89,6 +92,7 @@ var (
"then a self-signed certificate is used.")
// metric flags
totalQueuedRequestsMetric = flag.String("total-queued-requests-metric", runserver.DefaultTotalQueuedRequestsMetric, "Prometheus metric for the number of queued requests.")
totalRunningRequestsMetric = flag.String("total-running-requests-metric", runserver.DefaultTotalRunningRequestsMetric, "Prometheus metric for the number of running requests.")
kvCacheUsagePercentageMetric = flag.String("kv-cache-usage-percentage-metric", runserver.DefaultKvCacheUsagePercentageMetric, "Prometheus metric for the fraction of KV-cache blocks currently in use (from 0 to 1).")
// LoRA metrics
loraInfoMetric = flag.String("lora-info-metric", runserver.DefaultLoraInfoMetric, "Prometheus metric for the LoRA info metrics (must be in vLLM label format).")
Expand All @@ -107,6 +111,9 @@ var (
modelServerMetricsHttpsInsecureSkipVerify = flag.Bool("model-server-metrics-https-insecure-skip-verify", true, "When using 'https' scheme for 'model-server-metrics-scheme', configure 'InsecureSkipVerify' (default to true)")
haEnableLeaderElection = flag.Bool("ha-enable-leader-election", false, "Enables leader election for high availability. When enabled, readiness probes will only pass on the leader.")

// Latency Predictor Flag
enableLatencyPredictor = flag.Bool("enable-latency-predictor", false, "Enable the regression-based latency predictor and scheduler scorer.")

setupLog = ctrl.Log.WithName("setup")
)

Expand Down Expand Up @@ -233,9 +240,29 @@ func (r *Runner) Run(ctx context.Context) error {
runtime.SetBlockProfileRate(1)
}

err = r.parsePluginsConfiguration(ctx)
// ===================================================================
// == Latency Predictor Integration
// ===================================================================
var predictor latencypredictor.PredictorInterface // Use the interface type
if *enableLatencyPredictor {
setupLog.Info("Latency predictor is enabled. Initializing...")
predictor = latencypredictor.New(latencypredictor.ConfigFromEnv(), ctrl.Log.WithName("latency-predictor"))

// For the runnable, you'll need to type assert back to the concrete type
concretePredictor := predictor.(*latencypredictor.Predictor)
if err := mgr.Add(runnable.NoLeaderElection(&predictorRunnable{predictor: concretePredictor})); err != nil {
setupLog.Error(err, "Failed to register latency predictor runnable")
return err
}
} else {
setupLog.Info("Latency predictor is disabled.")
predictor = nil // This will be a true nil interface
}
// ===================================================================

err = r.parsePluginsConfiguration(ctx, predictor, datastore)
if err != nil {
setupLog.Error(err, "Failed to parse plugins configuration")
setupLog.Error(err, "Failed to parse the configuration")
return err
}

Expand Down Expand Up @@ -268,6 +295,7 @@ func (r *Runner) Run(ctx context.Context) error {
Director: director,
SaturationDetector: saturationDetector,
UseExperimentalDatalayerV2: useDatalayerV2, // pluggable data layer feature flag
LatencyPredictor: predictor,
}
if err := serverRunner.SetupWithManager(ctx, mgr); err != nil {
setupLog.Error(err, "Failed to setup EPP controllers")
Expand Down Expand Up @@ -310,7 +338,20 @@ func (r *Runner) registerInTreePlugins() {
plugins.Register(testfilter.HeaderBasedTestingFilterType, testfilter.HeaderBasedTestingFilterFactory)
}

func (r *Runner) parsePluginsConfiguration(ctx context.Context) error {
func (r *Runner) registerLatencyPredictorPlugins(predictor latencypredictor.PredictorInterface, datastore datastore.Datastore) {
// Register the SLO request tracker and scorer plugin, these plugins need access to the predictor and datastore.
// We have to specify a custom factory function to create the plugins with the correct dependencies.
plugins.Register(slorequest.SLORequestTrackerPluginType, func(name string, _ json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) {
return slorequest.New(predictor, datastore).WithName(name), nil
})
plugins.Register(scorer.SLOScorerPluginType, func(name string, _ json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) {
return scorer.NewSLOScorer(predictor, datastore, scorer.HeadroomSelectionStrategy).WithName(name), nil
})
plugins.Register(profile.SLOAwareProfileHandlerType, profile.SLOAwareProfileHandlerFactory)
plugins.Register(picker.WeightedRandomPickerType, picker.WeightedRandomPickerFactory)
}

func (r *Runner) parsePluginsConfiguration(ctx context.Context, predictor latencypredictor.PredictorInterface, datastore datastore.Datastore) error {
if *configText == "" && *configFile == "" {
return nil // configuring through code, not through file
}
Expand All @@ -329,6 +370,12 @@ func (r *Runner) parsePluginsConfiguration(ctx context.Context) error {
}

r.registerInTreePlugins()
// If we have a latency predictor enabled and predictor and datastore are not nil,
// register the latency predictor plugins (currently just the SLO scorer).
if *enableLatencyPredictor && predictor != nil && datastore != nil {
setupLog.Info("Registering latency predictor plugins")
r.registerLatencyPredictorPlugins(predictor, datastore)
}
handle := plugins.NewEppHandle(ctx)
config, err := loader.LoadConfig(configBytes, handle, logger)
if err != nil {
Expand Down Expand Up @@ -358,6 +405,7 @@ func (r *Runner) setupMetricsCollection(setupLog logr.Logger, useExperimentalDat
func setupMetricsV1(setupLog logr.Logger) (datalayer.EndpointFactory, error) {
mapping, err := backendmetrics.NewMetricMapping(
*totalQueuedRequestsMetric,
*totalRunningRequestsMetric,
*kvCacheUsagePercentageMetric,
*loraInfoMetric,
)
Expand Down Expand Up @@ -402,6 +450,7 @@ func setupDatalayer() (datalayer.EndpointFactory, error) {
*modelServerMetricsHttpsInsecureSkipVerify,
nil)
extractor, err := dlmetrics.NewExtractor(*totalQueuedRequestsMetric,
*totalRunningRequestsMetric,
*kvCacheUsagePercentageMetric,
*loraInfoMetric)

Expand Down Expand Up @@ -510,3 +559,21 @@ func setupPprofHandlers(mgr ctrl.Manager) error {
}
return nil
}

// ===================================================================
// == Latency Predictor Plugin and Helpers
// ===================================================================

// predictorRunnable implements controller-runtime's Runnable interface to manage the predictor's lifecycle.
type predictorRunnable struct {
predictor *latencypredictor.Predictor
}

func (p *predictorRunnable) Start(ctx context.Context) error {
setupLog.Info("Starting latency predictor...")
p.predictor.Start(ctx)
<-ctx.Done()
setupLog.Info("Stopping latency predictor...")
p.predictor.Stop()
return nil
}
2 changes: 1 addition & 1 deletion config/manifests/gateway/gke/gcp-backend-policy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ metadata:
name: inferencepool-backend-policy
spec:
targetRef:
group: "inference.networking.k8s.io"
group: "inference.networking.x-k8s.io"
kind: InferencePool
name: vllm-llama3-8b-instruct
default:
Expand Down
2 changes: 1 addition & 1 deletion config/manifests/gateway/gke/healthcheck.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ metadata:
namespace: default
spec:
targetRef:
group: "inference.networking.k8s.io"
group: "inference.networking.x-k8s.io"
kind: InferencePool
name: vllm-llama3-8b-instruct
default:
Expand Down
2 changes: 1 addition & 1 deletion config/manifests/gateway/gke/httproute.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ spec:
name: inference-gateway
rules:
- backendRefs:
- group: inference.networking.k8s.io
- group: inference.networking.x-k8s.io
kind: InferencePool
name: vllm-llama3-8b-instruct
matches:
Expand Down
Loading