diff --git a/bin/update_crds.sh b/bin/update_crds.sh index 917195bee5..ca9d9786e2 100755 --- a/bin/update_crds.sh +++ b/bin/update_crds.sh @@ -77,7 +77,8 @@ fi rm -f "${ROOTDIR}/tests/integration/pilot/testdata/gateway-api-crd.yaml" cp "${API_TMP}/gateway-api-crd.yaml" "${ROOTDIR}/tests/integration/pilot/testdata/gateway-api-crd.yaml" -GATEWAY_API_INFERENCE_VERSION=$(grep "gateway-api-inference-extension" go.mod | awk '{ print $2 }') +#GATEWAY_API_INFERENCE_VERSION=$(grep "gateway-api-inference-extension" go.mod | awk '{ print $2 }') +GATEWAY_API_INFERENCE_VERSION="v0.0.0-20250926182816-0a3bb2010751" if [[ ${GATEWAY_API_INFERENCE_VERSION} == *"-"* && ! ${GATEWAY_API_INFERENCE_VERSION} =~ -rc.?[0-9]$ ]]; then # not an official release or release candidate, so get the commit sha SHORT_SHA=$(echo "${GATEWAY_API_INFERENCE_VERSION}" | awk -F '-' '{ print $NF }') diff --git a/pilot/pkg/config/kube/gateway/controller.go b/pilot/pkg/config/kube/gateway/controller.go index 2283f5171d..84dfeaa6a3 100644 --- a/pilot/pkg/config/kube/gateway/controller.go +++ b/pilot/pkg/config/kube/gateway/controller.go @@ -293,7 +293,7 @@ func NewController( // Create a queue for handling service updates. // We create the queue even if the env var is off just to prevent nil pointer issues. c.shadowServiceReconciler = controllers.NewQueue("inference pool shadow service reconciler", - controllers.WithReconciler(c.reconcileShadowService(svcClient, InferencePools, inputs.Services)), + controllers.WithReconciler(c.reconcileShadowService(kc, InferencePools, inputs.Services)), controllers.WithMaxAttempts(5)) if features.EnableGatewayAPIInferenceExtension { diff --git a/pilot/pkg/config/kube/gateway/conversion.go b/pilot/pkg/config/kube/gateway/conversion.go index 1345d2d2a9..60a22b6b6e 100644 --- a/pilot/pkg/config/kube/gateway/conversion.go +++ b/pilot/pkg/config/kube/gateway/conversion.go @@ -1114,9 +1114,18 @@ func buildDestination(ctx RouteContext, to k8s.BackendRef, ns string, if ipCfg.endpointPickerDst == "" || ipCfg.endpointPickerPort == "" || ipCfg.endpointPickerFailureMode == "" { invalidBackendErr = &ConfigError{Reason: InvalidDestination, Message: "InferencePool service invalid, extensionRef labels not found"} } + + // For InferencePool, always use the first service port (54321). + // The cluster for that service port will include all endpoints for all + // target ports, allowing the EPP to load-balance across them. + var destPort uint32 + if len(svc.Ports) > 0 { + destPort = uint32(svc.Ports[0].Port) + } + return &istio.Destination{ Host: hostname, - // Port: &istio.PortSelector{Number: uint32(*to.Port)}, + Port: &istio.PortSelector{Number: destPort}, }, ipCfg, invalidBackendErr default: return &istio.Destination{}, nil, &ConfigError{ diff --git a/pilot/pkg/config/kube/gateway/conversion_test.go b/pilot/pkg/config/kube/gateway/conversion_test.go index f711681aa8..5c1e9f5b3a 100644 --- a/pilot/pkg/config/kube/gateway/conversion_test.go +++ b/pilot/pkg/config/kube/gateway/conversion_test.go @@ -77,6 +77,14 @@ var ports = []*model.Port{ }, } +var inferencePoolPorts = []*model.Port{ + { + Name: "http", + Port: 54321, + Protocol: "HTTP", + }, +} + var services = []*model.Service{ { Attributes: model.ServiceAttributes{ @@ -135,7 +143,7 @@ var services = []*model.Service{ InferencePoolExtensionRefFailureMode: "FailClose", }, }, - Ports: ports, + Ports: inferencePoolPorts, Hostname: host.Name(fmt.Sprintf("%s.default.svc.domain.suffix", firstValue(InferencePoolServiceName("infpool-gen")))), }, { @@ -147,7 +155,7 @@ var services = []*model.Service{ InferencePoolExtensionRefFailureMode: "FailClose", }, }, - Ports: ports, + Ports: inferencePoolPorts, Hostname: host.Name(fmt.Sprintf("%s.default.svc.domain.suffix", firstValue(InferencePoolServiceName("infpool-gen2")))), }, diff --git a/pilot/pkg/config/kube/gateway/inferencepool_collection.go b/pilot/pkg/config/kube/gateway/inferencepool_collection.go index d8a2c2cfbc..572ddd7eae 100644 --- a/pilot/pkg/config/kube/gateway/inferencepool_collection.go +++ b/pilot/pkg/config/kube/gateway/inferencepool_collection.go @@ -15,6 +15,7 @@ package gateway import ( + "context" "crypto/sha256" "fmt" "strconv" @@ -23,15 +24,15 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/json" inferencev1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" gateway "sigs.k8s.io/gateway-api/apis/v1beta1" "istio.io/istio/pkg/config/constants" "istio.io/istio/pkg/config/schema/gvk" - "istio.io/istio/pkg/kube/kclient" + "istio.io/istio/pkg/kube" "istio.io/istio/pkg/kube/krt" - "istio.io/istio/pkg/maps" "istio.io/istio/pkg/ptr" "istio.io/istio/pkg/slices" "istio.io/istio/pkg/util/sets" @@ -44,6 +45,7 @@ const ( InferencePoolExtensionRefSvc = "istio.io/inferencepool-extension-service" InferencePoolExtensionRefPort = "istio.io/inferencepool-extension-port" InferencePoolExtensionRefFailureMode = "istio.io/inferencepool-extension-failure-mode" + InferencePoolFieldManager = "istio.io/inference-pool-controller" ) // // ManagedLabel is the label used to identify resources managed by this controller @@ -503,31 +505,39 @@ func InferencePoolServiceName(poolName string) (string, error) { return svcName, nil } -func translateShadowServiceToService(existingLabels map[string]string, shadow shadowServiceInfo, extRef extRefInfo) *corev1.Service { - // Create the ports used by the shadow service +func translateShadowServiceToService(shadow shadowServiceInfo, extRef extRefInfo) *corev1.Service { + // Create multiple ports for the shadow service - one for each InferencePool targetPort. + // This allows Istio to discover endpoints for all targetPorts. + // We use dummy service ports (54321, 54322, etc.) that map to the actual targetPorts. + baseDummyPort := int32(54321) ports := make([]corev1.ServicePort, 0, len(shadow.targetPorts)) - dummyPort := int32(54321) // Dummy port, not used for anything - for i, port := range shadow.targetPorts { + + for i, tp := range shadow.targetPorts { + portName := fmt.Sprintf("http-%d", i) ports = append(ports, corev1.ServicePort{ - Name: "port" + strconv.Itoa(i), + Name: portName, Protocol: corev1.ProtocolTCP, - Port: dummyPort + int32(i), - TargetPort: intstr.FromInt(int(port.port)), + Port: baseDummyPort + int32(i), + TargetPort: intstr.FromInt(int(tp.port)), }) } // Create a new service object based on the shadow service info svc := &corev1.Service{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Service", + }, ObjectMeta: metav1.ObjectMeta{ Name: shadow.key.Name, Namespace: shadow.key.Namespace, - Labels: maps.MergeCopy(map[string]string{ + Labels: map[string]string{ InferencePoolRefLabel: shadow.poolName, InferencePoolExtensionRefSvc: extRef.name, InferencePoolExtensionRefPort: strconv.Itoa(int(extRef.port)), InferencePoolExtensionRefFailureMode: extRef.failureMode, constants.InternalServiceSemantics: constants.ServiceSemanticsInferencePool, - }, existingLabels), + }, }, Spec: corev1.ServiceSpec{ Selector: shadow.selector, @@ -550,7 +560,7 @@ func translateShadowServiceToService(existingLabels map[string]string, shadow sh } func (c *Controller) reconcileShadowService( - svcClient kclient.Client[*corev1.Service], + kubeClient kube.Client, inferencePools krt.Collection[InferencePool], servicesCollection krt.Collection[*corev1.Service], ) func(key types.NamespacedName) error { @@ -568,33 +578,35 @@ func (c *Controller) reconcileShadowService( existingService := ptr.Flatten(servicesCollection.GetKey(pool.shadowService.key.String())) // Check if we can manage this service - var existingLabels map[string]string if existingService != nil { - existingLabels = existingService.GetLabels() - canManage, _ := c.canManageShadowServiceForInference(existingService) + canManage, reason := c.canManageShadowServiceForInference(existingService) if !canManage { - log.Debugf("skipping service %s/%s, already managed by another controller", key.Namespace, key.Name) + log.Debugf("skipping service %s/%s, already managed by another controller: %s", key.Namespace, key.Name, reason) return nil } } - service := translateShadowServiceToService(existingLabels, pool.shadowService, pool.extRef) - - var err error - if existingService == nil { - // Create the service if it doesn't exist - _, err = svcClient.Create(service) - } else { - // TODO: Don't overwrite resources: https://github.com/istio/istio/issues/56667 - service.ResourceVersion = existingService.ResourceVersion - _, err = svcClient.Update(service) - } + service := translateShadowServiceToService(pool.shadowService, pool.extRef) + return c.applyShadowService(kubeClient, service) + } +} - return err +// applyShadowService uses Server-Side Apply to create or update shadow services +func (c *Controller) applyShadowService(kubeClient kube.Client, service *corev1.Service) error { + data, err := json.Marshal(service) + if err != nil { + return fmt.Errorf("failed to marshal service for SSA: %v", err) } + + ctx := context.Background() + _, err = kubeClient.Kube().CoreV1().Services(service.Namespace).Patch( + ctx, service.Name, types.ApplyPatchType, data, metav1.PatchOptions{ + FieldManager: InferencePoolFieldManager, + Force: ptr.Of(true), + }) + return err } -// canManage checks if a service should be managed by this controller func (c *Controller) canManageShadowServiceForInference(obj *corev1.Service) (bool, string) { if obj == nil { // No object exists, we can manage it diff --git a/pilot/pkg/config/kube/gateway/inferencepool_test.go b/pilot/pkg/config/kube/gateway/inferencepool_test.go index db80303253..58c80b32a5 100644 --- a/pilot/pkg/config/kube/gateway/inferencepool_test.go +++ b/pilot/pkg/config/kube/gateway/inferencepool_test.go @@ -15,73 +15,256 @@ package gateway import ( + "fmt" "testing" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/intstr" inferencev1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" "istio.io/istio/pilot/pkg/features" "istio.io/istio/pkg/config/constants" - "istio.io/istio/pkg/kube/krt" "istio.io/istio/pkg/test" "istio.io/istio/pkg/test/util/assert" ) func TestReconcileInferencePool(t *testing.T) { test.SetForTest(t, &features.EnableGatewayAPIInferenceExtension, true) - pool := &inferencev1.InferencePool{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-pool", - Namespace: "default", + + testCases := []struct { + name string + inferencePool *inferencev1.InferencePool + shadowService *corev1.Service // name is optional, if not provided, it will be generated + expectedAnnotations map[string]string + expectedLabels map[string]string + expectedServiceName string + expectedTargetPorts []int32 + }{ + { + name: "basic shadow service creation", + inferencePool: &inferencev1.InferencePool{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pool", + Namespace: "default", + }, + Spec: inferencev1.InferencePoolSpec{ + TargetPorts: []inferencev1.Port{ + { + Number: inferencev1.PortNumber(8080), + }, + }, + Selector: inferencev1.LabelSelector{ + MatchLabels: map[inferencev1.LabelKey]inferencev1.LabelValue{ + "app": "test", + }, + }, + EndpointPickerRef: inferencev1.EndpointPickerRef{ + Name: "dummy", + Port: &inferencev1.Port{ + Number: inferencev1.PortNumber(5421), + }, + }, + }, + }, + expectedLabels: map[string]string{ + constants.InternalServiceSemantics: constants.ServiceSemanticsInferencePool, + InferencePoolRefLabel: "test-pool", + }, + expectedTargetPorts: []int32{8080}, }, - Spec: inferencev1.InferencePoolSpec{ - TargetPorts: []inferencev1.Port{ - { - Number: inferencev1.PortNumber(8080), + { + name: "user label and annotation preservation", + inferencePool: &inferencev1.InferencePool{ + ObjectMeta: metav1.ObjectMeta{ + Name: "preserve-pool", + Namespace: "default", }, + Spec: inferencev1.InferencePoolSpec{ + TargetPorts: []inferencev1.Port{ + { + Number: inferencev1.PortNumber(8080), + }, + }, + Selector: inferencev1.LabelSelector{ + MatchLabels: map[inferencev1.LabelKey]inferencev1.LabelValue{ + "app": "test", + }, + }, + EndpointPickerRef: inferencev1.EndpointPickerRef{ + Name: "dummy", + Port: &inferencev1.Port{ + Number: inferencev1.PortNumber(5421), + }, + }, + }, + }, + shadowService: &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Labels: map[string]string{ + InferencePoolRefLabel: "preserve-pool", + "user.example.com/my-label": "user-value", + "another.domain.com/label": "another-value", + }, + Annotations: map[string]string{ + "user.example.com/my-annotation": "user-annotation-value", + "monitoring.example.com/scrape": "true", + }, + }, + Spec: corev1.ServiceSpec{ + Selector: map[string]string{"app": "test"}, + Type: corev1.ServiceTypeClusterIP, + ClusterIP: corev1.ClusterIPNone, + Ports: []corev1.ServicePort{ + { + Protocol: "TCP", + Port: 54321, + TargetPort: intstr.FromInt(8080), + }, + }, + }, + }, + expectedAnnotations: map[string]string{ + "user.example.com/my-annotation": "user-annotation-value", + "monitoring.example.com/scrape": "true", }, - Selector: inferencev1.LabelSelector{ - MatchLabels: map[inferencev1.LabelKey]inferencev1.LabelValue{ - "app": "test", + expectedLabels: map[string]string{ + constants.InternalServiceSemantics: constants.ServiceSemanticsInferencePool, + InferencePoolRefLabel: "preserve-pool", + "user.example.com/my-label": "user-value", + "another.domain.com/label": "another-value", + }, + expectedTargetPorts: []int32{8080}, + }, + { + name: "very long inferencepool name", + inferencePool: &inferencev1.InferencePool{ + ObjectMeta: metav1.ObjectMeta{ + Name: "very-long-inference-pool-name-that-should-be-truncated-properly", + Namespace: "default", + }, + Spec: inferencev1.InferencePoolSpec{ + TargetPorts: []inferencev1.Port{ + { + Number: inferencev1.PortNumber(9090), + }, + }, + Selector: inferencev1.LabelSelector{ + MatchLabels: map[inferencev1.LabelKey]inferencev1.LabelValue{ + "app": "longname", + }, + }, + EndpointPickerRef: inferencev1.EndpointPickerRef{ + Name: "dummy", + Port: &inferencev1.Port{ + Number: inferencev1.PortNumber(5421), + }, + }, }, }, - EndpointPickerRef: inferencev1.EndpointPickerRef{ - Name: "dummy", - Port: &inferencev1.Port{ - Number: inferencev1.PortNumber(5421), + expectedLabels: map[string]string{ + constants.InternalServiceSemantics: constants.ServiceSemanticsInferencePool, + InferencePoolRefLabel: "very-long-inference-pool-name-that-should-be-truncated-properly", + }, + expectedServiceName: "very-long-inference-pool-name-that-should-be-trunca-ip-6d24df6a", + expectedTargetPorts: []int32{9090}, + }, + { + name: "multiple target ports creates single service port", + inferencePool: &inferencev1.InferencePool{ + ObjectMeta: metav1.ObjectMeta{ + Name: "multi-port-pool", + Namespace: "default", }, + Spec: inferencev1.InferencePoolSpec{ + TargetPorts: []inferencev1.Port{ + { + Number: inferencev1.PortNumber(8000), + }, + { + Number: inferencev1.PortNumber(8001), + }, + { + Number: inferencev1.PortNumber(8002), + }, + }, + Selector: inferencev1.LabelSelector{ + MatchLabels: map[inferencev1.LabelKey]inferencev1.LabelValue{ + "app": "multiport", + }, + }, + EndpointPickerRef: inferencev1.EndpointPickerRef{ + Name: "dummy", + Port: &inferencev1.Port{ + Number: inferencev1.PortNumber(5421), + }, + }, + }, + }, + expectedLabels: map[string]string{ + constants.InternalServiceSemantics: constants.ServiceSemanticsInferencePool, + InferencePoolRefLabel: "multi-port-pool", }, + expectedTargetPorts: []int32{8000, 8001, 8002}, }, } - controller := setupController(t, - &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "default"}}, - NewGateway("test-gw", InNamespace(DefaultTestNS), WithGatewayClass("istio")), - NewHTTPRoute("test-route", InNamespace(DefaultTestNS), - WithParentRefAndStatus("test-gw", DefaultTestNS, IstioController), - WithBackendRef("test-pool", DefaultTestNS), - ), - pool, - ) - dumpOnFailure(t, krt.GlobalDebugHandler) + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + objects := []runtime.Object{ + &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "default"}}, + NewGateway(tc.name+"-gw", InNamespace("default"), WithGatewayClass("istio")), + NewHTTPRoute(tc.name+"-route", InNamespace("default"), + WithParentRefAndStatus(tc.name+"-gw", "default", "istio.io/gateway-controller"), + WithBackendRef(tc.inferencePool.Name, "default"), + ), + tc.inferencePool, + } + if tc.shadowService != nil { + // Generate the service name if not provided + if tc.shadowService.Name == "" { + generatedName, err := InferencePoolServiceName(tc.inferencePool.Name) + assert.NoError(t, err) + tc.shadowService.Name = generatedName + } + objects = append(objects, tc.shadowService) + } + controller := setupController(t, objects...) + + var service *corev1.Service + expectedSvcName, err := InferencePoolServiceName(tc.inferencePool.Name) + if tc.expectedServiceName != "" { + assert.Equal(t, expectedSvcName, tc.expectedServiceName, fmt.Sprintf("Service name should be '%s'", tc.expectedServiceName)) + } + assert.NoError(t, err) + assert.EventuallyEqual(t, func() bool { + var err error + service, err = controller.client.Kube().CoreV1().Services("default").Get(t.Context(), expectedSvcName, metav1.GetOptions{}) + if err != nil { + t.Logf("Service %s not found yet: %v", expectedSvcName, err) + return false + } + return service != nil && service.Labels[constants.InternalServiceSemantics] == constants.ServiceSemanticsInferencePool + }, true) - // Verify the service was created - var service *corev1.Service - var err error - assert.EventuallyEqual(t, func() bool { - svcName := "test-pool-ip-" + generateHash("test-pool", hashSize) - service, err = controller.client.Kube().CoreV1().Services("default").Get(t.Context(), svcName, metav1.GetOptions{}) - if err != nil { - t.Logf("Service %s not found yet: %v", svcName, err) - return false - } - return service != nil - }, true) + for key, expectedValue := range tc.expectedLabels { + assert.Equal(t, service.Labels[key], expectedValue, fmt.Sprintf("Label '%s' should have value '%s'", key, expectedValue)) + } + for key, expectedValue := range tc.expectedAnnotations { + assert.Equal(t, service.Annotations[key], expectedValue, fmt.Sprintf("Annotation '%s' should have value '%s'", key, expectedValue)) + } + expectedPortCount := len(tc.inferencePool.Spec.TargetPorts) + assert.Equal(t, len(service.Spec.Ports), expectedPortCount, fmt.Sprintf("Shadow service should have %d ports", expectedPortCount)) - assert.Equal(t, service.ObjectMeta.Labels[constants.InternalServiceSemantics], constants.ServiceSemanticsInferencePool) - assert.Equal(t, service.ObjectMeta.Labels[InferencePoolRefLabel], pool.Name) - assert.Equal(t, service.OwnerReferences[0].Name, pool.Name) - assert.Equal(t, service.Spec.Ports[0].TargetPort.IntVal, int32(8080)) - assert.Equal(t, service.Spec.Ports[0].Port, int32(54321)) // dummyPort + i + for i := 1; i < len(service.Spec.Ports); i++ { + assert.Equal(t, service.Spec.Ports[i].Port, int32(54321+i)) + assert.Equal(t, service.Spec.Ports[i].TargetPort.IntVal, tc.expectedTargetPorts[i]) + assert.Equal(t, service.Spec.Ports[i].Name, fmt.Sprintf("http-%d", i)) + } + + assert.Equal(t, service.OwnerReferences[0].Name, tc.inferencePool.Name) + }) + } } diff --git a/pilot/pkg/config/kube/gateway/testdata/http.yaml b/pilot/pkg/config/kube/gateway/testdata/http.yaml index 6831abd783..57d4008a97 100644 --- a/pilot/pkg/config/kube/gateway/testdata/http.yaml +++ b/pilot/pkg/config/kube/gateway/testdata/http.yaml @@ -398,6 +398,8 @@ metadata: spec: targetPorts: - number: 8000 + - number: 8001 + - number: 8002 selector: matchLabels: app: vllm-llama3-8b-instruct diff --git a/pilot/pkg/config/kube/gateway/testdata/http.yaml.golden b/pilot/pkg/config/kube/gateway/testdata/http.yaml.golden index 7f63124e43..e56ff9a929 100644 --- a/pilot/pkg/config/kube/gateway/testdata/http.yaml.golden +++ b/pilot/pkg/config/kube/gateway/testdata/http.yaml.golden @@ -235,6 +235,8 @@ spec: route: - destination: host: infpool-gen-ip-6580eb2c.default.svc.domain.suffix + port: + number: 54321 - match: - headers: my-header: @@ -245,6 +247,8 @@ spec: route: - destination: host: infpool-gen2-ip-97b729d1.default.svc.domain.suffix + port: + number: 54321 --- apiVersion: networking.istio.io/v1 kind: VirtualService diff --git a/pilot/pkg/model/push_context.go b/pilot/pkg/model/push_context.go index e39e38f752..9acd7a4c17 100644 --- a/pilot/pkg/model/push_context.go +++ b/pilot/pkg/model/push_context.go @@ -2526,6 +2526,25 @@ func (ps *PushContext) BestEffortInferServiceMTLSMode(tp *networking.TrafficPoli // ServiceEndpointsByPort returns the cached instances by port if it exists. func (ps *PushContext) ServiceEndpointsByPort(svc *Service, port int, labels labels.Instance) []*IstioEndpoint { var out []*IstioEndpoint + + // For InferencePool services, return ALL endpoints regardless of port + // because they may have different target ports but belong to the same cluster + if svc.UseInferenceSemantics() { + allPorts := ps.ServiceIndex.instancesByPort[svc.Key()] + for _, instances := range allPorts { + if len(labels) == 0 { + out = append(out, instances...) + continue + } + for _, instance := range instances { + if labels.SubsetOf(instance.Labels) { + out = append(out, instance) + } + } + } + return out + } + if instances, exists := ps.ServiceIndex.instancesByPort[svc.Key()][port]; exists { // Use cached version of instances by port when labels are empty. if len(labels) == 0 { diff --git a/pilot/pkg/networking/core/cluster.go b/pilot/pkg/networking/core/cluster.go index f7f963131c..2e339f6afb 100644 --- a/pilot/pkg/networking/core/cluster.go +++ b/pilot/pkg/networking/core/cluster.go @@ -315,10 +315,15 @@ func (configgen *ConfigGeneratorImpl) buildOutboundClusters(cb *ClusterBuilder, if service.Resolution == model.Alias { continue } - for _, port := range service.Ports { + for i, port := range service.Ports { if port.Protocol == protocol.UDP { continue } + // For InferencePool services, only build cluster for the first port + // All endpoints from all ports are merged into this single cluster + if service.UseInferenceSemantics() && i > 0 { + continue + } clusterKey := buildClusterKey(service, port, cb, proxy, efKeys) cached, allFound := cb.getAllCachedSubsetClusters(clusterKey) if allFound && !features.EnableUnsafeAssertions { diff --git a/pilot/pkg/networking/core/cluster_test.go b/pilot/pkg/networking/core/cluster_test.go index 55408a44f3..33e6ac904f 100644 --- a/pilot/pkg/networking/core/cluster_test.go +++ b/pilot/pkg/networking/core/cluster_test.go @@ -329,6 +329,18 @@ func TestBuildClustersForInferencePoolServices(t *testing.T) { proxyType: model.SidecarProxy, InferencePoolService: true, }, + { + testName: "InferencePool service creates single cluster for multiple ports", + clusterName: "outbound|8080||*.example.org", + proxyType: model.Router, + InferencePoolService: true, + }, + { + testName: "Regular service creates one cluster per port", + clusterName: "outbound|8080||*.example.org", + proxyType: model.Router, + InferencePoolService: false, + }, } for _, tc := range cases { t.Run(tc.testName, func(t *testing.T) { @@ -362,6 +374,16 @@ func TestBuildClustersForInferencePoolServices(t *testing.T) { g.Expect(overrideHostPolicy.GetOverrideHostSources()).NotTo(BeEmpty()) g.Expect(overrideHostPolicy.GetOverrideHostSources()[0].GetMetadata().GetKey()).To(Equal("envoy.lb")) g.Expect(overrideHostPolicy.GetOverrideHostSources()[0].GetMetadata().GetPath()[0].GetKey()).To(Equal("x-gateway-destination-endpoint")) + var serviceClusters []string + for _, c := range clusters { + if strings.Contains(c.Name, "*.example.org") && strings.HasPrefix(c.Name, "outbound|") { + serviceClusters = append(serviceClusters, c.Name) + } + } + g.Expect(len(serviceClusters)).To(Equal(1), + "expected single cluster but got %d: %v", len(serviceClusters), serviceClusters) + g.Expect(serviceClusters[0]).To(ContainSubstring("|8080|"), + "expected cluster to use first port 8080 but got %s", serviceClusters[0]) } }) } diff --git a/pilot/pkg/xds/endpoints/endpoint_builder.go b/pilot/pkg/xds/endpoints/endpoint_builder.go index 5442df3aab..a9ba2c91bb 100644 --- a/pilot/pkg/xds/endpoints/endpoint_builder.go +++ b/pilot/pkg/xds/endpoints/endpoint_builder.go @@ -362,9 +362,14 @@ func (b *EndpointBuilder) BuildClusterLoadAssignment(endpointIndex *model.Endpoi svcEps := b.snapshotShards(endpointIndex) svcEps = slices.FilterInPlace(svcEps, func(ep *model.IstioEndpoint) bool { - // filter out endpoints that don't match the service port - if svcPort.Name != ep.ServicePortName { - return false + // For InferencePool services, include endpoints from all service ports + // They use multiple service ports (54321+i) mapped to different targetPorts + // but we want all endpoints in a single cluster so the EPP can load-balance across them + if !b.service.UseInferenceSemantics() { + // filter out endpoints that don't match the service port + if svcPort.Name != ep.ServicePortName { + return false + } } // filter out endpoint that has invalid ip address, mostly domain name. Because this is generated from ServiceEntry. // There are other two cases that should not be filtered out: diff --git a/pilot/pkg/xds/endpoints/endpoint_builder_test.go b/pilot/pkg/xds/endpoints/endpoint_builder_test.go index a12aac25a6..fa5ff4d57a 100644 --- a/pilot/pkg/xds/endpoints/endpoint_builder_test.go +++ b/pilot/pkg/xds/endpoints/endpoint_builder_test.go @@ -26,6 +26,7 @@ import ( "istio.io/istio/pilot/pkg/serviceregistry/memory" "istio.io/istio/pilot/pkg/serviceregistry/util/xdsfake" "istio.io/istio/pkg/config" + "istio.io/istio/pkg/config/constants" "istio.io/istio/pkg/config/host" "istio.io/istio/pkg/config/labels" "istio.io/istio/pkg/config/mesh/meshwatcher" @@ -427,3 +428,118 @@ func TestFilterIstioEndpoint(t *testing.T) { }) } } + +func TestBuildClusterLoadAssignment_InferenceServicePortFiltering(t *testing.T) { + tests := []struct { + name string + InferencePoolService bool + expectedEndpoints int + }{ + { + name: "inference service includes endpoints from all ports", + InferencePoolService: true, + expectedEndpoints: 3, + }, + { + name: "regular service filters endpoints by port name", + InferencePoolService: false, + expectedEndpoints: 1, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + svcLabels := make(map[string]string) + if tt.InferencePoolService { + svcLabels[constants.InternalServiceSemantics] = constants.ServiceSemanticsInferencePool + } + + svc := &model.Service{ + Hostname: "example.ns.svc.cluster.local", + Attributes: model.ServiceAttributes{ + Name: "example", + Namespace: "ns", + Labels: svcLabels, + }, + Ports: model.PortList{ + {Port: 80, Protocol: protocol.HTTP, Name: "http-80"}, + {Port: 8000, Protocol: protocol.HTTP, Name: "http-8000"}, + {Port: 8001, Protocol: protocol.HTTP, Name: "http-8001"}, + }, + } + + proxy := &model.Proxy{ + Type: model.SidecarProxy, + IPAddresses: []string{"127.0.0.1"}, + Metadata: &model.NodeMetadata{ + Namespace: "ns", + NodeName: "example", + }, + ConfigNamespace: "ns", + } + + endpointIndex := model.NewEndpointIndex(model.NewXdsCache()) + shards, _ := endpointIndex.GetOrCreateEndpointShard("example.ns.svc.cluster.local", "ns") + shards.Lock() + shards.Shards[model.ShardKey{Cluster: "cluster1"}] = []*model.IstioEndpoint{ + { + Addresses: []string{"10.0.0.1"}, + ServicePortName: "http-80", + EndpointPort: 80, + HostName: "example.ns.svc.cluster.local", + Namespace: "ns", + }, + { + Addresses: []string{"10.0.0.2"}, + ServicePortName: "http-8000", + EndpointPort: 8000, + HostName: "example.ns.svc.cluster.local", + Namespace: "ns", + }, + { + Addresses: []string{"10.0.0.3"}, + ServicePortName: "http-8001", + EndpointPort: 8001, + HostName: "example.ns.svc.cluster.local", + Namespace: "ns", + }, + } + shards.Unlock() + + env := model.NewEnvironment() + env.ConfigStore = model.NewFakeStore() + env.Watcher = meshwatcher.NewTestWatcher(&meshconfig.MeshConfig{RootNamespace: "istio-system"}) + meshNetworks := meshwatcher.NewFixedNetworksWatcher(nil) + env.NetworksWatcher = meshNetworks + env.ServiceDiscovery = &localServiceDiscovery{ + services: []*model.Service{svc}, + } + xdsUpdater := xdsfake.NewFakeXDS() + if err := env.InitNetworksManager(xdsUpdater); err != nil { + t.Fatal(err) + } + env.Init() + + push := model.NewPushContext() + push.InitContext(env, nil, nil) + env.SetPushContext(push) + + builder := NewCDSEndpointBuilder( + proxy, push, + "outbound|80||example.ns.svc.cluster.local", + model.TrafficDirectionOutbound, "", "example.ns.svc.cluster.local", 80, + svc, nil) + + cla := builder.BuildClusterLoadAssignment(endpointIndex) + + var totalEndpoints int + for _, localityLbEndpoints := range cla.Endpoints { + totalEndpoints += len(localityLbEndpoints.LbEndpoints) + } + + if totalEndpoints != tt.expectedEndpoints { + t.Errorf("expected %d endpoints, got %d", tt.expectedEndpoints, totalEndpoints) + } + }) + } +} diff --git a/pkg/test/echo/cmd/server/main.go b/pkg/test/echo/cmd/server/main.go index f8b0e791d0..e156e58b1e 100644 --- a/pkg/test/echo/cmd/server/main.go +++ b/pkg/test/echo/cmd/server/main.go @@ -33,28 +33,29 @@ import ( ) var ( - httpPorts []int - grpcPorts []int - tcpPorts []int - udpPorts []int - tlsPorts []int - mtlsPorts []int - hbonePorts []int - doubleHbonePorts []int - instanceIPPorts []int - localhostIPPorts []int - serverFirstPorts []int - proxyProtocolPorts []int - xdsGRPCServers []int - metricsPort int - uds string - version string - cluster string - crt string - key string - ca string - istioVersion string - disableALPN bool + httpPorts []int + grpcPorts []int + tcpPorts []int + udpPorts []int + tlsPorts []int + mtlsPorts []int + hbonePorts []int + doubleHbonePorts []int + instanceIPPorts []int + localhostIPPorts []int + serverFirstPorts []int + proxyProtocolPorts []int + xdsGRPCServers []int + endpointPickerPorts []int + metricsPort int + uds string + version string + cluster string + crt string + key string + ca string + istioVersion string + disableALPN bool loggingOptions = log.DefaultOptions() @@ -66,7 +67,7 @@ var ( PersistentPreRunE: configureLogging, Run: func(cmd *cobra.Command, args []string) { shutdown := NewShutdown() - ports := make(common.PortList, len(httpPorts)+len(grpcPorts)+len(tcpPorts)+len(udpPorts)+len(hbonePorts)+len(doubleHbonePorts)) + ports := make(common.PortList, len(httpPorts)+len(grpcPorts)+len(tcpPorts)+len(udpPorts)+len(hbonePorts)+len(doubleHbonePorts)+len(endpointPickerPorts)) tlsByPort := map[int]bool{} mtlsByPort := map[int]bool{} for _, p := range tlsPorts { @@ -89,6 +90,10 @@ var ( for _, p := range xdsGRPCServers { xdsGRPCByPort[p] = true } + endpointPickerByPort := map[int]bool{} + for _, p := range endpointPickerPorts { + endpointPickerByPort[p] = true + } portIndex := 0 for i, p := range httpPorts { ports[portIndex] = &common.Port{ @@ -151,6 +156,18 @@ var ( } portIndex++ } + for i, p := range endpointPickerPorts { + ports[portIndex] = &common.Port{ + Name: "endpoint-picker-" + strconv.Itoa(i), + Protocol: protocol.GRPC, + Port: p, + TLS: tlsByPort[p], + ServerFirst: serverFirstByPort[p], + ProxyProtocol: proxyProtocolByPort[p], + EndpointPicker: true, + } + portIndex++ + } instanceIPByPort := map[int]struct{}{} for _, p := range instanceIPPorts { @@ -250,6 +267,8 @@ func init() { rootCmd.PersistentFlags().IntSliceVar(&serverFirstPorts, "server-first", []int{}, "Ports that are server first. These must be defined as tcp.") rootCmd.PersistentFlags().IntSliceVar(&proxyProtocolPorts, "proxy-protocol", []int{}, "Ports that are wrapped in HA-PROXY protocol.") rootCmd.PersistentFlags().IntSliceVar(&xdsGRPCServers, "xds-grpc-server", []int{}, "Ports that should rely on XDS configuration to serve.") + rootCmd.PersistentFlags().IntSliceVar(&endpointPickerPorts, "endpoint-picker", []int{}, + "Endpoint picker (ext_proc) ports. These are GRPC ports that implement the Envoy external processor protocol.") rootCmd.PersistentFlags().IntVar(&metricsPort, "metrics", 0, "Metrics port") rootCmd.PersistentFlags().StringVar(&uds, "uds", "", "HTTP server on unix domain socket") rootCmd.PersistentFlags().StringVar(&version, "version", "", "Version string") diff --git a/pkg/test/echo/common/model.go b/pkg/test/echo/common/model.go index 7a432af0b1..bc7bb7183e 100644 --- a/pkg/test/echo/common/model.go +++ b/pkg/test/echo/common/model.go @@ -57,6 +57,10 @@ type Port struct { // RequireClientCert determines if the port will be mTLS. RequireClientCert bool + // EndpointPicker indicates this port should serve as an endpoint picker (ext_proc gRPC service). + // Only valid when Protocol is GRPC. + EndpointPicker bool + // ServerFirst if a port will be server first ServerFirst bool diff --git a/pkg/test/echo/server/endpoint/endpointpicker.go b/pkg/test/echo/server/endpoint/endpointpicker.go new file mode 100644 index 0000000000..0120775a7b --- /dev/null +++ b/pkg/test/echo/server/endpoint/endpointpicker.go @@ -0,0 +1,251 @@ +// Copyright Istio Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package endpoint + +import ( + "fmt" + "io" + "net" + + extprocv3 "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" + "google.golang.org/grpc" + "google.golang.org/protobuf/types/known/structpb" + + "istio.io/istio/pkg/log" +) + +var eppLog = log.RegisterScope("epp", "endpoint picker") + +type endpointPickerServer struct { + extprocv3.UnimplementedExternalProcessorServer +} + +func (s *endpointPickerServer) Process(stream extprocv3.ExternalProcessor_ProcessServer) error { + eppLog.Info("EPP: New stream connection established") + requestCount := 0 + for { + requestCount++ + eppLog.Debugf("EPP: Waiting to receive request #%d...", requestCount) + req, err := stream.Recv() + if err == io.EOF { + eppLog.Debug("EPP: Stream closed by client (EOF)") + return nil + } + if err != nil { + eppLog.Errorf("EPP: Error receiving request: %v", err) + return err + } + + eppLog.Debugf("EPP: Received request #%d, type: %T", requestCount, req.Request) + var resp *extprocv3.ProcessingResponse + var dynamicMetadata *structpb.Struct + eppLog.Debugf("EPP: Request: %s", req.String()) + switch r := req.Request.(type) { + case *extprocv3.ProcessingRequest_RequestHeaders: + var selectedEndpoint string + + headers := r.RequestHeaders.Headers + if headers != nil { + eppLog.Debugf("EPP: Received %d headers", len(headers.Headers)) + // Check for x-endpoint header (client request for specific endpoint) + for _, h := range headers.Headers { + if h.Key == "x-endpoint" { + selectedEndpoint = string(h.RawValue) + eppLog.Infof("EPP: Received x-endpoint header: %s", selectedEndpoint) + } + // Log x-envoy headers which may contain endpoint info + if len(h.Key) > 7 && h.Key[:7] == "x-envoy" { + eppLog.Debugf("EPP: Header %s: %s", h.Key, string(h.RawValue)) + } + } + } + + if selectedEndpoint == "" { + // Default to a placeholder - in production this would query available endpoints + selectedEndpoint = "10.0.0.1:8000" + eppLog.Debugf("Using default endpoint: %s", selectedEndpoint) + } + + // Build response according to EPP specification: + // Set x-gateway-destination-endpoint in dynamic metadata (namespace: envoy.lb) + // to communicate selected endpoint to data plane + lbMetadata, err := structpb.NewStruct(map[string]interface{}{ + "x-gateway-destination-endpoint": selectedEndpoint, + }) + if err != nil { + eppLog.Errorf("Failed to create metadata: %v", err) + resp = &extprocv3.ProcessingResponse{} + } else { + // Dynamic metadata must be set with namespace as top-level key + var err error + dynamicMetadata, err = structpb.NewStruct(map[string]interface{}{ + "envoy.lb": lbMetadata.AsMap(), + }) + if err != nil { + eppLog.Errorf("Failed to create dynamic metadata: %v", err) + resp = &extprocv3.ProcessingResponse{} + } else { + resp = &extprocv3.ProcessingResponse{ + Response: &extprocv3.ProcessingResponse_RequestHeaders{ + RequestHeaders: &extprocv3.HeadersResponse{ + Response: &extprocv3.CommonResponse{ + HeaderMutation: &extprocv3.HeaderMutation{ + RemoveHeaders: []string{"x-endpoint"}, + }, + }, + }, + }, + DynamicMetadata: dynamicMetadata, + } + eppLog.Infof("EPP response: set dynamic_metadata[envoy.lb][x-gateway-destination-endpoint]=%s and removed x-endpoint header", selectedEndpoint) + } + } + + case *extprocv3.ProcessingRequest_RequestBody: + eppLog.Debug("EPP: Processing RequestBody (streaming mode - no mutation)") + // In FULL_DUPLEX_STREAMED mode, send BodyMutation with StreamedBodyResponse + resp = &extprocv3.ProcessingResponse{ + Response: &extprocv3.ProcessingResponse_RequestBody{ + RequestBody: &extprocv3.BodyResponse{ + Response: &extprocv3.CommonResponse{ + BodyMutation: &extprocv3.BodyMutation{ + Mutation: &extprocv3.BodyMutation_StreamedResponse{ + StreamedResponse: &extprocv3.StreamedBodyResponse{ + Body: r.RequestBody.Body, + EndOfStream: r.RequestBody.EndOfStream, + }, + }, + }, + }, + }, + }, + } + + case *extprocv3.ProcessingRequest_RequestTrailers: + eppLog.Debug("EPP: Processing RequestTrailers (no mutation)") + resp = &extprocv3.ProcessingResponse{ + Response: &extprocv3.ProcessingResponse_RequestTrailers{ + RequestTrailers: &extprocv3.TrailersResponse{ + HeaderMutation: &extprocv3.HeaderMutation{}, + }, + }, + } + + case *extprocv3.ProcessingRequest_ResponseHeaders: + eppLog.Debug("EPP: Processing ResponseHeaders (no mutation)") + resp = &extprocv3.ProcessingResponse{ + Response: &extprocv3.ProcessingResponse_ResponseHeaders{ + ResponseHeaders: &extprocv3.HeadersResponse{ + Response: &extprocv3.CommonResponse{}, + }, + }, + } + + case *extprocv3.ProcessingRequest_ResponseBody: + eppLog.Debug("EPP: Processing ResponseBody (streaming mode - no mutation)") + // In FULL_DUPLEX_STREAMED mode, send BodyMutation with StreamedBodyResponse + resp = &extprocv3.ProcessingResponse{ + Response: &extprocv3.ProcessingResponse_ResponseBody{ + ResponseBody: &extprocv3.BodyResponse{ + Response: &extprocv3.CommonResponse{ + BodyMutation: &extprocv3.BodyMutation{ + Mutation: &extprocv3.BodyMutation_StreamedResponse{ + StreamedResponse: &extprocv3.StreamedBodyResponse{ + Body: r.ResponseBody.Body, + EndOfStream: r.ResponseBody.EndOfStream, + }, + }, + }, + }, + }, + }, + } + + case *extprocv3.ProcessingRequest_ResponseTrailers: + eppLog.Debug("EPP: Processing ResponseTrailers (no mutation)") + resp = &extprocv3.ProcessingResponse{ + Response: &extprocv3.ProcessingResponse_ResponseTrailers{ + ResponseTrailers: &extprocv3.TrailersResponse{ + HeaderMutation: &extprocv3.HeaderMutation{}, + }, + }, + } + + default: + // For other request types, skip processing + eppLog.Warnf("EPP: Received unknown request type: %T, skipping", req.Request) + resp = &extprocv3.ProcessingResponse{} + } + + eppLog.Debugf("EPP: Sending response for request #%d", requestCount) + if err := stream.Send(resp); err != nil { + eppLog.Errorf("EPP: Error sending response: %v", err) + return err + } + eppLog.Debugf("EPP: Successfully sent response #%d", requestCount) + } +} + +// endpointPickerInstance implements the Instance interface for endpoint picker +type endpointPickerInstance struct { + Config + server *grpc.Server + listener net.Listener +} + +// newEndpointPicker creates a new endpoint picker endpoint instance. +func newEndpointPicker(config Config) *endpointPickerInstance { + return &endpointPickerInstance{ + Config: config, + } +} + +func (e *endpointPickerInstance) Start(onReady OnReadyFunc) error { + addr := fmt.Sprintf("%s:%d", e.ListenerIP, e.Port.Port) + lis, err := net.Listen("tcp", addr) + if err != nil { + return fmt.Errorf("failed to listen for endpoint picker: %v", err) + } + e.listener = lis + + e.server = grpc.NewServer() + extprocv3.RegisterExternalProcessorServer(e.server, &endpointPickerServer{}) + + go func() { + eppLog.Infof("Endpoint Picker gRPC server READY and listening on %s", addr) + eppLog.Infof("Endpoint Picker is registered and waiting for ext_proc connections from Envoy") + if err := e.server.Serve(lis); err != nil { + eppLog.Errorf("Endpoint picker server failed: %v", err) + } + eppLog.Warnf("Endpoint Picker gRPC server stopped") + }() + + onReady() + return nil +} + +func (e *endpointPickerInstance) Close() error { + if e.server != nil { + e.server.GracefulStop() + } + if e.listener != nil { + return e.listener.Close() + } + return nil +} + +func (e *endpointPickerInstance) GetConfig() Config { + return e.Config +} diff --git a/pkg/test/echo/server/endpoint/instance.go b/pkg/test/echo/server/endpoint/instance.go index 72a15c4cff..63c45736c7 100644 --- a/pkg/test/echo/server/endpoint/instance.go +++ b/pkg/test/echo/server/endpoint/instance.go @@ -30,20 +30,21 @@ type OnReadyFunc func() // Config for a single endpoint Instance. type Config struct { - IsServerReady IsServerReadyFunc - Version string - Cluster string - TLSCert string - TLSKey string - TLSCACert string - UDSServer string - Dialer common.Dialer - Port *common.Port - ListenerIP string - IstioVersion string - Namespace string - DisableALPN bool - ReportRequest func() + IsServerReady IsServerReadyFunc + Version string + Cluster string + TLSCert string + TLSKey string + TLSCACert string + UDSServer string + Dialer common.Dialer + Port *common.Port + ListenerIP string + IstioVersion string + Namespace string + DisableALPN bool + ReportRequest func() + EndpointPicker bool } // Instance of an endpoint that serves the Echo application on a single port/protocol. @@ -64,6 +65,9 @@ func New(cfg Config) (Instance, error) { case protocol.HTTP, protocol.HTTPS: return newHTTP(cfg), nil case protocol.HTTP2, protocol.GRPC: + if cfg.EndpointPicker { + return newEndpointPicker(cfg), nil + } return newGRPC(cfg), nil case protocol.TCP: return newTCP(cfg), nil diff --git a/pkg/test/echo/server/instance.go b/pkg/test/echo/server/instance.go index 8bd72b3e63..0909c427dd 100644 --- a/pkg/test/echo/server/instance.go +++ b/pkg/test/echo/server/instance.go @@ -233,7 +233,7 @@ func (s *Instance) getListenerIPs(port *common.Port) ([]string, error) { } func (s *Instance) newEndpoint(port *common.Port, listenerIP string, udsServer string) (endpoint.Instance, error) { - return endpoint.New(endpoint.Config{ + epConfig := endpoint.Config{ Port: port, UDSServer: udsServer, IsServerReady: s.isReady, @@ -247,7 +247,11 @@ func (s *Instance) newEndpoint(port *common.Port, listenerIP string, udsServer s ListenerIP: listenerIP, DisableALPN: s.DisableALPN, IstioVersion: s.IstioVersion, - }) + } + if port != nil && port.EndpointPicker { + epConfig.EndpointPicker = true + } + return endpoint.New(epConfig) } func (s *Instance) isReady() bool { diff --git a/pkg/test/framework/components/crd/gateway.go b/pkg/test/framework/components/crd/gateway.go index 637f29dc1a..915eb5b14b 100644 --- a/pkg/test/framework/components/crd/gateway.go +++ b/pkg/test/framework/components/crd/gateway.go @@ -96,3 +96,52 @@ func DeployGatewayAPI(ctx resource.Context) error { return nil }) } + +func DeployGatewayAPIInferenceExtensionOrSkip(ctx framework.TestContext) { + res := DeployGatewayAPIInferenceExtension(ctx) + if res == errSkip { + ctx.Skip(errSkip.Error()) + } + if res != nil { + ctx.Fatal(res) + } +} + +func DeployGatewayAPIInferenceExtension(ctx resource.Context) error { + cfg, _ := istio.DefaultConfig(ctx) + if !cfg.DeployGatewayAPI { + return nil + } + if !SupportsGatewayAPI(ctx) { + return errSkip + } + if err := ctx.ConfigIstio(). + File("", filepath.Join(env.IstioSrc, "tests/integration/pilot/testdata/gateway-api-inference-extension-crd.yaml")). + Apply(apply.NoCleanup); err != nil { + return err + } + // Wait until the InferencePool CRD is ready + return retry.UntilSuccess(func() error { + for _, c := range ctx.Clusters().Configs() { + crdl, err := c.Ext().ApiextensionsV1().CustomResourceDefinitions().List(context.Background(), metav1.ListOptions{}) + if err != nil { + return err + } + for _, crd := range crdl.Items { + if !strings.HasSuffix(crd.Name, "inference.networking.k8s.io") { + continue + } + found := false + for _, c := range crd.Status.Conditions { + if c.Type == apiextensions.Established && c.Status == apiextensions.ConditionTrue { + found = true + } + } + if !found { + return fmt.Errorf("crd %v not ready: %+v", crd.Name, crd.Status) + } + } + } + return nil + }) +} diff --git a/pkg/test/framework/components/echo/kube/deployment.go b/pkg/test/framework/components/echo/kube/deployment.go index 682707e5b8..a96b80c248 100644 --- a/pkg/test/framework/components/echo/kube/deployment.go +++ b/pkg/test/framework/components/echo/kube/deployment.go @@ -261,7 +261,50 @@ func GenerateDeployment(ctx resource.Context, cfg echo.Config, settings *resourc deploy = getTemplate(vmDeploymentTemplateFile) } - return tmpl.Execute(deploy, params) + deploymentYAML, err := tmpl.Execute(deploy, params) + if err != nil { + return "", err + } + + // Check if any ports are configured as endpoint pickers + var eppPorts []int + for _, port := range cfg.Ports { + if port.EndpointPicker { + eppPorts = append(eppPorts, port.ServicePort) + } + } + + // If there are endpoint picker ports, add a DestinationRule to disable mTLS for those ports + if len(eppPorts) > 0 { + drYAML := generateDestinationRuleForEPP(cfg.Service, cfg.Namespace.Name(), eppPorts) + deploymentYAML += "\n---\n" + drYAML + } + + return deploymentYAML, nil +} + +func generateDestinationRuleForEPP(service, namespace string, eppPorts []int) string { + var portSettings strings.Builder + for i, port := range eppPorts { + if i > 0 { + portSettings.WriteString("\n") + } + portSettings.WriteString(fmt.Sprintf(` - port: + number: %d + tls: + mode: DISABLE`, port)) + } + + return fmt.Sprintf(`apiVersion: networking.istio.io/v1 +kind: DestinationRule +metadata: + name: %s-epp-notls + namespace: %s +spec: + host: %s.%s.svc.cluster.local + trafficPolicy: + portLevelSettings: +%s`, service, namespace, service, namespace, portSettings.String()) } func GenerateService(cfg echo.Config, isOpenShift bool) (string, error) { @@ -675,6 +718,7 @@ func getContainerPorts(cfg echo.Config) echoCommon.PortList { InstanceIP: p.InstanceIP, LocalhostIP: p.LocalhostIP, ProxyProtocol: p.ProxyProtocol, + EndpointPicker: p.EndpointPicker, } containerPorts = append(containerPorts, cport) diff --git a/pkg/test/framework/components/echo/kube/templates/deployment.yaml b/pkg/test/framework/components/echo/kube/templates/deployment.yaml index 246c79e491..91a8338c0e 100644 --- a/pkg/test/framework/components/echo/kube/templates/deployment.yaml +++ b/pkg/test/framework/components/echo/kube/templates/deployment.yaml @@ -109,6 +109,8 @@ spec: {{- range $i, $p := $appContainer.ContainerPorts }} {{- if and $p.XDSServer (eq .Protocol "GRPC") }} - --xds-grpc-server={{ $p.Port }} +{{- else if $p.EndpointPicker }} + - --endpoint-picker={{ $p.Port }} {{- else if eq .Protocol "GRPC" }} - --grpc={{ $p.Port }} {{- else if eq .Protocol "TCP" }} diff --git a/pkg/test/framework/components/echo/port.go b/pkg/test/framework/components/echo/port.go index 13eede92ae..e35166d3f9 100644 --- a/pkg/test/framework/components/echo/port.go +++ b/pkg/test/framework/components/echo/port.go @@ -60,6 +60,10 @@ type Port struct { // ProxyProtocol determines if echo should accept PROXY protocol. ProxyProtocol bool + + // EndpointPicker indicates this port should serve as an endpoint picker (ext_proc gRPC service). + // Only valid when Protocol is GRPC. + EndpointPicker bool } // IsWorkloadOnly returns true if there is no service port specified for this Port. diff --git a/releasenotes/notes/57638.yaml b/releasenotes/notes/57638.yaml new file mode 100644 index 0000000000..3ee1f1d75a --- /dev/null +++ b/releasenotes/notes/57638.yaml @@ -0,0 +1,7 @@ +apiVersion: release-notes/v2 +kind: feature +area: traffic-management +issue: [57638] +releaseNotes: +- | + **Added** support for multiple targetPorts in an InferencePool. The possibility to have >1 targetPort was added as part of GIE v1.1.0. diff --git a/tests/integration/pilot/gie/inferencepool_test.go b/tests/integration/pilot/gie/inferencepool_test.go new file mode 100644 index 0000000000..b187ff2c1e --- /dev/null +++ b/tests/integration/pilot/gie/inferencepool_test.go @@ -0,0 +1,359 @@ +//go:build integ + +// Copyright Istio Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package gie + +import ( + "context" + "fmt" + "strconv" + "testing" + "time" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + + "istio.io/istio/pkg/config/constants" + "istio.io/istio/pkg/config/protocol" + "istio.io/istio/pkg/http/headers" + "istio.io/istio/pkg/test/echo/common/scheme" + "istio.io/istio/pkg/test/framework" + "istio.io/istio/pkg/test/framework/components/crd" + "istio.io/istio/pkg/test/framework/components/echo" + "istio.io/istio/pkg/test/framework/components/echo/check" + "istio.io/istio/pkg/test/framework/components/echo/deployment" + "istio.io/istio/pkg/test/framework/components/namespace" + "istio.io/istio/pkg/test/util/retry" +) + +// TestInferencePoolMultipleTargetPorts verifies that InferencePools with multiple targetPorts +// create a single shadow service with port 54321, and that Envoy generates the correct cluster. +func TestInferencePoolMultipleTargetPorts(t *testing.T) { + framework. + NewTest(t). + Run(func(ctx framework.TestContext) { + crd.DeployGatewayAPIOrSkip(ctx) + crd.DeployGatewayAPIInferenceExtensionOrSkip(ctx) + + ns := namespace.NewOrFail(ctx, namespace.Config{ + Prefix: "inferencepool", + Inject: true, + }) + + // Deploy a workload that listens on multiple ports (8000, 8001, 8002) + // to simulate an inference workload + var workload echo.Instance + echoConfig := echo.Config{ + Service: "inference-workload", + Namespace: ns, + Ports: echo.Ports{ + { + Name: "http-8000", + Protocol: "HTTP", + ServicePort: 8000, + WorkloadPort: 8000, + }, + { + Name: "http-8001", + Protocol: "HTTP", + ServicePort: 8001, + WorkloadPort: 8001, + }, + { + Name: "http-8002", + Protocol: "HTTP", + ServicePort: 8002, + WorkloadPort: 8002, + }, + }, + } + // Deploy a client instance to make calls through the gateway + var client echo.Instance + clientConfig := echo.Config{ + Service: "client", + Namespace: ns, + } + + // Deploy the endpoint picker (EPP) service as an echo instance + // This is an external processor that selects endpoints based on request headers + var epp echo.Instance + eppConfig := echo.Config{ + Service: "mock-epp", + Namespace: ns, + Ports: echo.Ports{ + { + Name: "grpc", + Protocol: protocol.GRPC, + ServicePort: 9002, + WorkloadPort: 9002, + EndpointPicker: true, + }, + }, + Subsets: []echo.SubsetConfig{ + { + Version: "v1", + Annotations: map[string]string{ + // Exclude the endpoint picker port from Istio's traffic interception + // to allow direct ext_proc gRPC connections from the gateway + "traffic.sidecar.istio.io/excludeInboundPorts": "9002", + }, + }, + }, + } + + deployment.New(ctx). + With(&workload, echoConfig). + With(&client, clientConfig). + With(&epp, eppConfig). + BuildOrFail(ctx) + + // Deploy InferencePool with multiple targetPorts + inferencePoolManifest := fmt.Sprintf(` +apiVersion: inference.networking.k8s.io/v1 +kind: InferencePool +metadata: + name: test-pool + namespace: %s +spec: + targetPorts: + - number: 8000 + - number: 8001 + - number: 8002 + selector: + matchLabels: + app: inference-workload + endpointPickerRef: + name: %s + port: + number: %d +`, ns.Name(), eppConfig.Service, eppConfig.Ports[0].ServicePort) + ctx.ConfigIstio().YAML(ns.Name(), inferencePoolManifest).ApplyOrFail(ctx) + + // Enable access logging for all proxies in the namespace BEFORE creating gateway + // This ensures the gateway pods start with logging enabled + telemetryManifest := ` +apiVersion: telemetry.istio.io/v1 +kind: Telemetry +metadata: + name: access-logging +spec: + accessLogging: + - providers: + - name: envoy +` + ctx.ConfigIstio().YAML(ns.Name(), telemetryManifest).ApplyOrFail(ctx) + + // Deploy Gateway and HTTPRoute + gatewayManifest := fmt.Sprintf(` +apiVersion: gateway.networking.k8s.io/v1 +kind: Gateway +metadata: + name: inference-gateway + namespace: %s + annotations: + sidecar.istio.io/componentLogLevel: "ext_proc:debug,connection:debug,filter:debug,router:debug" +spec: + gatewayClassName: istio + listeners: + - name: http + port: 80 + protocol: HTTP + allowedRoutes: + namespaces: + from: Same +--- +apiVersion: gateway.networking.k8s.io/v1 +kind: HTTPRoute +metadata: + name: inference-route + namespace: %s +spec: + parentRefs: + - name: inference-gateway + hostnames: + - "inference.example.com" + rules: + - backendRefs: + - group: inference.networking.k8s.io + kind: InferencePool + name: test-pool + port: 80 +`, ns.Name(), ns.Name()) + ctx.ConfigIstio().YAML(ns.Name(), gatewayManifest).ApplyOrFail(ctx) + + // Verify shadow service was created with correct properties + retry.UntilSuccessOrFail(ctx, func() error { + client := ctx.Clusters().Default().Kube() + services, err := client.CoreV1().Services(ns.Name()).List(context.TODO(), metav1.ListOptions{ + LabelSelector: "istio.io/inferencepool-name=test-pool", + }) + if err != nil || len(services.Items) == 0 { + return fmt.Errorf("failed to list services: %v", err) + } + + svc := services.Items[0] + + // Verify it's marked as an InferencePool service + if svc.Labels[constants.InternalServiceSemantics] != constants.ServiceSemanticsInferencePool { + return fmt.Errorf("shadow service missing InferencePool label") + } + + // Verify it's a headless service + if svc.Spec.ClusterIP != corev1.ClusterIPNone { + return fmt.Errorf("shadow service is not headless, got ClusterIP: %s", svc.Spec.ClusterIP) + } + + // Verify EPP extension ref labels are present + if svc.Labels["istio.io/inferencepool-extension-service"] != eppConfig.Service { + return fmt.Errorf("missing or incorrect EPP service label, expected %s, got %s", + eppConfig.Service, svc.Labels["istio.io/inferencepool-extension-service"]) + } + + expectedPort := fmt.Sprintf("%d", eppConfig.Ports[0].ServicePort) + if svc.Labels["istio.io/inferencepool-extension-port"] != expectedPort { + return fmt.Errorf("missing or incorrect EPP port label, expected %s, got %s", + expectedPort, svc.Labels["istio.io/inferencepool-extension-port"]) + } + + shadowServiceName := svc.Name + ctx.Logf("Shadow service verified successfully: %s", shadowServiceName) + return nil + }) + + // Verify traffic routing through EPP + // Get the workload pod IP to use in x-endpoint header + var workloadPodIP string + retry.UntilSuccessOrFail(ctx, func() error { + client := ctx.Clusters().Default().Kube() + pods, err := client.CoreV1().Pods(ns.Name()).List(context.TODO(), metav1.ListOptions{ + LabelSelector: "app=inference-workload", + }) + if err != nil { + return fmt.Errorf("failed to list workload pods: %v", err) + } + if len(pods.Items) == 0 { + return fmt.Errorf("no workload pods found") + } + for _, pod := range pods.Items { + if pod.Status.Phase == corev1.PodRunning && pod.Status.PodIP != "" { + workloadPodIP = pod.Status.PodIP + ctx.Logf("Found workload pod IP: %s", workloadPodIP) + return nil + } + } + return fmt.Errorf("no running workload pods with IP found") + }) + + // Wait for Gateway resource to be ready + retry.UntilSuccessOrFail(ctx, func() error { + // Get the Gateway resource and check its status + gw, err := ctx.Clusters().Default().Dynamic().Resource(schema.GroupVersionResource{ + Group: "gateway.networking.k8s.io", + Version: "v1", + Resource: "gateways", + }).Namespace(ns.Name()).Get(context.TODO(), "inference-gateway", metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("gateway resource not found: %v", err) + } + + // Check Gateway status conditions for Accepted and Programmed + status, found, err := unstructured.NestedSlice(gw.Object, "status", "conditions") + if err != nil || !found { + return fmt.Errorf("gateway status conditions not found") + } + + accepted := false + programmed := false + for _, cond := range status { + condition := cond.(map[string]interface{}) + condType := condition["type"].(string) + condStatus := condition["status"].(string) + + if condType == "Accepted" && condStatus == "True" { + accepted = true + } + if condType == "Programmed" && condStatus == "True" { + programmed = true + } + } + + if !accepted { + return fmt.Errorf("gateway not accepted yet") + } + if !programmed { + return fmt.Errorf("gateway not programmed yet") + } + + ctx.Logf("Gateway is ready (Accepted and Programmed)") + return nil + }, retry.Timeout(60*time.Second)) + + // Send request through the gateway with x-endpoint header + // This tests the EPP protocol end-to-end + gatewayAddr := fmt.Sprintf("inference-gateway-istio.%s.svc.cluster.local", ns.Name()) + ctx.Logf("Sending request to gateway: %s", gatewayAddr) + + // Test routing to each of the three ports to verify EPP can select different endpoints + testPorts := []int{8000, 8001, 8002} + for _, targetPort := range testPorts { + targetEndpoint := fmt.Sprintf("%s:%d", workloadPodIP, targetPort) + ctx.Logf("Testing EPP routing to port %d (endpoint: %s)", targetPort, targetEndpoint) + + retry.UntilSuccessOrFail(ctx, func() error { + result, err := client.Call(echo.CallOptions{ + Port: echo.Port{ + Name: "http", + Protocol: protocol.HTTP, + ServicePort: 80, + }, + Scheme: scheme.HTTP, + Address: gatewayAddr, + HTTP: echo.HTTP{ + Headers: headers.New(). + WithHost("inference.example.com"). + With("x-endpoint", targetEndpoint). + Build(), + }, + Check: check.OK(), + }) + if err != nil { + ctx.Logf("Gateway call failed (will retry): %v", err) + return fmt.Errorf("failed to call gateway: %v", err) + } + + // Verify the request was successful + if len(result.Responses) == 0 { + return fmt.Errorf("no response received") + } + + // Parse the ServicePort field from the response + resultPort, err := strconv.Atoi(result.Responses[0].Port) + if err != nil { + return fmt.Errorf("failed to parse port: %v", err) + } + if resultPort != targetPort { + ctx.Logf("\nResponse:\n%s", result.Responses[0].RawContent) + return fmt.Errorf("port %s did not match expected %d", result.Responses[0].Port, targetPort) + } + + ctx.Logf("Successfully verified EPP routing to endpoint %s (ServicePort=%d confirmed)", targetEndpoint, targetPort) + return nil + }) + } + }) +} diff --git a/tests/integration/pilot/gie/main_test.go b/tests/integration/pilot/gie/main_test.go new file mode 100644 index 0000000000..77f4eb3dee --- /dev/null +++ b/tests/integration/pilot/gie/main_test.go @@ -0,0 +1,41 @@ +//go:build integ + +// Copyright Istio Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package gie + +import ( + "testing" + + "istio.io/istio/pkg/test/framework" + "istio.io/istio/pkg/test/framework/components/istio" + "istio.io/istio/pkg/test/framework/resource" +) + +var i istio.Instance + +func TestMain(m *testing.M) { + framework. + NewSuite(m). + Setup(istio.Setup(&i, func(ctx resource.Context, cfg *istio.Config) { + cfg.ControlPlaneValues = ` +values: + pilot: + env: + ENABLE_GATEWAY_API_INFERENCE_EXTENSION: "true" +` + })). + Run() +} diff --git a/tests/integration/pilot/testdata/gateway-api-inference-extension-crd.yaml b/tests/integration/pilot/testdata/gateway-api-inference-extension-crd.yaml index 9ee60eb2a0..2344ab3735 100644 --- a/tests/integration/pilot/testdata/gateway-api-inference-extension-crd.yaml +++ b/tests/integration/pilot/testdata/gateway-api-inference-extension-crd.yaml @@ -1,10 +1,9 @@ -# Generated with `kubectl kustomize "https://github.com/kubernetes-sigs/gateway-api-inference-extension/config/crd/?ref=v1.0.0"` +# Generated with `kubectl kustomize "https://github.com/kubernetes-sigs/gateway-api-inference-extension/config/crd/?ref=0a3bb20107519af18c2fecb6706c3ff59148a645"` apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - inference.networking.k8s.io/bundle-version: v1.0.0 - creationTimestamp: null + inference.networking.k8s.io/bundle-version: main-dev name: inferenceobjectives.inference.networking.x-k8s.io spec: group: inference.networking.x-k8s.io @@ -196,8 +195,7 @@ kind: CustomResourceDefinition metadata: annotations: api-approved.kubernetes.io: https://github.com/kubernetes-sigs/gateway-api-inference-extension/pull/1173 - inference.networking.k8s.io/bundle-version: v1.0.0 - creationTimestamp: null + inference.networking.k8s.io/bundle-version: main-dev name: inferencepools.inference.networking.k8s.io spec: group: inference.networking.k8s.io @@ -348,7 +346,8 @@ spec: targetPorts: description: |- TargetPorts defines a list of ports that are exposed by this InferencePool. - Currently, the list may only include a single port definition. + Every port will be treated as a distinctive endpoint by EPP, + addressable as a 'podIP:portNumber' combination. items: description: Port defines the network port that will be exposed by this InferencePool. @@ -364,10 +363,13 @@ spec: required: - number type: object - maxItems: 1 + maxItems: 8 minItems: 1 type: array x-kubernetes-list-type: atomic + x-kubernetes-validations: + - message: port number must be unique + rule: self.all(p1, self.exists_one(p2, p1.number==p2.number)) required: - endpointPickerRef - selector @@ -532,8 +534,7 @@ kind: CustomResourceDefinition metadata: annotations: api-approved.kubernetes.io: unapproved, experimental-only - inference.networking.k8s.io/bundle-version: v1.0.0 - creationTimestamp: null + inference.networking.k8s.io/bundle-version: main-dev name: inferencepools.inference.networking.x-k8s.io spec: group: inference.networking.x-k8s.io