Skip to content

Commit 0004b19

Browse files
Antonio Ojeadamemi
authored andcommitted
do not mutate endpoints in the apiserver
the endpoints API handler was using the Canonicalize() method to reorder the endpoints, however, due to differences with the endpoint controller RepackSubsets(), the controller was considering the endpoints different despite they were not, generating unnecessary updates evert resync period.
1 parent 44d1976 commit 0004b19

File tree

6 files changed

+243
-4
lines changed

6 files changed

+243
-4
lines changed

pkg/registry/core/endpoint/BUILD

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ go_library(
1313
],
1414
importpath = "k8s.io/kubernetes/pkg/registry/core/endpoint",
1515
deps = [
16-
"//pkg/api/endpoints:go_default_library",
1716
"//pkg/api/legacyscheme:go_default_library",
1817
"//pkg/apis/core:go_default_library",
1918
"//pkg/apis/core/validation:go_default_library",

pkg/registry/core/endpoint/strategy.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import (
2222
"k8s.io/apimachinery/pkg/runtime"
2323
"k8s.io/apimachinery/pkg/util/validation/field"
2424
"k8s.io/apiserver/pkg/storage/names"
25-
endptspkg "k8s.io/kubernetes/pkg/api/endpoints"
2625
"k8s.io/kubernetes/pkg/api/legacyscheme"
2726
api "k8s.io/kubernetes/pkg/apis/core"
2827
"k8s.io/kubernetes/pkg/apis/core/validation"
@@ -60,8 +59,6 @@ func (endpointsStrategy) Validate(ctx context.Context, obj runtime.Object) field
6059

6160
// Canonicalize normalizes the object after validation.
6261
func (endpointsStrategy) Canonicalize(obj runtime.Object) {
63-
endpoints := obj.(*api.Endpoints)
64-
endpoints.Subsets = endptspkg.RepackSubsets(endpoints.Subsets)
6562
}
6663

6764
// AllowCreateOnUpdate is true for endpoints.

test/integration/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ filegroup(
5050
"//test/integration/deployment:all-srcs",
5151
"//test/integration/disruption:all-srcs",
5252
"//test/integration/dryrun:all-srcs",
53+
"//test/integration/endpoints:all-srcs",
5354
"//test/integration/endpointslice:all-srcs",
5455
"//test/integration/etcd:all-srcs",
5556
"//test/integration/events:all-srcs",

test/integration/endpoints/BUILD

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
load("@io_bazel_rules_go//go:def.bzl", "go_test")
2+
3+
go_test(
4+
name = "go_default_test",
5+
srcs = [
6+
"endpoints_test.go",
7+
"main_test.go",
8+
],
9+
tags = ["integration"],
10+
deps = [
11+
"//pkg/controller/endpoint:go_default_library",
12+
"//staging/src/k8s.io/api/core/v1:go_default_library",
13+
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
14+
"//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
15+
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
16+
"//staging/src/k8s.io/client-go/informers:go_default_library",
17+
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
18+
"//staging/src/k8s.io/client-go/rest:go_default_library",
19+
"//test/integration/framework:go_default_library",
20+
],
21+
)
22+
23+
filegroup(
24+
name = "package-srcs",
25+
srcs = glob(["**"]),
26+
tags = ["automanaged"],
27+
visibility = ["//visibility:private"],
28+
)
29+
30+
filegroup(
31+
name = "all-srcs",
32+
srcs = [":package-srcs"],
33+
tags = ["automanaged"],
34+
visibility = ["//visibility:public"],
35+
)
Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
/*
2+
Copyright 2020 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package endpoints
18+
19+
import (
20+
"context"
21+
"testing"
22+
"time"
23+
24+
v1 "k8s.io/api/core/v1"
25+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
26+
"k8s.io/apimachinery/pkg/util/intstr"
27+
"k8s.io/apimachinery/pkg/util/wait"
28+
"k8s.io/client-go/informers"
29+
clientset "k8s.io/client-go/kubernetes"
30+
restclient "k8s.io/client-go/rest"
31+
"k8s.io/kubernetes/pkg/controller/endpoint"
32+
"k8s.io/kubernetes/test/integration/framework"
33+
)
34+
35+
func TestEndpointUpdates(t *testing.T) {
36+
masterConfig := framework.NewIntegrationTestMasterConfig()
37+
_, server, closeFn := framework.RunAMaster(masterConfig)
38+
defer closeFn()
39+
40+
config := restclient.Config{Host: server.URL}
41+
client, err := clientset.NewForConfig(&config)
42+
if err != nil {
43+
t.Fatalf("Error creating clientset: %v", err)
44+
}
45+
46+
informers := informers.NewSharedInformerFactory(client, 0)
47+
48+
epController := endpoint.NewEndpointController(
49+
informers.Core().V1().Pods(),
50+
informers.Core().V1().Services(),
51+
informers.Core().V1().Endpoints(),
52+
client,
53+
0)
54+
55+
// Start informer and controllers
56+
stopCh := make(chan struct{})
57+
defer close(stopCh)
58+
informers.Start(stopCh)
59+
go epController.Run(1, stopCh)
60+
61+
// Create namespace
62+
ns := framework.CreateTestingNamespace("test-endpoints-updates", server, t)
63+
defer framework.DeleteTestingNamespace(ns, server, t)
64+
65+
// Create a pod with labels
66+
pod := &v1.Pod{
67+
ObjectMeta: metav1.ObjectMeta{
68+
Name: "test-pod",
69+
Namespace: ns.Name,
70+
Labels: labelMap(),
71+
},
72+
Spec: v1.PodSpec{
73+
NodeName: "fakenode",
74+
Containers: []v1.Container{
75+
{
76+
Name: "fake-name",
77+
Image: "fakeimage",
78+
},
79+
},
80+
},
81+
}
82+
83+
createdPod, err := client.CoreV1().Pods(ns.Name).Create(context.TODO(), pod, metav1.CreateOptions{})
84+
if err != nil {
85+
t.Fatalf("Failed to create pod %s: %v", pod.Name, err)
86+
}
87+
88+
// Set pod IPs
89+
createdPod.Status = v1.PodStatus{
90+
Phase: v1.PodRunning,
91+
PodIPs: []v1.PodIP{{IP: "1.1.1.1"}, {IP: "2001:db8::"}},
92+
}
93+
_, err = client.CoreV1().Pods(ns.Name).UpdateStatus(context.TODO(), createdPod, metav1.UpdateOptions{})
94+
if err != nil {
95+
t.Fatalf("Failed to update status of pod %s: %v", pod.Name, err)
96+
}
97+
98+
// Create a service associated to the pod
99+
svc := newService(ns.Name, "foo1")
100+
svc1, err := client.CoreV1().Services(ns.Name).Create(context.TODO(), svc, metav1.CreateOptions{})
101+
if err != nil {
102+
t.Fatalf("Failed to create service %s: %v", svc.Name, err)
103+
}
104+
105+
// Obtain ResourceVersion of the new endpoint created
106+
var resVersion string
107+
if err := wait.PollImmediate(1*time.Second, wait.ForeverTestTimeout, func() (bool, error) {
108+
endpoints, err := client.CoreV1().Endpoints(ns.Name).Get(context.TODO(), svc.Name, metav1.GetOptions{})
109+
if err != nil {
110+
t.Logf("error fetching endpoints: %v", err)
111+
return false, nil
112+
}
113+
resVersion = endpoints.ObjectMeta.ResourceVersion
114+
return true, nil
115+
}); err != nil {
116+
t.Fatalf("endpoints not found: %v", err)
117+
}
118+
119+
// Force recomputation on the endpoint controller
120+
svc1.SetAnnotations(map[string]string{"foo": "bar"})
121+
_, err = client.CoreV1().Services(ns.Name).Update(context.TODO(), svc1, metav1.UpdateOptions{})
122+
if err != nil {
123+
t.Fatalf("Failed to update service %s: %v", svc1.Name, err)
124+
}
125+
126+
// Create a new service and wait until it has been processed,
127+
// this way we can be sure that the endpoint for the original service
128+
// was recomputed before asserting, since we only have 1 worker
129+
// in the endpoint controller
130+
svc2 := newService(ns.Name, "foo2")
131+
_, err = client.CoreV1().Services(ns.Name).Create(context.TODO(), svc2, metav1.CreateOptions{})
132+
if err != nil {
133+
t.Fatalf("Failed to create service %s: %v", svc.Name, err)
134+
}
135+
136+
if err := wait.PollImmediate(1*time.Second, wait.ForeverTestTimeout, func() (bool, error) {
137+
_, err := client.CoreV1().Endpoints(ns.Name).Get(context.TODO(), svc2.Name, metav1.GetOptions{})
138+
if err != nil {
139+
t.Logf("error fetching endpoints: %v", err)
140+
return false, nil
141+
}
142+
return true, nil
143+
}); err != nil {
144+
t.Fatalf("endpoints not found: %v", err)
145+
}
146+
147+
// the endpoint controller should not update the endpoint created for the original
148+
// service since nothing has changed, the resource version has to be the same
149+
endpoints, err := client.CoreV1().Endpoints(ns.Name).Get(context.TODO(), svc.Name, metav1.GetOptions{})
150+
if err != nil {
151+
t.Fatalf("error fetching endpoints: %v", err)
152+
}
153+
if resVersion != endpoints.ObjectMeta.ResourceVersion {
154+
t.Fatalf("endpoints resource version does not match, expected %s received %s", resVersion, endpoints.ObjectMeta.ResourceVersion)
155+
}
156+
157+
}
158+
159+
func labelMap() map[string]string {
160+
return map[string]string{"foo": "bar"}
161+
}
162+
163+
// newService returns a service with selector and exposing ports
164+
func newService(namespace, name string) *v1.Service {
165+
return &v1.Service{
166+
ObjectMeta: metav1.ObjectMeta{
167+
Name: name,
168+
Namespace: namespace,
169+
Labels: labelMap(),
170+
},
171+
Spec: v1.ServiceSpec{
172+
Selector: labelMap(),
173+
Ports: []v1.ServicePort{
174+
{Name: "port-1338", Port: 1338, Protocol: "TCP", TargetPort: intstr.FromInt(1338)},
175+
{Name: "port-1337", Port: 1337, Protocol: "TCP", TargetPort: intstr.FromInt(1337)},
176+
},
177+
},
178+
}
179+
180+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
Copyright 2020 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package endpoints
18+
19+
import (
20+
"testing"
21+
22+
"k8s.io/kubernetes/test/integration/framework"
23+
)
24+
25+
func TestMain(m *testing.M) {
26+
framework.EtcdMain(m.Run)
27+
}

0 commit comments

Comments
 (0)