Skip to content

Commit 02957f6

Browse files
dgnclaude
authored andcommitted
Support multiple targetPorts on an InferencePool (#58238)
* Update GIE CRDs to include support for >1 targetPorts I planned on updating to v1.1.0 but ran into dependency issues. Now pointing to the commit that loosened restrictions on number of targetPorts. * Add support for multiple targetPorts in InferencePool This adds support for multiple targetPorts in an InferencePool by adding all targetPorts to the shadow service, and then making sure that only a single cluster is created for the dummy port (54321), allowing the EPP to loadbalance across all endpoints. * Add release note * Add integration test and EPP mock Co-Authored-By: Claude <[email protected]> --------- Co-authored-by: Claude <[email protected]>
1 parent 2345dc2 commit 02957f6

File tree

22 files changed

+958
-61
lines changed

22 files changed

+958
-61
lines changed

pilot/pkg/config/kube/gateway/conversion.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1114,9 +1114,18 @@ func buildDestination(ctx RouteContext, to k8s.BackendRef, ns string,
11141114
if ipCfg.endpointPickerDst == "" || ipCfg.endpointPickerPort == "" || ipCfg.endpointPickerFailureMode == "" {
11151115
invalidBackendErr = &ConfigError{Reason: InvalidDestination, Message: "InferencePool service invalid, extensionRef labels not found"}
11161116
}
1117+
1118+
// For InferencePool, always use the first service port (54321).
1119+
// The cluster for that service port will include all endpoints for all
1120+
// target ports, allowing the EPP to load-balance across them.
1121+
var destPort uint32
1122+
if len(svc.Ports) > 0 {
1123+
destPort = uint32(svc.Ports[0].Port)
1124+
}
1125+
11171126
return &istio.Destination{
11181127
Host: hostname,
1119-
// Port: &istio.PortSelector{Number: uint32(*to.Port)},
1128+
Port: &istio.PortSelector{Number: destPort},
11201129
}, ipCfg, invalidBackendErr
11211130
default:
11221131
return &istio.Destination{}, nil, &ConfigError{

pilot/pkg/config/kube/gateway/conversion_test.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,14 @@ var ports = []*model.Port{
7777
},
7878
}
7979

80+
var inferencePoolPorts = []*model.Port{
81+
{
82+
Name: "http",
83+
Port: 54321,
84+
Protocol: "HTTP",
85+
},
86+
}
87+
8088
var services = []*model.Service{
8189
{
8290
Attributes: model.ServiceAttributes{
@@ -135,7 +143,7 @@ var services = []*model.Service{
135143
InferencePoolExtensionRefFailureMode: "FailClose",
136144
},
137145
},
138-
Ports: ports,
146+
Ports: inferencePoolPorts,
139147
Hostname: host.Name(fmt.Sprintf("%s.default.svc.domain.suffix", firstValue(InferencePoolServiceName("infpool-gen")))),
140148
},
141149
{
@@ -147,7 +155,7 @@ var services = []*model.Service{
147155
InferencePoolExtensionRefFailureMode: "FailClose",
148156
},
149157
},
150-
Ports: ports,
158+
Ports: inferencePoolPorts,
151159
Hostname: host.Name(fmt.Sprintf("%s.default.svc.domain.suffix", firstValue(InferencePoolServiceName("infpool-gen2")))),
152160
},
153161

pilot/pkg/config/kube/gateway/inferencepool_collection.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -506,15 +506,19 @@ func InferencePoolServiceName(poolName string) (string, error) {
506506
}
507507

508508
func translateShadowServiceToService(shadow shadowServiceInfo, extRef extRefInfo) *corev1.Service {
509-
// Create the ports used by the shadow service
509+
// Create multiple ports for the shadow service - one for each InferencePool targetPort.
510+
// This allows Istio to discover endpoints for all targetPorts.
511+
// We use dummy service ports (54321, 54322, etc.) that map to the actual targetPorts.
512+
baseDummyPort := int32(54321)
510513
ports := make([]corev1.ServicePort, 0, len(shadow.targetPorts))
511-
dummyPort := int32(54321) // Dummy port, not used for anything
512-
for i, port := range shadow.targetPorts {
514+
515+
for i, tp := range shadow.targetPorts {
516+
portName := fmt.Sprintf("http-%d", i)
513517
ports = append(ports, corev1.ServicePort{
514-
Name: "port" + strconv.Itoa(i),
518+
Name: portName,
515519
Protocol: corev1.ProtocolTCP,
516-
Port: dummyPort + int32(i),
517-
TargetPort: intstr.FromInt(int(port.port)),
520+
Port: baseDummyPort + int32(i),
521+
TargetPort: intstr.FromInt(int(tp.port)),
518522
})
519523
}
520524

pilot/pkg/config/kube/gateway/inferencepool_test.go

Lines changed: 51 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ func TestReconcileInferencePool(t *testing.T) {
4040
expectedAnnotations map[string]string
4141
expectedLabels map[string]string
4242
expectedServiceName string
43-
expectedTargetPort int32
43+
expectedTargetPorts []int32
4444
}{
4545
{
4646
name: "basic shadow service creation",
@@ -72,7 +72,7 @@ func TestReconcileInferencePool(t *testing.T) {
7272
constants.InternalServiceSemantics: constants.ServiceSemanticsInferencePool,
7373
InferencePoolRefLabel: "test-pool",
7474
},
75-
expectedTargetPort: 8080,
75+
expectedTargetPorts: []int32{8080},
7676
},
7777
{
7878
name: "user label and annotation preservation",
@@ -136,7 +136,7 @@ func TestReconcileInferencePool(t *testing.T) {
136136
"user.example.com/my-label": "user-value",
137137
"another.domain.com/label": "another-value",
138138
},
139-
expectedTargetPort: 8080,
139+
expectedTargetPorts: []int32{8080},
140140
},
141141
{
142142
name: "very long inferencepool name",
@@ -169,7 +169,45 @@ func TestReconcileInferencePool(t *testing.T) {
169169
InferencePoolRefLabel: "very-long-inference-pool-name-that-should-be-truncated-properly",
170170
},
171171
expectedServiceName: "very-long-inference-pool-name-that-should-be-trunca-ip-6d24df6a",
172-
expectedTargetPort: 9090,
172+
expectedTargetPorts: []int32{9090},
173+
},
174+
{
175+
name: "multiple target ports creates single service port",
176+
inferencePool: &inferencev1.InferencePool{
177+
ObjectMeta: metav1.ObjectMeta{
178+
Name: "multi-port-pool",
179+
Namespace: "default",
180+
},
181+
Spec: inferencev1.InferencePoolSpec{
182+
TargetPorts: []inferencev1.Port{
183+
{
184+
Number: inferencev1.PortNumber(8000),
185+
},
186+
{
187+
Number: inferencev1.PortNumber(8001),
188+
},
189+
{
190+
Number: inferencev1.PortNumber(8002),
191+
},
192+
},
193+
Selector: inferencev1.LabelSelector{
194+
MatchLabels: map[inferencev1.LabelKey]inferencev1.LabelValue{
195+
"app": "multiport",
196+
},
197+
},
198+
EndpointPickerRef: inferencev1.EndpointPickerRef{
199+
Name: "dummy",
200+
Port: &inferencev1.Port{
201+
Number: inferencev1.PortNumber(5421),
202+
},
203+
},
204+
},
205+
},
206+
expectedLabels: map[string]string{
207+
constants.InternalServiceSemantics: constants.ServiceSemanticsInferencePool,
208+
InferencePoolRefLabel: "multi-port-pool",
209+
},
210+
expectedTargetPorts: []int32{8000, 8001, 8002},
173211
},
174212
}
175213

@@ -217,8 +255,15 @@ func TestReconcileInferencePool(t *testing.T) {
217255
for key, expectedValue := range tc.expectedAnnotations {
218256
assert.Equal(t, service.Annotations[key], expectedValue, fmt.Sprintf("Annotation '%s' should have value '%s'", key, expectedValue))
219257
}
220-
assert.Equal(t, service.Spec.Ports[0].Port, int32(54321)) // dummyPort + i
221-
assert.Equal(t, service.Spec.Ports[0].TargetPort.IntVal, tc.expectedTargetPort)
258+
expectedPortCount := len(tc.inferencePool.Spec.TargetPorts)
259+
assert.Equal(t, len(service.Spec.Ports), expectedPortCount, fmt.Sprintf("Shadow service should have %d ports", expectedPortCount))
260+
261+
for i := 1; i < len(service.Spec.Ports); i++ {
262+
assert.Equal(t, service.Spec.Ports[i].Port, int32(54321+i))
263+
assert.Equal(t, service.Spec.Ports[i].TargetPort.IntVal, tc.expectedTargetPorts[i])
264+
assert.Equal(t, service.Spec.Ports[i].Name, fmt.Sprintf("http-%d", i))
265+
}
266+
222267
assert.Equal(t, service.OwnerReferences[0].Name, tc.inferencePool.Name)
223268
})
224269
}

pilot/pkg/config/kube/gateway/testdata/http.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -398,6 +398,8 @@ metadata:
398398
spec:
399399
targetPorts:
400400
- number: 8000
401+
- number: 8001
402+
- number: 8002
401403
selector:
402404
matchLabels:
403405
app: vllm-llama3-8b-instruct

pilot/pkg/config/kube/gateway/testdata/http.yaml.golden

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,8 @@ spec:
235235
route:
236236
- destination:
237237
host: infpool-gen-ip-6580eb2c.default.svc.domain.suffix
238+
port:
239+
number: 54321
238240
- match:
239241
- headers:
240242
my-header:
@@ -245,6 +247,8 @@ spec:
245247
route:
246248
- destination:
247249
host: infpool-gen2-ip-97b729d1.default.svc.domain.suffix
250+
port:
251+
number: 54321
248252
---
249253
apiVersion: networking.istio.io/v1
250254
kind: VirtualService

pilot/pkg/model/push_context.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2526,6 +2526,25 @@ func (ps *PushContext) BestEffortInferServiceMTLSMode(tp *networking.TrafficPoli
25262526
// ServiceEndpointsByPort returns the cached instances by port if it exists.
25272527
func (ps *PushContext) ServiceEndpointsByPort(svc *Service, port int, labels labels.Instance) []*IstioEndpoint {
25282528
var out []*IstioEndpoint
2529+
2530+
// For InferencePool services, return ALL endpoints regardless of port
2531+
// because they may have different target ports but belong to the same cluster
2532+
if svc.UseInferenceSemantics() {
2533+
allPorts := ps.ServiceIndex.instancesByPort[svc.Key()]
2534+
for _, instances := range allPorts {
2535+
if len(labels) == 0 {
2536+
out = append(out, instances...)
2537+
continue
2538+
}
2539+
for _, instance := range instances {
2540+
if labels.SubsetOf(instance.Labels) {
2541+
out = append(out, instance)
2542+
}
2543+
}
2544+
}
2545+
return out
2546+
}
2547+
25292548
if instances, exists := ps.ServiceIndex.instancesByPort[svc.Key()][port]; exists {
25302549
// Use cached version of instances by port when labels are empty.
25312550
if len(labels) == 0 {

pilot/pkg/networking/core/cluster.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -315,10 +315,15 @@ func (configgen *ConfigGeneratorImpl) buildOutboundClusters(cb *ClusterBuilder,
315315
if service.Resolution == model.Alias {
316316
continue
317317
}
318-
for _, port := range service.Ports {
318+
for i, port := range service.Ports {
319319
if port.Protocol == protocol.UDP {
320320
continue
321321
}
322+
// For InferencePool services, only build cluster for the first port
323+
// All endpoints from all ports are merged into this single cluster
324+
if service.UseInferenceSemantics() && i > 0 {
325+
continue
326+
}
322327
clusterKey := buildClusterKey(service, port, cb, proxy, efKeys)
323328
cached, allFound := cb.getAllCachedSubsetClusters(clusterKey)
324329
if allFound && !features.EnableUnsafeAssertions {

pilot/pkg/xds/endpoints/endpoint_builder.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -362,9 +362,14 @@ func (b *EndpointBuilder) BuildClusterLoadAssignment(endpointIndex *model.Endpoi
362362

363363
svcEps := b.snapshotShards(endpointIndex)
364364
svcEps = slices.FilterInPlace(svcEps, func(ep *model.IstioEndpoint) bool {
365-
// filter out endpoints that don't match the service port
366-
if svcPort.Name != ep.ServicePortName {
367-
return false
365+
// For InferencePool services, include endpoints from all service ports
366+
// They use multiple service ports (54321+i) mapped to different targetPorts
367+
// but we want all endpoints in a single cluster so the EPP can load-balance across them
368+
if !b.service.UseInferenceSemantics() {
369+
// filter out endpoints that don't match the service port
370+
if svcPort.Name != ep.ServicePortName {
371+
return false
372+
}
368373
}
369374
// filter out endpoint that has invalid ip address, mostly domain name. Because this is generated from ServiceEntry.
370375
// There are other two cases that should not be filtered out:

pkg/test/echo/cmd/server/main.go

Lines changed: 42 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -33,28 +33,29 @@ import (
3333
)
3434

3535
var (
36-
httpPorts []int
37-
grpcPorts []int
38-
tcpPorts []int
39-
udpPorts []int
40-
tlsPorts []int
41-
mtlsPorts []int
42-
hbonePorts []int
43-
doubleHbonePorts []int
44-
instanceIPPorts []int
45-
localhostIPPorts []int
46-
serverFirstPorts []int
47-
proxyProtocolPorts []int
48-
xdsGRPCServers []int
49-
metricsPort int
50-
uds string
51-
version string
52-
cluster string
53-
crt string
54-
key string
55-
ca string
56-
istioVersion string
57-
disableALPN bool
36+
httpPorts []int
37+
grpcPorts []int
38+
tcpPorts []int
39+
udpPorts []int
40+
tlsPorts []int
41+
mtlsPorts []int
42+
hbonePorts []int
43+
doubleHbonePorts []int
44+
instanceIPPorts []int
45+
localhostIPPorts []int
46+
serverFirstPorts []int
47+
proxyProtocolPorts []int
48+
xdsGRPCServers []int
49+
endpointPickerPorts []int
50+
metricsPort int
51+
uds string
52+
version string
53+
cluster string
54+
crt string
55+
key string
56+
ca string
57+
istioVersion string
58+
disableALPN bool
5859

5960
loggingOptions = log.DefaultOptions()
6061

@@ -66,7 +67,7 @@ var (
6667
PersistentPreRunE: configureLogging,
6768
Run: func(cmd *cobra.Command, args []string) {
6869
shutdown := NewShutdown()
69-
ports := make(common.PortList, len(httpPorts)+len(grpcPorts)+len(tcpPorts)+len(udpPorts)+len(hbonePorts)+len(doubleHbonePorts))
70+
ports := make(common.PortList, len(httpPorts)+len(grpcPorts)+len(tcpPorts)+len(udpPorts)+len(hbonePorts)+len(doubleHbonePorts)+len(endpointPickerPorts))
7071
tlsByPort := map[int]bool{}
7172
mtlsByPort := map[int]bool{}
7273
for _, p := range tlsPorts {
@@ -89,6 +90,10 @@ var (
8990
for _, p := range xdsGRPCServers {
9091
xdsGRPCByPort[p] = true
9192
}
93+
endpointPickerByPort := map[int]bool{}
94+
for _, p := range endpointPickerPorts {
95+
endpointPickerByPort[p] = true
96+
}
9297
portIndex := 0
9398
for i, p := range httpPorts {
9499
ports[portIndex] = &common.Port{
@@ -151,6 +156,18 @@ var (
151156
}
152157
portIndex++
153158
}
159+
for i, p := range endpointPickerPorts {
160+
ports[portIndex] = &common.Port{
161+
Name: "endpoint-picker-" + strconv.Itoa(i),
162+
Protocol: protocol.GRPC,
163+
Port: p,
164+
TLS: tlsByPort[p],
165+
ServerFirst: serverFirstByPort[p],
166+
ProxyProtocol: proxyProtocolByPort[p],
167+
EndpointPicker: true,
168+
}
169+
portIndex++
170+
}
154171

155172
instanceIPByPort := map[int]struct{}{}
156173
for _, p := range instanceIPPorts {
@@ -250,6 +267,8 @@ func init() {
250267
rootCmd.PersistentFlags().IntSliceVar(&serverFirstPorts, "server-first", []int{}, "Ports that are server first. These must be defined as tcp.")
251268
rootCmd.PersistentFlags().IntSliceVar(&proxyProtocolPorts, "proxy-protocol", []int{}, "Ports that are wrapped in HA-PROXY protocol.")
252269
rootCmd.PersistentFlags().IntSliceVar(&xdsGRPCServers, "xds-grpc-server", []int{}, "Ports that should rely on XDS configuration to serve.")
270+
rootCmd.PersistentFlags().IntSliceVar(&endpointPickerPorts, "endpoint-picker", []int{},
271+
"Endpoint picker (ext_proc) ports. These are GRPC ports that implement the Envoy external processor protocol.")
253272
rootCmd.PersistentFlags().IntVar(&metricsPort, "metrics", 0, "Metrics port")
254273
rootCmd.PersistentFlags().StringVar(&uds, "uds", "", "HTTP server on unix domain socket")
255274
rootCmd.PersistentFlags().StringVar(&version, "version", "", "Version string")

0 commit comments

Comments
 (0)