Skip to content

Commit e389237

Browse files
author
Ricardo Pchevuzinske Katz
committed
Remove watching Endpoints of Headless Services
Signed-off-by: Ricardo Pchevuzinske Katz <[email protected]>
1 parent 5df8781 commit e389237

File tree

7 files changed

+90
-4
lines changed

7 files changed

+90
-4
lines changed

cmd/kube-proxy/app/BUILD

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,9 @@ go_library(
4545
"//pkg/version/verflag:go_default_library",
4646
"//staging/src/k8s.io/api/core/v1:go_default_library",
4747
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
48+
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
4849
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
50+
"//staging/src/k8s.io/apimachinery/pkg/selection:go_default_library",
4951
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
5052
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
5153
"//staging/src/k8s.io/apiserver/pkg/server/healthz:go_default_library",

cmd/kube-proxy/app/server.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,9 @@ import (
3030

3131
v1 "k8s.io/api/core/v1"
3232
v1meta "k8s.io/apimachinery/pkg/apis/meta/v1"
33+
"k8s.io/apimachinery/pkg/labels"
3334
"k8s.io/apimachinery/pkg/runtime"
35+
"k8s.io/apimachinery/pkg/selection"
3436
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
3537
"k8s.io/apimachinery/pkg/util/wait"
3638
"k8s.io/apiserver/pkg/server/healthz"
@@ -603,9 +605,22 @@ func (s *ProxyServer) Run() error {
603605
}
604606
}
605607

608+
noProxyName, err := labels.NewRequirement(apis.LabelServiceProxyName, selection.DoesNotExist, nil)
609+
if err != nil {
610+
return err
611+
}
612+
613+
noHeadlessEndpoints, err := labels.NewRequirement(v1.IsHeadlessService, selection.DoesNotExist, nil)
614+
if err != nil {
615+
return err
616+
}
617+
618+
labelSelector := labels.NewSelector()
619+
labelSelector = labelSelector.Add(*noProxyName, *noHeadlessEndpoints)
620+
606621
informerFactory := informers.NewSharedInformerFactoryWithOptions(s.Client, s.ConfigSyncPeriod,
607622
informers.WithTweakListOptions(func(options *v1meta.ListOptions) {
608-
options.LabelSelector = "!" + apis.LabelServiceProxyName
623+
options.LabelSelector = labelSelector.String()
609624
}))
610625

611626
// Create configs (i.e. Watches for Services and Endpoints)

pkg/controller/endpoint/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ go_library(
1818
"//pkg/api/v1/endpoints:go_default_library",
1919
"//pkg/api/v1/pod:go_default_library",
2020
"//pkg/apis/core:go_default_library",
21+
"//pkg/apis/core/v1/helper:go_default_library",
2122
"//pkg/controller:go_default_library",
2223
"//pkg/util/metrics:go_default_library",
2324
"//staging/src/k8s.io/api/core/v1:go_default_library",

pkg/controller/endpoint/endpoints_controller.go

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ import (
4343
"k8s.io/kubernetes/pkg/api/v1/endpoints"
4444
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
4545
api "k8s.io/kubernetes/pkg/apis/core"
46+
helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
4647
"k8s.io/kubernetes/pkg/controller"
4748
"k8s.io/kubernetes/pkg/util/metrics"
4849
)
@@ -55,8 +56,8 @@ const (
5556
// 5ms, 10ms, 20ms, 40ms, 80ms, 160ms, 320ms, 640ms, 1.3s, 2.6s, 5.1s, 10.2s, 20.4s, 41s, 82s
5657
maxRetries = 15
5758

58-
// An annotation on the Service denoting if the endpoints controller should
59-
// go ahead and create endpoints for unready pods. This annotation is
59+
// TolerateUnreadyEndpointsAnnotation is an annotation on the Service denoting if the endpoints
60+
// controller should go ahead and create endpoints for unready pods. This annotation is
6061
// currently only used by StatefulSets, where we need the pod to be DNS
6162
// resolvable during initialization and termination. In this situation we
6263
// create a headless Service just for the StatefulSet, and clients shouldn't
@@ -545,6 +546,16 @@ func (e *EndpointController) syncService(key string) error {
545546
delete(newEndpoints.Annotations, v1.EndpointsLastChangeTriggerTime)
546547
}
547548

549+
if newEndpoints.Labels == nil {
550+
newEndpoints.Labels = make(map[string]string)
551+
}
552+
553+
if !helper.IsServiceIPSet(service) {
554+
newEndpoints.Labels[v1.IsHeadlessService] = ""
555+
} else {
556+
delete(newEndpoints.Labels, v1.IsHeadlessService)
557+
}
558+
548559
klog.V(4).Infof("Update endpoints for %v/%v, ready: %d not ready: %d", service.Namespace, service.Name, totalReadyEps, totalNotReadyEps)
549560
if createEndpoints {
550561
// No previous endpoints, create them

pkg/controller/endpoint/endpoints_controller_test.go

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,9 @@ func TestSyncEndpointsProtocolTCP(t *testing.T) {
305305
Name: "foo",
306306
Namespace: ns,
307307
ResourceVersion: "1",
308+
Labels: map[string]string{
309+
v1.IsHeadlessService: "",
310+
},
308311
},
309312
Subsets: []v1.EndpointSubset{{
310313
Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
@@ -346,6 +349,9 @@ func TestSyncEndpointsProtocolUDP(t *testing.T) {
346349
Name: "foo",
347350
Namespace: ns,
348351
ResourceVersion: "1",
352+
Labels: map[string]string{
353+
v1.IsHeadlessService: "",
354+
},
349355
},
350356
Subsets: []v1.EndpointSubset{{
351357
Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
@@ -387,6 +393,9 @@ func TestSyncEndpointsProtocolSCTP(t *testing.T) {
387393
Name: "foo",
388394
Namespace: ns,
389395
ResourceVersion: "1",
396+
Labels: map[string]string{
397+
v1.IsHeadlessService: "",
398+
},
390399
},
391400
Subsets: []v1.EndpointSubset{{
392401
Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
@@ -424,6 +433,9 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) {
424433
Name: "foo",
425434
Namespace: ns,
426435
ResourceVersion: "1",
436+
Labels: map[string]string{
437+
v1.IsHeadlessService: "",
438+
},
427439
},
428440
Subsets: []v1.EndpointSubset{{
429441
Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
@@ -461,6 +473,9 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllNotReady(t *testing.T) {
461473
Name: "foo",
462474
Namespace: ns,
463475
ResourceVersion: "1",
476+
Labels: map[string]string{
477+
v1.IsHeadlessService: "",
478+
},
464479
},
465480
Subsets: []v1.EndpointSubset{{
466481
NotReadyAddresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
@@ -498,6 +513,9 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllMixed(t *testing.T) {
498513
Name: "foo",
499514
Namespace: ns,
500515
ResourceVersion: "1",
516+
Labels: map[string]string{
517+
v1.IsHeadlessService: "",
518+
},
501519
},
502520
Subsets: []v1.EndpointSubset{{
503521
Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
@@ -539,6 +557,9 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) {
539557
Name: "foo",
540558
Namespace: ns,
541559
ResourceVersion: "1",
560+
Labels: map[string]string{
561+
v1.IsHeadlessService: "",
562+
},
542563
},
543564
Subsets: []v1.EndpointSubset{{
544565
Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
@@ -610,6 +631,9 @@ func TestSyncEndpointsItems(t *testing.T) {
610631
ObjectMeta: metav1.ObjectMeta{
611632
ResourceVersion: "",
612633
Name: "foo",
634+
Labels: map[string]string{
635+
v1.IsHeadlessService: "",
636+
},
613637
},
614638
Subsets: endptspkg.SortSubsets(expectedSubsets),
615639
})
@@ -651,6 +675,8 @@ func TestSyncEndpointsItemsWithLabels(t *testing.T) {
651675
{Name: "port1", Port: 8088, Protocol: "TCP"},
652676
},
653677
}}
678+
679+
serviceLabels[v1.IsHeadlessService] = ""
654680
data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
655681
ObjectMeta: metav1.ObjectMeta{
656682
ResourceVersion: "",
@@ -697,6 +723,7 @@ func TestSyncEndpointsItemsPreexistingLabelsChange(t *testing.T) {
697723
})
698724
endpoints.syncService(ns + "/foo")
699725

726+
serviceLabels[v1.IsHeadlessService] = ""
700727
data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
701728
ObjectMeta: metav1.ObjectMeta{
702729
Name: "foo",
@@ -797,6 +824,9 @@ func TestSyncEndpointsHeadlessService(t *testing.T) {
797824
Name: "foo",
798825
Namespace: ns,
799826
ResourceVersion: "1",
827+
Labels: map[string]string{
828+
v1.IsHeadlessService: "",
829+
},
800830
},
801831
Subsets: []v1.EndpointSubset{{
802832
Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
@@ -837,6 +867,9 @@ func TestSyncEndpointsItemsExcludeNotReadyPodsWithRestartPolicyNeverAndPhaseFail
837867
Name: "foo",
838868
Namespace: ns,
839869
ResourceVersion: "1",
870+
Labels: map[string]string{
871+
v1.IsHeadlessService: "",
872+
},
840873
},
841874
Subsets: []v1.EndpointSubset{},
842875
})
@@ -873,6 +906,9 @@ func TestSyncEndpointsItemsExcludeNotReadyPodsWithRestartPolicyNeverAndPhaseSucc
873906
Name: "foo",
874907
Namespace: ns,
875908
ResourceVersion: "1",
909+
Labels: map[string]string{
910+
v1.IsHeadlessService: "",
911+
},
876912
},
877913
Subsets: []v1.EndpointSubset{},
878914
})
@@ -909,6 +945,9 @@ func TestSyncEndpointsItemsExcludeNotReadyPodsWithRestartPolicyOnFailureAndPhase
909945
Name: "foo",
910946
Namespace: ns,
911947
ResourceVersion: "1",
948+
Labels: map[string]string{
949+
v1.IsHeadlessService: "",
950+
},
912951
},
913952
Subsets: []v1.EndpointSubset{},
914953
})
@@ -934,6 +973,9 @@ func TestSyncEndpointsHeadlessWithoutPort(t *testing.T) {
934973
data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
935974
ObjectMeta: metav1.ObjectMeta{
936975
Name: "foo",
976+
Labels: map[string]string{
977+
v1.IsHeadlessService: "",
978+
},
937979
},
938980
Subsets: []v1.EndpointSubset{{
939981
Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
@@ -1222,6 +1264,9 @@ func TestLastTriggerChangeTimeAnnotation(t *testing.T) {
12221264
Annotations: map[string]string{
12231265
v1.EndpointsLastChangeTriggerTime: triggerTimeString,
12241266
},
1267+
Labels: map[string]string{
1268+
v1.IsHeadlessService: "",
1269+
},
12251270
},
12261271
Subsets: []v1.EndpointSubset{{
12271272
Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
@@ -1269,6 +1314,9 @@ func TestLastTriggerChangeTimeAnnotation_AnnotationOverridden(t *testing.T) {
12691314
Annotations: map[string]string{
12701315
v1.EndpointsLastChangeTriggerTime: triggerTimeString,
12711316
},
1317+
Labels: map[string]string{
1318+
v1.IsHeadlessService: "",
1319+
},
12721320
},
12731321
Subsets: []v1.EndpointSubset{{
12741322
Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
@@ -1314,7 +1362,9 @@ func TestLastTriggerChangeTimeAnnotation_AnnotationCleared(t *testing.T) {
13141362
Name: "foo",
13151363
Namespace: ns,
13161364
ResourceVersion: "1",
1317-
Annotations: map[string]string{}, // Annotation not set anymore.
1365+
Labels: map[string]string{
1366+
v1.IsHeadlessService: "",
1367+
}, // Annotation not set anymore.
13181368
},
13191369
Subsets: []v1.EndpointSubset{{
13201370
Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},

pkg/proxy/endpoints.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"k8s.io/klog"
2727

2828
v1 "k8s.io/api/core/v1"
29+
2930
"k8s.io/apimachinery/pkg/types"
3031
"k8s.io/apimachinery/pkg/util/sets"
3132
"k8s.io/client-go/tools/record"

staging/src/k8s.io/api/core/v1/well_known_labels.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,4 +33,10 @@ const (
3333

3434
// LabelNamespaceNodeRestriction is a forbidden label namespace that kubelets may not self-set when the NodeRestriction admission plugin is enabled
3535
LabelNamespaceNodeRestriction = "node-restriction.kubernetes.io"
36+
37+
// IsHeadlessService is added by Controller to an Endpoint denoting if its parent
38+
// Service is Headless. The existence of this label can be used further by other
39+
// controllers and kube-proxy to check if the Endpoint objects should be replicated when
40+
// using Headless Services
41+
IsHeadlessService = "service.kubernetes.io/headless"
3642
)

0 commit comments

Comments
 (0)