Skip to content

Commit 9de9d4b

Browse files
committed
Conformance: Adds Weight-Based Traffic Splitting Test
Signed-off-by: Daneyon Hansen <[email protected]>
1 parent 534ee87 commit 9de9d4b

File tree

2 files changed

+246
-0
lines changed

2 files changed

+246
-0
lines changed
Lines changed: 216 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,216 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package tests
18+
19+
import (
20+
"fmt"
21+
"math"
22+
"net/http"
23+
"slices"
24+
"strings"
25+
"sync/atomic"
26+
"testing"
27+
28+
"github.com/stretchr/testify/require"
29+
"golang.org/x/sync/errgroup"
30+
"k8s.io/apimachinery/pkg/types"
31+
gwhttp "sigs.k8s.io/gateway-api/conformance/utils/http"
32+
"sigs.k8s.io/gateway-api/conformance/utils/suite"
33+
"sigs.k8s.io/gateway-api/pkg/features"
34+
35+
"sigs.k8s.io/gateway-api-inference-extension/conformance/resources"
36+
k8sutils "sigs.k8s.io/gateway-api-inference-extension/conformance/utils/kubernetes"
37+
"sigs.k8s.io/gateway-api-inference-extension/conformance/utils/traffic"
38+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/test"
39+
)
40+
41+
func init() {
42+
ConformanceTests = append(ConformanceTests, GatewayWeightedAcrossTwoInferencePools)
43+
}
44+
45+
// GatewayWeightedAcrossTwoInferencePools verifies that Gateway splits traffic across two
46+
// InferencePools according to backendRef weights, and that each request is routed to an
47+
// endpoint of the selected InferencePool.
48+
var GatewayWeightedAcrossTwoInferencePools = suite.ConformanceTest{
49+
ShortName: "GatewayWeightedAcrossTwoInferencePools",
50+
Description: "Gateway should split traffic across two InferencePools based on backendRef weights and route only to endpoints of the selected InferencePool",
51+
Manifests: []string{"tests/gateway_weighted_two_pools.yaml"},
52+
Features: []features.FeatureName{
53+
features.SupportGateway,
54+
features.FeatureName("SupportInferencePool"),
55+
features.SupportGateway,
56+
},
57+
Test: func(t *testing.T, s *suite.ConformanceTestSuite) {
58+
const (
59+
hostname = "primary.example.com"
60+
path = "/weighted-two-pools-test"
61+
62+
// Sample size so the weight signal dominates random noise.
63+
totalRequests = 200
64+
concurrentRequests = 5
65+
66+
// These route weights must match the test manifest.
67+
primaryWeight = 70
68+
secondaryWeight = 30
69+
)
70+
71+
// Objects under test.
72+
httpRouteNN := types.NamespacedName{Name: "httproute-weighted-two-pools", Namespace: resources.AppBackendNamespace}
73+
gatewayNN := resources.PrimaryGatewayNN
74+
primaryPoolNN := resources.PrimaryInferencePoolNN
75+
secondaryPoolNN := types.NamespacedName{Name: "secondary-inference-pool", Namespace: resources.AppBackendNamespace}
76+
77+
// Labels for the two deployments defined in base.yaml.
78+
primaryLabels := map[string]string{"app": "primary-inference-model-server"}
79+
secondaryLabels := map[string]string{"app": "secondary-inference-model-server"}
80+
81+
t.Log("Verifying HTTPRoute and both InferencePools are accepted and the Gateway has an address.")
82+
k8sutils.HTTPRouteMustBeAcceptedAndResolved(t, s.Client, s.TimeoutConfig, httpRouteNN, gatewayNN)
83+
k8sutils.InferencePoolMustBeAcceptedByParent(t, s.Client, primaryPoolNN, gatewayNN)
84+
k8sutils.InferencePoolMustBeAcceptedByParent(t, s.Client, secondaryPoolNN, gatewayNN)
85+
gwAddr := k8sutils.GetGatewayEndpoint(t, s.Client, s.TimeoutConfig, gatewayNN)
86+
87+
// Discover pods for each pool and build quick lookup sets.
88+
t.Logf("Fetching primary backend pods with labels: %v", primaryLabels)
89+
primaryPods, err := k8sutils.GetPodsWithLabel(t, s.Client, resources.AppBackendNamespace, primaryLabels, s.TimeoutConfig)
90+
require.NoError(t, err)
91+
require.Len(t, primaryPods, 3) // base.yaml uses 3 replicas
92+
93+
t.Logf("Fetching secondary backend pods with labels: %v", secondaryLabels)
94+
secondaryPods, err := k8sutils.GetPodsWithLabel(t, s.Client, resources.AppBackendNamespace, secondaryLabels, s.TimeoutConfig)
95+
require.NoError(t, err)
96+
require.Len(t, secondaryPods, 3) // base.yaml uses 3 replicas
97+
98+
primaryPodNames := make([]string, 0, len(primaryPods))
99+
primaryPodIPs := make([]string, 0, len(primaryPods))
100+
for _, p := range primaryPods {
101+
primaryPodNames = append(primaryPodNames, p.Name)
102+
primaryPodIPs = append(primaryPodIPs, p.Status.PodIP)
103+
}
104+
secondaryPodNames := make([]string, 0, len(secondaryPods))
105+
secondaryPodIPs := make([]string, 0, len(secondaryPods))
106+
for _, p := range secondaryPods {
107+
secondaryPodNames = append(secondaryPodNames, p.Name)
108+
secondaryPodIPs = append(secondaryPodIPs, p.Status.PodIP)
109+
}
110+
111+
// Provide a union list of eligible endpoints for the test. Each pool's EPP
112+
// should filter to endpoints that actually belong to its pool.
113+
allIPs := append(append([]string{}, primaryPodIPs...), secondaryPodIPs...)
114+
eppHeaderValue := strings.Join(allIPs, ",")
115+
116+
requestBody := `{
117+
"model": "conformance-fake-model",
118+
"prompt": "Write as if you were a critic: San Francisco"
119+
}`
120+
121+
// Send requests with the union header and verify the split roughly matches the
122+
// weight distribution of the test manifest.
123+
var (
124+
roundTripper = s.RoundTripper
125+
g errgroup.Group
126+
primaryHits int64
127+
secondaryHits int64
128+
headers = map[string]string{
129+
test.HeaderTestEppEndPointSelectionKey: eppHeaderValue,
130+
}
131+
expected = gwhttp.ExpectedResponse{
132+
Request: gwhttp.Request{
133+
Host: hostname,
134+
Path: path,
135+
Method: http.MethodPost,
136+
Headers: headers,
137+
},
138+
Response: gwhttp.Response{
139+
StatusCode: http.StatusOK,
140+
},
141+
// Leave backend empty to avoid enforcing a specific pod prefix in CompareRequest.
142+
Namespace: resources.AppBackendNamespace,
143+
}
144+
)
145+
146+
primarySet := func() map[string]struct{} {
147+
m := make(map[string]struct{}, len(primaryPodNames))
148+
for _, n := range primaryPodNames {
149+
m[n] = struct{}{}
150+
}
151+
return m
152+
}()
153+
secondarySet := func() map[string]struct{} {
154+
m := make(map[string]struct{}, len(secondaryPodNames))
155+
for _, n := range secondaryPodNames {
156+
m[n] = struct{}{}
157+
}
158+
return m
159+
}()
160+
161+
req := gwhttp.MakeRequest(t, &expected, gwAddr, "HTTP", "http")
162+
g.SetLimit(concurrentRequests)
163+
for range totalRequests {
164+
g.Go(func() error {
165+
cReq, cRes, err := traffic.MakeCallRoundTripper(t, roundTripper, &traffic.RequestWithBody{
166+
Request: req,
167+
Body: strings.NewReader(requestBody),
168+
})
169+
if err != nil {
170+
return fmt.Errorf("failed to roundtrip request: %w", err)
171+
}
172+
if err := gwhttp.CompareRequest(t, &req, cReq, cRes, expected); err != nil {
173+
return fmt.Errorf("response expectation failed: %w", err)
174+
}
175+
176+
// Attribute response to pool by the backend pod name.
177+
if _, ok := primarySet[cReq.Pod]; ok {
178+
atomic.AddInt64(&primaryHits, 1)
179+
} else if _, ok := secondarySet[cReq.Pod]; ok {
180+
atomic.AddInt64(&secondaryHits, 1)
181+
} else {
182+
return fmt.Errorf("request was handled by unexpected pod %q (not in either pool)", cReq.Pod)
183+
}
184+
185+
return nil
186+
})
187+
}
188+
require.NoError(t, g.Wait(), "requests failed")
189+
190+
ph := float64(atomic.LoadInt64(&primaryHits))
191+
sh := float64(atomic.LoadInt64(&secondaryHits))
192+
total := ph + sh
193+
require.Greater(t, total, 0.0)
194+
195+
observedPrimary := ph / total
196+
expectedPrimary := float64(primaryWeight) / float64(primaryWeight+secondaryWeight)
197+
198+
// Allow either a 10 percentage-point absolute error, or a 3-sigma
199+
// binomial confidence interval (whichever is larger). This keeps
200+
// flakiness low while still detecting obvious mis-weighting.
201+
sigma := math.Sqrt(expectedPrimary * (1.0 - expectedPrimary) / total)
202+
absTolerance := math.Max(0.10, 3.0*sigma)
203+
204+
diff := math.Abs(observedPrimary - expectedPrimary)
205+
if diff > absTolerance {
206+
t.Fatalf("weighted split out of bounds: observed primary=%.3f (hits=%d/%d), expected=%.3f, tolerance=±%.3f",
207+
observedPrimary, int64(ph), int64(total), expectedPrimary, absTolerance)
208+
}
209+
210+
t.Logf("Weighted split OK: primary=%.3f (hits=%d/%d), expected=%.3f, tolerance=±%.3f; secondary hits=%d",
211+
observedPrimary, int64(ph), int64(total), expectedPrimary, absTolerance, int64(sh))
212+
213+
// Sanity: ensure responses only came from pods we enumerated.
214+
require.True(t, slices.Contains([]int{int(ph + sh)}, int(total)))
215+
},
216+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
apiVersion: gateway.networking.k8s.io/v1
2+
kind: HTTPRoute
3+
metadata:
4+
name: httproute-weighted-two-pools
5+
namespace: gateway-conformance-app-backend
6+
spec:
7+
parentRefs:
8+
- group: gateway.networking.k8s.io
9+
kind: Gateway
10+
name: conformance-primary
11+
namespace: gateway-conformance-infra
12+
sectionName: http
13+
hostnames:
14+
- "primary.example.com"
15+
rules:
16+
- matches:
17+
- path:
18+
type: PathPrefix
19+
value: /weighted-two-pools-test
20+
backendRefs:
21+
# 70% of traffic goes to the primary pool
22+
- group: inference.networking.k8s.io
23+
kind: InferencePool
24+
name: primary-inference-pool
25+
weight: 70
26+
# 30% of traffic goes to the secondary pool
27+
- group: inference.networking.k8s.io
28+
kind: InferencePool
29+
name: secondary-inference-pool
30+
weight: 30

0 commit comments

Comments
 (0)