|
| 1 | +package testcases |
| 2 | + |
| 3 | +import ( |
| 4 | + "context" |
| 5 | + "fmt" |
| 6 | + "strings" |
| 7 | + "sync" |
| 8 | + "time" |
| 9 | + |
| 10 | + pkgtestcases "github.com/vllm-project/semantic-router/e2e/pkg/testcases" |
| 11 | + "k8s.io/client-go/kubernetes" |
| 12 | +) |
| 13 | + |
| 14 | +func init() { |
| 15 | + pkgtestcases.Register("chat-completions-progressive-stress", pkgtestcases.TestCase{ |
| 16 | + Description: "Progressive stress test with 10/20/50/100 QPS and success rate tracking", |
| 17 | + Tags: []string{"llm", "stress", "progressive", "qps"}, |
| 18 | + Fn: testProgressiveStress, |
| 19 | + }) |
| 20 | +} |
| 21 | + |
| 22 | +// ProgressiveStageResult tracks results for a single QPS stage |
| 23 | +type ProgressiveStageResult struct { |
| 24 | + QPS int |
| 25 | + TotalReqs int |
| 26 | + SuccessCount int |
| 27 | + FailureCount int |
| 28 | + SuccessRate float64 |
| 29 | + AvgDuration time.Duration |
| 30 | + MinDuration time.Duration |
| 31 | + MaxDuration time.Duration |
| 32 | + Results []StressTestResult |
| 33 | +} |
| 34 | + |
| 35 | +func testProgressiveStress(ctx context.Context, client *kubernetes.Clientset, opts pkgtestcases.TestCaseOptions) error { |
| 36 | + if opts.Verbose { |
| 37 | + fmt.Println("[Test] Starting progressive stress test: 10/20/50/100 QPS") |
| 38 | + } |
| 39 | + |
| 40 | + // Setup service connection and get local port |
| 41 | + localPort, stopPortForward, err := setupServiceConnection(ctx, client, opts) |
| 42 | + if err != nil { |
| 43 | + return err |
| 44 | + } |
| 45 | + defer stopPortForward() |
| 46 | + |
| 47 | + // Define QPS stages and duration for each stage |
| 48 | + qpsStages := []int{10, 20, 50, 100} |
| 49 | + stageDuration := 30 * time.Second // Run each stage for 30 seconds |
| 50 | + |
| 51 | + var stageResults []ProgressiveStageResult |
| 52 | + |
| 53 | + // Run each QPS stage |
| 54 | + for _, qps := range qpsStages { |
| 55 | + if opts.Verbose { |
| 56 | + fmt.Printf("\n[Test] Starting stage: %d QPS for %v\n", qps, stageDuration) |
| 57 | + } |
| 58 | + |
| 59 | + stageResult := runQPSStage(ctx, qps, stageDuration, localPort) |
| 60 | + stageResults = append(stageResults, stageResult) |
| 61 | + |
| 62 | + if opts.Verbose { |
| 63 | + fmt.Printf("[Test] Stage %d QPS completed: %d/%d successful (%.2f%% success rate)\n", |
| 64 | + qps, stageResult.SuccessCount, stageResult.TotalReqs, stageResult.SuccessRate) |
| 65 | + } |
| 66 | + |
| 67 | + // Brief pause between stages |
| 68 | + time.Sleep(2 * time.Second) |
| 69 | + } |
| 70 | + |
| 71 | + // Print comprehensive summary |
| 72 | + printProgressiveResults(stageResults) |
| 73 | + |
| 74 | + // Set details for reporting |
| 75 | + if opts.SetDetails != nil { |
| 76 | + details := make(map[string]interface{}) |
| 77 | + for _, stage := range stageResults { |
| 78 | + stageKey := fmt.Sprintf("qps_%d", stage.QPS) |
| 79 | + details[stageKey] = map[string]interface{}{ |
| 80 | + "total_requests": stage.TotalReqs, |
| 81 | + "successful": stage.SuccessCount, |
| 82 | + "failed": stage.FailureCount, |
| 83 | + "success_rate": fmt.Sprintf("%.2f%%", stage.SuccessRate), |
| 84 | + "avg_duration": stage.AvgDuration.Milliseconds(), |
| 85 | + "min_duration": stage.MinDuration.Milliseconds(), |
| 86 | + "max_duration": stage.MaxDuration.Milliseconds(), |
| 87 | + } |
| 88 | + } |
| 89 | + opts.SetDetails(details) |
| 90 | + } |
| 91 | + |
| 92 | + return nil |
| 93 | +} |
| 94 | + |
| 95 | +func runQPSStage(ctx context.Context, qps int, duration time.Duration, localPort string) ProgressiveStageResult { |
| 96 | + result := ProgressiveStageResult{ |
| 97 | + QPS: qps, |
| 98 | + MinDuration: time.Hour, // Initialize with large value |
| 99 | + } |
| 100 | + |
| 101 | + var mu sync.Mutex |
| 102 | + var wg sync.WaitGroup |
| 103 | + |
| 104 | + // Calculate interval between requests |
| 105 | + interval := time.Second / time.Duration(qps) |
| 106 | + ticker := time.NewTicker(interval) |
| 107 | + defer ticker.Stop() |
| 108 | + |
| 109 | + // Context with timeout for this stage |
| 110 | + stageCtx, cancel := context.WithTimeout(ctx, duration) |
| 111 | + defer cancel() |
| 112 | + |
| 113 | + requestID := 0 |
| 114 | + |
| 115 | + // Send requests at the specified QPS rate |
| 116 | + for { |
| 117 | + select { |
| 118 | + case <-stageCtx.Done(): |
| 119 | + // Stage duration completed, wait for all in-flight requests |
| 120 | + wg.Wait() |
| 121 | + return calculateStageStats(result) |
| 122 | + |
| 123 | + case <-ticker.C: |
| 124 | + requestID++ |
| 125 | + wg.Add(1) |
| 126 | + |
| 127 | + go func(reqID int) { |
| 128 | + defer wg.Done() |
| 129 | + |
| 130 | + // Send request |
| 131 | + reqResult := sendSingleRequest(ctx, reqID, localPort, false) |
| 132 | + |
| 133 | + // Update results |
| 134 | + mu.Lock() |
| 135 | + result.Results = append(result.Results, reqResult) |
| 136 | + result.TotalReqs++ |
| 137 | + if reqResult.Success { |
| 138 | + result.SuccessCount++ |
| 139 | + } else { |
| 140 | + result.FailureCount++ |
| 141 | + } |
| 142 | + mu.Unlock() |
| 143 | + }(requestID) |
| 144 | + } |
| 145 | + } |
| 146 | +} |
| 147 | + |
| 148 | +func calculateStageStats(result ProgressiveStageResult) ProgressiveStageResult { |
| 149 | + if result.TotalReqs == 0 { |
| 150 | + return result |
| 151 | + } |
| 152 | + |
| 153 | + // Calculate success rate |
| 154 | + result.SuccessRate = float64(result.SuccessCount) / float64(result.TotalReqs) * 100 |
| 155 | + |
| 156 | + // Calculate duration statistics |
| 157 | + var totalDuration time.Duration |
| 158 | + for _, r := range result.Results { |
| 159 | + totalDuration += r.Duration |
| 160 | + if r.Duration < result.MinDuration { |
| 161 | + result.MinDuration = r.Duration |
| 162 | + } |
| 163 | + if r.Duration > result.MaxDuration { |
| 164 | + result.MaxDuration = r.Duration |
| 165 | + } |
| 166 | + } |
| 167 | + |
| 168 | + if len(result.Results) > 0 { |
| 169 | + result.AvgDuration = totalDuration / time.Duration(len(result.Results)) |
| 170 | + } |
| 171 | + |
| 172 | + // Reset MinDuration if it wasn't updated |
| 173 | + if result.MinDuration == time.Hour { |
| 174 | + result.MinDuration = 0 |
| 175 | + } |
| 176 | + |
| 177 | + return result |
| 178 | +} |
| 179 | + |
| 180 | +func printProgressiveResults(stageResults []ProgressiveStageResult) { |
| 181 | + separator := strings.Repeat("=", 100) |
| 182 | + fmt.Println("\n" + separator) |
| 183 | + fmt.Println("Progressive Stress Test Results") |
| 184 | + fmt.Println(separator) |
| 185 | + |
| 186 | + // Print header |
| 187 | + fmt.Printf("%-10s %-15s %-15s %-15s %-15s %-15s %-15s\n", |
| 188 | + "QPS", "Total Reqs", "Successful", "Failed", "Success Rate", "Avg Duration", "Max Duration") |
| 189 | + fmt.Println(strings.Repeat("-", 100)) |
| 190 | + |
| 191 | + // Print each stage |
| 192 | + for _, stage := range stageResults { |
| 193 | + fmt.Printf("%-10d %-15d %-15d %-15d %-15s %-15v %-15v\n", |
| 194 | + stage.QPS, |
| 195 | + stage.TotalReqs, |
| 196 | + stage.SuccessCount, |
| 197 | + stage.FailureCount, |
| 198 | + fmt.Sprintf("%.2f%%", stage.SuccessRate), |
| 199 | + stage.AvgDuration.Round(time.Millisecond), |
| 200 | + stage.MaxDuration.Round(time.Millisecond)) |
| 201 | + } |
| 202 | + |
| 203 | + fmt.Println(separator) |
| 204 | + |
| 205 | + // Print summary statistics |
| 206 | + fmt.Println("\nSummary:") |
| 207 | + totalRequests := 0 |
| 208 | + totalSuccess := 0 |
| 209 | + for _, stage := range stageResults { |
| 210 | + totalRequests += stage.TotalReqs |
| 211 | + totalSuccess += stage.SuccessCount |
| 212 | + } |
| 213 | + overallSuccessRate := float64(totalSuccess) / float64(totalRequests) * 100 |
| 214 | + fmt.Printf(" Overall: %d/%d successful (%.2f%% success rate)\n", |
| 215 | + totalSuccess, totalRequests, overallSuccessRate) |
| 216 | + |
| 217 | + // Show failures for each stage |
| 218 | + fmt.Println("\nFailures by Stage:") |
| 219 | + for _, stage := range stageResults { |
| 220 | + if stage.FailureCount > 0 { |
| 221 | + fmt.Printf(" %d QPS: %d failures\n", stage.QPS, stage.FailureCount) |
| 222 | + // Show first 3 failures for this stage |
| 223 | + failureCount := 0 |
| 224 | + for _, result := range stage.Results { |
| 225 | + if !result.Success && failureCount < 3 { |
| 226 | + failureCount++ |
| 227 | + fmt.Printf(" Request #%d: %s (duration: %v)\n", |
| 228 | + result.RequestID, result.ErrorMessage, result.Duration) |
| 229 | + } |
| 230 | + if failureCount >= 3 { |
| 231 | + break |
| 232 | + } |
| 233 | + } |
| 234 | + } else { |
| 235 | + fmt.Printf(" %d QPS: No failures! 🎉\n", stage.QPS) |
| 236 | + } |
| 237 | + } |
| 238 | + fmt.Println() |
| 239 | +} |
0 commit comments