Skip to content

Commit 2133f9e

Browse files
author
louisgong
committed
add service selector cache in endpoint controller and endpointSlice controller
1 parent 74cbf0d commit 2133f9e

File tree

6 files changed

+309
-41
lines changed

6 files changed

+309
-41
lines changed

pkg/controller/endpoint/endpoints_controller.go

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"k8s.io/apimachinery/pkg/labels"
3030
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
3131
"k8s.io/apimachinery/pkg/util/wait"
32+
utilfeature "k8s.io/apiserver/pkg/util/feature"
3233
coreinformers "k8s.io/client-go/informers/core/v1"
3334
clientset "k8s.io/client-go/kubernetes"
3435
"k8s.io/client-go/kubernetes/scheme"
@@ -46,8 +47,6 @@ import (
4647
helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
4748
"k8s.io/kubernetes/pkg/controller"
4849
endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint"
49-
50-
utilfeature "k8s.io/apiserver/pkg/util/feature"
5150
"k8s.io/kubernetes/pkg/features"
5251
utilnet "k8s.io/utils/net"
5352
)
@@ -93,11 +92,11 @@ func NewEndpointController(podInformer coreinformers.PodInformer, serviceInforme
9392
}
9493

9594
serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
96-
AddFunc: e.enqueueService,
95+
AddFunc: e.onServiceUpdate,
9796
UpdateFunc: func(old, cur interface{}) {
98-
e.enqueueService(cur)
97+
e.onServiceUpdate(cur)
9998
},
100-
DeleteFunc: e.enqueueService,
99+
DeleteFunc: e.onServiceDelete,
101100
})
102101
e.serviceLister = serviceInformer.Lister()
103102
e.servicesSynced = serviceInformer.Informer().HasSynced
@@ -119,6 +118,8 @@ func NewEndpointController(podInformer coreinformers.PodInformer, serviceInforme
119118

120119
e.endpointUpdatesBatchPeriod = endpointUpdatesBatchPeriod
121120

121+
e.serviceSelectorCache = endpointutil.NewServiceSelectorCache()
122+
122123
return e
123124
}
124125

@@ -164,6 +165,10 @@ type EndpointController struct {
164165
triggerTimeTracker *endpointutil.TriggerTimeTracker
165166

166167
endpointUpdatesBatchPeriod time.Duration
168+
169+
// serviceSelectorCache is a cache of service selectors to avoid high CPU consumption caused by frequent calls
170+
// to AsSelectorPreValidated (see #73527)
171+
serviceSelectorCache *endpointutil.ServiceSelectorCache
167172
}
168173

