Skip to content

Commit 5b4d97d

Browse files
authored
Merge pull request kubernetes#122541 from aojea/headless_selector
Implement a field selector for ClusterIP on Services
2 parents 3686ceb + 5122fe0 commit 5b4d97d

File tree

6 files changed

+295
-2
lines changed

6 files changed

+295
-2
lines changed

pkg/apis/core/v1/conversion.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,9 @@ func addConversionFuncs(scheme *runtime.Scheme) error {
9797
if err := AddFieldLabelConversionsForSecret(scheme); err != nil {
9898
return err
9999
}
100+
if err := AddFieldLabelConversionsForService(scheme); err != nil {
101+
return err
102+
}
100103
return nil
101104
}
102105

@@ -488,6 +491,20 @@ func AddFieldLabelConversionsForSecret(scheme *runtime.Scheme) error {
488491
})
489492
}
490493

494+
func AddFieldLabelConversionsForService(scheme *runtime.Scheme) error {
495+
return scheme.AddFieldLabelConversionFunc(SchemeGroupVersion.WithKind("Service"),
496+
func(label, value string) (string, string, error) {
497+
switch label {
498+
case "metadata.namespace",
499+
"metadata.name",
500+
"spec.clusterIP":
501+
return label, value, nil
502+
default:
503+
return "", "", fmt.Errorf("field label not supported: %s", label)
504+
}
505+
})
506+
}
507+
491508
var initContainerAnnotations = map[string]bool{
492509
"pod.beta.kubernetes.io/init-containers": true,
493510
"pod.alpha.kubernetes.io/init-containers": true,

pkg/kubelet/kubelet.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -456,7 +456,11 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
456456
var serviceLister corelisters.ServiceLister
457457
var serviceHasSynced cache.InformerSynced
458458
if kubeDeps.KubeClient != nil {
459-
kubeInformers := informers.NewSharedInformerFactoryWithOptions(kubeDeps.KubeClient, 0)
459+
// don't watch headless services, they are not needed since this informer is only used to create the environment variables for pods.
460+
// See https://issues.k8s.io/122394
461+
kubeInformers := informers.NewSharedInformerFactoryWithOptions(kubeDeps.KubeClient, 0, informers.WithTweakListOptions(func(options *metav1.ListOptions) {
462+
options.FieldSelector = fields.OneTermNotEqualSelector("spec.clusterIP", v1.ClusterIPNone).String()
463+
}))
460464
serviceLister = kubeInformers.Core().V1().Services().Lister()
461465
serviceHasSynced = kubeInformers.Core().V1().Services().Informer().HasSynced
462466
kubeInformers.Start(wait.NeverStop)

pkg/registry/core/service/storage/storage.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ func NewREST(
8888
store := &genericregistry.Store{
8989
NewFunc: func() runtime.Object { return &api.Service{} },
9090
NewListFunc: func() runtime.Object { return &api.ServiceList{} },
91+
PredicateFunc: svcreg.Matcher,
9192
DefaultQualifiedResource: api.Resource("services"),
9293
SingularQualifiedResource: api.Resource("service"),
9394
ReturnDeletedObject: true,
@@ -99,7 +100,10 @@ func NewREST(
99100

100101
TableConvertor: printerstorage.TableConvertor{TableGenerator: printers.NewTableGenerator().With(printersinternal.AddHandlers)},
101102
}
102-
options := &generic.StoreOptions{RESTOptions: optsGetter}
103+
options := &generic.StoreOptions{
104+
RESTOptions: optsGetter,
105+
AttrFunc: svcreg.GetAttrs,
106+
}
103107
if err := store.CompleteWithOptions(options); err != nil {
104108
return nil, nil, nil, err
105109
}

pkg/registry/core/service/strategy.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,16 @@ package service
1818

1919
import (
2020
"context"
21+
"fmt"
2122
"reflect"
2223

24+
"k8s.io/apimachinery/pkg/fields"
25+
"k8s.io/apimachinery/pkg/labels"
2326
"k8s.io/apimachinery/pkg/runtime"
2427
"k8s.io/apimachinery/pkg/util/sets"
2528
"k8s.io/apimachinery/pkg/util/validation/field"
29+
"k8s.io/apiserver/pkg/registry/generic"
30+
pkgstorage "k8s.io/apiserver/pkg/storage"
2631
"k8s.io/apiserver/pkg/storage/names"
2732
utilfeature "k8s.io/apiserver/pkg/util/feature"
2833
"k8s.io/kubernetes/pkg/api/legacyscheme"
@@ -166,6 +171,33 @@ func (serviceStatusStrategy) WarningsOnUpdate(ctx context.Context, obj, old runt
166171
return nil
167172
}
168173

174+
// GetAttrs returns labels and fields of a given object for filtering purposes.
175+
func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, error) {
176+
service, ok := obj.(*api.Service)
177+
if !ok {
178+
return nil, nil, fmt.Errorf("not a service")
179+
}
180+
return service.Labels, SelectableFields(service), nil
181+
}
182+
183+
// Matcher returns a selection predicate for a given label and field selector.
184+
func Matcher(label labels.Selector, field fields.Selector) pkgstorage.SelectionPredicate {
185+
return pkgstorage.SelectionPredicate{
186+
Label: label,
187+
Field: field,
188+
GetAttrs: GetAttrs,
189+
}
190+
}
191+
192+
// SelectableFields returns a field set that can be used for filter selection
193+
func SelectableFields(service *api.Service) fields.Set {
194+
objectMetaFieldsSet := generic.ObjectMetaFieldsSet(&service.ObjectMeta, false)
195+
serviceSpecificFieldsSet := fields.Set{
196+
"spec.clusterIP": service.Spec.ClusterIP,
197+
}
198+
return generic.MergeFieldsSets(objectMetaFieldsSet, serviceSpecificFieldsSet)
199+
}
200+
169201
// dropServiceStatusDisabledFields drops fields that are not used if their associated feature gates
170202
// are not enabled. The typical pattern is:
171203
//

pkg/registry/core/service/strategy_test.go

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ import (
2323
"github.com/google/go-cmp/cmp"
2424
"k8s.io/apimachinery/pkg/api/errors"
2525
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
26+
"k8s.io/apimachinery/pkg/fields"
27+
"k8s.io/apimachinery/pkg/labels"
2628
"k8s.io/apimachinery/pkg/util/intstr"
2729
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
2830
"k8s.io/apiserver/pkg/registry/rest"
@@ -786,3 +788,95 @@ func TestDropTypeDependentFields(t *testing.T) {
786788
})
787789
}
788790
}
791+
792+
func TestMatchService(t *testing.T) {
793+
testCases := []struct {
794+
name string
795+
in *api.Service
796+
fieldSelector fields.Selector
797+
expectMatch bool
798+
}{
799+
{
800+
name: "match on headless service",
801+
in: &api.Service{
802+
Spec: api.ServiceSpec{ClusterIP: api.ClusterIPNone},
803+
},
804+
fieldSelector: fields.ParseSelectorOrDie("spec.clusterIP=None"),
805+
expectMatch: true,
806+
},
807+
{
808+
name: "no match on clusterIP service",
809+
in: &api.Service{
810+
Spec: api.ServiceSpec{ClusterIP: "192.168.1.1"},
811+
},
812+
fieldSelector: fields.ParseSelectorOrDie("spec.clusterIP=None"),
813+
expectMatch: false,
814+
},
815+
{
816+
name: "match on clusterIP service",
817+
in: &api.Service{
818+
Spec: api.ServiceSpec{ClusterIP: "192.168.1.1"},
819+
},
820+
fieldSelector: fields.ParseSelectorOrDie("spec.clusterIP=192.168.1.1"),
821+
expectMatch: true,
822+
},
823+
{
824+
name: "match on non-headless service",
825+
in: &api.Service{
826+
Spec: api.ServiceSpec{ClusterIP: "192.168.1.1"},
827+
},
828+
fieldSelector: fields.ParseSelectorOrDie("spec.clusterIP!=None"),
829+
expectMatch: true,
830+
},
831+
{
832+
name: "match on any ClusterIP set service",
833+
in: &api.Service{
834+
Spec: api.ServiceSpec{ClusterIP: "192.168.1.1"},
835+
},
836+
fieldSelector: fields.ParseSelectorOrDie("spec.clusterIP!=\"\""),
837+
expectMatch: true,
838+
},
839+
{
840+
name: "match on clusterIP IPv6 service",
841+
in: &api.Service{
842+
Spec: api.ServiceSpec{ClusterIP: "2001:db2::1"},
843+
},
844+
fieldSelector: fields.ParseSelectorOrDie("spec.clusterIP=2001:db2::1"),
845+
expectMatch: true,
846+
},
847+
{
848+
name: "no match on headless service",
849+
in: &api.Service{
850+
Spec: api.ServiceSpec{ClusterIP: api.ClusterIPNone},
851+
},
852+
fieldSelector: fields.ParseSelectorOrDie("spec.clusterIP=192.168.1.1"),
853+
expectMatch: false,
854+
},
855+
{
856+
name: "no match on headless service",
857+
in: &api.Service{
858+
Spec: api.ServiceSpec{ClusterIP: api.ClusterIPNone},
859+
},
860+
fieldSelector: fields.ParseSelectorOrDie("spec.clusterIP=2001:db2::1"),
861+
expectMatch: false,
862+
},
863+
{
864+
name: "no match on empty service",
865+
in: &api.Service{},
866+
fieldSelector: fields.ParseSelectorOrDie("spec.clusterIP=None"),
867+
expectMatch: false,
868+
},
869+
}
870+
for _, testCase := range testCases {
871+
t.Run(testCase.name, func(t *testing.T) {
872+
m := Matcher(labels.Everything(), testCase.fieldSelector)
873+
result, err := m.Matches(testCase.in)
874+
if err != nil {
875+
t.Errorf("Unexpected error %v", err)
876+
}
877+
if result != testCase.expectMatch {
878+
t.Errorf("Result %v, Expected %v, Selector: %v, Service: %v", result, testCase.expectMatch, testCase.fieldSelector.String(), testCase.in)
879+
}
880+
})
881+
}
882+
}

test/integration/service/service_test.go

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,16 @@ package service
1919
import (
2020
"context"
2121
"testing"
22+
"time"
2223

2324
corev1 "k8s.io/api/core/v1"
2425
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
26+
"k8s.io/apimachinery/pkg/fields"
27+
"k8s.io/apimachinery/pkg/labels"
28+
"k8s.io/apimachinery/pkg/util/wait"
29+
"k8s.io/client-go/informers"
2530
clientset "k8s.io/client-go/kubernetes"
31+
"k8s.io/client-go/tools/cache"
2632
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
2733
"k8s.io/kubernetes/test/integration/framework"
2834
)
@@ -264,3 +270,139 @@ func Test_RemovingExternalIPsFromClusterIPServiceDropsExternalTrafficPolicy(t *t
264270
t.Error("service externalTrafficPolicy was not set for clusterIP Service with externalIPs")
265271
}
266272
}
273+
274+
func Test_ServiceClusterIPSelector(t *testing.T) {
275+
server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
276+
defer server.TearDownFn()
277+
278+
ctx, cancel := context.WithCancel(context.Background())
279+
defer cancel()
280+
281+
client, err := clientset.NewForConfig(server.ClientConfig)
282+
if err != nil {
283+
t.Fatalf("Error creating clientset: %v", err)
284+
}
285+
286+
ns := framework.CreateNamespaceOrDie(client, "test-external-name-drops-internal-traffic-policy", t)
287+
defer framework.DeleteNamespaceOrDie(client, ns, t)
288+
289+
// create headless service
290+
service := &corev1.Service{
291+
ObjectMeta: metav1.ObjectMeta{
292+
Name: "test-headless",
293+
Namespace: ns.Name,
294+
},
295+
Spec: corev1.ServiceSpec{
296+
ClusterIP: corev1.ClusterIPNone,
297+
Type: corev1.ServiceTypeClusterIP,
298+
Ports: []corev1.ServicePort{{
299+
Port: int32(80),
300+
}},
301+
Selector: map[string]string{
302+
"foo": "bar",
303+
},
304+
},
305+
}
306+
307+
_, err = client.CoreV1().Services(ns.Name).Create(ctx, service, metav1.CreateOptions{})
308+
if err != nil {
309+
t.Fatalf("Error creating test service: %v", err)
310+
}
311+
312+
// informer to watch only non-headless services
313+
kubeInformers := informers.NewSharedInformerFactoryWithOptions(client, 0, informers.WithTweakListOptions(func(options *metav1.ListOptions) {
314+
options.FieldSelector = fields.OneTermNotEqualSelector("spec.clusterIP", corev1.ClusterIPNone).String()
315+
}))
316+
317+
serviceInformer := kubeInformers.Core().V1().Services().Informer()
318+
serviceLister := kubeInformers.Core().V1().Services().Lister()
319+
serviceHasSynced := serviceInformer.HasSynced
320+
if _, err = serviceInformer.AddEventHandler(
321+
cache.ResourceEventHandlerFuncs{
322+
AddFunc: func(obj interface{}) {
323+
svc := obj.(*corev1.Service)
324+
t.Logf("Added Service %#v", svc)
325+
},
326+
UpdateFunc: func(oldObj, newObj interface{}) {
327+
oldSvc := oldObj.(*corev1.Service)
328+
newSvc := newObj.(*corev1.Service)
329+
t.Logf("Updated Service %#v to %#v", oldSvc, newSvc)
330+
},
331+
DeleteFunc: func(obj interface{}) {
332+
svc := obj.(*corev1.Service)
333+
t.Logf("Deleted Service %#v", svc)
334+
},
335+
},
336+
); err != nil {
337+
t.Fatalf("Error adding service informer handler: %v", err)
338+
}
339+
kubeInformers.Start(ctx.Done())
340+
cache.WaitForCacheSync(ctx.Done(), serviceHasSynced)
341+
svcs, err := serviceLister.List(labels.Everything())
342+
if err != nil {
343+
t.Fatalf("Error listing services: %v", err)
344+
}
345+
// only the kubernetes.default service expected
346+
if len(svcs) != 1 || svcs[0].Name != "kubernetes" {
347+
t.Fatalf("expected 1 services, got %d", len(svcs))
348+
}
349+
350+
// create a new service with ClusterIP
351+
service2 := service.DeepCopy()
352+
service2.Spec.ClusterIP = ""
353+
service2.Name = "test-clusterip"
354+
_, err = client.CoreV1().Services(ns.Name).Create(ctx, service2, metav1.CreateOptions{})
355+
if err != nil {
356+
t.Fatalf("Error creating test service: %v", err)
357+
}
358+
359+
err = wait.PollUntilContextTimeout(ctx, 1*time.Second, 10*time.Second, true, func(ctx context.Context) (done bool, err error) {
360+
svc, err := serviceLister.Services(service2.Namespace).Get(service2.Name)
361+
if svc == nil || err != nil {
362+
return false, nil
363+
}
364+
return true, nil
365+
})
366+
if err != nil {
367+
t.Fatalf("Error waiting for test service test-clusterip: %v", err)
368+
}
369+
370+
// mutate the Service to drop the ClusterIP, theoretically ClusterIP is inmutable but ...
371+
service.Spec.ExternalName = "test"
372+
service.Spec.Type = corev1.ServiceTypeExternalName
373+
_, err = client.CoreV1().Services(ns.Name).Update(ctx, service, metav1.UpdateOptions{})
374+
if err != nil {
375+
t.Fatalf("Error creating test service: %v", err)
376+
}
377+
378+
err = wait.PollUntilContextTimeout(ctx, 1*time.Second, 10*time.Second, true, func(ctx context.Context) (done bool, err error) {
379+
svc, err := serviceLister.Services(service.Namespace).Get(service.Name)
380+
if svc == nil || err != nil {
381+
return false, nil
382+
}
383+
return true, nil
384+
})
385+
if err != nil {
386+
t.Fatalf("Error waiting for test service without ClusterIP: %v", err)
387+
}
388+
389+
// mutate the Service to get the ClusterIP again
390+
service.Spec.ExternalName = ""
391+
service.Spec.ClusterIP = ""
392+
service.Spec.Type = corev1.ServiceTypeClusterIP
393+
_, err = client.CoreV1().Services(ns.Name).Update(ctx, service, metav1.UpdateOptions{})
394+
if err != nil {
395+
t.Fatalf("Error creating test service: %v", err)
396+
}
397+
398+
err = wait.PollUntilContextTimeout(ctx, 1*time.Second, 10*time.Second, true, func(ctx context.Context) (done bool, err error) {
399+
svc, err := serviceLister.Services(service.Namespace).Get(service.Name)
400+
if svc == nil || err != nil {
401+
return false, nil
402+
}
403+
return true, nil
404+
})
405+
if err != nil {
406+
t.Fatalf("Error waiting for test service with ClusterIP: %v", err)
407+
}
408+
}

0 commit comments

Comments
 (0)