diff --git a/config/manifests/inferencepool-resources-lp.yaml b/config/manifests/inferencepool-resources-lp.yaml index 31db1813e..dbaa59d4b 100644 --- a/config/manifests/inferencepool-resources-lp.yaml +++ b/config/manifests/inferencepool-resources-lp.yaml @@ -17,6 +17,7 @@ data: LATENCY_TPOT_SCALER_PATH: "/models/tpot_scaler.joblib" LATENCY_MODEL_TYPE: "xgboost" LATENCY_MAX_TRAINING_DATA_SIZE_PER_BUCKET: "5000" + LATENCY_QUANTILE_ALPHA: "0.9" --- apiVersion: v1 kind: ConfigMap @@ -74,6 +75,34 @@ spec: protocol: TCP port: 8003 targetPort: 8003 + - name: latency-predictor-4 + protocol: TCP + port: 8004 + targetPort: 8004 + - name: latency-predictor-5 + protocol: TCP + port: 8005 + targetPort: 8005 + - name: latency-predictor-6 + protocol: TCP + port: 8006 + targetPort: 8006 + - name: latency-predictor-7 + protocol: TCP + port: 8007 + targetPort: 8007 + - name: latency-predictor-8 + protocol: TCP + port: 8008 + targetPort: 8008 + - name: latency-predictor-9 + protocol: TCP + port: 8009 + targetPort: 8009 + - name: latency-predictor-10 + protocol: TCP + port: 8010 + targetPort: 8010 - name: prometheus protocol: TCP port: 9090 @@ -106,11 +135,10 @@ spec: spec: serviceAccountName: vllm-llama3-8b-instruct-epp # Conservatively, this timeout should mirror the longest grace period of the pods within the pool - terminationGracePeriodSeconds: 130 containers: # EPP Container - name: epp - image: us-central1-docker.pkg.dev/benjaminbraun-gke-dev/slo-routing/slo-routing-epp-exp + image: us-docker.pkg.dev/kaushikmitra-gke-dev/kaushikmitra-docker-repo/epp-wlp-latencypredictor-v2 imagePullPolicy: Always args: - -pool-name @@ -132,15 +160,15 @@ spec: - "-enable-latency-predictor" env: - name: PREDICTION_SERVER_URL - value: "http://localhost:8001,http://localhost:8002,http://localhost:8003" # Multiple prediction servers + value: "http://localhost:8001,http://localhost:8002,http://localhost:8003,http://localhost:8004,http://localhost:8005,http://localhost:8006,http://localhost:8007,http://localhost:8008,http://localhost:8009,http://localhost:8010" # All 10 prediction servers - name: TRAINING_SERVER_URL value: "http://localhost:8000" # Single training server for sending training data - name: LATENCY_MAX_SAMPLE_SIZE value: "10000" # Maximum sample size for latency prediction - - name: NEG_HEADROOM_TPOT_WEIGHT - value: "0.2" # Weight for TPOT in negative headroom calculation - - name: NEG_HEADROOM_TTFT_WEIGHT - value: "0.8" # Weight for TTFT in negative headroom calculation + + + + ports: - containerPort: 9002 - containerPort: 9003 @@ -163,7 +191,7 @@ spec: mountPath: "/config" # Training Server Sidecar Container - name: training-server - image: us-central1-docker.pkg.dev/benjaminbraun-gke-dev/slo-routing/latency_training:latest + image: us-docker.pkg.dev/kaushikmitra-gke-dev/kaushikmitra-docker-repo/latencypredictor-v3-training-server:latest imagePullPolicy: Always ports: - containerPort: 8000 @@ -202,7 +230,7 @@ spec: mountPath: /models # Prediction Server Sidecar Container 1 - name: prediction-server-1 - image: us-central1-docker.pkg.dev/benjaminbraun-gke-dev/slo-routing/latency_prediction:latest + image: us-docker.pkg.dev/kaushikmitra-gke-dev/kaushikmitra-docker-repo/latencypredictor-v3-prediction-server:latest imagePullPolicy: Always command: ["uvicorn"] args: ["prediction_server:app", "--host", "0.0.0.0", "--port", "8001"] @@ -248,7 +276,7 @@ spec: mountPath: /server_models # Prediction Server Sidecar Container 2 - name: prediction-server-2 - image: us-central1-docker.pkg.dev/benjaminbraun-gke-dev/slo-routing/latency_prediction:latest + image: us-docker.pkg.dev/kaushikmitra-gke-dev/kaushikmitra-docker-repo/latencypredictor-v3-prediction-server:latest imagePullPolicy: Always command: ["uvicorn"] args: ["prediction_server:app", "--host", "0.0.0.0", "--port", "8002"] @@ -294,7 +322,7 @@ spec: mountPath: /server_models # Prediction Server Sidecar Container 3 - name: prediction-server-3 - image: us-central1-docker.pkg.dev/benjaminbraun-gke-dev/slo-routing/latency_prediction:latest + image: us-docker.pkg.dev/kaushikmitra-gke-dev/kaushikmitra-docker-repo/latencypredictor-v3-prediction-server:latest imagePullPolicy: Always command: ["uvicorn"] args: ["prediction_server:app", "--host", "0.0.0.0", "--port", "8003"] @@ -338,6 +366,328 @@ spec: volumeMounts: - name: prediction-server-3-storage mountPath: /server_models + # Prediction Server Sidecar Container 4 + - name: prediction-server-4 + image: us-docker.pkg.dev/kaushikmitra-gke-dev/kaushikmitra-docker-repo/latencypredictor-v3-prediction-server:latest + imagePullPolicy: Always + command: ["uvicorn"] + args: ["prediction_server:app", "--host", "0.0.0.0", "--port", "8004"] + ports: + - containerPort: 8004 + name: predict-port-4 + livenessProbe: + httpGet: + path: /healthz + port: 8004 + initialDelaySeconds: 15 + periodSeconds: 15 + readinessProbe: + httpGet: + path: /readyz + port: 8004 + initialDelaySeconds: 10 + periodSeconds: 5 + failureThreshold: 10 + resources: + requests: + cpu: "500m" + memory: "1Gi" + limits: + cpu: "1000m" + memory: "2Gi" + envFrom: + - configMapRef: + name: prediction-server-config + env: + - name: PREDICT_PORT + value: "8004" + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: SERVER_TYPE + value: "prediction-4" + - name: TRAINING_SERVER_URL + value: "http://localhost:8000" + volumeMounts: + - name: prediction-server-4-storage + mountPath: /server_models + # Prediction Server Sidecar Container 5 + - name: prediction-server-5 + image: us-docker.pkg.dev/kaushikmitra-gke-dev/kaushikmitra-docker-repo/latencypredictor-v3-prediction-server:latest + imagePullPolicy: Always + command: ["uvicorn"] + args: ["prediction_server:app", "--host", "0.0.0.0", "--port", "8005"] + ports: + - containerPort: 8005 + name: predict-port-5 + livenessProbe: + httpGet: + path: /healthz + port: 8005 + initialDelaySeconds: 15 + periodSeconds: 15 + readinessProbe: + httpGet: + path: /readyz + port: 8005 + initialDelaySeconds: 10 + periodSeconds: 5 + failureThreshold: 10 + resources: + requests: + cpu: "500m" + memory: "1Gi" + limits: + cpu: "1000m" + memory: "2Gi" + envFrom: + - configMapRef: + name: prediction-server-config + env: + - name: PREDICT_PORT + value: "8005" + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: SERVER_TYPE + value: "prediction-5" + - name: TRAINING_SERVER_URL + value: "http://localhost:8000" + volumeMounts: + - name: prediction-server-5-storage + mountPath: /server_models + # Prediction Server Sidecar Container 6 + - name: prediction-server-6 + image: us-docker.pkg.dev/kaushikmitra-gke-dev/kaushikmitra-docker-repo/latencypredictor-v3-prediction-server:latest + imagePullPolicy: Always + command: ["uvicorn"] + args: ["prediction_server:app", "--host", "0.0.0.0", "--port", "8006"] + ports: + - containerPort: 8006 + name: predict-port-6 + livenessProbe: + httpGet: + path: /healthz + port: 8006 + initialDelaySeconds: 15 + periodSeconds: 15 + readinessProbe: + httpGet: + path: /readyz + port: 8006 + initialDelaySeconds: 10 + periodSeconds: 5 + failureThreshold: 10 + resources: + requests: + cpu: "500m" + memory: "1Gi" + limits: + cpu: "1000m" + memory: "2Gi" + envFrom: + - configMapRef: + name: prediction-server-config + env: + - name: PREDICT_PORT + value: "8006" + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: SERVER_TYPE + value: "prediction-6" + - name: TRAINING_SERVER_URL + value: "http://localhost:8000" + volumeMounts: + - name: prediction-server-6-storage + mountPath: /server_models + # Prediction Server Sidecar Container 7 + - name: prediction-server-7 + image: us-docker.pkg.dev/kaushikmitra-gke-dev/kaushikmitra-docker-repo/latencypredictor-v3-prediction-server:latest + imagePullPolicy: Always + command: ["uvicorn"] + args: ["prediction_server:app", "--host", "0.0.0.0", "--port", "8007"] + ports: + - containerPort: 8007 + name: predict-port-7 + livenessProbe: + httpGet: + path: /healthz + port: 8007 + initialDelaySeconds: 15 + periodSeconds: 15 + readinessProbe: + httpGet: + path: /readyz + port: 8007 + initialDelaySeconds: 10 + periodSeconds: 5 + failureThreshold: 10 + resources: + requests: + cpu: "500m" + memory: "1Gi" + limits: + cpu: "1000m" + memory: "2Gi" + envFrom: + - configMapRef: + name: prediction-server-config + env: + - name: PREDICT_PORT + value: "8007" + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: SERVER_TYPE + value: "prediction-7" + - name: TRAINING_SERVER_URL + value: "http://localhost:8000" + volumeMounts: + - name: prediction-server-7-storage + mountPath: /server_models + # Prediction Server Sidecar Container 8 + - name: prediction-server-8 + image: us-docker.pkg.dev/kaushikmitra-gke-dev/kaushikmitra-docker-repo/latencypredictor-v3-prediction-server:latest + imagePullPolicy: Always + command: ["uvicorn"] + args: ["prediction_server:app", "--host", "0.0.0.0", "--port", "8008"] + ports: + - containerPort: 8008 + name: predict-port-8 + livenessProbe: + httpGet: + path: /healthz + port: 8008 + initialDelaySeconds: 15 + periodSeconds: 15 + readinessProbe: + httpGet: + path: /readyz + port: 8008 + initialDelaySeconds: 10 + periodSeconds: 5 + failureThreshold: 10 + resources: + requests: + cpu: "500m" + memory: "1Gi" + limits: + cpu: "1000m" + memory: "2Gi" + envFrom: + - configMapRef: + name: prediction-server-config + env: + - name: PREDICT_PORT + value: "8008" + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: SERVER_TYPE + value: "prediction-8" + - name: TRAINING_SERVER_URL + value: "http://localhost:8000" + volumeMounts: + - name: prediction-server-8-storage + mountPath: /server_models + # Prediction Server Sidecar Container 9 + - name: prediction-server-9 + image: us-docker.pkg.dev/kaushikmitra-gke-dev/kaushikmitra-docker-repo/latencypredictor-v3-prediction-server:latest + imagePullPolicy: Always + command: ["uvicorn"] + args: ["prediction_server:app", "--host", "0.0.0.0", "--port", "8009"] + ports: + - containerPort: 8009 + name: predict-port-9 + livenessProbe: + httpGet: + path: /healthz + port: 8009 + initialDelaySeconds: 15 + periodSeconds: 15 + readinessProbe: + httpGet: + path: /readyz + port: 8009 + initialDelaySeconds: 10 + periodSeconds: 5 + failureThreshold: 10 + resources: + requests: + cpu: "500m" + memory: "1Gi" + limits: + cpu: "1000m" + memory: "2Gi" + envFrom: + - configMapRef: + name: prediction-server-config + env: + - name: PREDICT_PORT + value: "8009" + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: SERVER_TYPE + value: "prediction-9" + - name: TRAINING_SERVER_URL + value: "http://localhost:8000" + volumeMounts: + - name: prediction-server-9-storage + mountPath: /server_models + # Prediction Server Sidecar Container 10 + - name: prediction-server-10 + image: us-docker.pkg.dev/kaushikmitra-gke-dev/kaushikmitra-docker-repo/latencypredictor-v3-prediction-server:latest + imagePullPolicy: Always + command: ["uvicorn"] + args: ["prediction_server:app", "--host", "0.0.0.0", "--port", "8010"] + ports: + - containerPort: 8010 + name: predict-port-10 + livenessProbe: + httpGet: + path: /healthz + port: 8010 + initialDelaySeconds: 15 + periodSeconds: 15 + readinessProbe: + httpGet: + path: /readyz + port: 8010 + initialDelaySeconds: 10 + periodSeconds: 5 + failureThreshold: 10 + resources: + requests: + cpu: "500m" + memory: "1Gi" + limits: + cpu: "1000m" + memory: "2Gi" + envFrom: + - configMapRef: + name: prediction-server-config + env: + - name: PREDICT_PORT + value: "8010" + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: SERVER_TYPE + value: "prediction-10" + - name: TRAINING_SERVER_URL + value: "http://localhost:8000" + volumeMounts: + - name: prediction-server-10-storage + mountPath: /server_models volumes: - name: training-server-storage emptyDir: @@ -351,6 +701,27 @@ spec: - name: prediction-server-3-storage emptyDir: sizeLimit: "10Gi" # Dedicated volume for prediction server 3 + - name: prediction-server-4-storage + emptyDir: + sizeLimit: "10Gi" # Dedicated volume for prediction server 4 + - name: prediction-server-5-storage + emptyDir: + sizeLimit: "10Gi" # Dedicated volume for prediction server 5 + - name: prediction-server-6-storage + emptyDir: + sizeLimit: "10Gi" # Dedicated volume for prediction server 6 + - name: prediction-server-7-storage + emptyDir: + sizeLimit: "10Gi" # Dedicated volume for prediction server 7 + - name: prediction-server-8-storage + emptyDir: + sizeLimit: "10Gi" # Dedicated volume for prediction server 8 + - name: prediction-server-9-storage + emptyDir: + sizeLimit: "10Gi" # Dedicated volume for prediction server 9 + - name: prediction-server-10-storage + emptyDir: + sizeLimit: "10Gi" # Dedicated volume for prediction server 10 - name: plugins-config-volume configMap: name: plugins-config @@ -370,6 +741,7 @@ data: - type: slo-aware-routing - type: slo-aware-profile-handler - type: max-score-picker + - type: prefix-cache-scorer schedulingProfiles: - name: default plugins: @@ -377,11 +749,15 @@ data: weight: 0 - pluginRef: queue-scorer - pluginRef: kv-cache-utilization-scorer + - pluginRef: prefix-cache-scorer - pluginRef: max-score-picker - name: slo plugins: + - pluginRef: prefix-cache-scorer + weight: 0 - pluginRef: slo-aware-routing - pluginRef: max-score-picker + --- # --- RBAC --- kind: Role diff --git a/pkg/epp/scheduling/framework/plugins/multi/slo_aware_router/config.go b/pkg/epp/scheduling/framework/plugins/multi/slo_aware_router/config.go index cf9d8ee33..5b408acc7 100644 --- a/pkg/epp/scheduling/framework/plugins/multi/slo_aware_router/config.go +++ b/pkg/epp/scheduling/framework/plugins/multi/slo_aware_router/config.go @@ -19,6 +19,23 @@ import ( "strings" ) +var DefaultSamplingMean = func() float64 { + if value, exists := os.LookupEnv("SAMPLING_MEAN"); exists { + if parsedValue, err := strconv.ParseFloat(value, 64); err == nil && parsedValue > 0 { + return parsedValue + } + } + return 100.0 // default value +}() + +var MaxSampledTokens = func() int { + if value, exists := os.LookupEnv("MAX_SAMPLED_TOKENS"); exists { + if parsedValue, err := strconv.Atoi(value); err == nil && parsedValue > 0 { + return parsedValue + } + } + return 20 // default value +}() var SLOBufferFactor = func() float64 { if value, exists := os.LookupEnv("SLO_BUFFER_FACTOR"); exists { if parsedValue, err := strconv.ParseFloat(value, 64); err == nil { diff --git a/pkg/epp/scheduling/framework/plugins/multi/slo_aware_router/helpers.go b/pkg/epp/scheduling/framework/plugins/multi/slo_aware_router/helpers.go index 3f02d5e52..1d5568243 100644 --- a/pkg/epp/scheduling/framework/plugins/multi/slo_aware_router/helpers.go +++ b/pkg/epp/scheduling/framework/plugins/multi/slo_aware_router/helpers.go @@ -133,7 +133,7 @@ func (s *SLOAwareRouter) buildCompositeChoices( *total += w choices = append(choices, Choice{PodName: p.Pod, Weight: w}) - log.FromContext(ctx).V(logutil.DEBUG).Info("Composite (neg/pos) score", + log.FromContext(ctx).V(logutil.TRACE).Info("Composite (neg/pos) score", "pod", p.Pod.GetPod().String(), "kvUsage", kvUsage, "kvFree", kvFree, "queue", q, "relQueue", relQueue, diff --git a/pkg/epp/scheduling/framework/plugins/multi/slo_aware_router/latencypredictor_helper.go b/pkg/epp/scheduling/framework/plugins/multi/slo_aware_router/latencypredictor_helper.go index 951815b15..6e91a6832 100644 --- a/pkg/epp/scheduling/framework/plugins/multi/slo_aware_router/latencypredictor_helper.go +++ b/pkg/epp/scheduling/framework/plugins/multi/slo_aware_router/latencypredictor_helper.go @@ -28,12 +28,6 @@ import ( requtil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/request" ) -const ( - // Poisson sampling parameters for predictions - defaultSamplingMean = 100 // Mean interval between prediction samples (tokens) - maxSampledTokens = 20 // Maximum number of prediction samples per request -) - // RefreshLastSeenMetrics updates sloCtx.LastSeenMetrics from the latest scheduling result. func RefreshLastSeenMetrics(ctx context.Context, sloCtx *SLORequestContext) { if sr := sloCtx.SchedulingResult; sr != nil { @@ -136,7 +130,7 @@ func ProcessFirstTokenForLatencyPrediction( // Initialize sampler if sloCtx.TokenSampler == nil { requestID := sloCtx.SchedulingRequest.Headers[requtil.RequestIdHeaderKey] - sloCtx.TokenSampler = NewTokenSampler(requestID, defaultSamplingMean, maxSampledTokens) + sloCtx.TokenSampler = NewTokenSampler(requestID, DefaultSamplingMean, MaxSampledTokens) logger.V(logutil.DEBUG).Info("Initialized token sampler for first token", "request_id", requestID, "next_prediction_token", sloCtx.TokenSampler.GetNextSampleToken()) } @@ -214,7 +208,7 @@ func ProcessTokenForLatencyPrediction( // Initialize sampler if not yet if sloCtx.TokenSampler == nil { requestID := sloCtx.SchedulingRequest.Headers[requtil.RequestIdHeaderKey] - sloCtx.TokenSampler = NewTokenSampler(requestID, defaultSamplingMean, maxSampledTokens) + sloCtx.TokenSampler = NewTokenSampler(requestID, DefaultSamplingMean, MaxSampledTokens) logger.V(logutil.DEBUG).Info("Initialized token sampler for subsequent tokens", "request_id", requestID, "next_prediction_token", sloCtx.TokenSampler.GetNextSampleToken()) } diff --git a/pkg/epp/scheduling/framework/plugins/multi/slo_aware_router/prediction.go b/pkg/epp/scheduling/framework/plugins/multi/slo_aware_router/prediction.go index 2645d112a..4c92dbbd1 100644 --- a/pkg/epp/scheduling/framework/plugins/multi/slo_aware_router/prediction.go +++ b/pkg/epp/scheduling/framework/plugins/multi/slo_aware_router/prediction.go @@ -22,8 +22,21 @@ import ( logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" ) +type PodPredictionResult struct { + Pod schedulingtypes.Pod + TTFT float64 + TPOT float64 + TTFTValid bool + TPOTValid bool + IsValid bool + Error error + Headroom float64 // Headroom for the pod, if applicable + TTFTHeadroom float64 // TTFT headroom for the pod + PrefixCacheScore float64 // Prefix cache score for the pod +} + // generatePredictions creates prediction results for all candidate pods -func (s *SLOAwareRouter) generatePredictions(ctx context.Context, state *schedulingtypes.CycleState, request *schedulingtypes.LLMRequest, sloCtx *SLORequestContext, candidatePods []schedulingtypes.Pod) []PodPredictionResult { +func (s *SLOAwareRouter) generatePredictions(ctx context.Context, state *schedulingtypes.CycleState, request *schedulingtypes.LLMRequest, sloCtx *SLORequestContext, candidatePods []schedulingtypes.Pod) ([]PodPredictionResult, error) { logger := log.FromContext(ctx) predictions := make([]PodPredictionResult, 0, len(candidatePods)) @@ -42,10 +55,9 @@ func (s *SLOAwareRouter) generatePredictions(ctx context.Context, state *schedul // Generate prediction prediction, err := PredictWithMetrics(ctx, s.latencypredictor, pod.GetMetrics(), request.Body.Completions.Prompt, 1, prefixCacheScore) if err != nil { - logger.V(logutil.DEBUG).Info("Skipping pod due to prediction error", "pod", pod.GetPod().String(), "error", err) + logger.V(logutil.DEBUG).Error(err, "Skipping pod due to prediction error", "pod", pod.GetPod().String()) predResult.Error = err - predictions = append(predictions, predResult) - continue + return nil, err } predResult.PrefixCacheScore = prefixCacheScore predResult.TTFT = prediction.TTFT @@ -76,7 +88,7 @@ func (s *SLOAwareRouter) generatePredictions(ctx context.Context, state *schedul predictions = append(predictions, predResult) } - return predictions + return predictions, nil } // updateRequestContextWithPredictions updates the request context with prediction data diff --git a/pkg/epp/scheduling/framework/plugins/multi/slo_aware_router/scorer.go b/pkg/epp/scheduling/framework/plugins/multi/slo_aware_router/scorer.go index 9ecb396af..7a7cac19e 100644 --- a/pkg/epp/scheduling/framework/plugins/multi/slo_aware_router/scorer.go +++ b/pkg/epp/scheduling/framework/plugins/multi/slo_aware_router/scorer.go @@ -35,19 +35,6 @@ import ( logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" ) -type PodPredictionResult struct { - Pod schedulingtypes.Pod - TTFT float64 - TPOT float64 - TTFTValid bool - TPOTValid bool - IsValid bool - Error error - Headroom float64 // Headroom for the pod, if applicable - TTFTHeadroom float64 // TTFT headroom for the pod - PrefixCacheScore float64 // Prefix cache score for the pod -} - type SLOAwareRouter struct { tn plugins.TypedName latencypredictor latencypredictor.PredictorInterface @@ -126,6 +113,48 @@ func (s *SLOAwareRouter) epsilonGreedyAffinityGate( return eligible, true } +// scoreWithoutPredictions provides fallback scoring based only on prefix cache scores +// when latency predictions are unavailable +func (s *SLOAwareRouter) scoreWithoutPredictions( + ctx context.Context, + state *schedulingtypes.CycleState, + pods []schedulingtypes.Pod, + r *rand.Rand, +) map[schedulingtypes.Pod]float64 { + logger := log.FromContext(ctx) + logger.V(logutil.TRACE).Info("Using composite-only scoring without predictions") + + scores := make(map[schedulingtypes.Pod]float64, len(pods)) + for _, pod := range pods { + scores[pod] = 0 + } + + if len(pods) == 0 { + return scores + } + + // Build prediction results with only prefix cache scores + podResults := make([]PodPredictionResult, 0, len(pods)) + for _, pod := range pods { + prefixScore := s.getPrefixCacheScoreForPod(ctx, state, pod) + podResults = append(podResults, PodPredictionResult{ + Pod: pod, + PrefixCacheScore: prefixScore, + IsValid: true, // All pods are valid when we don't check predictions + }) + } + + // Select based on composite scores (prefix cache + other non-prediction metrics) + selectedPod := s.selectFromCompositeScores(ctx, podResults, r, HeadroomStrategyCompositeOnly) + + if selectedPod != nil { + scores[selectedPod] = 1 + logger.V(logutil.TRACE).Info("Selected pod using composite-only scoring", "pod", selectedPod.GetPod().String()) + } + + return scores +} + func (s *SLOAwareRouter) Score(ctx context.Context, state *schedulingtypes.CycleState, request *schedulingtypes.LLMRequest, pods []schedulingtypes.Pod) map[schedulingtypes.Pod]float64 { logger := log.FromContext(ctx) if s.latencypredictor == nil { @@ -158,11 +187,6 @@ func (s *SLOAwareRouter) Score(ctx context.Context, state *schedulingtypes.Cycle return nil } - predictions := s.generatePredictions(ctx, state, request, sloCtx, pods) - s.updateRequestContextWithPredictions(sloCtx, predictions) - - allPreds := append([]PodPredictionResult(nil), predictions...) - // Initialize scores map with all pods having score 0 scores := make(map[schedulingtypes.Pod]float64, len(pods)) for _, pod := range pods { @@ -171,6 +195,18 @@ func (s *SLOAwareRouter) Score(ctx context.Context, state *schedulingtypes.Cycle source := rand.NewSource(time.Now().UnixNano()) r := rand.New(source) + + predictions, err := s.generatePredictions(ctx, state, request, sloCtx, pods) + if err != nil { + logger.V(logutil.DEBUG).Error(err, "SLOAwareRouter: Error generating predictions, falling back to composite-only scoring") + // Fall back to composite-only scoring using prefix cache scores + s.setSLOContextForRequest(request, sloCtx) + return s.scoreWithoutPredictions(ctx, state, pods, r) + } + s.updateRequestContextWithPredictions(sloCtx, predictions) + + allPreds := append([]PodPredictionResult(nil), predictions...) + allPreds, sticky := s.epsilonGreedyAffinityGate(ctx, allPreds, r, "overall", AffinityGateTauGlobal) // Check if all pods are invalid and all have running requests diff --git a/pkg/epp/scheduling/framework/plugins/multi/slo_aware_router/scorer_test.go b/pkg/epp/scheduling/framework/plugins/multi/slo_aware_router/scorer_test.go new file mode 100644 index 000000000..e71b1a349 --- /dev/null +++ b/pkg/epp/scheduling/framework/plugins/multi/slo_aware_router/scorer_test.go @@ -0,0 +1,535 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package slo_aware_router + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "k8s.io/apimachinery/pkg/types" + + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend" + backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" + latencypredictor "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/latencypredictorasync" + schedulingtypes "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types" +) + +// mockPredictor implements PredictorInterface for testing +type mockPredictor struct { + predictions map[string]*latencypredictor.PredictionResponse + err error +} + +func (m *mockPredictor) Predict(ctx context.Context, request latencypredictor.PredictionRequest) (*latencypredictor.PredictionResponse, error) { + if m.err != nil { + return nil, m.err + } + // Generate a key based on KV cache percentage to return different predictions for different pods + key := fmt.Sprintf("%.1f", request.KVCachePercentage) + if pred, ok := m.predictions[key]; ok { + return pred, nil + } + // Default prediction + return &latencypredictor.PredictionResponse{TTFT: 0.5, TPOT: 0.03}, nil +} + +func (m *mockPredictor) PredictBulk(ctx context.Context, requests []latencypredictor.PredictionRequest) (*latencypredictor.BulkPredictionResponse, error) { + if m.err != nil { + return nil, m.err + } + // Generate a key based on KV cache percentage to return different predictions for different pods + responses := make([]latencypredictor.PredictionResponse, 0, len(requests)) + for _, request := range requests { + key := fmt.Sprintf("%.1f", request.KVCachePercentage) + if pred, ok := m.predictions[key]; ok { + responses = append(responses, *pred) + } else { + return nil, fmt.Errorf("no prediction for key %s", key) + } + } + return &latencypredictor.BulkPredictionResponse{Predictions: responses}, nil +} + +func (m *mockPredictor) PredictBulkStrict(ctx context.Context, requests []latencypredictor.PredictionRequest) (*latencypredictor.BulkPredictionResponse, error) { + if m.err != nil { + return nil, m.err + } + // Generate a key based on KV cache percentage to return different predictions for different pods + responses := make([]latencypredictor.PredictionResponse, 0, len(requests)) + for _, request := range requests { + key := fmt.Sprintf("%.1f", request.KVCachePercentage) + if pred, ok := m.predictions[key]; ok { + responses = append(responses, *pred) + } else { + return nil, fmt.Errorf("no prediction for key %s", key) + } + } + return &latencypredictor.BulkPredictionResponse{Predictions: responses}, nil +} + +func (m *mockPredictor) AddTrainingDataBulk(data []latencypredictor.TrainingEntry) error { + return nil +} + +func (m *mockPredictor) AddTrainingData(data latencypredictor.TrainingEntry) error { + return nil +} + +func (m *mockPredictor) HealthCheck() error { + return nil +} + +func (m *mockPredictor) GetServerStatus(ctx context.Context) (*latencypredictor.ServerStatusResponse, error) { + return &latencypredictor.ServerStatusResponse{}, nil +} + +func createTestPod(name string, kvCacheUsage float64, runningQueueSize, waitingQueueSize int) schedulingtypes.Pod { + return &schedulingtypes.PodMetrics{ + Pod: &backend.Pod{ + NamespacedName: types.NamespacedName{ + Name: name, + Namespace: "default", + }, + }, + MetricsState: &backendmetrics.MetricsState{ + KVCacheUsagePercent: kvCacheUsage, + RunningQueueSize: runningQueueSize, + WaitingQueueSize: waitingQueueSize, + }, + } +} + +func createTestLLMRequest(ttftSLO, tpotSLO float64, predictionBased bool) *schedulingtypes.LLMRequest { + headers := make(map[string]string) + if ttftSLO > 0 { + headers["x-ttft-slo"] = fmt.Sprintf("%f", ttftSLO) + } + if tpotSLO > 0 { + headers["x-avg-tpot-slo"] = fmt.Sprintf("%f", tpotSLO) + } + headers["x-prediction-based-scheduling"] = fmt.Sprintf("%t", predictionBased) + + return &schedulingtypes.LLMRequest{ + Headers: headers, + Body: &schedulingtypes.LLMRequestBody{ + Completions: &schedulingtypes.CompletionsRequest{ + Prompt: "test prompt", + }, + }, + } +} + +func TestSLOAwareRouter_Score(t *testing.T) { + tests := []struct { + name string + predictor *mockPredictor + strategy HeadroomStrategy + request *schedulingtypes.LLMRequest + pods []schedulingtypes.Pod + expectedScores map[string]float64 // Map of pod name to expected score + expectNil bool + }{ + { + name: "Prediction-based scheduling disabled", + predictor: &mockPredictor{}, + strategy: HeadroomStrategyLeast, + request: createTestLLMRequest(1.0, 0.05, false), // predictionBased = false + pods: []schedulingtypes.Pod{ + createTestPod("pod1", 0.5, 2, 1), // 50% KV cache, 2 running, 1 waiting + createTestPod("pod2", 0.7, 3, 2), // 70% KV cache, 3 running, 2 waiting + }, + expectNil: true, + }, + { + name: "No predictor configured", + predictor: nil, + strategy: HeadroomStrategyLeast, + request: createTestLLMRequest(1.0, 0.05, true), + pods: []schedulingtypes.Pod{ + createTestPod("pod1", 0.5, 2, 1), + }, + expectNil: true, + }, + { + name: "All pods have positive headroom", + predictor: &mockPredictor{ + predictions: map[string]*latencypredictor.PredictionResponse{ + "0.5": {TTFT: 0.5, TPOT: 0.03}, // 50% KV cache + "0.6": {TTFT: 0.6, TPOT: 0.04}, // 60% KV cache + "0.3": {TTFT: 0.4, TPOT: 0.02}, // 30% KV cache + }, + }, + strategy: HeadroomStrategyLeast, + request: createTestLLMRequest(1.0, 0.05, true), + pods: []schedulingtypes.Pod{ + createTestPod("pod1", 0.5, 2, 1), // 50% KV cache + createTestPod("pod2", 0.6, 3, 2), // 60% KV cache + createTestPod("pod3", 0.3, 1, 0), // 30% KV cache + }, + // One pod should be selected with score 1, others 0 + expectedScores: map[string]float64{ + // We can't predict which one due to randomness, but exactly one should be 1 + }, + }, + { + name: "All pods have negative headroom", + predictor: &mockPredictor{ + predictions: map[string]*latencypredictor.PredictionResponse{ + "0.8": {TTFT: 1.5, TPOT: 0.08}, // 80% KV cache - high load + "0.9": {TTFT: 1.8, TPOT: 0.09}, // 90% KV cache - very high load + }, + }, + strategy: HeadroomStrategyLeast, + request: createTestLLMRequest(1.0, 0.05, true), + pods: []schedulingtypes.Pod{ + createTestPod("pod1", 0.8, 5, 3), // 80% KV cache, high load + createTestPod("pod2", 0.9, 6, 4), // 90% KV cache, very high load + }, + // One pod should still be selected even with negative headroom + expectedScores: map[string]float64{}, + }, + { + name: "Mixed positive and negative headroom", + predictor: &mockPredictor{ + predictions: map[string]*latencypredictor.PredictionResponse{ + "0.3": {TTFT: 0.5, TPOT: 0.03}, // 30% KV cache - Positive headroom + "0.9": {TTFT: 1.5, TPOT: 0.08}, // 90% KV cache - Negative headroom + }, + }, + strategy: HeadroomStrategyLeast, + request: createTestLLMRequest(1.0, 0.05, true), + pods: []schedulingtypes.Pod{ + createTestPod("pod-positive", 0.3, 1, 0), // Low KV cache, positive headroom + createTestPod("pod-negative", 0.9, 6, 4), // High KV cache, negative headroom + }, + // With 99% probability, positive headroom pod should be selected + expectedScores: map[string]float64{}, + }, + { + name: "Prediction errors - fallback to composite scoring", + predictor: &mockPredictor{ + err: fmt.Errorf("prediction failed"), + }, + strategy: HeadroomStrategyLeast, + request: createTestLLMRequest(1.0, 0.05, true), + pods: []schedulingtypes.Pod{ + createTestPod("pod1", 0.5, 2, 1), + createTestPod("pod2", 0.6, 3, 2), + }, + // Should fall back to composite-only scoring and select one pod + expectedScores: map[string]float64{ + // One pod should be selected with score 1, verified in general validation below + }, + }, + { + name: "Empty pod list", + predictor: &mockPredictor{}, + strategy: HeadroomStrategyLeast, + request: createTestLLMRequest(1.0, 0.05, true), + pods: []schedulingtypes.Pod{}, + // Should return empty scores map + expectedScores: map[string]float64{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var router *SLOAwareRouter + if tt.predictor == nil { + router = NewSLOAwareRouter(nil, tt.strategy) + } else { + router = NewSLOAwareRouter(tt.predictor, tt.strategy) + } + + scores := router.Score(context.Background(), schedulingtypes.NewCycleState(), tt.request, tt.pods) + + if tt.expectNil { + assert.Nil(t, scores, "Expected nil scores") + return + } + + assert.NotNil(t, scores, "Expected non-nil scores") + + // If we have specific expected scores, verify them + if len(tt.expectedScores) > 0 { + for _, pod := range tt.pods { + podName := pod.GetPod().NamespacedName.Name + if expectedScore, ok := tt.expectedScores[podName]; ok { + assert.InDelta(t, expectedScore, scores[pod], 0.0001, "Pod %s should have score %f", podName, expectedScore) + } + } + } + + // General validation: exactly one pod should have score 1 (selected), others should have score 0 + // This applies even when predictions fail because we fall back to composite scoring + if !tt.expectNil && len(tt.pods) > 0 && tt.predictor != nil { + selectedCount := 0 + for _, score := range scores { + if score == 1.0 { + selectedCount++ + } else { + assert.InDelta(t, 0.0, score, 0.0001, "Non-selected pods should have score 0") + } + } + assert.Equal(t, 1, selectedCount, "Exactly one pod should be selected with score 1") + } + }) + } +} + +func TestSLOAwareRouter_Strategies(t *testing.T) { + tests := []struct { + name string + strategy HeadroomStrategy + }{ + { + name: "HeadroomStrategyLeast", + strategy: HeadroomStrategyLeast, + }, + { + name: "HeadroomStrategyMost", + strategy: HeadroomStrategyMost, + }, + { + name: "HeadroomStrategyCompositeMost", + strategy: HeadroomStrategyCompositeMost, + }, + { + name: "HeadroomStrategyCompositeLeast", + strategy: HeadroomStrategyCompositeLeast, + }, + { + name: "HeadroomStrategyCompositeOnly", + strategy: HeadroomStrategyCompositeOnly, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + predictor := &mockPredictor{ + predictions: map[string]*latencypredictor.PredictionResponse{ + "0.5": {TTFT: 0.5, TPOT: 0.03}, + "0.6": {TTFT: 0.6, TPOT: 0.04}, + "0.3": {TTFT: 0.4, TPOT: 0.02}, + }, + } + router := NewSLOAwareRouter(predictor, tt.strategy) + + request := createTestLLMRequest(1.0, 0.05, true) + pods := []schedulingtypes.Pod{ + createTestPod("pod1", 0.5, 2, 1), + createTestPod("pod2", 0.6, 3, 2), + createTestPod("pod3", 0.3, 1, 0), + } + + scores := router.Score(context.Background(), schedulingtypes.NewCycleState(), request, pods) + + assert.NotNil(t, scores, "Expected non-nil scores for strategy %s", tt.strategy) + + // Verify exactly one pod is selected + selectedCount := 0 + for _, score := range scores { + if score == 1.0 { + selectedCount++ + } + } + assert.Equal(t, 1, selectedCount, "Strategy %s should select exactly one pod", tt.strategy) + }) + } +} + +func TestSLOAwareRouter_SetHeadroomStrategy(t *testing.T) { + predictor := &mockPredictor{} + router := NewSLOAwareRouter(predictor, HeadroomStrategyLeast) + + assert.Equal(t, HeadroomStrategyLeast, router.GetHeadroomStrategy(), "Initial strategy should be Least") + + router.SetHeadroomStrategy(HeadroomStrategyMost) + assert.Equal(t, HeadroomStrategyMost, router.GetHeadroomStrategy(), "Strategy should be updated to Most") + + router.SetHeadroomStrategy(HeadroomStrategyCompositeOnly) + assert.Equal(t, HeadroomStrategyCompositeOnly, router.GetHeadroomStrategy(), "Strategy should be updated to CompositeOnly") +} + +func TestSLOAwareRouter_TypedName(t *testing.T) { + predictor := &mockPredictor{} + router := NewSLOAwareRouter(predictor, HeadroomStrategyLeast) + + tn := router.TypedName() + assert.Equal(t, "slo-aware-routing", tn.Type, "Type should be slo-aware-routing") + assert.Equal(t, "slo-aware-routing", tn.Name, "Default name should be slo-aware-routing") +} + +func TestSLOAwareRouter_WithName(t *testing.T) { + predictor := &mockPredictor{} + router := NewSLOAwareRouter(predictor, HeadroomStrategyLeast) + + customName := "custom-router" + router = router.WithName(customName) + + tn := router.TypedName() + assert.Equal(t, "slo-aware-routing", tn.Type, "Type should remain slo-aware-routing") + assert.Equal(t, customName, tn.Name, "Name should be updated to custom name") +} + +func TestSLOAwareRouter_Dependencies(t *testing.T) { + predictor := &mockPredictor{} + router := NewSLOAwareRouter(predictor, HeadroomStrategyLeast) + + deps := router.Dependencies() + assert.Len(t, deps, 1, "Should have exactly 1 dependency") + assert.Equal(t, "prefix-cache-scorer", deps[0].Type, "Dependency type should be prefix-cache-scorer") + assert.Equal(t, "prefix-cache-scorer", deps[0].Name, "Dependency name should be prefix-cache-scorer") +} + +func TestSLOAwareRouter_GetPodRunningRequestCount(t *testing.T) { + tests := []struct { + name string + setupRequests func(*SLOAwareRouter, schedulingtypes.Pod) + expectedCount int + }{ + { + name: "No running requests", + setupRequests: func(r *SLOAwareRouter, p schedulingtypes.Pod) {}, + expectedCount: 0, + }, + { + name: "One running request", + setupRequests: func(r *SLOAwareRouter, p schedulingtypes.Pod) { + podName := types.NamespacedName{ + Name: p.GetPod().NamespacedName.Name, + Namespace: p.GetPod().NamespacedName.Namespace, + } + r.runningRequestLists[podName] = NewRequestPriorityQueue() + r.runningRequestLists[podName].Add("req1", 0.04) + }, + expectedCount: 1, + }, + { + name: "Multiple running requests", + setupRequests: func(r *SLOAwareRouter, p schedulingtypes.Pod) { + podName := types.NamespacedName{ + Name: p.GetPod().NamespacedName.Name, + Namespace: p.GetPod().NamespacedName.Namespace, + } + r.runningRequestLists[podName] = NewRequestPriorityQueue() + r.runningRequestLists[podName].Add("req1", 0.04) + r.runningRequestLists[podName].Add("req2", 0.03) + r.runningRequestLists[podName].Add("req3", 0.05) + }, + expectedCount: 3, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + predictor := &mockPredictor{} + router := NewSLOAwareRouter(predictor, HeadroomStrategyLeast) + pod := createTestPod("test-pod", 0.5, 2, 1) + + tt.setupRequests(router, pod) + + count := router.getPodRunningRequestCount(pod) + assert.Equal(t, tt.expectedCount, count, "Running request count should match expected") + }) + } +} + +func TestSLOAwareRouter_GetPodMinTPOTSLO(t *testing.T) { + tests := []struct { + name string + setupRequests func(*SLOAwareRouter, schedulingtypes.Pod) + expectedSLO float64 + }{ + { + name: "No running requests", + setupRequests: func(r *SLOAwareRouter, p schedulingtypes.Pod) {}, + expectedSLO: 0.0, + }, + { + name: "One running request", + setupRequests: func(r *SLOAwareRouter, p schedulingtypes.Pod) { + podName := types.NamespacedName{ + Name: p.GetPod().NamespacedName.Name, + Namespace: p.GetPod().NamespacedName.Namespace, + } + r.runningRequestLists[podName] = NewRequestPriorityQueue() + r.runningRequestLists[podName].Add("req1", 0.04) + }, + expectedSLO: 0.04, + }, + { + name: "Multiple running requests - should return minimum", + setupRequests: func(r *SLOAwareRouter, p schedulingtypes.Pod) { + podName := types.NamespacedName{ + Name: p.GetPod().NamespacedName.Name, + Namespace: p.GetPod().NamespacedName.Namespace, + } + r.runningRequestLists[podName] = NewRequestPriorityQueue() + // Add in any order - heap will maintain minimum at top + r.runningRequestLists[podName].Add("req1", 0.05) + r.runningRequestLists[podName].Add("req2", 0.03) // This is the minimum + r.runningRequestLists[podName].Add("req3", 0.04) + }, + expectedSLO: 0.03, // Minimum TPOT (heap guarantees this is at items[0]) + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + predictor := &mockPredictor{} + router := NewSLOAwareRouter(predictor, HeadroomStrategyLeast) + pod := createTestPod("test-pod", 0.5, 2, 1) + + tt.setupRequests(router, pod) + + minSLO := router.getPodMinTPOTSLO(pod) + assert.InDelta(t, tt.expectedSLO, minSLO, 0.0001, "Min TPOT SLO should match expected") + }) + } +} + +func TestSLOAwareRouter_GetPrefixCacheScoreForPod(t *testing.T) { + tests := []struct { + name string + setupState func(*schedulingtypes.CycleState) + expectedScore float64 + }{ + { + name: "No prefix cache state", + setupState: func(s *schedulingtypes.CycleState) {}, + expectedScore: 0.0, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + predictor := &mockPredictor{} + router := NewSLOAwareRouter(predictor, HeadroomStrategyLeast) + + state := schedulingtypes.NewCycleState() + tt.setupState(state) + + pod := createTestPod("test-pod", 0.5, 2, 1) + + score := router.getPrefixCacheScoreForPod(context.Background(), state, pod) + assert.InDelta(t, tt.expectedScore, score, 0.0001, "Prefix cache score should match expected") + }) + } +}