Skip to content

Commit d4ba10e

Browse files
authored
Merge pull request kubernetes#85703 from robscott/endpointslice-controller-race-fix
Fixing Potential Race Condition in EndpointSlice Controller.
2 parents 4158e7c + c75787b commit d4ba10e

File tree

8 files changed

+449
-3
lines changed

8 files changed

+449
-3
lines changed

pkg/controller/endpointslice/BUILD

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ go_library(
55
srcs = [
66
"endpointset.go",
77
"endpointslice_controller.go",
8+
"endpointslice_tracker.go",
89
"reconciler.go",
910
"utils.go",
1011
],
@@ -49,6 +50,7 @@ go_test(
4950
name = "go_default_test",
5051
srcs = [
5152
"endpointslice_controller_test.go",
53+
"endpointslice_tracker_test.go",
5254
"reconciler_test.go",
5355
"utils_test.go",
5456
],

pkg/controller/endpointslice/endpointslice_controller.go

Lines changed: 66 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ const (
6363
func NewController(podInformer coreinformers.PodInformer,
6464
serviceInformer coreinformers.ServiceInformer,
6565
nodeInformer coreinformers.NodeInformer,
66-
esInformer discoveryinformers.EndpointSliceInformer,
66+
endpointSliceInformer discoveryinformers.EndpointSliceInformer,
6767
maxEndpointsPerSlice int32,
6868
client clientset.Interface,
6969
) *Controller {
@@ -105,15 +105,23 @@ func NewController(podInformer coreinformers.PodInformer,
105105
c.nodeLister = nodeInformer.Lister()
106106
c.nodesSynced = nodeInformer.Informer().HasSynced
107107

108-
c.endpointSliceLister = esInformer.Lister()
109-
c.endpointSlicesSynced = esInformer.Informer().HasSynced
108+
endpointSliceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
109+
AddFunc: c.onEndpointSliceAdd,
110+
UpdateFunc: c.onEndpointSliceUpdate,
111+
DeleteFunc: c.onEndpointSliceDelete,
112+
})
113+
114+
c.endpointSliceLister = endpointSliceInformer.Lister()
115+
c.endpointSlicesSynced = endpointSliceInformer.Informer().HasSynced
116+
c.endpointSliceTracker = newEndpointSliceTracker()
110117

111118
c.maxEndpointsPerSlice = maxEndpointsPerSlice
112119

113120
c.reconciler = &reconciler{
114121
client: c.client,
115122
nodeLister: c.nodeLister,
116123
maxEndpointsPerSlice: c.maxEndpointsPerSlice,
124+
endpointSliceTracker: c.endpointSliceTracker,
117125
metricsCache: endpointslicemetrics.NewCache(maxEndpointsPerSlice),
118126
}
119127
c.triggerTimeTracker = endpointutil.NewTriggerTimeTracker()
@@ -152,6 +160,10 @@ type Controller struct {
152160
// endpointSlicesSynced returns true if the endpoint slice shared informer has been synced at least once.
153161
// Added as a member to the struct to allow injection for testing.
154162
endpointSlicesSynced cache.InformerSynced
163+
// endpointSliceTracker tracks the list of EndpointSlices and associated
164+
// resource versions expected for each Service. It can help determine if a
165+
// cached EndpointSlice is out of date.
166+
endpointSliceTracker *endpointSliceTracker
155167

156168
// nodeLister is able to list/get nodes and is populated by the
157169
// shared informer passed to NewController
@@ -343,6 +355,57 @@ func (c *Controller) onServiceDelete(obj interface{}) {
343355
c.queue.Add(key)
344356
}
345357

358+
// onEndpointSliceAdd queues a sync for the relevant Service for a sync if the
359+
// EndpointSlice resource version does not match the expected version in the
360+
// endpointSliceTracker.
361+
func (c *Controller) onEndpointSliceAdd(obj interface{}) {
362+
endpointSlice := obj.(*discovery.EndpointSlice)
363+
if endpointSlice == nil {
364+
utilruntime.HandleError(fmt.Errorf("Invalid EndpointSlice provided to onEndpointSliceAdd()"))
365+
return
366+
}
367+
if managedByController(endpointSlice) && c.endpointSliceTracker.Stale(endpointSlice) {
368+
c.queueServiceForEndpointSlice(endpointSlice)
369+
}
370+
}
371+
372+
// onEndpointSliceUpdate queues a sync for the relevant Service for a sync if
373+
// the EndpointSlice resource version does not match the expected version in the
374+
// endpointSliceTracker or the managed-by value of the EndpointSlice has changed
375+
// from or to this controller.
376+
func (c *Controller) onEndpointSliceUpdate(prevObj, obj interface{}) {
377+
prevEndpointSlice := obj.(*discovery.EndpointSlice)
378+
endpointSlice := obj.(*discovery.EndpointSlice)
379+
if endpointSlice == nil || prevEndpointSlice == nil {
380+
utilruntime.HandleError(fmt.Errorf("Invalid EndpointSlice provided to onEndpointSliceUpdate()"))
381+
return
382+
}
383+
if managedByChanged(prevEndpointSlice, endpointSlice) || (managedByController(endpointSlice) && c.endpointSliceTracker.Stale(endpointSlice)) {
384+
c.queueServiceForEndpointSlice(endpointSlice)
385+
}
386+
}
387+
388+
// onEndpointSliceDelete queues a sync for the relevant Service for a sync if the
389+
// EndpointSlice resource version does not match the expected version in the
390+
// endpointSliceTracker.
391+
func (c *Controller) onEndpointSliceDelete(obj interface{}) {
392+
endpointSlice := getEndpointSliceFromDeleteAction(obj)
393+
if endpointSlice != nil && managedByController(endpointSlice) && c.endpointSliceTracker.Has(endpointSlice) {
394+
c.queueServiceForEndpointSlice(endpointSlice)
395+
}
396+
}
397+
398+
// queueServiceForEndpointSlice attempts to queue the corresponding Service for
399+
// the provided EndpointSlice.
400+
func (c *Controller) queueServiceForEndpointSlice(endpointSlice *discovery.EndpointSlice) {
401+
key, err := serviceControllerKey(endpointSlice)
402+
if err != nil {
403+
utilruntime.HandleError(fmt.Errorf("Couldn't get key for EndpointSlice %+v: %v", endpointSlice, err))
404+
return
405+
}
406+
c.queue.Add(key)
407+
}
408+
346409
func (c *Controller) addPod(obj interface{}) {
347410
pod := obj.(*v1.Pod)
348411
services, err := c.serviceSelectorCache.GetPodServiceMemberships(c.serviceLister, pod)
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
/*
2+
Copyright 2019 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package endpointslice
18+
19+
import (
20+
"sync"
21+
22+
discovery "k8s.io/api/discovery/v1beta1"
23+
"k8s.io/apimachinery/pkg/types"
24+
)
25+
26+
// endpointSliceResourceVersions tracks expected EndpointSlice resource versions
27+
// by EndpointSlice name.
28+
type endpointSliceResourceVersions map[string]string
29+
30+
// endpointSliceTracker tracks EndpointSlices and their associated resource
31+
// versions to help determine if a change to an EndpointSlice has been processed
32+
// by the EndpointSlice controller.
33+
type endpointSliceTracker struct {
34+
// lock protects resourceVersionsByService.
35+
lock sync.Mutex
36+
// resourceVersionsByService tracks the list of EndpointSlices and
37+
// associated resource versions expected for a given Service.
38+
resourceVersionsByService map[types.NamespacedName]endpointSliceResourceVersions
39+
}
40+
41+
// newEndpointSliceTracker creates and initializes a new endpointSliceTracker.
42+
func newEndpointSliceTracker() *endpointSliceTracker {
43+
return &endpointSliceTracker{
44+
resourceVersionsByService: map[types.NamespacedName]endpointSliceResourceVersions{},
45+
}
46+
}
47+
48+
// Has returns true if the endpointSliceTracker has a resource version for the
49+
// provided EndpointSlice.
50+
func (est *endpointSliceTracker) Has(endpointSlice *discovery.EndpointSlice) bool {
51+
est.lock.Lock()
52+
defer est.lock.Unlock()
53+
54+
rrv := est.relatedResourceVersions(endpointSlice)
55+
_, ok := rrv[endpointSlice.Name]
56+
return ok
57+
}
58+
59+
// Stale returns true if this endpointSliceTracker does not have a resource
60+
// version for the provided EndpointSlice or it does not match the resource
61+
// version of the provided EndpointSlice.
62+
func (est *endpointSliceTracker) Stale(endpointSlice *discovery.EndpointSlice) bool {
63+
est.lock.Lock()
64+
defer est.lock.Unlock()
65+
66+
rrv := est.relatedResourceVersions(endpointSlice)
67+
return rrv[endpointSlice.Name] != endpointSlice.ResourceVersion
68+
}
69+
70+
// Update adds or updates the resource version in this endpointSliceTracker for
71+
// the provided EndpointSlice.
72+
func (est *endpointSliceTracker) Update(endpointSlice *discovery.EndpointSlice) {
73+
est.lock.Lock()
74+
defer est.lock.Unlock()
75+
76+
rrv := est.relatedResourceVersions(endpointSlice)
77+
rrv[endpointSlice.Name] = endpointSlice.ResourceVersion
78+
}
79+
80+
// Delete removes the resource version in this endpointSliceTracker for the
81+
// provided EndpointSlice.
82+
func (est *endpointSliceTracker) Delete(endpointSlice *discovery.EndpointSlice) {
83+
est.lock.Lock()
84+
defer est.lock.Unlock()
85+
86+
rrv := est.relatedResourceVersions(endpointSlice)
87+
delete(rrv, endpointSlice.Name)
88+
}
89+
90+
// relatedResourceVersions returns the set of resource versions tracked for the
91+
// Service corresponding to the provided EndpointSlice. If no resource versions
92+
// are currently tracked for this service, an empty set is initialized.
93+
func (est *endpointSliceTracker) relatedResourceVersions(endpointSlice *discovery.EndpointSlice) endpointSliceResourceVersions {
94+
serviceNN := getServiceNN(endpointSlice)
95+
vers, ok := est.resourceVersionsByService[serviceNN]
96+
97+
if !ok {
98+
vers = endpointSliceResourceVersions{}
99+
est.resourceVersionsByService[serviceNN] = vers
100+
}
101+
102+
return vers
103+
}
104+
105+
// getServiceNN returns a namespaced name for the Service corresponding to the
106+
// provided EndpointSlice.
107+
func getServiceNN(endpointSlice *discovery.EndpointSlice) types.NamespacedName {
108+
serviceName, _ := endpointSlice.Labels[discovery.LabelServiceName]
109+
return types.NamespacedName{Name: serviceName, Namespace: endpointSlice.Namespace}
110+
}
111+
112+
// managedByChanged returns true if one of the provided EndpointSlices is
113+
// managed by the EndpointSlice controller while the other is not.
114+
func managedByChanged(endpointSlice1, endpointSlice2 *discovery.EndpointSlice) bool {
115+
return managedByController(endpointSlice1) != managedByController(endpointSlice2)
116+
}
117+
118+
// managedByController returns true if the controller of the provided
119+
// EndpointSlices is the EndpointSlice controller.
120+
func managedByController(endpointSlice *discovery.EndpointSlice) bool {
121+
managedBy, _ := endpointSlice.Labels[discovery.LabelManagedBy]
122+
return managedBy == controllerName
123+
}

0 commit comments

Comments
 (0)