Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion bin/update_crds.sh
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ fi
rm -f "${ROOTDIR}/tests/integration/pilot/testdata/gateway-api-crd.yaml"
cp "${API_TMP}/gateway-api-crd.yaml" "${ROOTDIR}/tests/integration/pilot/testdata/gateway-api-crd.yaml"

GATEWAY_API_INFERENCE_VERSION=$(grep "gateway-api-inference-extension" go.mod | awk '{ print $2 }')
#GATEWAY_API_INFERENCE_VERSION=$(grep "gateway-api-inference-extension" go.mod | awk '{ print $2 }')
GATEWAY_API_INFERENCE_VERSION="v0.0.0-20250926182816-0a3bb2010751"
if [[ ${GATEWAY_API_INFERENCE_VERSION} == *"-"* && ! ${GATEWAY_API_INFERENCE_VERSION} =~ -rc.?[0-9]$ ]]; then
# not an official release or release candidate, so get the commit sha
SHORT_SHA=$(echo "${GATEWAY_API_INFERENCE_VERSION}" | awk -F '-' '{ print $NF }')
Expand Down
2 changes: 1 addition & 1 deletion pilot/pkg/config/kube/gateway/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ func NewController(
// Create a queue for handling service updates.
// We create the queue even if the env var is off just to prevent nil pointer issues.
c.shadowServiceReconciler = controllers.NewQueue("inference pool shadow service reconciler",
controllers.WithReconciler(c.reconcileShadowService(svcClient, InferencePools, inputs.Services)),
controllers.WithReconciler(c.reconcileShadowService(kc, InferencePools, inputs.Services)),
controllers.WithMaxAttempts(5))

if features.EnableGatewayAPIInferenceExtension {
Expand Down
11 changes: 10 additions & 1 deletion pilot/pkg/config/kube/gateway/conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -1114,9 +1114,18 @@ func buildDestination(ctx RouteContext, to k8s.BackendRef, ns string,
if ipCfg.endpointPickerDst == "" || ipCfg.endpointPickerPort == "" || ipCfg.endpointPickerFailureMode == "" {
invalidBackendErr = &ConfigError{Reason: InvalidDestination, Message: "InferencePool service invalid, extensionRef labels not found"}
}

// For InferencePool, always use the first service port (54321).
// The cluster for that service port will include all endpoints for all
// target ports, allowing the EPP to load-balance across them.
var destPort uint32
if len(svc.Ports) > 0 {
destPort = uint32(svc.Ports[0].Port)
}

return &istio.Destination{
Host: hostname,
// Port: &istio.PortSelector{Number: uint32(*to.Port)},
Port: &istio.PortSelector{Number: destPort},
}, ipCfg, invalidBackendErr
default:
return &istio.Destination{}, nil, &ConfigError{
Expand Down
12 changes: 10 additions & 2 deletions pilot/pkg/config/kube/gateway/conversion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,14 @@ var ports = []*model.Port{
},
}

var inferencePoolPorts = []*model.Port{
{
Name: "http",
Port: 54321,
Protocol: "HTTP",
},
}

var services = []*model.Service{
{
Attributes: model.ServiceAttributes{
Expand Down Expand Up @@ -135,7 +143,7 @@ var services = []*model.Service{
InferencePoolExtensionRefFailureMode: "FailClose",
},
},
Ports: ports,
Ports: inferencePoolPorts,
Hostname: host.Name(fmt.Sprintf("%s.default.svc.domain.suffix", firstValue(InferencePoolServiceName("infpool-gen")))),
},
{
Expand All @@ -147,7 +155,7 @@ var services = []*model.Service{
InferencePoolExtensionRefFailureMode: "FailClose",
},
},
Ports: ports,
Ports: inferencePoolPorts,
Hostname: host.Name(fmt.Sprintf("%s.default.svc.domain.suffix", firstValue(InferencePoolServiceName("infpool-gen2")))),
},

Expand Down
70 changes: 41 additions & 29 deletions pilot/pkg/config/kube/gateway/inferencepool_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package gateway

import (
"context"
"crypto/sha256"
"fmt"
"strconv"
Expand All @@ -23,15 +24,15 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/json"
inferencev1 "sigs.k8s.io/gateway-api-inference-extension/api/v1"
gatewayv1 "sigs.k8s.io/gateway-api/apis/v1"
gateway "sigs.k8s.io/gateway-api/apis/v1beta1"

"istio.io/istio/pkg/config/constants"
"istio.io/istio/pkg/config/schema/gvk"
"istio.io/istio/pkg/kube/kclient"
"istio.io/istio/pkg/kube"
"istio.io/istio/pkg/kube/krt"
"istio.io/istio/pkg/maps"
"istio.io/istio/pkg/ptr"
"istio.io/istio/pkg/slices"
"istio.io/istio/pkg/util/sets"
Expand All @@ -44,6 +45,7 @@ const (
InferencePoolExtensionRefSvc = "istio.io/inferencepool-extension-service"
InferencePoolExtensionRefPort = "istio.io/inferencepool-extension-port"
InferencePoolExtensionRefFailureMode = "istio.io/inferencepool-extension-failure-mode"
InferencePoolFieldManager = "istio.io/inference-pool-controller"
)

// // ManagedLabel is the label used to identify resources managed by this controller
Expand Down Expand Up @@ -503,31 +505,39 @@ func InferencePoolServiceName(poolName string) (string, error) {
return svcName, nil
}

func translateShadowServiceToService(existingLabels map[string]string, shadow shadowServiceInfo, extRef extRefInfo) *corev1.Service {
// Create the ports used by the shadow service
func translateShadowServiceToService(shadow shadowServiceInfo, extRef extRefInfo) *corev1.Service {
// Create multiple ports for the shadow service - one for each InferencePool targetPort.
// This allows Istio to discover endpoints for all targetPorts.
// We use dummy service ports (54321, 54322, etc.) that map to the actual targetPorts.
baseDummyPort := int32(54321)
ports := make([]corev1.ServicePort, 0, len(shadow.targetPorts))
dummyPort := int32(54321) // Dummy port, not used for anything
for i, port := range shadow.targetPorts {

for i, tp := range shadow.targetPorts {
portName := fmt.Sprintf("http-%d", i)
ports = append(ports, corev1.ServicePort{
Name: "port" + strconv.Itoa(i),
Name: portName,
Protocol: corev1.ProtocolTCP,
Port: dummyPort + int32(i),
TargetPort: intstr.FromInt(int(port.port)),
Port: baseDummyPort + int32(i),
TargetPort: intstr.FromInt(int(tp.port)),
})
}

// Create a new service object based on the shadow service info
svc := &corev1.Service{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "Service",
},
ObjectMeta: metav1.ObjectMeta{
Name: shadow.key.Name,
Namespace: shadow.key.Namespace,
Labels: maps.MergeCopy(map[string]string{
Labels: map[string]string{
InferencePoolRefLabel: shadow.poolName,
InferencePoolExtensionRefSvc: extRef.name,
InferencePoolExtensionRefPort: strconv.Itoa(int(extRef.port)),
InferencePoolExtensionRefFailureMode: extRef.failureMode,
constants.InternalServiceSemantics: constants.ServiceSemanticsInferencePool,
}, existingLabels),
},
},
Spec: corev1.ServiceSpec{
Selector: shadow.selector,
Expand All @@ -550,7 +560,7 @@ func translateShadowServiceToService(existingLabels map[string]string, shadow sh
}

func (c *Controller) reconcileShadowService(
svcClient kclient.Client[*corev1.Service],
kubeClient kube.Client,
inferencePools krt.Collection[InferencePool],
servicesCollection krt.Collection[*corev1.Service],
) func(key types.NamespacedName) error {
Expand All @@ -568,33 +578,35 @@ func (c *Controller) reconcileShadowService(
existingService := ptr.Flatten(servicesCollection.GetKey(pool.shadowService.key.String()))

// Check if we can manage this service
var existingLabels map[string]string
if existingService != nil {
existingLabels = existingService.GetLabels()
canManage, _ := c.canManageShadowServiceForInference(existingService)
canManage, reason := c.canManageShadowServiceForInference(existingService)
if !canManage {
log.Debugf("skipping service %s/%s, already managed by another controller", key.Namespace, key.Name)
log.Debugf("skipping service %s/%s, already managed by another controller: %s", key.Namespace, key.Name, reason)
return nil
}
}

service := translateShadowServiceToService(existingLabels, pool.shadowService, pool.extRef)

var err error
if existingService == nil {
// Create the service if it doesn't exist
_, err = svcClient.Create(service)
} else {
// TODO: Don't overwrite resources: https://github.com/istio/istio/issues/56667
service.ResourceVersion = existingService.ResourceVersion
_, err = svcClient.Update(service)
}
service := translateShadowServiceToService(pool.shadowService, pool.extRef)
return c.applyShadowService(kubeClient, service)
}
}

return err
// applyShadowService uses Server-Side Apply to create or update shadow services
func (c *Controller) applyShadowService(kubeClient kube.Client, service *corev1.Service) error {
data, err := json.Marshal(service)
if err != nil {
return fmt.Errorf("failed to marshal service for SSA: %v", err)
}

ctx := context.Background()
_, err = kubeClient.Kube().CoreV1().Services(service.Namespace).Patch(
ctx, service.Name, types.ApplyPatchType, data, metav1.PatchOptions{
FieldManager: InferencePoolFieldManager,
Force: ptr.Of(true),
})
return err
}

// canManage checks if a service should be managed by this controller
func (c *Controller) canManageShadowServiceForInference(obj *corev1.Service) (bool, string) {
if obj == nil {
// No object exists, we can manage it
Expand Down
Loading