diff --git a/Makefile b/Makefile index 023ed0e5f..8113c33b2 100644 --- a/Makefile +++ b/Makefile @@ -373,7 +373,7 @@ GCI = $(LOCALBIN)/gci ## Tool Versions KUSTOMIZE_VERSION ?= v5.4.3 -CONTROLLER_TOOLS_VERSION ?= v0.16.1 +CONTROLLER_TOOLS_VERSION ?= v0.17.0 ENVTEST_VERSION ?= release-0.19 CRD_REF_DOCS_VERSION ?= v0.2.0 GOLANGCI_LINT_VERSION ?= v2.3.0 diff --git a/go.mod b/go.mod index bf30e0484..28dbb0837 100644 --- a/go.mod +++ b/go.mod @@ -92,7 +92,6 @@ require ( github.com/spf13/cobra v1.9.1 // indirect github.com/spf13/pflag v1.0.6 // indirect github.com/stoewer/go-strcase v1.3.0 // indirect - github.com/stretchr/objx v0.5.2 // indirect github.com/x448/float16 v0.8.4 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.61.0 // indirect diff --git a/latencypredictor-v1/test_dual_server_client.py b/latencypredictor-v1/test_dual_server_client.py index 86e277153..01711ac52 100644 --- a/latencypredictor-v1/test_dual_server_client.py +++ b/latencypredictor-v1/test_dual_server_client.py @@ -13,8 +13,9 @@ import tempfile # Base URLs for the dual-server architecture -PREDICTION_URL = os.getenv("PREDICTION_SERVER_URL", "http://34.158.41.245:80") # Update this -TRAINING_URL = os.getenv("TRAINING_SERVER_URL", "http://34.143.208.0:8080") # Update this + +PREDICTION_URL = os.getenv("PREDICTION_SERVER_URL", "http://:80") # Update this +TRAINING_URL = os.getenv("TRAINING_SERVER_URL", "http://:8080") # Update this TARGET_QPS = float(os.getenv("TARGET_QPS", 1000)) # Update this TARGET_QPS_LARGE_BATCH = float(os.getenv("TARGET_QPS_LARGE_BATCH", 100)) # Update this @@ -1133,6 +1134,204 @@ def test_server_configuration(): print(f"Training server: {train_root_data.get('message')}") print(f" Model type: {train_root_data.get('model_type')}") +def test_training_server_flush_api(): + """Test the training server flush API and data status endpoint.""" + print("Testing training server flush API...") + + # 1. Check initial data status + print("Step 1: Checking initial data status...") + initial_status_r = requests.get(f"{TRAINING_URL}/data/status") + assert initial_status_r.status_code == 200 + initial_status = initial_status_r.json() + + print(f" Initial training samples: TTFT={initial_status['training_data']['ttft_samples']}, " + f"TPOT={initial_status['training_data']['tpot_samples']}") + print(f" Initial test samples: TTFT={initial_status['test_data']['ttft_samples']}, " + f"TPOT={initial_status['test_data']['tpot_samples']}") + + # 2. Add training data + print("Step 2: Adding training data...") + training_entries = [generate_random_training_payload() for _ in range(100)] + training_payload = {"entries": training_entries} + + add_r = requests.post(f"{TRAINING_URL}/add_training_data_bulk", json=training_payload) + assert add_r.status_code == 202 + print(f" Added 100 training samples") + + # Wait a bit for data to be processed + time.sleep(2) + + # 3. Verify data was added + print("Step 3: Verifying data was added...") + after_add_status_r = requests.get(f"{TRAINING_URL}/data/status") + assert after_add_status_r.status_code == 200 + after_add_status = after_add_status_r.json() + + total_samples_after = after_add_status['training_data']['total_samples'] + after_add_status['test_data']['total_samples'] + print(f" After adding - Training: {after_add_status['training_data']['total_samples']}, " + f"Test: {after_add_status['test_data']['total_samples']}, Total: {total_samples_after}") + + # Should have more data now (some goes to training, some to test based on TEST_TRAIN_RATIO) + assert total_samples_after > 0, "No samples were added" + + # 4. Test flush with only training data + print("Step 4: Testing flush with only training data...") + flush_training_only = { + "flush_training_data": True, + "flush_test_data": False, + "flush_metrics": False, + "reason": "Test flush training data only" + } + + flush_r = requests.post(f"{TRAINING_URL}/flush", json=flush_training_only) + assert flush_r.status_code == 200 + flush_response = flush_r.json() + + assert flush_response["success"] == True + assert flush_response["metrics_cleared"] == False + assert flush_response["reason"] == "Test flush training data only" + + print(f" Flushed {flush_response['ttft_training_samples_flushed']} TTFT training samples") + print(f" Flushed {flush_response['tpot_training_samples_flushed']} TPOT training samples") + print(f" Test samples flushed: {flush_response['ttft_test_samples_flushed']} TTFT, " + f"{flush_response['tpot_test_samples_flushed']} TPOT (should be 0)") + + # Verify training data was flushed but test data remains + after_flush_training_r = requests.get(f"{TRAINING_URL}/data/status") + after_flush_training = after_flush_training_r.json() + + assert after_flush_training['training_data']['total_samples'] == 0, "Training data should be empty" + # Test data should still exist if any was added + print(f" After training flush - Training: {after_flush_training['training_data']['total_samples']}, " + f"Test: {after_flush_training['test_data']['total_samples']}") + + # 5. Add more data + print("Step 5: Adding more training data...") + more_entries = [generate_random_training_payload() for _ in range(50)] + requests.post(f"{TRAINING_URL}/add_training_data_bulk", json={"entries": more_entries}) + time.sleep(2) + + # 6. Test flush everything + print("Step 6: Testing flush everything...") + flush_all = { + "flush_training_data": True, + "flush_test_data": True, + "flush_metrics": True, + "reason": "Complete flush test" + } + + flush_all_r = requests.post(f"{TRAINING_URL}/flush", json=flush_all) + assert flush_all_r.status_code == 200 + flush_all_response = flush_all_r.json() + + assert flush_all_response["success"] == True + assert flush_all_response["metrics_cleared"] == True + assert "Successfully flushed" in flush_all_response["message"] + + print(f" Complete flush message: {flush_all_response['message']}") + + # Verify everything was flushed + after_flush_all_r = requests.get(f"{TRAINING_URL}/data/status") + after_flush_all = after_flush_all_r.json() + + assert after_flush_all['training_data']['total_samples'] == 0, "Training data should be empty" + assert after_flush_all['test_data']['total_samples'] == 0, "Test data should be empty" + + print(f" After complete flush - Training: {after_flush_all['training_data']['total_samples']}, " + f"Test: {after_flush_all['test_data']['total_samples']}") + + # 7. Test flush with default parameters (should flush everything) + print("Step 7: Testing default flush (no body)...") + + # Add some data first + requests.post(f"{TRAINING_URL}/add_training_data_bulk", + json={"entries": [generate_random_training_payload() for _ in range(20)]}) + time.sleep(1) + + # Flush with empty body (uses defaults) + default_flush_r = requests.post(f"{TRAINING_URL}/flush") + assert default_flush_r.status_code == 200 + default_flush_response = default_flush_r.json() + + assert default_flush_response["success"] == True + print(f" Default flush result: {default_flush_response['message']}") + + # 8. Test flush with only test data + print("Step 8: Testing flush with only test data...") + + # Add data + requests.post(f"{TRAINING_URL}/add_training_data_bulk", + json={"entries": [generate_random_training_payload() for _ in range(50)]}) + time.sleep(2) + + # Get status before + before_test_flush_r = requests.get(f"{TRAINING_URL}/data/status") + before_test_flush = before_test_flush_r.json() + + # Flush only test data + flush_test_only = { + "flush_training_data": False, + "flush_test_data": True, + "flush_metrics": False, + "reason": "Test flush test data only" + } + + flush_test_r = requests.post(f"{TRAINING_URL}/flush", json=flush_test_only) + assert flush_test_r.status_code == 200 + flush_test_response = flush_test_r.json() + + print(f" Test data flush: {flush_test_response['ttft_test_samples_flushed']} TTFT, " + f"{flush_test_response['tpot_test_samples_flushed']} TPOT") + + # Verify only test data was flushed + after_test_flush_r = requests.get(f"{TRAINING_URL}/data/status") + after_test_flush = after_test_flush_r.json() + + assert after_test_flush['test_data']['total_samples'] == 0, "Test data should be empty" + # Training data should still exist + print(f" After test flush - Training: {after_test_flush['training_data']['total_samples']}, " + f"Test: {after_test_flush['test_data']['total_samples']}") + + # 9. Test bucket distribution in status + print("Step 9: Testing bucket distribution in status...") + if "bucket_distribution" in after_flush_all: + print(f" Bucket distribution available: {len(after_flush_all.get('bucket_distribution', {}))} buckets with data") + + print("✓ Flush API tests passed!") + + +def test_training_server_flush_error_handling(): + """Test error handling in flush API.""" + print("Testing flush API error handling...") + + # Test with invalid JSON + invalid_json = '{"flush_training_data": "not_a_boolean"}' + headers = {'Content-Type': 'application/json'} + + try: + r = requests.post(f"{TRAINING_URL}/flush", data=invalid_json, headers=headers) + # Should get validation error + assert r.status_code in [400, 422], f"Expected 400 or 422, got {r.status_code}" + print("✓ Invalid JSON handled correctly") + except Exception as e: + print(f"⚠️ Error handling test skipped: {e}") + + # Test with valid parameters + valid_flush = { + "flush_training_data": False, + "flush_test_data": False, + "flush_metrics": True, + "reason": "Metrics only flush" + } + + r = requests.post(f"{TRAINING_URL}/flush", json=valid_flush) + assert r.status_code == 200 + response = r.json() + assert response["metrics_cleared"] == True + assert response["ttft_training_samples_flushed"] == 0 + assert response["tpot_training_samples_flushed"] == 0 + + print("✓ Flush error handling tests passed!") if __name__ == "__main__": print("Running dual-server architecture tests with prefix cache score support...") @@ -1168,6 +1367,8 @@ def test_server_configuration(): ("Training Metrics", test_training_server_metrics), ("Model Consistency", test_model_consistency_between_servers), ("XGBoost Trees", test_model_specific_endpoints_on_training_server), + ("Flush API", test_training_server_flush_api), + ("Flush Error Handling", test_training_server_flush_error_handling), ("Dual Server Model Learns Equation", test_dual_server_quantile_regression_learns_distribution), ("End-to-End Workflow", test_end_to_end_workflow), diff --git a/latencypredictor-v1/training_server.py b/latencypredictor-v1/training_server.py index 559917a6d..91ddab74b 100644 --- a/latencypredictor-v1/training_server.py +++ b/latencypredictor-v1/training_server.py @@ -110,7 +110,24 @@ class ModelInfoResponse(BaseModel): last_retrain_time: Optional[datetime] = Field(default=None, description="Last retraining timestamp") min_samples_for_retrain: int = Field(default=0, description="Minimum samples required for retraining") retraining_interval_sec: int = Field(default=0, description="Retraining interval in seconds") + +class FlushRequest(BaseModel): + flush_training_data: bool = Field(default=True, description="Flush training data buckets") + flush_test_data: bool = Field(default=True, description="Flush test data") + flush_metrics: bool = Field(default=True, description="Flush quantile metric scores") + reason: Optional[str] = Field(default=None, description="Optional reason for flushing") + +class FlushResponse(BaseModel): + success: bool + flushed_at: datetime + reason: Optional[str] = None + ttft_training_samples_flushed: int + tpot_training_samples_flushed: int + ttft_test_samples_flushed: int + tpot_test_samples_flushed: int + metrics_cleared: bool + message: str def quantile_loss(y_true, y_pred, quantile): """ @@ -784,7 +801,79 @@ def _save_models_unlocked(self): except Exception as e: logging.error(f"Error saving models: {e}", exc_info=True) - + + def flush_training_data(self, flush_training: bool = True, flush_test: bool = True, + flush_metrics: bool = True, reason: str = None) -> dict: + """ + Manually flush training data, test data, and/or metrics. + Returns statistics about what was flushed. + + Args: + flush_training: Whether to flush training data buckets + flush_test: Whether to flush test data + flush_metrics: Whether to flush quantile metric scores + reason: Optional reason for flushing (for logging) + + Returns: + Dictionary with flush statistics + """ + try: + with self.lock: + # Count samples before flushing + ttft_training_count = sum(len(bucket) for bucket in self.ttft_data_buckets.values()) + tpot_training_count = sum(len(bucket) for bucket in self.tpot_data_buckets.values()) + ttft_test_count = len(self.ttft_test_data) + tpot_test_count = len(self.tpot_test_data) + + reason_str = f" Reason: {reason}" if reason else "" + logging.info( + f"Manual flush requested.{reason_str} " + f"Training: {flush_training}, Test: {flush_test}, Metrics: {flush_metrics}" + ) + + # Flush training data + if flush_training: + for bucket_key in self.ttft_data_buckets: + self.ttft_data_buckets[bucket_key].clear() + for bucket_key in self.tpot_data_buckets: + self.tpot_data_buckets[bucket_key].clear() + logging.info( + f"Flushed {ttft_training_count} TTFT and {tpot_training_count} TPOT training samples" + ) + + # Flush test data + if flush_test: + self.ttft_test_data.clear() + self.tpot_test_data.clear() + logging.info( + f"Flushed {ttft_test_count} TTFT and {tpot_test_count} TPOT test samples" + ) + + # Clear metrics + metrics_cleared = False + if flush_metrics: + self.ttft_quantile_loss_scores.clear() + self.tpot_quantile_loss_scores.clear() + self.ttft_coverage_scores.clear() + self.tpot_coverage_scores.clear() + self.ttft_violation_rates.clear() + self.tpot_violation_rates.clear() + metrics_cleared = True + logging.info("Cleared all quantile metric scores") + + return { + "success": True, + "ttft_training_samples_flushed": ttft_training_count if flush_training else 0, + "tpot_training_samples_flushed": tpot_training_count if flush_training else 0, + "ttft_test_samples_flushed": ttft_test_count if flush_test else 0, + "tpot_test_samples_flushed": tpot_test_count if flush_test else 0, + "metrics_cleared": metrics_cleared + } + + except Exception as e: + logging.error(f"Error flushing data: {e}", exc_info=True) + raise + def load_models(self): try: with self.lock: @@ -1065,7 +1154,110 @@ async def root(): "quantile": predictor.quantile, "description": f"Predicting {predictor.quantile:.0%} quantile for TTFT and TPOT latencies" } - + +@app.post("/flush", response_model=FlushResponse, status_code=status.HTTP_200_OK) +async def flush_data(request: FlushRequest = FlushRequest()): + """ + Manually flush training data, test data, and/or metrics. + + Useful when: + - Server workload has changed significantly + - You want to start fresh with new data + - Testing or debugging model behavior + - Forcing a clean state after deployment + + Example requests: + - Flush everything: POST /flush with empty body + - Flush only training: POST /flush with {"flush_test_data": false, "flush_metrics": false} + - Flush with reason: POST /flush with {"reason": "New deployment"} + """ + try: + result = predictor.flush_training_data( + flush_training=request.flush_training_data, + flush_test=request.flush_test_data, + flush_metrics=request.flush_metrics, + reason=request.reason + ) + + total_flushed = ( + result["ttft_training_samples_flushed"] + + result["tpot_training_samples_flushed"] + + result["ttft_test_samples_flushed"] + + result["tpot_test_samples_flushed"] + ) + + message_parts = [] + if request.flush_training_data: + message_parts.append( + f"{result['ttft_training_samples_flushed']} TTFT and " + f"{result['tpot_training_samples_flushed']} TPOT training samples" + ) + if request.flush_test_data: + message_parts.append( + f"{result['ttft_test_samples_flushed']} TTFT and " + f"{result['tpot_test_samples_flushed']} TPOT test samples" + ) + if request.flush_metrics: + message_parts.append("all metric scores") + + message = f"Successfully flushed: {', '.join(message_parts)}" if message_parts else "No data flushed" + + return FlushResponse( + success=True, + flushed_at=datetime.now(timezone.utc), + reason=request.reason, + ttft_training_samples_flushed=result["ttft_training_samples_flushed"], + tpot_training_samples_flushed=result["tpot_training_samples_flushed"], + ttft_test_samples_flushed=result["ttft_test_samples_flushed"], + tpot_test_samples_flushed=result["tpot_test_samples_flushed"], + metrics_cleared=result["metrics_cleared"], + message=message + ) + + except Exception as e: + logging.error(f"Error in flush endpoint: {e}", exc_info=True) + raise HTTPException( + status_code=500, + detail=f"Failed to flush data: {str(e)}" + ) + + +@app.get("/data/status", status_code=status.HTTP_200_OK) +async def get_data_status(): + """ + Get current status of training data. + Useful for monitoring and deciding whether to flush. + """ + ttft_training_count = sum(len(bucket) for bucket in predictor.ttft_data_buckets.values()) + tpot_training_count = sum(len(bucket) for bucket in predictor.tpot_data_buckets.values()) + + # Get bucket distribution + bucket_distribution = {} + for (q, c), bucket in predictor.ttft_data_buckets.items(): + if len(bucket) > 0: + key = f"queue_{q}_cache_{c}" + bucket_distribution[key] = len(bucket) + + return { + "training_data": { + "ttft_samples": ttft_training_count, + "tpot_samples": tpot_training_count, + "total_samples": ttft_training_count + tpot_training_count + }, + "test_data": { + "ttft_samples": len(predictor.ttft_test_data), + "tpot_samples": len(predictor.tpot_test_data), + "total_samples": len(predictor.ttft_test_data) + len(predictor.tpot_test_data) + }, + "metrics": { + "ttft_scores_count": len(predictor.ttft_quantile_loss_scores), + "tpot_scores_count": len(predictor.tpot_quantile_loss_scores) + }, + "bucket_distribution": bucket_distribution, + "model_ready": predictor.is_ready, + "last_retrain": predictor.last_retrain_time.isoformat() if predictor.last_retrain_time else None + } + @app.get("/model/download/info") async def model_download_info(): """ diff --git a/pkg/epp/scheduling/framework/plugins/scorer/slo_scorer.go b/pkg/epp/scheduling/framework/plugins/scorer/slo_scorer.go index c2f2daa3b..a2411c31c 100644 --- a/pkg/epp/scheduling/framework/plugins/scorer/slo_scorer.go +++ b/pkg/epp/scheduling/framework/plugins/scorer/slo_scorer.go @@ -117,16 +117,37 @@ var HeadroomSelectionStrategy = func() HeadroomStrategy { return HeadroomStrategyLeast // default to least (better packing) }() +// Only consider pods with prefix score ≥ this threshold if any exist (aka "perfect stickiness") +var PrefixStickyThreshold = func() float64 { + if v, ok := os.LookupEnv("PREFIX_STICKY_THRESHOLD"); ok { + if f, err := strconv.ParseFloat(v, 64); err == nil && f >= 0 && f <= 1 { + return f + } + } + return 0.80 // default +}() + +// With this probability, ignore stickiness and explore the full set +var StickyExplorationProb = func() float64 { + if v, ok := os.LookupEnv("STICKY_EXPLORATION_PROB"); ok { + if f, err := strconv.ParseFloat(v, 64); err == nil && f >= 0 && f <= 1 { + return f + } + } + return 0.01 // default 1% exploration +}() + type PodPredictionResult struct { - Pod schedulingtypes.Pod - TTFT float64 - TPOT float64 - TTFTValid bool - TPOTValid bool - IsValid bool - Error error - Headroom float64 // Headroom for the pod, if applicable - TTFTHeadroom float64 // TTFT headroom for the pod + Pod schedulingtypes.Pod + TTFT float64 + TPOT float64 + TTFTValid bool + TPOTValid bool + IsValid bool + Error error + Headroom float64 // Headroom for the pod, if applicable + TTFTHeadroom float64 // TTFT headroom for the pod + PrefixCacheScore float64 // Prefix cache score for the pod } type SLOScorer struct { @@ -172,6 +193,41 @@ func (s *SLOScorer) GetHeadroomStrategy() HeadroomStrategy { return s.headroomStrategy } +// maybeApplyPerfectStickiness returns a possibly filtered candidate list and a flag indicating if filtering was applied. +// If any pods have PrefixCacheScore >= PrefixStickyThreshold and we don't explore (rand >= StickyExplorationProb), +// restrict to that subset. Otherwise, return the original list. +func (s *SLOScorer) maybeApplyPerfectStickiness( + ctx context.Context, + candidates []PodPredictionResult, + r *rand.Rand, + label string, // e.g. "positive" or "negative" +) ([]PodPredictionResult, bool) { + logger := log.FromContext(ctx) + + eligible := make([]PodPredictionResult, 0, len(candidates)) + for _, p := range candidates { + if p.PrefixCacheScore >= PrefixStickyThreshold { + eligible = append(eligible, p) + } + } + + // No eligible sticky pods? Do nothing. + if len(eligible) == 0 { + return candidates, false + } + + // Exploration branch? + if r.Float64() < StickyExplorationProb { + logger.V(logutil.DEBUG).Info("Exploring (ignoring perfect stickiness for this attempt)", + "path", label, "exploreProb", StickyExplorationProb, "eligibleCount", len(eligible)) + return candidates, false + } + + logger.V(logutil.DEBUG).Info("Applying perfect stickiness subset", + "path", label, "threshold", PrefixStickyThreshold, "eligibleCount", len(eligible), "total", len(candidates)) + return eligible, true +} + func (s *SLOScorer) Score(ctx context.Context, state *schedulingtypes.CycleState, request *schedulingtypes.LLMRequest, pods []schedulingtypes.Pod) map[schedulingtypes.Pod]float64 { logger := log.FromContext(ctx) if s.predictor == nil { @@ -284,6 +340,13 @@ func (s *SLOScorer) selectFromPositiveHeadroomPods(ctx context.Context, posHeadr return posHeadroomPods[0].Pod } + // Apply perfect stickiness (with exploration) + candidates, sticky := s.maybeApplyPerfectStickiness(ctx, posHeadroomPods, r, "positive") + + // If perfect stickiness collapsed us to a single pod, short-circuit + if sticky && len(candidates) == 1 { + return candidates[0].Pod + } const Wmax = 100 const minWeight = 1 const eps = 1e-9 @@ -292,7 +355,7 @@ func (s *SLOScorer) selectFromPositiveHeadroomPods(ctx context.Context, posHeadr minTPOTH, maxTPOTH := math.MaxFloat64, -math.MaxFloat64 minTTFTH, maxTTFTH := math.MaxFloat64, -math.MaxFloat64 - for _, p := range posHeadroomPods { + for _, p := range candidates { if p.Headroom < minTPOTH { minTPOTH = p.Headroom } @@ -327,10 +390,10 @@ func (s *SLOScorer) selectFromPositiveHeadroomPods(ctx context.Context, posHeadr "alphaTTFT", alpha, "betaTPOT", beta, "strategy", s.headroomStrategy) // Calculate weights for weighted random selection - weightedChoices := make([]Choice, 0, len(posHeadroomPods)) + weightedChoices := make([]Choice, 0, len(candidates)) total := 0 - for _, p := range posHeadroomPods { + for _, p := range candidates { // Normalize to [0,1] within the cohort nTPOTH := 0.5 if tpotRange > eps { @@ -382,7 +445,7 @@ func (s *SLOScorer) selectFromPositiveHeadroomPods(ctx context.Context, posHeadr // If no pod was selected (shouldn't happen), fallback to first pod if selectedPod == nil { - selectedPod = posHeadroomPods[0].Pod + selectedPod = candidates[0].Pod } return selectedPod @@ -430,13 +493,21 @@ func (s *SLOScorer) selectFromNegativeHeadroomPodsInternal(ctx context.Context, return negHeadroomPods[0].Pod } + // Apply perfect stickiness (with exploration) + candidates, sticky := s.maybeApplyPerfectStickiness(ctx, negHeadroomPods, r, "negative") + + // If perfect stickiness collapsed us to a single pod, short-circuit + if sticky && len(candidates) == 1 { + return candidates[0].Pod + } + const minWeightForNegative = 1 // Build weighted choices for selection - weightedChoices := make([]Choice, 0, len(negHeadroomPods)) + weightedChoices := make([]Choice, 0, len(candidates)) total := 0 - s.handleNegativeHeadroomPodsHierarchical(ctx, negHeadroomPods, &weightedChoices, &total, minWeightForNegative) + s.handleNegativeHeadroomPodsHierarchical(ctx, candidates, &weightedChoices, &total, minWeightForNegative) // Perform weighted random selection idx := r.Intn(total) @@ -452,7 +523,7 @@ func (s *SLOScorer) selectFromNegativeHeadroomPodsInternal(ctx context.Context, // If no pod was selected (shouldn't happen), fallback to first pod if selectedPod == nil { - selectedPod = negHeadroomPods[0].Pod + selectedPod = candidates[0].Pod } return selectedPod @@ -639,7 +710,7 @@ func (s *SLOScorer) generatePredictions(ctx context.Context, state *schedulingty predictions = append(predictions, predResult) continue } - + predResult.PrefixCacheScore = prefixCacheScore predResult.TTFT = prediction.TTFT predResult.TPOT = prediction.TPOT podMinTPOTSLO := 0.0 @@ -652,6 +723,7 @@ func (s *SLOScorer) generatePredictions(ctx context.Context, state *schedulingty logger.V(logutil.DEBUG).Info("Prediction for scheduling", "pod", pod.GetPod().String(), + "prefixCacheScore", prefixCacheScore, "TTFT", prediction.TTFT, "TPOT", prediction.TPOT, "buffer", SLOBufferFactor,