Skip to content

Commit a1f0704

Browse files
MrHohnjhorwit2
andcommitted
Patch service instead of update in service controller
Co-authored-by: Josh Horwitz <[email protected]>
1 parent aaec77a commit a1f0704

File tree

4 files changed

+179
-43
lines changed

4 files changed

+179
-43
lines changed

pkg/controller/service/BUILD

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ go_library(
1010
name = "go_default_library",
1111
srcs = [
1212
"doc.go",
13+
"patch.go",
1314
"service_controller.go",
1415
],
1516
importpath = "k8s.io/kubernetes/pkg/controller/service",
@@ -20,8 +21,10 @@ go_library(
2021
"//pkg/util/metrics:go_default_library",
2122
"//staging/src/k8s.io/api/core/v1:go_default_library",
2223
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
24+
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
2325
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
2426
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
27+
"//staging/src/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library",
2528
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
2629
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
2730
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
@@ -39,7 +42,10 @@ go_library(
3942

4043
go_test(
4144
name = "go_default_test",
42-
srcs = ["service_controller_test.go"],
45+
srcs = [
46+
"patch_test.go",
47+
"service_controller_test.go",
48+
],
4349
embed = [":go_default_library"],
4450
deps = [
4551
"//pkg/api/testapi:go_default_library",

pkg/controller/service/patch.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
Copyright 2019 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package service
18+
19+
import (
20+
"encoding/json"
21+
"fmt"
22+
23+
v1 "k8s.io/api/core/v1"
24+
"k8s.io/apimachinery/pkg/types"
25+
"k8s.io/apimachinery/pkg/util/strategicpatch"
26+
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
27+
)
28+
29+
// patch patches service's Status or ObjectMeta given the origin and
30+
// updated ones. Change to spec will be ignored.
31+
func patch(c v1core.CoreV1Interface, oldSvc *v1.Service, newSvc *v1.Service) (*v1.Service, error) {
32+
// Reset spec to make sure only patch for Status or ObjectMeta.
33+
newSvc.Spec = oldSvc.Spec
34+
35+
patchBytes, err := getPatchBytes(oldSvc, newSvc)
36+
if err != nil {
37+
return nil, err
38+
}
39+
40+
updatedSvc, err := c.Services(oldSvc.Namespace).Patch(oldSvc.Name, types.StrategicMergePatchType, patchBytes, "status")
41+
if err != nil {
42+
return nil, fmt.Errorf("failed to patch %q for svc %s/%s: %v", patchBytes, oldSvc.Namespace, oldSvc.Name, err)
43+
}
44+
return updatedSvc, nil
45+
}
46+
47+
func getPatchBytes(oldSvc *v1.Service, newSvc *v1.Service) ([]byte, error) {
48+
oldData, err := json.Marshal(oldSvc)
49+
if err != nil {
50+
return nil, fmt.Errorf("failed to Marshal oldData for svc %s/%s: %v", oldSvc.Namespace, oldSvc.Name, err)
51+
}
52+
53+
newData, err := json.Marshal(newSvc)
54+
if err != nil {
55+
return nil, fmt.Errorf("failed to Marshal newData for svc %s/%s: %v", newSvc.Namespace, newSvc.Name, err)
56+
}
57+
58+
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Service{})
59+
if err != nil {
60+
return nil, fmt.Errorf("failed to CreateTwoWayMergePatch for svc %s/%s: %v", oldSvc.Namespace, oldSvc.Name, err)
61+
}
62+
return patchBytes, nil
63+
}
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
Copyright 2019 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package service
18+
19+
import (
20+
"reflect"
21+
"testing"
22+
23+
v1 "k8s.io/api/core/v1"
24+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
25+
"k8s.io/client-go/kubernetes/fake"
26+
)
27+
28+
func addAnnotations(svc *v1.Service) {
29+
svc.Annotations["foo"] = "bar"
30+
}
31+
32+
func TestPatch(t *testing.T) {
33+
svcOrigin := &v1.Service{
34+
ObjectMeta: metav1.ObjectMeta{
35+
Name: "test-patch",
36+
Annotations: map[string]string{},
37+
},
38+
Spec: v1.ServiceSpec{
39+
ClusterIP: "10.0.0.1",
40+
},
41+
}
42+
fakeCs := fake.NewSimpleClientset(svcOrigin)
43+
44+
// Issue a separate update and verify patch doesn't fail after this.
45+
svcToUpdate := svcOrigin.DeepCopy()
46+
addAnnotations(svcToUpdate)
47+
if _, err := fakeCs.CoreV1().Services(svcOrigin.Namespace).Update(svcToUpdate); err != nil {
48+
t.Fatalf("Failed to update service: %v", err)
49+
}
50+
51+
// Attempt to patch based the original service.
52+
svcToPatch := svcOrigin.DeepCopy()
53+
svcToPatch.Finalizers = []string{"foo"}
54+
svcToPatch.Spec.ClusterIP = "10.0.0.2"
55+
svcToPatch.Status = v1.ServiceStatus{
56+
LoadBalancer: v1.LoadBalancerStatus{
57+
Ingress: []v1.LoadBalancerIngress{
58+
{IP: "8.8.8.8"},
59+
},
60+
},
61+
}
62+
svcPatched, err := patch(fakeCs.CoreV1(), svcOrigin, svcToPatch)
63+
if err != nil {
64+
t.Fatalf("Failed to patch service: %v", err)
65+
}
66+
67+
// Service returned by patch will contain latest content (e.g from
68+
// the separate update).
69+
addAnnotations(svcToPatch)
70+
if !reflect.DeepEqual(svcPatched, svcToPatch) {
71+
t.Errorf("PatchStatus() = %+v, want %+v", svcPatched, svcToPatch)
72+
}
73+
// Explicitly validate if spec is unchanged from origin.
74+
if !reflect.DeepEqual(svcPatched.Spec, svcOrigin.Spec) {
75+
t.Errorf("Got spec = %+v, want %+v", svcPatched.Spec, svcOrigin.Spec)
76+
}
77+
}
78+
79+
func TestGetPatchBytes(t *testing.T) {
80+
origin := &v1.Service{
81+
ObjectMeta: metav1.ObjectMeta{
82+
Name: "test-patch-bytes",
83+
Finalizers: []string{"foo"},
84+
},
85+
}
86+
updated := &v1.Service{
87+
ObjectMeta: metav1.ObjectMeta{
88+
Name: "test-patch-bytes",
89+
Finalizers: []string{"foo", "bar"},
90+
},
91+
}
92+
93+
b, err := getPatchBytes(origin, updated)
94+
if err != nil {
95+
t.Fatal(err)
96+
}
97+
expected := `{"metadata":{"$setElementOrder/finalizers":["foo","bar"],"finalizers":["bar"]}}`
98+
if string(b) != expected {
99+
t.Errorf("getPatchBytes(%+v, %+v) = %s ; want %s", origin, updated, string(b), expected)
100+
}
101+
}

pkg/controller/service/service_controller.go

Lines changed: 8 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import (
2424

2525
"reflect"
2626

27-
"k8s.io/api/core/v1"
27+
v1 "k8s.io/api/core/v1"
2828
"k8s.io/apimachinery/pkg/api/errors"
2929
"k8s.io/apimachinery/pkg/util/runtime"
3030
"k8s.io/apimachinery/pkg/util/sets"
@@ -309,57 +309,23 @@ func (s *ServiceController) syncLoadBalancerIfNeeded(key string, service *v1.Ser
309309
s.eventRecorder.Event(service, v1.EventTypeNormal, "EnsuredLoadBalancer", "Ensured load balancer")
310310
}
311311

312-
// Write the state if changed
313-
// TODO: Be careful here ... what if there were other changes to the service?
312+
// If there are any changes to the status then patch the service.
314313
if !v1helper.LoadBalancerStatusEqual(previousState, newState) {
315314
// Make a copy so we don't mutate the shared informer cache
316-
service = service.DeepCopy()
315+
updated := service.DeepCopy()
316+
updated.Status.LoadBalancer = *newState
317317

318-
// Update the status on the copy
319-
service.Status.LoadBalancer = *newState
320-
321-
if err := s.persistUpdate(service); err != nil {
322-
// TODO: This logic needs to be revisited. We might want to retry on all the errors, not just conflicts.
323-
if errors.IsConflict(err) {
324-
return fmt.Errorf("not persisting update to service '%s/%s' that has been changed since we received it: %v", service.Namespace, service.Name, err)
325-
}
326-
runtime.HandleError(fmt.Errorf("failed to persist service %q updated status to apiserver, even after retries. Giving up: %v", key, err))
327-
return nil
318+
if _, err := patch(s.kubeClient.CoreV1(), service, updated); err != nil {
319+
return fmt.Errorf("failed to patch status for service %s: %v", key, err)
328320
}
321+
klog.V(4).Infof("Successfully patched status for service %s", key)
329322
} else {
330-
klog.V(2).Infof("Not persisting unchanged LoadBalancerStatus for service %s to registry.", key)
323+
klog.V(4).Infof("Not persisting unchanged LoadBalancerStatus for service %s to registry.", key)
331324
}
332325

333326
return nil
334327
}
335328

336-
func (s *ServiceController) persistUpdate(service *v1.Service) error {
337-
var err error
338-
for i := 0; i < clientRetryCount; i++ {
339-
_, err = s.kubeClient.CoreV1().Services(service.Namespace).UpdateStatus(service)
340-
if err == nil {
341-
return nil
342-
}
343-
// If the object no longer exists, we don't want to recreate it. Just bail
344-
// out so that we can process the delete, which we should soon be receiving
345-
// if we haven't already.
346-
if errors.IsNotFound(err) {
347-
klog.Infof("Not persisting update to service '%s/%s' that no longer exists: %v",
348-
service.Namespace, service.Name, err)
349-
return nil
350-
}
351-
// TODO: Try to resolve the conflict if the change was unrelated to load
352-
// balancer status. For now, just pass it up the stack.
353-
if errors.IsConflict(err) {
354-
return err
355-
}
356-
klog.Warningf("Failed to persist updated LoadBalancerStatus to service '%s/%s' after creating its load balancer: %v",
357-
service.Namespace, service.Name, err)
358-
time.Sleep(clientRetryInterval)
359-
}
360-
return err
361-
}
362-
363329
func (s *ServiceController) ensureLoadBalancer(service *v1.Service) (*v1.LoadBalancerStatus, error) {
364330
nodes, err := s.nodeLister.ListWithPredicate(getNodeConditionPredicate())
365331
if err != nil {

0 commit comments

Comments
 (0)