Skip to content

Commit 1f708f6

Browse files
authored
Merge pull request kubernetes#94112 from damemi/sort-endpoints
Remove canonicalization of endpoints by endpoints controller for better comparison
2 parents 32ab671 + 0004b19 commit 1f708f6

File tree

8 files changed

+280
-5
lines changed

8 files changed

+280
-5
lines changed

pkg/controller/endpoint/endpoints_controller.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -472,9 +472,18 @@ func (e *EndpointController) syncService(key string) error {
472472

473473
createEndpoints := len(currentEndpoints.ResourceVersion) == 0
474474

475+
// Compare the sorted subsets and labels
476+
// Remove the HeadlessService label from the endpoints if it exists,
477+
// as this won't be set on the service itself
478+
// and will cause a false negative in this diff check.
479+
// But first check if it has that label to avoid expensive copies.
480+
compareLabels := currentEndpoints.Labels
481+
if _, ok := currentEndpoints.Labels[v1.IsHeadlessService]; ok {
482+
compareLabels = utillabels.CloneAndRemoveLabel(currentEndpoints.Labels, v1.IsHeadlessService)
483+
}
475484
if !createEndpoints &&
476485
apiequality.Semantic.DeepEqual(currentEndpoints.Subsets, subsets) &&
477-
apiequality.Semantic.DeepEqual(currentEndpoints.Labels, service.Labels) {
486+
apiequality.Semantic.DeepEqual(compareLabels, service.Labels) {
478487
klog.V(5).Infof("endpoints are equal for %s/%s, skipping update", service.Namespace, service.Name)
479488
return nil
480489
}

pkg/controller/endpoint/endpoints_controller_test.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -383,6 +383,33 @@ func TestSyncEndpointsProtocolTCP(t *testing.T) {
383383
endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "PUT", &data)
384384
}
385385

386+
func TestSyncEndpointsHeadlessServiceLabel(t *testing.T) {
387+
ns := metav1.NamespaceDefault
388+
testServer, endpointsHandler := makeTestServer(t, ns)
389+
defer testServer.Close()
390+
endpoints := newController(testServer.URL, 0*time.Second)
391+
endpoints.endpointsStore.Add(&v1.Endpoints{
392+
ObjectMeta: metav1.ObjectMeta{
393+
Name: "foo",
394+
Namespace: ns,
395+
ResourceVersion: "1",
396+
Labels: map[string]string{
397+
v1.IsHeadlessService: "",
398+
},
399+
},
400+
Subsets: []v1.EndpointSubset{},
401+
})
402+
endpoints.serviceStore.Add(&v1.Service{
403+
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
404+
Spec: v1.ServiceSpec{
405+
Selector: map[string]string{"foo": "bar"},
406+
Ports: []v1.ServicePort{{Port: 80}},
407+
},
408+
})
409+
endpoints.syncService(ns + "/foo")
410+
endpointsHandler.ValidateRequestCount(t, 0)
411+
}
412+
386413
func TestSyncEndpointsProtocolUDP(t *testing.T) {
387414
ns := "other"
388415
testServer, endpointsHandler := makeTestServer(t, ns)

pkg/registry/core/endpoint/BUILD

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ go_library(
1313
],
1414
importpath = "k8s.io/kubernetes/pkg/registry/core/endpoint",
1515
deps = [
16-
"//pkg/api/endpoints:go_default_library",
1716
"//pkg/api/legacyscheme:go_default_library",
1817
"//pkg/apis/core:go_default_library",
1918
"//pkg/apis/core/validation:go_default_library",

pkg/registry/core/endpoint/strategy.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import (
2222
"k8s.io/apimachinery/pkg/runtime"
2323
"k8s.io/apimachinery/pkg/util/validation/field"
2424
"k8s.io/apiserver/pkg/storage/names"
25-
endptspkg "k8s.io/kubernetes/pkg/api/endpoints"
2625
"k8s.io/kubernetes/pkg/api/legacyscheme"
2726
api "k8s.io/kubernetes/pkg/apis/core"
2827
"k8s.io/kubernetes/pkg/apis/core/validation"
@@ -60,8 +59,6 @@ func (endpointsStrategy) Validate(ctx context.Context, obj runtime.Object) field
6059

6160
// Canonicalize normalizes the object after validation.
6261
func (endpointsStrategy) Canonicalize(obj runtime.Object) {
63-
endpoints := obj.(*api.Endpoints)
64-
endpoints.Subsets = endptspkg.RepackSubsets(endpoints.Subsets)
6562
}
6663

6764
// AllowCreateOnUpdate is true for endpoints.

test/integration/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ filegroup(
5050
"//test/integration/deployment:all-srcs",
5151
"//test/integration/disruption:all-srcs",
5252
"//test/integration/dryrun:all-srcs",
53+
"//test/integration/endpoints:all-srcs",
5354
"//test/integration/endpointslice:all-srcs",
5455
"//test/integration/etcd:all-srcs",
5556
"//test/integration/events:all-srcs",

test/integration/endpoints/BUILD

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
load("@io_bazel_rules_go//go:def.bzl", "go_test")
2+
3+
go_test(
4+
name = "go_default_test",
5+
srcs = [
6+
"endpoints_test.go",
7+
"main_test.go",
8+
],
9+
tags = ["integration"],
10+
deps = [
11+
"//pkg/controller/endpoint:go_default_library",
12+
"//staging/src/k8s.io/api/core/v1:go_default_library",
13+
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
14+
"//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
15+
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
16+
"//staging/src/k8s.io/client-go/informers:go_default_library",
17+
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
18+
"//staging/src/k8s.io/client-go/rest:go_default_library",
19+
"//test/integration/framework:go_default_library",
20+
],
21+
)
22+
23+
filegroup(
24+
name = "package-srcs",
25+
srcs = glob(["**"]),
26+
tags = ["automanaged"],
27+
visibility = ["//visibility:private"],
28+
)
29+
30+
filegroup(
31+
name = "all-srcs",
32+
srcs = [":package-srcs"],
33+
tags = ["automanaged"],
34+
visibility = ["//visibility:public"],
35+
)
Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
/*
2+
Copyright 2020 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package endpoints
18+
19+
import (
20+
"context"
21+
"testing"
22+
"time"
23+
24+
v1 "k8s.io/api/core/v1"
25+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
26+
"k8s.io/apimachinery/pkg/util/intstr"
27+
"k8s.io/apimachinery/pkg/util/wait"
28+
"k8s.io/client-go/informers"
29+
clientset "k8s.io/client-go/kubernetes"
30+
restclient "k8s.io/client-go/rest"
31+
"k8s.io/kubernetes/pkg/controller/endpoint"
32+
"k8s.io/kubernetes/test/integration/framework"
33+
)
34+
35+
func TestEndpointUpdates(t *testing.T) {
36+
masterConfig := framework.NewIntegrationTestMasterConfig()
37+
_, server, closeFn := framework.RunAMaster(masterConfig)
38+
defer closeFn()
39+
40+
config := restclient.Config{Host: server.URL}
41+
client, err := clientset.NewForConfig(&config)
42+
if err != nil {
43+
t.Fatalf("Error creating clientset: %v", err)
44+
}
45+
46+
informers := informers.NewSharedInformerFactory(client, 0)
47+
48+
epController := endpoint.NewEndpointController(
49+
informers.Core().V1().Pods(),
50+
informers.Core().V1().Services(),
51+
informers.Core().V1().Endpoints(),
52+
client,
53+
0)
54+
55+
// Start informer and controllers
56+
stopCh := make(chan struct{})
57+
defer close(stopCh)
58+
informers.Start(stopCh)
59+
go epController.Run(1, stopCh)
60+
61+
// Create namespace
62+
ns := framework.CreateTestingNamespace("test-endpoints-updates", server, t)
63+
defer framework.DeleteTestingNamespace(ns, server, t)
64+
65+
// Create a pod with labels
66+
pod := &v1.Pod{
67+
ObjectMeta: metav1.ObjectMeta{
68+
Name: "test-pod",
69+
Namespace: ns.Name,
70+
Labels: labelMap(),
71+
},
72+
Spec: v1.PodSpec{
73+
NodeName: "fakenode",
74+
Containers: []v1.Container{
75+
{
76+
Name: "fake-name",
77+
Image: "fakeimage",
78+
},
79+
},
80+
},
81+
}
82+
83+
createdPod, err := client.CoreV1().Pods(ns.Name).Create(context.TODO(), pod, metav1.CreateOptions{})
84+
if err != nil {
85+
t.Fatalf("Failed to create pod %s: %v", pod.Name, err)
86+
}
87+
88+
// Set pod IPs
89+
createdPod.Status = v1.PodStatus{
90+
Phase: v1.PodRunning,
91+
PodIPs: []v1.PodIP{{IP: "1.1.1.1"}, {IP: "2001:db8::"}},
92+
}
93+
_, err = client.CoreV1().Pods(ns.Name).UpdateStatus(context.TODO(), createdPod, metav1.UpdateOptions{})
94+
if err != nil {
95+
t.Fatalf("Failed to update status of pod %s: %v", pod.Name, err)
96+
}
97+
98+
// Create a service associated to the pod
99+
svc := newService(ns.Name, "foo1")
100+
svc1, err := client.CoreV1().Services(ns.Name).Create(context.TODO(), svc, metav1.CreateOptions{})
101+
if err != nil {
102+
t.Fatalf("Failed to create service %s: %v", svc.Name, err)
103+
}
104+
105+
// Obtain ResourceVersion of the new endpoint created
106+
var resVersion string
107+
if err := wait.PollImmediate(1*time.Second, wait.ForeverTestTimeout, func() (bool, error) {
108+
endpoints, err := client.CoreV1().Endpoints(ns.Name).Get(context.TODO(), svc.Name, metav1.GetOptions{})
109+
if err != nil {
110+
t.Logf("error fetching endpoints: %v", err)
111+
return false, nil
112+
}
113+
resVersion = endpoints.ObjectMeta.ResourceVersion
114+
return true, nil
115+
}); err != nil {
116+
t.Fatalf("endpoints not found: %v", err)
117+
}
118+
119+
// Force recomputation on the endpoint controller
120+
svc1.SetAnnotations(map[string]string{"foo": "bar"})
121+
_, err = client.CoreV1().Services(ns.Name).Update(context.TODO(), svc1, metav1.UpdateOptions{})
122+
if err != nil {
123+
t.Fatalf("Failed to update service %s: %v", svc1.Name, err)
124+
}
125+
126+
// Create a new service and wait until it has been processed,
127+
// this way we can be sure that the endpoint for the original service
128+
// was recomputed before asserting, since we only have 1 worker
129+
// in the endpoint controller
130+
svc2 := newService(ns.Name, "foo2")
131+
_, err = client.CoreV1().Services(ns.Name).Create(context.TODO(), svc2, metav1.CreateOptions{})
132+
if err != nil {
133+
t.Fatalf("Failed to create service %s: %v", svc.Name, err)
134+
}
135+
136+
if err := wait.PollImmediate(1*time.Second, wait.ForeverTestTimeout, func() (bool, error) {
137+
_, err := client.CoreV1().Endpoints(ns.Name).Get(context.TODO(), svc2.Name, metav1.GetOptions{})
138+
if err != nil {
139+
t.Logf("error fetching endpoints: %v", err)
140+
return false, nil
141+
}
142+
return true, nil
143+
}); err != nil {
144+
t.Fatalf("endpoints not found: %v", err)
145+
}
146+
147+
// the endpoint controller should not update the endpoint created for the original
148+
// service since nothing has changed, the resource version has to be the same
149+
endpoints, err := client.CoreV1().Endpoints(ns.Name).Get(context.TODO(), svc.Name, metav1.GetOptions{})
150+
if err != nil {
151+
t.Fatalf("error fetching endpoints: %v", err)
152+
}
153+
if resVersion != endpoints.ObjectMeta.ResourceVersion {
154+
t.Fatalf("endpoints resource version does not match, expected %s received %s", resVersion, endpoints.ObjectMeta.ResourceVersion)
155+
}
156+
157+
}
158+
159+
func labelMap() map[string]string {
160+
return map[string]string{"foo": "bar"}
161+
}
162+
163+
// newService returns a service with selector and exposing ports
164+
func newService(namespace, name string) *v1.Service {
165+
return &v1.Service{
166+
ObjectMeta: metav1.ObjectMeta{
167+
Name: name,
168+
Namespace: namespace,
169+
Labels: labelMap(),
170+
},
171+
Spec: v1.ServiceSpec{
172+
Selector: labelMap(),
173+
Ports: []v1.ServicePort{
174+
{Name: "port-1338", Port: 1338, Protocol: "TCP", TargetPort: intstr.FromInt(1338)},
175+
{Name: "port-1337", Port: 1337, Protocol: "TCP", TargetPort: intstr.FromInt(1337)},
176+
},
177+
},
178+
}
179+
180+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
Copyright 2020 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package endpoints
18+
19+
import (
20+
"testing"
21+
22+
"k8s.io/kubernetes/test/integration/framework"
23+
)
24+
25+
func TestMain(m *testing.M) {
26+
framework.EtcdMain(m.Run)
27+
}

0 commit comments

Comments
 (0)