169174
// Run will not return until stopCh is closed. workers determines how many
@@ -195,7 +200,7 @@ func (e *EndpointController) Run(workers int, stopCh <-chan struct{}) {
195200
// enqueue them. obj must have *v1.Pod type.
196201
func (e *EndpointController) addPod(obj interface{}) {
197202
pod := obj.(*v1.Pod)
198-
services, err := endpointutil.GetPodServiceMemberships(e.serviceLister, pod)
203+
services, err := e.serviceSelectorCache.GetPodServiceMemberships(e.serviceLister, pod)
199204
if err != nil {
200205
utilruntime.HandleError(fmt.Errorf("Unable to get pod %s/%s's service memberships: %v", pod.Namespace, pod.Name, err))
201206
return
@@ -263,7 +268,7 @@ func endpointChanged(pod1, pod2 *v1.Pod) bool {
263268
// and what services it will be a member of, and enqueue the union of these.
264269
// old and cur must be *v1.Pod types.
265270
func (e *EndpointController) updatePod(old, cur interface{}) {
266-
services := endpointutil.GetServicesToUpdateOnPodChange(e.serviceLister, old, cur, endpointChanged)
271+
services := endpointutil.GetServicesToUpdateOnPodChange(e.serviceLister, e.serviceSelectorCache, old, cur, endpointChanged)
267272
for key := range services {
268273
e.queue.AddAfter(key, e.endpointUpdatesBatchPeriod)
269274
}
@@ -278,14 +283,27 @@ func (e *EndpointController) deletePod(obj interface{}) {
278283
}
279284
}
280285

281-
// obj could be an *v1.Service, or a DeletionFinalStateUnknown marker item.
282-
func (e *EndpointController) enqueueService(obj interface{}) {
286+
// onServiceUpdate updates the Service Selector in the cache and queues the Service for processing.
287+
func (e *EndpointController) onServiceUpdate(obj interface{}) {
288+
key, err := controller.KeyFunc(obj)
289+
if err != nil {
290+
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
291+
return
292+
}
293+
294+
_ = e.serviceSelectorCache.Update(key, obj.(*v1.Service).Spec.Selector)
295+
e.queue.Add(key)
296+
}
297+
298+
// onServiceDelete removes the Service Selector from the cache and queues the Service for processing.
299+
func (e *EndpointController) onServiceDelete(obj interface{}) {
283300
key, err := controller.KeyFunc(obj)
284301
if err != nil {
285302
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
286303
return
287304
}
288305

306+
e.serviceSelectorCache.Delete(key)
289307
e.queue.Add(key)
290308
}
291309

pkg/controller/endpoint/endpoints_controller_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -782,7 +782,7 @@ func TestWaitsForAllInformersToBeSynced2(t *testing.T) {
782782
},
783783
}
784784
endpoints.serviceStore.Add(service)
785-
endpoints.enqueueService(service)
785+
endpoints.onServiceUpdate(service)
786786
endpoints.podsSynced = test.podsSynced
787787
endpoints.servicesSynced = test.servicesSynced
788788
endpoints.endpointsSynced = test.endpointsSynced

pkg/controller/endpointslice/endpointslice_controller.go

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -82,11 +82,11 @@ func NewController(podInformer coreinformers.PodInformer,
8282
}
8383

8484
serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
85-
AddFunc: c.enqueueService,
85+
AddFunc: c.onServiceUpdate,
8686
UpdateFunc: func(old, cur interface{}) {
87-
c.enqueueService(cur)
87+
c.onServiceUpdate(cur)
8888
},
89-
DeleteFunc: c.enqueueService,
89+
DeleteFunc: c.onServiceDelete,
9090
})
9191
c.serviceLister = serviceInformer.Lister()
9292
c.servicesSynced = serviceInformer.Informer().HasSynced
@@ -118,6 +118,8 @@ func NewController(podInformer coreinformers.PodInformer,
118118
c.eventBroadcaster = broadcaster
119119
c.eventRecorder = recorder
120120

121+
c.serviceSelectorCache = endpointutil.NewServiceSelectorCache()
122+
121123
return c
122124
}
123125

@@ -176,6 +178,10 @@ type Controller struct {
176178
// workerLoopPeriod is the time between worker runs. The workers
177179
// process the queue of service and pod changes
178180
workerLoopPeriod time.Duration
181+
182+
// serviceSelectorCache is a cache of service selectors to avoid high CPU consumption caused by frequent calls
183+
// to AsSelectorPreValidated (see #73527)
184+
serviceSelectorCache *endpointutil.ServiceSelectorCache
179185
}
180186

181187
// Run will not return until stopCh is closed.
@@ -307,20 +313,33 @@ func (c *Controller) syncService(key string) error {
307313
return nil
308314
}
309315

310-
// obj could be a *v1.Service or a DeletionalFinalStateUnknown marker item
311-
func (c *Controller) enqueueService(obj interface{}) {
316+
// onServiceUpdate updates the Service Selector in the cache and queues the Service for processing.
317+
func (c *Controller) onServiceUpdate(obj interface{}) {
318+
key, err := controller.KeyFunc(obj)
319+
if err != nil {
320+
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
321+
return
322+
}
323+
324+
_ = c.serviceSelectorCache.Update(key, obj.(*v1.Service).Spec.Selector)
325+
c.queue.Add(key)
326+
}
327+
328+
// onServiceDelete removes the Service Selector from the cache and queues the Service for processing.
329+
func (c *Controller) onServiceDelete(obj interface{}) {
312330
key, err := controller.KeyFunc(obj)
313331
if err != nil {
314-
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object"))
332+
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
315333
return
316334
}
317335

336+
c.serviceSelectorCache.Delete(key)
318337
c.queue.Add(key)
319338
}
320339

321340
func (c *Controller) addPod(obj interface{}) {
322341
pod := obj.(*v1.Pod)
323-
services, err := endpointutil.GetPodServiceMemberships(c.serviceLister, pod)
342+
services, err := c.serviceSelectorCache.GetPodServiceMemberships(c.serviceLister, pod)
324343
if err != nil {
325344
utilruntime.HandleError(fmt.Errorf("Unable to get pod %s/%s's service memberships: %v", pod.Namespace, pod.Name, err))
326345
return
@@ -331,7 +350,7 @@ func (c *Controller) addPod(obj interface{}) {
331350
}
332351

333352
func (c *Controller) updatePod(old, cur interface{}) {
334-
services := endpointutil.GetServicesToUpdateOnPodChange(c.serviceLister, old, cur, podEndpointChanged)
353+
services := endpointutil.GetServicesToUpdateOnPodChange(c.serviceLister, c.serviceSelectorCache, old, cur, podEndpointChanged)
335354
for key := range services {
336355
c.queue.Add(key)
337356
}

pkg/controller/util/endpoint/BUILD

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ go_library(
1414
"//pkg/util/hash:go_default_library",
1515
"//staging/src/k8s.io/api/core/v1:go_default_library",
1616
"//staging/src/k8s.io/api/discovery/v1alpha1:go_default_library",
17+
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
1718
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
1819
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
1920
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
@@ -31,8 +32,11 @@ go_test(
3132
deps = [
3233
"//staging/src/k8s.io/api/core/v1:go_default_library",
3334
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
35+
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
3436
"//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
3537
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
38+
"//staging/src/k8s.io/client-go/informers:go_default_library",
39+
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
3640
],
3741
)
3842

pkg/controller/util/endpoint/controller_utils.go

Lines changed: 75 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,11 @@ import (
2222
"fmt"
2323
"reflect"
2424
"sort"
25+
"sync"
2526

2627
v1 "k8s.io/api/core/v1"
2728
discovery "k8s.io/api/discovery/v1alpha1"
29+
"k8s.io/apimachinery/pkg/labels"
2830
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
2931
"k8s.io/apimachinery/pkg/util/sets"
3032
v1listers "k8s.io/client-go/listers/core/v1"
@@ -34,6 +36,76 @@ import (
3436
"k8s.io/kubernetes/pkg/util/hash"
3537
)
3638

39+
// ServiceSelectorCache is a cache of service selectors to avoid high CPU consumption caused by frequent calls to AsSelectorPreValidated (see #73527)
40+
type ServiceSelectorCache struct {
41+
lock sync.RWMutex
42+
cache map[string]labels.Selector
43+
}
44+
45+
// NewServiceSelectorCache init ServiceSelectorCache for both endpoint controller and endpointSlice controller.
46+
func NewServiceSelectorCache() *ServiceSelectorCache {
47+
return &ServiceSelectorCache{
48+
cache: map[string]labels.Selector{},
49+
}
50+
}
51+
52+
// Get return selector and existence in ServiceSelectorCache by key.
53+
func (sc *ServiceSelectorCache) Get(key string) (labels.Selector, bool) {
54+
sc.lock.RLock()
55+
selector, ok := sc.cache[key]
56+
// fine-grained lock improves GetPodServiceMemberships performance(16.5%) than defer measured by BenchmarkGetPodServiceMemberships
57+
sc.lock.RUnlock()
58+
return selector, ok
59+
}
60+
61+
// Update can update or add a selector in ServiceSelectorCache while service's selector changed.
62+
func (sc *ServiceSelectorCache) Update(key string, rawSelector map[string]string) labels.Selector {
63+
sc.lock.Lock()
64+
defer sc.lock.Unlock()
65+
selector := labels.Set(rawSelector).AsSelectorPreValidated()
66+
sc.cache[key] = selector
67+
return selector
68+
}
69+
70+
// Delete can delete selector which exist in ServiceSelectorCache.
71+
func (sc *ServiceSelectorCache) Delete(key string) {
72+
sc.lock.Lock()
73+
defer sc.lock.Unlock()
74+
delete(sc.cache, key)
75+
}
76+
77+
// GetPodServiceMemberships returns a set of Service keys for Services that have
78+
// a selector matching the given pod.
79+
func (sc *ServiceSelectorCache) GetPodServiceMemberships(serviceLister v1listers.ServiceLister, pod *v1.Pod) (sets.String, error) {
80+
set := sets.String{}
81+
services, err := serviceLister.Services(pod.Namespace).List(labels.Everything())
82+
if err != nil {
83+
return set, err
84+
}
85+
86+
var selector labels.Selector
87+
for _, service := range services {
88+
if service.Spec.Selector == nil {
89+
// if the service has a nil selector this means selectors match nothing, not everything.
90+
continue
91+
}
92+
key, err := controller.KeyFunc(service)
93+
if err != nil {
94+
return nil, err
95+
}
96+
if v, ok := sc.Get(key); ok {
97+
selector = v
98+
} else {
99+
selector = sc.Update(key, service.Spec.Selector)
100+
}
101+
102+
if selector.Matches(labels.Set(pod.Labels)) {
103+
set.Insert(key)
104+
}
105+
}
106+
return set, nil
107+
}
108+
37109
// EndpointsMatch is a type of function that returns true if pod endpoints match.
38110
type EndpointsMatch func(*v1.Pod, *v1.Pod) bool
39111

@@ -100,29 +172,9 @@ func PodChanged(oldPod, newPod *v1.Pod, endpointChanged EndpointsMatch) (bool, b
100172
return endpointChanged(newPod, oldPod), labelsChanged
101173
}
102174

103-
// GetPodServiceMemberships returns a set of Service keys for Services that have
104-
// a selector matching the given pod.
105-
func GetPodServiceMemberships(serviceLister v1listers.ServiceLister, pod *v1.Pod) (sets.String, error) {
106-
set := sets.String{}
107-
services, err := serviceLister.GetPodServices(pod)
108-
if err != nil {
109-
// don't log this error because this function makes pointless
110-
// errors when no services match
111-
return set, nil
112-
}
113-
for i := range services {
114-
key, err := controller.KeyFunc(services[i])
115-
if err != nil {
116-
return nil, err
117-
}
118-
set.Insert(key)
119-
}
120-
return set, nil
121-
}
122-
123175
// GetServicesToUpdateOnPodChange returns a set of Service keys for Services
124176
// that have potentially been affected by a change to this pod.
125-
func GetServicesToUpdateOnPodChange(serviceLister v1listers.ServiceLister, old, cur interface{}, endpointChanged EndpointsMatch) sets.String {
177+
func GetServicesToUpdateOnPodChange(serviceLister v1listers.ServiceLister, selectorCache *ServiceSelectorCache, old, cur interface{}, endpointChanged EndpointsMatch) sets.String {
126178
newPod := cur.(*v1.Pod)
127179
oldPod := old.(*v1.Pod)
128180
if newPod.ResourceVersion == oldPod.ResourceVersion {
@@ -138,14 +190,14 @@ func GetServicesToUpdateOnPodChange(serviceLister v1listers.ServiceLister, old,
138190
return sets.String{}
139191
}
140192

141-
services, err := GetPodServiceMemberships(serviceLister, newPod)
193+
services, err := selectorCache.GetPodServiceMemberships(serviceLister, newPod)
142194
if err != nil {
143195
utilruntime.HandleError(fmt.Errorf("Unable to get pod %s/%s's service memberships: %v", newPod.Namespace, newPod.Name, err))
144196
return sets.String{}
145197
}
146198

147199
if labelsChanged {
148-
oldServices, err := GetPodServiceMemberships(serviceLister, oldPod)
200+
oldServices, err := selectorCache.GetPodServiceMemberships(serviceLister, oldPod)
149201
if err != nil {
150202
utilruntime.HandleError(fmt.Errorf("Unable to get pod %s/%s's service memberships: %v", newPod.Namespace, newPod.Name, err))
151203
}

0 commit comments

Comments
 (0)