Skip to content

Commit c4b7e46

Browse files
committed
update
Signed-off-by: bitliu <[email protected]>
1 parent a4b6683 commit c4b7e46

File tree

10 files changed

+1531
-7
lines changed

10 files changed

+1531
-7
lines changed

e2e/pkg/helpers/kubernetes.go

Lines changed: 68 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"strings"
1010
"time"
1111

12+
corev1 "k8s.io/api/core/v1"
1213
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1314
"k8s.io/client-go/kubernetes"
1415
"k8s.io/client-go/rest"
@@ -117,9 +118,10 @@ func VerifyServicePodsRunning(ctx context.Context, client *kubernetes.Clientset,
117118
return nil
118119
}
119120

120-
// StartPortForward starts port forwarding to a service
121+
// StartPortForward starts port forwarding to a service by finding a pod behind it
121122
// The ports parameter should be in format "localPort:servicePort" (e.g., "8080:80")
122-
// This function forwards directly to the service, letting Kubernetes handle the routing
123+
// Note: Kubernetes API doesn't support port-forward directly to services, only to pods.
124+
// This function mimics kubectl's behavior by finding a pod behind the service.
123125
func StartPortForward(ctx context.Context, client *kubernetes.Clientset, restConfig *rest.Config, namespace, service, ports string, verbose bool) error {
124126
// Parse ports (e.g., "8080:80" -> local=8080, service=80)
125127
portParts := strings.Split(ports, ":")
@@ -133,17 +135,76 @@ func StartPortForward(ctx context.Context, client *kubernetes.Clientset, restCon
133135
fmt.Printf("[Helper] Starting port-forward to service %s/%s (%s:%s)\n", namespace, service, localPort, servicePort)
134136
}
135137

138+
// Get the service to find its selector
139+
svc, err := client.CoreV1().Services(namespace).Get(ctx, service, metav1.GetOptions{})
140+
if err != nil {
141+
return fmt.Errorf("failed to get service: %w", err)
142+
}
143+
144+
// Build label selector from service selector
145+
var selectorParts []string
146+
for key, value := range svc.Spec.Selector {
147+
selectorParts = append(selectorParts, fmt.Sprintf("%s=%s", key, value))
148+
}
149+
labelSelector := strings.Join(selectorParts, ",")
150+
151+
// Find pods matching the service selector
152+
pods, err := client.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{
153+
LabelSelector: labelSelector,
154+
})
155+
if err != nil {
156+
return fmt.Errorf("failed to list pods for service: %w", err)
157+
}
158+
159+
if len(pods.Items) == 0 {
160+
return fmt.Errorf("no pods found for service %s/%s", namespace, service)
161+
}
162+
163+
// Use the first running pod
164+
var targetPod *corev1.Pod
165+
for i := range pods.Items {
166+
pod := &pods.Items[i]
167+
if pod.Status.Phase == corev1.PodRunning {
168+
targetPod = pod
169+
break
170+
}
171+
}
172+
173+
if targetPod == nil {
174+
return fmt.Errorf("no running pods found for service %s/%s", namespace, service)
175+
}
176+
177+
if verbose {
178+
fmt.Printf("[Helper] Found running pod: %s\n", targetPod.Name)
179+
}
180+
181+
// Map service port to container port
182+
var containerPort string
183+
for _, port := range svc.Spec.Ports {
184+
if fmt.Sprintf("%d", port.Port) == servicePort {
185+
containerPort = fmt.Sprintf("%d", port.TargetPort.IntVal)
186+
if port.TargetPort.IntVal == 0 {
187+
// TargetPort is a named port, use the port number directly
188+
containerPort = servicePort
189+
}
190+
break
191+
}
192+
}
193+
if containerPort == "" {
194+
containerPort = servicePort // fallback to service port
195+
}
196+
136197
// Create SPDY transport
137198
transport, upgrader, err := spdy.RoundTripperFor(restConfig)
138199
if err != nil {
139200
return fmt.Errorf("failed to create SPDY transport: %w", err)
140201
}
141202

142-
// Build the URL for port forwarding to service
203+
// Build the URL for port forwarding to pod
143204
url := client.CoreV1().RESTClient().Post().
144-
Resource("services").
205+
Resource("pods").
145206
Namespace(namespace).
146-
Name(service).
207+
Name(targetPod.Name).
147208
SubResource("portforward").
148209
URL()
149210

@@ -160,8 +221,8 @@ func StartPortForward(ctx context.Context, client *kubernetes.Clientset, restCon
160221
errOut = os.Stderr
161222
}
162223

163-
// Create port forwarder
164-
forwarder, err := portforward.New(dialer, []string{fmt.Sprintf("%s:%s", localPort, servicePort)}, stopChan, readyChan, out, errOut)
224+
// Create port forwarder (forward localPort to containerPort on the pod)
225+
forwarder, err := portforward.New(dialer, []string{fmt.Sprintf("%s:%s", localPort, containerPort)}, stopChan, readyChan, out, errOut)
165226
if err != nil {
166227
return fmt.Errorf("failed to create port forwarder: %w", err)
167228
}

