@@ -20,6 +20,7 @@ import (
2020 "encoding/json"
2121 "errors"
2222 "fmt"
23+ "maps"
2324 "strconv"
2425 "strings"
2526 "time"
@@ -31,20 +32,68 @@ import (
3132 appsv1 "k8s.io/api/apps/v1"
3233 corev1 "k8s.io/api/core/v1"
3334 k8serrors "k8s.io/apimachinery/pkg/api/errors"
35+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3436 "k8s.io/apimachinery/pkg/types"
3537 "sigs.k8s.io/controller-runtime/pkg/client"
3638
39+ v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1"
3740 "sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2"
3841 "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metadata"
3942 testutils "sigs.k8s.io/gateway-api-inference-extension/test/utils"
4043)
4144
45+ const (
46+ firstPort = 8000
47+ numPorts = 8
48+ )
49+
4250var _ = ginkgo .Describe ("InferencePool" , func () {
4351 var infObjective * v1alpha2.InferenceObjective
4452 ginkgo .BeforeEach (func () {
4553 ginkgo .By ("Waiting for the namespace to exist." )
4654 namespaceExists (testConfig )
4755
56+ ginkgo .By ("Modifying deployment using local image for testing (temporary)." )
57+ deploy := & appsv1.Deployment {}
58+ key := types.NamespacedName {Name : "vllm-llama3-8b-instruct" , Namespace : testConfig .NsName }
59+
60+ gomega .Eventually (func () error {
61+ err := testConfig .K8sClient .Get (testConfig .Context , key , deploy )
62+ if err != nil {
63+ return err
64+ }
65+
66+ deploy .Spec .Template .Spec .Containers [0 ].Image = "vllm-dynamic-backend:local" // TODO(ryanrosario): Change back to official image after testing
67+ deploy .Spec .Template .Spec .Containers [0 ].ImagePullPolicy = corev1 .PullNever
68+ deploy .Spec .Template .Spec .Containers [0 ].Args = []string {strconv .Itoa (firstPort ), strconv .Itoa (numPorts )}
69+ deploy .Spec .Template .Spec .Containers [0 ].Ports = buildContainerPorts (firstPort , numPorts )
70+ return testConfig .K8sClient .Update (testConfig .Context , deploy )
71+ }, testConfig .ExistsTimeout , testConfig .Interval ).Should (gomega .Succeed ())
72+
73+ waitForDeploymentRollout (testConfig , deploy )
74+
75+ pool := & v1.InferencePool {}
76+ gomega .Eventually (func () error {
77+ err := testConfig .K8sClient .Get (testConfig .Context , key , pool )
78+ if err != nil {
79+ return err
80+ }
81+
82+ pool .Spec .TargetPorts = buildTargetPorts (firstPort , numPorts )
83+
84+ return testConfig .K8sClient .Update (testConfig .Context , pool )
85+ }, testConfig .ExistsTimeout , testConfig .Interval ).Should (gomega .Succeed ())
86+
87+ ginkgo .By ("Restarting EPP to force configuration reload" )
88+ // We delete the EPP *POD*, not the deployment. The Deployment will recreate it immediately.
89+ // This forces the new EPP process to read the Multi-Port InferencePool from scratch.
90+ eppLabels := client.MatchingLabels {"app" : inferExtName }
91+ gomega .Expect (testConfig .K8sClient .DeleteAllOf (testConfig .Context , & corev1.Pod {}, client .InNamespace (testConfig .NsName ), eppLabels )).To (gomega .Succeed ())
92+
93+ // Wait for the new EPP to be ready
94+ eppDeploy := & appsv1.Deployment {ObjectMeta : metav1.ObjectMeta {Name : inferExtName , Namespace : testConfig .NsName }}
95+ waitForDeploymentReady (testConfig , eppDeploy )
96+
4897 ginkgo .By ("Creating an InferenceObjective resource" )
4998 infObjective = newInferenceObjective (testConfig .NsName )
5099 gomega .Expect (testConfig .K8sClient .Create (testConfig .Context , infObjective )).To (gomega .Succeed ())
@@ -204,33 +253,80 @@ func verifyTrafficRouting() {
204253 } {
205254 ginkgo .By (fmt .Sprintf ("Verifying connectivity through the inference extension with %s api and prompt/messages: %v" , t .api , t .promptOrMessages ))
206255
207- // Ensure the expected responses include the InferenceObjective target model names.
208- var expected []string
209- expected = append (expected , targetModelName )
210- curlCmd := getCurlCommand (envoyName , testConfig .NsName , envoyPort , modelName , curlTimeout , t .api , t .promptOrMessages , false )
211-
212- actual := make (map [string ]int )
256+ // Expected ports and InferenceObjective target models
257+ expectedPort := generateSequence (firstPort , numPorts )
258+ expectedModel := []string {targetModelName }
259+
260+ // Observed ports and InferenceObjective target models
261+ actualModel := make (map [string ]int )
262+ actualPort := make (map [int ]int )
263+ // Probability: need to compute estimate of number of batches to send to have high confidence of hitting all ports.
264+ // Using the Coupon Collector's Problem formula: n * H_n, where H_n is the nth harmonic number.
265+ // This gives us an expected number of trials to collect all coupons (ports).
266+ batches := numPorts * harmonicNumber (numPorts )
267+ // Send curl requests to verify routing to all target ports in the InferencePool.
213268 gomega .Eventually (func () error {
214- resp , err := testutils .ExecCommandInPod (testConfig , "curl" , "curl" , curlCmd )
215- if err != nil {
216- return err
269+ // Run a small batch per retry (e.g., 5) to keep the test active
270+ for i := range batches {
271+ uniqueID := time .Now ().UnixNano ()
272+ dynamicHashValue := fmt .Sprintf ("Nonce-%d" , uniqueID )
273+ currentPromptOrMessages := t .promptOrMessages // Start with the original
274+
275+ // Check if the payload is a slice of maps (e.g., for /chat/completions)
276+ if originalMessages , ok := currentPromptOrMessages .([]map [string ]any ); ok {
277+ messagesCopy := make ([]map [string ]any , len (originalMessages ))
278+ for idx , msg := range originalMessages {
279+ msgCopy := make (map [string ]any , len (msg ))
280+ maps .Copy (msgCopy , msg )
281+ // Inject a unique nonce into the content of *EACH* message
282+ if content , ok := msgCopy ["content" ].(string ); ok {
283+ msgCopy ["content" ] = fmt .Sprintf ("(TestNonce: %s-%d-msg%d) %s" , dynamicHashValue , i , idx , content )
284+ }
285+ messagesCopy [idx ] = msgCopy
286+ }
287+ currentPromptOrMessages = messagesCopy // Use the modified messages for getCurlCommand
288+ }
289+
290+ curlCmd := getCurlCommand (envoyName , testConfig .NsName , envoyPort , modelName , curlTimeout , t .api , currentPromptOrMessages , false )
291+
292+ resp , err := testutils .ExecCommandInPod (testConfig , "curl" , "curl" , curlCmd )
293+ if err != nil {
294+ return err
295+ }
296+
297+ if ! strings .Contains (resp , "200 OK" ) {
298+ return fmt .Errorf ("did not get 200 OK: %s" , resp )
299+ }
300+
301+ for _ , m := range expectedModel {
302+ if strings .Contains (resp , m ) {
303+ actualModel [m ] = 0
304+ }
305+ }
306+ for _ , p := range expectedPort {
307+ if strings .Contains (resp , fmt .Sprintf ("x-backend-port: %d" , p )) {
308+ fmt .Printf ("Port: %d\n " , p )
309+ actualPort [p ] = 0
310+ }
311+ }
217312 }
218- if ! strings .Contains (resp , "200 OK" ) {
219- return fmt .Errorf ("did not get 200 OK: %s" , resp )
313+
314+ var gotModel []string
315+ for m := range actualModel {
316+ gotModel = append (gotModel , m )
220317 }
221- for _ , m := range expected {
222- if strings .Contains (resp , m ) {
223- actual [m ] = 0
224- }
318+ var gotPort []int
319+ for p := range actualPort {
320+ gotPort = append (gotPort , p )
225321 }
226- var got [] string
227- for m := range actual {
228- got = append ( got , m )
322+
323+ if ! cmp . Equal ( gotModel , expectedModel , cmpopts . SortSlices ( func ( a , b string ) bool { return a < b })) {
324+ return fmt . Errorf ( "collecting models... have %v, want %v" , gotModel , expectedModel )
229325 }
230- // Compare ignoring order
231- if ! cmp .Equal (got , expected , cmpopts .SortSlices (func (a , b string ) bool { return a < b })) {
232- return fmt .Errorf ("actual (%v) != expected (%v); resp=%q" , got , expected , resp )
326+ if ! cmp .Equal (gotPort , expectedPort , cmpopts .SortSlices (func (a , b int ) bool { return a < b })) {
327+ return fmt .Errorf ("collecting ports... have %v, want %v" , gotPort , expectedPort )
233328 }
329+
234330 return nil
235331 }, testConfig .ReadyTimeout , curlInterval ).Should (gomega .Succeed ())
236332 }
@@ -257,28 +353,28 @@ func verifyMetrics() {
257353 "inference_extension_info" ,
258354 }
259355
260- // Generate traffic by sending requests through the inference extension
356+ // Generate traffic by sending requests through the inference extension.
261357 ginkgo .By ("Generating traffic through the inference extension" )
262358 curlCmd := getCurlCommand (envoyName , testConfig .NsName , envoyPort , modelName , curlTimeout , "/completions" , "Write as if you were a critic: San Francisco" , true )
263359
264- // Run the curl command multiple times to generate some metrics data
265- for i := 0 ; i < 5 ; i ++ {
360+ // Run the curl command multiple times to generate some metrics data.
361+ for range 5 {
266362 _ , err := testutils .ExecCommandInPod (testConfig , "curl" , "curl" , curlCmd )
267363 gomega .Expect (err ).NotTo (gomega .HaveOccurred ())
268364 }
269365
270- // modify the curl command to generate some error metrics
366+ // Modify the curl command to generate some error metrics.
271367 curlCmd [len (curlCmd )- 1 ] = "invalid input"
272- for i := 0 ; i < 5 ; i ++ {
368+ for range 5 {
273369 _ , err := testutils .ExecCommandInPod (testConfig , "curl" , "curl" , curlCmd )
274370 gomega .Expect (err ).NotTo (gomega .HaveOccurred ())
275371 }
276372
277- // Now scrape metrics from the EPP endpoint via the curl pod
373+ // Now scrape metrics from the EPP endpoint via the curl pod.
278374 ginkgo .By ("Scraping metrics from the EPP endpoint" )
279375 podIP := findReadyPod ().Status .PodIP
280376
281- // Get the authorization token for reading metrics
377+ // Get the authorization token for reading metrics.
282378 token := ""
283379 gomega .Eventually (func (g gomega.Gomega ) {
284380 t , err := getMetricsReaderToken (testConfig .K8sClient )
@@ -287,21 +383,22 @@ func verifyMetrics() {
287383 token = t
288384 }, testConfig .ExistsTimeout , testConfig .Interval ).Should (gomega .Succeed ())
289385
290- // Construct the metric scraping curl command using Pod IP
386+ // Construct the metric scraping curl command using Pod IP.
291387 metricScrapeCmd := getMetricsScrapeCommand (podIP , token )
292388
293389 ginkgo .By ("Verifying that all expected metrics are present." )
294390 gomega .Eventually (func () error {
295- // Execute the metrics scrape command inside the curl pod
391+ // Execute the metrics scrape command inside the curl pod.
392+ fmt .Printf ("pod IP: %s" , podIP )
296393 resp , err := testutils .ExecCommandInPod (testConfig , "curl" , "curl" , metricScrapeCmd )
297394 if err != nil {
298395 return err
299396 }
300- // Verify that we got a 200 OK responsecurl
397+ // Verify that we got a 200 OK responsecurl.
301398 if ! strings .Contains (resp , "200 OK" ) {
302399 return fmt .Errorf ("did not get 200 OK: %s" , resp )
303400 }
304- // Check if all expected metrics are present in the metrics output
401+ // Check if all expected metrics are present in the metrics output.
305402 for _ , metric := range expectedMetrics {
306403 if ! strings .Contains (resp , metric ) {
307404 return fmt .Errorf ("expected metric %s not found in metrics output" , metric )
@@ -389,10 +486,120 @@ func getCurlCommand(name, ns, port, model string, timeout time.Duration, api str
389486 "-H" ,
390487 "Content-Type: application/json" ,
391488 "-H" ,
489+ "Cache-Control: no-cache" ,
490+ "-H" ,
392491 fmt .Sprintf ("%v: inferenceobjective-sample" , metadata .ObjectiveKey ),
393492 "-H" ,
394493 fmt .Sprintf ("%v: %s" , metadata .ModelNameRewriteKey , targetModelName ),
494+ "-H" ,
495+ "Connection: close" ,
395496 "-d" ,
396497 string (b ),
397498 }
398499}
500+
501+ // buildContainerPorts constructs a slice of corev1.ContainerPort starting from 'start' with 'count' ports.
502+ func buildContainerPorts (start int , count int ) []corev1.ContainerPort {
503+ ports := make ([]corev1.ContainerPort , count )
504+ for i := range count {
505+ portNum := int32 (start + i )
506+ ports [i ] = corev1.ContainerPort {
507+ Name : fmt .Sprintf ("http-%d" , portNum ),
508+ ContainerPort : portNum ,
509+ Protocol : corev1 .ProtocolTCP ,
510+ }
511+ }
512+ return ports
513+ }
514+
515+ // buildTargetPorts constructs a slice of v1.Port starting from 'start' with 'count' ports.
516+ func buildTargetPorts (start int , count int ) []v1.Port {
517+ ports := make ([]v1.Port , count )
518+ for i := range count {
519+ // v1.PortNumber is usually a typedef for int32 in these APIs.
520+ ports [i ] = v1.Port {
521+ Number : v1 .PortNumber (start + i ),
522+ }
523+ }
524+ return ports
525+ }
526+
527+ // waitForDeploymentRollout waits until the Deployment has completed its update.
528+ // It ensures that the new version is fully rolled out and available.
529+ func waitForDeploymentRollout (tc * testutils.TestConfig , deploy * appsv1.Deployment ) {
530+ ginkgo .By (fmt .Sprintf ("Waiting for Deployment %s/%s to complete rollout" , deploy .Namespace , deploy .Name ))
531+
532+ key := types.NamespacedName {Name : deploy .Name , Namespace : deploy .Namespace }
533+
534+ gomega .Eventually (func () error {
535+ currentDeploy := & appsv1.Deployment {}
536+ if err := tc .K8sClient .Get (tc .Context , key , currentDeploy ); err != nil {
537+ return err
538+ }
539+
540+ if currentDeploy .Generation > currentDeploy .Status .ObservedGeneration {
541+ return fmt .Errorf ("deployment generation not observed yet" )
542+ }
543+
544+ desiredReplicas := * currentDeploy .Spec .Replicas
545+
546+ if currentDeploy .Status .UpdatedReplicas < desiredReplicas {
547+ return fmt .Errorf ("waiting for updated replicas: %d/%d" , currentDeploy .Status .UpdatedReplicas , desiredReplicas )
548+ }
549+
550+ if currentDeploy .Status .AvailableReplicas < desiredReplicas {
551+ return fmt .Errorf ("waiting for available replicas: %d/%d" , currentDeploy .Status .AvailableReplicas , desiredReplicas )
552+ }
553+
554+ if currentDeploy .Status .Replicas > desiredReplicas {
555+ return fmt .Errorf ("waiting for old replicas to terminate: %d > %d" , currentDeploy .Status .Replicas , desiredReplicas )
556+ }
557+
558+ return nil
559+ }, testConfig .ReadyTimeout , testConfig .Interval ).Should (gomega .Succeed (), "Deployment failed to roll out within timeout" )
560+
561+ ginkgo .By ("Deployment rollout complete" )
562+ }
563+
564+ // waitForDeploymentReady waits for the Deployment to have all replicas ready.
565+ func waitForDeploymentReady (tc * testutils.TestConfig , deploy * appsv1.Deployment ) {
566+ ginkgo .By (fmt .Sprintf ("waiting for Deployment %s/%s to be ready" , deploy .Namespace , deploy .Name ))
567+
568+ key := types.NamespacedName {Name : deploy .Name , Namespace : deploy .Namespace }
569+
570+ gomega .Eventually (func () error {
571+ current := & appsv1.Deployment {}
572+ if err := tc .K8sClient .Get (tc .Context , key , current ); err != nil {
573+ return err
574+ }
575+
576+ if current .Status .Replicas != current .Status .ReadyReplicas {
577+ return fmt .Errorf ("replicas mismatch: expected %d, got %d ready" ,
578+ current .Status .Replicas , current .Status .ReadyReplicas )
579+ }
580+
581+ if current .Status .ReadyReplicas == 0 {
582+ return fmt .Errorf ("no replicas are ready yet" )
583+ }
584+
585+ return nil
586+ }, testConfig .ReadyTimeout , testConfig .Interval ).Should (gomega .Succeed ())
587+ }
588+
589+ // generateSequence generates a sequence of integers starting from 'start' with 'count' numbers.
590+ func generateSequence (start int , count int ) []int {
591+ nums := make ([]int , count )
592+ for i := range count {
593+ nums [i ] = start + i
594+ }
595+ return nums
596+ }
597+
598+ // Calculates the nth harmonic number.
599+ func harmonicNumber (n int ) int {
600+ h := 0
601+ for i := 1 ; i <= n ; i ++ {
602+ h += 1 / i
603+ }
604+ return h
605+ }
0 commit comments