Skip to content

Commit 2345dc2

Browse files
dgnaslakknutsen
authored andcommitted
inferencepool controller: use SSA for shadow svc updates (#57091)
Fixes #56667
1 parent 97722ea commit 2345dc2

File tree

3 files changed

+213
-67
lines changed

3 files changed

+213
-67
lines changed

pilot/pkg/config/kube/gateway/controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -293,7 +293,7 @@ func NewController(
293293
// Create a queue for handling service updates.
294294
// We create the queue even if the env var is off just to prevent nil pointer issues.
295295
c.shadowServiceReconciler = controllers.NewQueue("inference pool shadow service reconciler",
296-
controllers.WithReconciler(c.reconcileShadowService(svcClient, InferencePools, inputs.Services)),
296+
controllers.WithReconciler(c.reconcileShadowService(kc, InferencePools, inputs.Services)),
297297
controllers.WithMaxAttempts(5))
298298

299299
if features.EnableGatewayAPIInferenceExtension {

pilot/pkg/config/kube/gateway/inferencepool_collection.go

Lines changed: 31 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
package gateway
1616

1717
import (
18+
"context"
1819
"crypto/sha256"
1920
"fmt"
2021
"strconv"
@@ -23,15 +24,15 @@ import (
2324
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2425
"k8s.io/apimachinery/pkg/types"
2526
"k8s.io/apimachinery/pkg/util/intstr"
27+
"k8s.io/apimachinery/pkg/util/json"
2628
inferencev1 "sigs.k8s.io/gateway-api-inference-extension/api/v1"
2729
gatewayv1 "sigs.k8s.io/gateway-api/apis/v1"
2830
gateway "sigs.k8s.io/gateway-api/apis/v1beta1"
2931

3032
"istio.io/istio/pkg/config/constants"
3133
"istio.io/istio/pkg/config/schema/gvk"
32-
"istio.io/istio/pkg/kube/kclient"
34+
"istio.io/istio/pkg/kube"
3335
"istio.io/istio/pkg/kube/krt"
34-
"istio.io/istio/pkg/maps"
3536
"istio.io/istio/pkg/ptr"
3637
"istio.io/istio/pkg/slices"
3738
"istio.io/istio/pkg/util/sets"
@@ -44,6 +45,7 @@ const (
4445
InferencePoolExtensionRefSvc = "istio.io/inferencepool-extension-service"
4546
InferencePoolExtensionRefPort = "istio.io/inferencepool-extension-port"
4647
InferencePoolExtensionRefFailureMode = "istio.io/inferencepool-extension-failure-mode"
48+
InferencePoolFieldManager = "istio.io/inference-pool-controller"
4749
)
4850

4951
// // ManagedLabel is the label used to identify resources managed by this controller
@@ -503,7 +505,7 @@ func InferencePoolServiceName(poolName string) (string, error) {
503505
return svcName, nil
504506
}
505507

506-
func translateShadowServiceToService(existingLabels map[string]string, shadow shadowServiceInfo, extRef extRefInfo) *corev1.Service {
508+
func translateShadowServiceToService(shadow shadowServiceInfo, extRef extRefInfo) *corev1.Service {
507509
// Create the ports used by the shadow service
508510
ports := make([]corev1.ServicePort, 0, len(shadow.targetPorts))
509511
dummyPort := int32(54321) // Dummy port, not used for anything
@@ -518,16 +520,20 @@ func translateShadowServiceToService(existingLabels map[string]string, shadow sh
518520

519521
// Create a new service object based on the shadow service info
520522
svc := &corev1.Service{
523+
TypeMeta: metav1.TypeMeta{
524+
APIVersion: "v1",
525+
Kind: "Service",
526+
},
521527
ObjectMeta: metav1.ObjectMeta{
522528
Name: shadow.key.Name,
523529
Namespace: shadow.key.Namespace,
524-
Labels: maps.MergeCopy(map[string]string{
530+
Labels: map[string]string{
525531
InferencePoolRefLabel: shadow.poolName,
526532
InferencePoolExtensionRefSvc: extRef.name,
527533
InferencePoolExtensionRefPort: strconv.Itoa(int(extRef.port)),
528534
InferencePoolExtensionRefFailureMode: extRef.failureMode,
529535
constants.InternalServiceSemantics: constants.ServiceSemanticsInferencePool,
530-
}, existingLabels),
536+
},
531537
},
532538
Spec: corev1.ServiceSpec{
533539
Selector: shadow.selector,
@@ -550,7 +556,7 @@ func translateShadowServiceToService(existingLabels map[string]string, shadow sh
550556
}
551557

552558
func (c *Controller) reconcileShadowService(
553-
svcClient kclient.Client[*corev1.Service],
559+
kubeClient kube.Client,
554560
inferencePools krt.Collection[InferencePool],
555561
servicesCollection krt.Collection[*corev1.Service],
556562
) func(key types.NamespacedName) error {
@@ -568,33 +574,35 @@ func (c *Controller) reconcileShadowService(
568574
existingService := ptr.Flatten(servicesCollection.GetKey(pool.shadowService.key.String()))
569575

570576
// Check if we can manage this service
571-
var existingLabels map[string]string
572577
if existingService != nil {
573-
existingLabels = existingService.GetLabels()
574-
canManage, _ := c.canManageShadowServiceForInference(existingService)
578+
canManage, reason := c.canManageShadowServiceForInference(existingService)
575579
if !canManage {
576-
log.Debugf("skipping service %s/%s, already managed by another controller", key.Namespace, key.Name)
580+
log.Debugf("skipping service %s/%s, already managed by another controller: %s", key.Namespace, key.Name, reason)
577581
return nil
578582
}
579583
}
580584

581-
service := translateShadowServiceToService(existingLabels, pool.shadowService, pool.extRef)
582-
583-
var err error
584-
if existingService == nil {
585-
// Create the service if it doesn't exist
586-
_, err = svcClient.Create(service)
587-
} else {
588-
// TODO: Don't overwrite resources: https://github.com/istio/istio/issues/56667
589-
service.ResourceVersion = existingService.ResourceVersion
590-
_, err = svcClient.Update(service)
591-
}
585+
service := translateShadowServiceToService(pool.shadowService, pool.extRef)
586+
return c.applyShadowService(kubeClient, service)
587+
}
588+
}
592589

593-
return err
590+
// applyShadowService uses Server-Side Apply to create or update shadow services
591+
func (c *Controller) applyShadowService(kubeClient kube.Client, service *corev1.Service) error {
592+
data, err := json.Marshal(service)
593+
if err != nil {
594+
return fmt.Errorf("failed to marshal service for SSA: %v", err)
594595
}
596+
597+
ctx := context.Background()
598+
_, err = kubeClient.Kube().CoreV1().Services(service.Namespace).Patch(
599+
ctx, service.Name, types.ApplyPatchType, data, metav1.PatchOptions{
600+
FieldManager: InferencePoolFieldManager,
601+
Force: ptr.Of(true),
602+
})
603+
return err
595604
}
596605

597-
// canManage checks if a service should be managed by this controller
598606
func (c *Controller) canManageShadowServiceForInference(obj *corev1.Service) (bool, string) {
599607
if obj == nil {
600608
// No object exists, we can manage it

pilot/pkg/config/kube/gateway/inferencepool_test.go

Lines changed: 181 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -15,73 +15,211 @@
1515
package gateway
1616

1717
import (
18+
"fmt"
1819
"testing"
1920

2021
corev1 "k8s.io/api/core/v1"
2122
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
23+
"k8s.io/apimachinery/pkg/runtime"
24+
"k8s.io/apimachinery/pkg/util/intstr"
2225
inferencev1 "sigs.k8s.io/gateway-api-inference-extension/api/v1"
2326

2427
"istio.io/istio/pilot/pkg/features"
2528
"istio.io/istio/pkg/config/constants"
26-
"istio.io/istio/pkg/kube/krt"
2729
"istio.io/istio/pkg/test"
2830
"istio.io/istio/pkg/test/util/assert"
2931
)
3032

3133
func TestReconcileInferencePool(t *testing.T) {
3234
test.SetForTest(t, &features.EnableGatewayAPIInferenceExtension, true)
33-
pool := &inferencev1.InferencePool{
34-
ObjectMeta: metav1.ObjectMeta{
35-
Name: "test-pool",
36-
Namespace: "default",
35+
36+
testCases := []struct {
37+
name string
38+
inferencePool *inferencev1.InferencePool
39+
shadowService *corev1.Service // name is optional, if not provided, it will be generated
40+
expectedAnnotations map[string]string
41+
expectedLabels map[string]string
42+
expectedServiceName string
43+
expectedTargetPort int32
44+
}{
45+
{
46+
name: "basic shadow service creation",
47+
inferencePool: &inferencev1.InferencePool{
48+
ObjectMeta: metav1.ObjectMeta{
49+
Name: "test-pool",
50+
Namespace: "default",
51+
},
52+
Spec: inferencev1.InferencePoolSpec{
53+
TargetPorts: []inferencev1.Port{
54+
{
55+
Number: inferencev1.PortNumber(8080),
56+
},
57+
},
58+
Selector: inferencev1.LabelSelector{
59+
MatchLabels: map[inferencev1.LabelKey]inferencev1.LabelValue{
60+
"app": "test",
61+
},
62+
},
63+
EndpointPickerRef: inferencev1.EndpointPickerRef{
64+
Name: "dummy",
65+
Port: &inferencev1.Port{
66+
Number: inferencev1.PortNumber(5421),
67+
},
68+
},
69+
},
70+
},
71+
expectedLabels: map[string]string{
72+
constants.InternalServiceSemantics: constants.ServiceSemanticsInferencePool,
73+
InferencePoolRefLabel: "test-pool",
74+
},
75+
expectedTargetPort: 8080,
3776
},
38-
Spec: inferencev1.InferencePoolSpec{
39-
TargetPorts: []inferencev1.Port{
40-
{
41-
Number: inferencev1.PortNumber(8080),
77+
{
78+
name: "user label and annotation preservation",
79+
inferencePool: &inferencev1.InferencePool{
80+
ObjectMeta: metav1.ObjectMeta{
81+
Name: "preserve-pool",
82+
Namespace: "default",
83+
},
84+
Spec: inferencev1.InferencePoolSpec{
85+
TargetPorts: []inferencev1.Port{
86+
{
87+
Number: inferencev1.PortNumber(8080),
88+
},
89+
},
90+
Selector: inferencev1.LabelSelector{
91+
MatchLabels: map[inferencev1.LabelKey]inferencev1.LabelValue{
92+
"app": "test",
93+
},
94+
},
95+
EndpointPickerRef: inferencev1.EndpointPickerRef{
96+
Name: "dummy",
97+
Port: &inferencev1.Port{
98+
Number: inferencev1.PortNumber(5421),
99+
},
100+
},
42101
},
43102
},
44-
Selector: inferencev1.LabelSelector{
45-
MatchLabels: map[inferencev1.LabelKey]inferencev1.LabelValue{
46-
"app": "test",
103+
shadowService: &corev1.Service{
104+
ObjectMeta: metav1.ObjectMeta{
105+
Namespace: "default",
106+
Labels: map[string]string{
107+
InferencePoolRefLabel: "preserve-pool",
108+
"user.example.com/my-label": "user-value",
109+
"another.domain.com/label": "another-value",
110+
},
111+
Annotations: map[string]string{
112+
"user.example.com/my-annotation": "user-annotation-value",
113+
"monitoring.example.com/scrape": "true",
114+
},
115+
},
116+
Spec: corev1.ServiceSpec{
117+
Selector: map[string]string{"app": "test"},
118+
Type: corev1.ServiceTypeClusterIP,
119+
ClusterIP: corev1.ClusterIPNone,
120+
Ports: []corev1.ServicePort{
121+
{
122+
Protocol: "TCP",
123+
Port: 54321,
124+
TargetPort: intstr.FromInt(8080),
125+
},
126+
},
47127
},
48128
},
49-
EndpointPickerRef: inferencev1.EndpointPickerRef{
50-
Name: "dummy",
51-
Port: &inferencev1.Port{
52-
Number: inferencev1.PortNumber(5421),
129+
expectedAnnotations: map[string]string{
130+
"user.example.com/my-annotation": "user-annotation-value",
131+
"monitoring.example.com/scrape": "true",
132+
},
133+
expectedLabels: map[string]string{
134+
constants.InternalServiceSemantics: constants.ServiceSemanticsInferencePool,
135+
InferencePoolRefLabel: "preserve-pool",
136+
"user.example.com/my-label": "user-value",
137+
"another.domain.com/label": "another-value",
138+
},
139+
expectedTargetPort: 8080,
140+
},
141+
{
142+
name: "very long inferencepool name",
143+
inferencePool: &inferencev1.InferencePool{
144+
ObjectMeta: metav1.ObjectMeta{
145+
Name: "very-long-inference-pool-name-that-should-be-truncated-properly",
146+
Namespace: "default",
53147
},
148+
Spec: inferencev1.InferencePoolSpec{
149+
TargetPorts: []inferencev1.Port{
150+
{
151+
Number: inferencev1.PortNumber(9090),
152+
},
153+
},
154+
Selector: inferencev1.LabelSelector{
155+
MatchLabels: map[inferencev1.LabelKey]inferencev1.LabelValue{
156+
"app": "longname",
157+
},
158+
},
159+
EndpointPickerRef: inferencev1.EndpointPickerRef{
160+
Name: "dummy",
161+
Port: &inferencev1.Port{
162+
Number: inferencev1.PortNumber(5421),
163+
},
164+
},
165+
},
166+
},
167+
expectedLabels: map[string]string{
168+
constants.InternalServiceSemantics: constants.ServiceSemanticsInferencePool,
169+
InferencePoolRefLabel: "very-long-inference-pool-name-that-should-be-truncated-properly",
54170
},
171+
expectedServiceName: "very-long-inference-pool-name-that-should-be-trunca-ip-6d24df6a",
172+
expectedTargetPort: 9090,
55173
},
56174
}
57-
controller := setupController(t,
58-
&corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "default"}},
59-
NewGateway("test-gw", InNamespace(DefaultTestNS), WithGatewayClass("istio")),
60-
NewHTTPRoute("test-route", InNamespace(DefaultTestNS),
61-
WithParentRefAndStatus("test-gw", DefaultTestNS, IstioController),
62-
WithBackendRef("test-pool", DefaultTestNS),
63-
),
64-
pool,
65-
)
66175

67-
dumpOnFailure(t, krt.GlobalDebugHandler)
176+
for _, tc := range testCases {
177+
t.Run(tc.name, func(t *testing.T) {
178+
objects := []runtime.Object{
179+
&corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "default"}},
180+
NewGateway(tc.name+"-gw", InNamespace("default"), WithGatewayClass("istio")),
181+
NewHTTPRoute(tc.name+"-route", InNamespace("default"),
182+
WithParentRefAndStatus(tc.name+"-gw", "default", "istio.io/gateway-controller"),
183+
WithBackendRef(tc.inferencePool.Name, "default"),
184+
),
185+
tc.inferencePool,
186+
}
187+
if tc.shadowService != nil {
188+
// Generate the service name if not provided
189+
if tc.shadowService.Name == "" {
190+
generatedName, err := InferencePoolServiceName(tc.inferencePool.Name)
191+
assert.NoError(t, err)
192+
tc.shadowService.Name = generatedName
193+
}
194+
objects = append(objects, tc.shadowService)
195+
}
196+
controller := setupController(t, objects...)
68197

69-
// Verify the service was created
70-
var service *corev1.Service
71-
var err error
72-
assert.EventuallyEqual(t, func() bool {
73-
svcName := "test-pool-ip-" + generateHash("test-pool", hashSize)
74-
service, err = controller.client.Kube().CoreV1().Services("default").Get(t.Context(), svcName, metav1.GetOptions{})
75-
if err != nil {
76-
t.Logf("Service %s not found yet: %v", svcName, err)
77-
return false
78-
}
79-
return service != nil
80-
}, true)
198+
var service *corev1.Service
199+
expectedSvcName, err := InferencePoolServiceName(tc.inferencePool.Name)
200+
if tc.expectedServiceName != "" {
201+
assert.Equal(t, expectedSvcName, tc.expectedServiceName, fmt.Sprintf("Service name should be '%s'", tc.expectedServiceName))
202+
}
203+
assert.NoError(t, err)
204+
assert.EventuallyEqual(t, func() bool {
205+
var err error
206+
service, err = controller.client.Kube().CoreV1().Services("default").Get(t.Context(), expectedSvcName, metav1.GetOptions{})
207+
if err != nil {
208+
t.Logf("Service %s not found yet: %v", expectedSvcName, err)
209+
return false
210+
}
211+
return service != nil && service.Labels[constants.InternalServiceSemantics] == constants.ServiceSemanticsInferencePool
212+
}, true)
81213

82-
assert.Equal(t, service.ObjectMeta.Labels[constants.InternalServiceSemantics], constants.ServiceSemanticsInferencePool)
83-
assert.Equal(t, service.ObjectMeta.Labels[InferencePoolRefLabel], pool.Name)
84-
assert.Equal(t, service.OwnerReferences[0].Name, pool.Name)
85-
assert.Equal(t, service.Spec.Ports[0].TargetPort.IntVal, int32(8080))
86-
assert.Equal(t, service.Spec.Ports[0].Port, int32(54321)) // dummyPort + i
214+
for key, expectedValue := range tc.expectedLabels {
215+
assert.Equal(t, service.Labels[key], expectedValue, fmt.Sprintf("Label '%s' should have value '%s'", key, expectedValue))
216+
}
217+
for key, expectedValue := range tc.expectedAnnotations {
218+
assert.Equal(t, service.Annotations[key], expectedValue, fmt.Sprintf("Annotation '%s' should have value '%s'", key, expectedValue))
219+
}
220+
assert.Equal(t, service.Spec.Ports[0].Port, int32(54321)) // dummyPort + i
221+
assert.Equal(t, service.Spec.Ports[0].TargetPort.IntVal, tc.expectedTargetPort)
222+
assert.Equal(t, service.OwnerReferences[0].Name, tc.inferencePool.Name)
223+
})
224+
}
87225
}

0 commit comments

Comments
 (0)