e2e/profiles/ai-gateway/cache.go

Lines changed: 277 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,277 @@
1+
package aigateway
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"encoding/json"
7+
"fmt"
8+
"io"
9+
"net/http"
10+
"os"
11+
"time"
12+
13+
"k8s.io/client-go/kubernetes"
14+
15+
"github.com/vllm-project/semantic-router/e2e/pkg/helpers"
16+
"github.com/vllm-project/semantic-router/e2e/pkg/testcases"
17+
)
18+
19+
func init() {
20+
testcases.Register("cache", testcases.TestCase{
21+
Description: "Test semantic cache hit rate with similar questions",
22+
Tags: []string{"ai-gateway", "cache", "performance"},
23+
Fn: testCache,
24+
})
25+
}
26+
27+
// CacheTestCase represents a test case for cache testing
28+
type CacheTestCase struct {
29+
Description string `json:"description"`
30+
Category string `json:"category"`
31+
OriginalQuestion string `json:"original_question"`
32+
SimilarQuestions []string `json:"similar_questions"`
33+
}
34+
35+
// CacheResult tracks the result of a cache test
36+
type CacheResult struct {
37+
Description string
38+
Category string
39+
OriginalQuestion string
40+
SimilarQuestion string
41+
CacheHit bool
42+
Error string
43+
}
44+
45+
func testCache(ctx context.Context, client *kubernetes.Clientset, opts testcases.TestCaseOptions) error {
46+
if opts.Verbose {
47+
fmt.Println("[Test] Testing semantic cache functionality")
48+
}
49+
50+
// Get the Envoy service name (should already be ready from profile setup)
51+
labelSelector := "gateway.envoyproxy.io/owning-gateway-namespace=default,gateway.envoyproxy.io/owning-gateway-name=semantic-router"
52+
envoyService, err := helpers.GetEnvoyServiceName(ctx, client, labelSelector, opts.Verbose)
53+
if err != nil {
54+
return fmt.Errorf("failed to get Envoy service: %w", err)
55+
}
56+
57+
if opts.Verbose {
58+
fmt.Printf("[Test] Using Envoy service: %s\n", envoyService)
59+
}
60+
61+
// Start port forwarding to service (8080:80)
62+
if err := helpers.StartPortForward(ctx, client, opts.RestConfig, "envoy-gateway-system", envoyService, "8080:80", opts.Verbose); err != nil {
63+
return fmt.Errorf("failed to start port forwarding: %w", err)
64+
}
65+
66+
// Wait a bit for port forwarding to stabilize
67+
time.Sleep(2 * time.Second)
68+
69+
// Load test cases from JSON file
70+
testCases, err := loadCacheCases("e2e/profiles/ai-gateway/testdata/cache_cases.json")
71+
if err != nil {
72+
return fmt.Errorf("failed to load test cases: %w", err)
73+
}
74+
75+
// Run cache tests
76+
var results []CacheResult
77+
totalRequests := 0
78+
cacheHits := 0
79+
80+
for _, testCase := range testCases {
81+
// Send original question first (should not hit cache)
82+
if opts.Verbose {
83+
fmt.Printf("[Test] Sending original question: %s\n", testCase.OriginalQuestion)
84+
}
85+
_, err := sendChatRequest(ctx, testCase.OriginalQuestion, opts.Verbose)
86+
if err != nil {
87+
if opts.Verbose {
88+
fmt.Printf("[Test] Error sending original question: %v\n", err)
89+
}
90+
continue
91+
}
92+
93+
// Wait a bit to ensure cache is populated
94+
time.Sleep(1 * time.Second)
95+
96+
// Send similar questions (should hit cache)
97+
for _, similarQuestion := range testCase.SimilarQuestions {
98+
totalRequests++
99+
result := testSingleCacheRequest(ctx, testCase, similarQuestion, opts.Verbose)
100+
results = append(results, result)
101+
if result.CacheHit {
102+
cacheHits++
103+
}
104+
}
105+
}
106+
107+
// Calculate hit rate
108+
hitRate := float64(0)
109+
if totalRequests > 0 {
110+
hitRate = float64(cacheHits) / float64(totalRequests) * 100
111+
}
112+
113+
// Print results
114+
printCacheResults(results, totalRequests, cacheHits, hitRate)
115+
116+
if opts.Verbose {
117+
fmt.Printf("[Test] Cache test completed: %d/%d cache hits (%.2f%% hit rate)\n",
118+
cacheHits, totalRequests, hitRate)
119+
}
120+
121+
return nil
122+
}
123+
124+
func loadCacheCases(filepath string) ([]CacheTestCase, error) {
125+
data, err := os.ReadFile(filepath)
126+
if err != nil {
127+
return nil, fmt.Errorf("failed to read test cases file: %w", err)
128+
}
129+
130+
var cases []CacheTestCase
131+
if err := json.Unmarshal(data, &cases); err != nil {
132+
return nil, fmt.Errorf("failed to parse test cases: %w", err)
133+
}
134+
135+
return cases, nil
136+
}
137+
138+
func testSingleCacheRequest(ctx context.Context, testCase CacheTestCase, question string, verbose bool) CacheResult {
139+
result := CacheResult{
140+
Description: testCase.Description,
141+
Category: testCase.Category,
142+
OriginalQuestion: testCase.OriginalQuestion,
143+
SimilarQuestion: question,
144+
}
145+
146+
resp, err := sendChatRequest(ctx, question, verbose)
147+
if err != nil {
148+
result.Error = fmt.Sprintf("failed to send request: %v", err)
149+
return result
150+
}
151+
defer resp.Body.Close()
152+
153+
// Check for cache hit header
154+
cacheHitHeader := resp.Header.Get("x-vsr-cache-hit")
155+
result.CacheHit = (cacheHitHeader == "true")
156+
157+
if verbose {
158+
if result.CacheHit {
159+
fmt.Printf("[Test] ✓ Cache HIT for: %s\n", question)
160+
} else {
161+
fmt.Printf("[Test] ✗ Cache MISS for: %s\n", question)
162+
}
163+
}
164+
165+
return result
166+
}
167+
168+
func sendChatRequest(ctx context.Context, question string, verbose bool) (*http.Response, error) {
169+
// Create chat completion request
170+
requestBody := map[string]interface{}{
171+
"model": "base-model",
172+
"messages": []map[string]string{
173+
{"role": "user", "content": question},
174+
},
175+
"max_tokens": 100,
176+
"stream": false,
177+
}
178+
179+
jsonData, err := json.Marshal(requestBody)
180+
if err != nil {
181+
return nil, fmt.Errorf("failed to marshal request: %w", err)
182+
}
183+
184+
// Send request
185+
req, err := http.NewRequestWithContext(ctx, "POST", "http://localhost:8080/v1/chat/completions", bytes.NewBuffer(jsonData))
186+
if err != nil {
187+
return nil, fmt.Errorf("failed to create request: %w", err)
188+
}
189+
req.Header.Set("Content-Type", "application/json")
190+
191+
httpClient := &http.Client{Timeout: 30 * time.Second}
192+
resp, err := httpClient.Do(req)
193+
if err != nil {
194+
return nil, fmt.Errorf("failed to send request: %w", err)
195+
}
196+
197+
// Check response status
198+
if resp.StatusCode != http.StatusOK {
199+
bodyBytes, _ := io.ReadAll(resp.Body)
200+
resp.Body.Close()
201+
return nil, fmt.Errorf("unexpected status code: %d, body: %s", resp.StatusCode, string(bodyBytes))
202+
}
203+
204+
return resp, nil
205+
}
206+
207+
func printCacheResults(results []CacheResult, totalRequests, cacheHits int, hitRate float64) {
208+
separator := "================================================================================"
209+
fmt.Println("\n" + separator)
210+
fmt.Println("CACHE TEST RESULTS")
211+
fmt.Println(separator)
212+
fmt.Printf("Total Requests: %d\n", totalRequests)
213+
fmt.Printf("Cache Hits: %d\n", cacheHits)
214+
fmt.Printf("Hit Rate: %.2f%%\n", hitRate)
215+
fmt.Println(separator)
216+
217+
// Group results by category
218+
categoryStats := make(map[string]struct {
219+
total int
220+
hits int
221+
})
222+
223+
for _, result := range results {
224+
stats := categoryStats[result.Category]
225+
stats.total++
226+
if result.CacheHit {
227+
stats.hits++
228+
}
229+
categoryStats[result.Category] = stats
230+
}
231+
232+
// Print per-category results
233+
fmt.Println("\nPer-Category Results:")
234+
for category, stats := range categoryStats {
235+
categoryHitRate := float64(stats.hits) / float64(stats.total) * 100
236+
fmt.Printf(" - %-20s: %d/%d (%.2f%%)\n", category, stats.hits, stats.total, categoryHitRate)
237+
}
238+
239+
// Print cache misses
240+
missCount := 0
241+
for _, result := range results {
242+
if !result.CacheHit && result.Error == "" {
243+
missCount++
244+
}
245+
}
246+
247+
if missCount > 0 {
248+
fmt.Println("\nCache Misses:")
249+
for _, result := range results {
250+
if !result.CacheHit && result.Error == "" {
251+
fmt.Printf(" - Original: %s\n", result.OriginalQuestion)
252+
fmt.Printf(" Similar: %s\n", result.SimilarQuestion)
253+
fmt.Printf(" Category: %s\n", result.Category)
254+
}
255+
}
256+
}
257+
258+
// Print errors
259+
errorCount := 0
260+
for _, result := range results {
261+
if result.Error != "" {
262+
errorCount++
263+
}
264+
}
265+
266+
if errorCount > 0 {
267+
fmt.Println("\nErrors:")
268+
for _, result := range results {
269+
if result.Error != "" {
270+
fmt.Printf(" - Question: %s\n", result.SimilarQuestion)
271+
fmt.Printf(" Error: %s\n", result.Error)
272+
}
273+
}
274+
}
275+
276+
fmt.Println(separator + "\n")
277+
}

0 commit comments

Comments
 (0)