Skip to content

Commit c75787b

Browse files
committed
Fixing Potential Race Condition in EndpointSlice Controller.
This adds a new EndpointSlice tracker to keep track of the expected resource versions of EndpointSlices associated with each Service managed by the EndpointSlice controller. This should prevent a potential race where a syncService call could happen with an incomplete view of EndpointSlices if additions or deletions hadn't fully propagated to the cache yet. Additionally, this ensures that external changes to EndpointSlices will be handled by the EndpointSlice controller.
1 parent e4ad76e commit c75787b

File tree

8 files changed

+449
-3
lines changed

8 files changed

+449
-3
lines changed

pkg/controller/endpointslice/BUILD

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ go_library(
55
srcs = [
66
"endpointset.go",
77
"endpointslice_controller.go",
8+
"endpointslice_tracker.go",
89
"reconciler.go",
910
"utils.go",
1011
],
@@ -49,6 +50,7 @@ go_test(
4950
name = "go_default_test",
5051
srcs = [
5152
"endpointslice_controller_test.go",
53+
"endpointslice_tracker_test.go",
5254
"reconciler_test.go",
5355
"utils_test.go",
5456
],

pkg/controller/endpointslice/endpointslice_controller.go

Lines changed: 66 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ const (
6363
func NewController(podInformer coreinformers.PodInformer,
6464
serviceInformer coreinformers.ServiceInformer,
6565
nodeInformer coreinformers.NodeInformer,
66-
esInformer discoveryinformers.EndpointSliceInformer,
66+
endpointSliceInformer discoveryinformers.EndpointSliceInformer,
6767
maxEndpointsPerSlice int32,
6868
client clientset.Interface,
6969
) *Controller {
@@ -105,15 +105,23 @@ func NewController(podInformer coreinformers.PodInformer,
105105
c.nodeLister = nodeInformer.Lister()
106106
c.nodesSynced = nodeInformer.Informer().HasSynced
107107

108-
c.endpointSliceLister = esInformer.Lister()
109-
c.endpointSlicesSynced = esInformer.Informer().HasSynced
108+
endpointSliceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
109+
AddFunc: c.onEndpointSliceAdd,
110+
UpdateFunc: c.onEndpointSliceUpdate,
111+
DeleteFunc: c.onEndpointSliceDelete,
112+
})
113+
114+
c.endpointSliceLister = endpointSliceInformer.Lister()
115+
c.endpointSlicesSynced = endpointSliceInformer.Informer().HasSynced
116+
c.endpointSliceTracker = newEndpointSliceTracker()
110117

111118
c.maxEndpointsPerSlice = maxEndpointsPerSlice
112119

113120
c.reconciler = &reconciler{
114121
client: c.client,
115122
nodeLister: c.nodeLister,
116123
maxEndpointsPerSlice: c.maxEndpointsPerSlice,
124+
endpointSliceTracker: c.endpointSliceTracker,
117125
metricsCache: endpointslicemetrics.NewCache(maxEndpointsPerSlice),
118126
}
119127
c.triggerTimeTracker = endpointutil.NewTriggerTimeTracker()
@@ -152,6 +160,10 @@ type Controller struct {
152160
// endpointSlicesSynced returns true if the endpoint slice shared informer has been synced at least once.
153161
// Added as a member to the struct to allow injection for testing.
154162
endpointSlicesSynced cache.InformerSynced
163+
// endpointSliceTracker tracks the list of EndpointSlices and associated
164+
// resource versions expected for each Service. It can help determine if a
165+
// cached EndpointSlice is out of date.
166+
endpointSliceTracker *endpointSliceTracker
155167

156168
// nodeLister is able to list/get nodes and is populated by the
157169
// shared informer passed to NewController
@@ -343,6 +355,57 @@ func (c *Controller) onServiceDelete(obj interface{}) {
343355
c.queue.Add(key)
344356
}
345357

358+
// onEndpointSliceAdd queues a sync for the relevant Service for a sync if the
359+
// EndpointSlice resource version does not match the expected version in the
360+
// endpointSliceTracker.
361+
func (c *Controller) onEndpointSliceAdd(obj interface{}) {
362+
endpointSlice := obj.(*discovery.EndpointSlice)
363+
if endpointSlice == nil {
364+
utilruntime.HandleError(fmt.Errorf("Invalid EndpointSlice provided to onEndpointSliceAdd()"))
365+
return
366+
}
367+
if managedByController(endpointSlice) && c.endpointSliceTracker.Stale(endpointSlice) {
368+
c.queueServiceForEndpointSlice(endpointSlice)
369+
}
370+
}
371+
372+
// onEndpointSliceUpdate queues a sync for the relevant Service for a sync if
373+
// the EndpointSlice resource version does not match the expected version in the
374+
// endpointSliceTracker or the managed-by value of the EndpointSlice has changed
375+
// from or to this controller.
376+
func (c *Controller) onEndpointSliceUpdate(prevObj, obj interface{}) {
377+
prevEndpointSlice := obj.(*discovery.EndpointSlice)
378+
endpointSlice := obj.(*discovery.EndpointSlice)
379+
if endpointSlice == nil || prevEndpointSlice == nil {
380+
utilruntime.HandleError(fmt.Errorf("Invalid EndpointSlice provided to onEndpointSliceUpdate()"))
381+
return
382+
}
383+
if managedByChanged(prevEndpointSlice, endpointSlice) || (managedByController(endpointSlice) && c.endpointSliceTracker.Stale(endpointSlice)) {
384+
c.queueServiceForEndpointSlice(endpointSlice)
385+
}
386+
}
387+
388+
// onEndpointSliceDelete queues a sync for the relevant Service for a sync if the
389+
// EndpointSlice resource version does not match the expected version in the
390+
// endpointSliceTracker.
391+
func (c *Controller) onEndpointSliceDelete(obj interface{}) {
392+
endpointSlice := getEndpointSliceFromDeleteAction(obj)
393+
if endpointSlice != nil && managedByController(endpointSlice) && c.endpointSliceTracker.Has(endpointSlice) {
394+
c.queueServiceForEndpointSlice(endpointSlice)
395+
}
396+
}
397+
398+
// queueServiceForEndpointSlice attempts to queue the corresponding Service for
399+
// the provided EndpointSlice.
400+
func (c *Controller) queueServiceForEndpointSlice(endpointSlice *discovery.EndpointSlice) {
401+
key, err := serviceControllerKey(endpointSlice)
402+
if err != nil {
403+
utilruntime.HandleError(fmt.Errorf("Couldn't get key for EndpointSlice %+v: %v", endpointSlice, err))
404+
return
405+
}
406+
c.queue.Add(key)
407+
}
408+
346409
func (c *Controller) addPod(obj interface{}) {
347410
pod := obj.(*v1.Pod)
348411
services, err := c.serviceSelectorCache.GetPodServiceMemberships(c.serviceLister, pod)
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
/*
2+
Copyright 2019 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package endpointslice
18+
19+
import (
20+
"sync"
21+
22+
discovery "k8s.io/api/discovery/v1beta1"
23+
"k8s.io/apimachinery/pkg/types"
24+
)
25+
26+
// endpointSliceResourceVersions tracks expected EndpointSlice resource versions
27+
// by EndpointSlice name.
28+
type endpointSliceResourceVersions map[string]string
29+
30+
// endpointSliceTracker tracks EndpointSlices and their associated resource
31+
// versions to help determine if a change to an EndpointSlice has been processed
32+
// by the EndpointSlice controller.
33+
type endpointSliceTracker struct {
34+
// lock protects resourceVersionsByService.
35+
lock sync.Mutex
36+
// resourceVersionsByService tracks the list of EndpointSlices and
37+
// associated resource versions expected for a given Service.
38+
resourceVersionsByService map[types.NamespacedName]endpointSliceResourceVersions
39+
}
40+
41+
// newEndpointSliceTracker creates and initializes a new endpointSliceTracker.
42+
func newEndpointSliceTracker() *endpointSliceTracker {
43+
return &endpointSliceTracker{
44+
resourceVersionsByService: map[types.NamespacedName]endpointSliceResourceVersions{},
45+
}
46+
}
47+
48+
// Has returns true if the endpointSliceTracker has a resource version for the
49+
// provided EndpointSlice.
50+
func (est *endpointSliceTracker) Has(endpointSlice *discovery.EndpointSlice) bool {
51+
est.lock.Lock()
52+
defer est.lock.Unlock()
53+
54+
rrv := est.relatedResourceVersions(endpointSlice)
55+
_, ok := rrv[endpointSlice.Name]
56+
return ok
57+
}
58+
59+
// Stale returns true if this endpointSliceTracker does not have a resource
60+
// version for the provided EndpointSlice or it does not match the resource
61+
// version of the provided EndpointSlice.
62+
func (est *endpointSliceTracker) Stale(endpointSlice *discovery.EndpointSlice) bool {
63+
est.lock.Lock()
64+
defer est.lock.Unlock()
65+
66+
rrv := est.relatedResourceVersions(endpointSlice)
67+
return rrv[endpointSlice.Name] != endpointSlice.ResourceVersion
68+
}
69+
70+
// Update adds or updates the resource version in this endpointSliceTracker for
71+
// the provided EndpointSlice.
72+
func (est *endpointSliceTracker) Update(endpointSlice *discovery.EndpointSlice) {
73+
est.lock.Lock()
74+
defer est.lock.Unlock()
75+
76+
rrv := est.relatedResourceVersions(endpointSlice)
77+
rrv[endpointSlice.Name] = endpointSlice.ResourceVersion
78+
}
79+
80+
// Delete removes the resource version in this endpointSliceTracker for the
81+
// provided EndpointSlice.
82+
func (est *endpointSliceTracker) Delete(endpointSlice *discovery.EndpointSlice) {
83+
est.lock.Lock()
84+
defer est.lock.Unlock()
85+
86+
rrv := est.relatedResourceVersions(endpointSlice)
87+
delete(rrv, endpointSlice.Name)
88+
}
89+
90+
// relatedResourceVersions returns the set of resource versions tracked for the
91+
// Service corresponding to the provided EndpointSlice. If no resource versions
92+
// are currently tracked for this service, an empty set is initialized.
93+
func (est *endpointSliceTracker) relatedResourceVersions(endpointSlice *discovery.EndpointSlice) endpointSliceResourceVersions {
94+
serviceNN := getServiceNN(endpointSlice)
95+
vers, ok := est.resourceVersionsByService[serviceNN]
96+
97+
if !ok {
98+
vers = endpointSliceResourceVersions{}
99+
est.resourceVersionsByService[serviceNN] = vers
100+
}
101+
102+
return vers
103+
}
104+
105+
// getServiceNN returns a namespaced name for the Service corresponding to the
106+
// provided EndpointSlice.
107+
func getServiceNN(endpointSlice *discovery.EndpointSlice) types.NamespacedName {
108+
serviceName, _ := endpointSlice.Labels[discovery.LabelServiceName]
109+
return types.NamespacedName{Name: serviceName, Namespace: endpointSlice.Namespace}
110+
}
111+
112+
// managedByChanged returns true if one of the provided EndpointSlices is
113+
// managed by the EndpointSlice controller while the other is not.
114+
func managedByChanged(endpointSlice1, endpointSlice2 *discovery.EndpointSlice) bool {
115+
return managedByController(endpointSlice1) != managedByController(endpointSlice2)
116+
}
117+
118+
// managedByController returns true if the controller of the provided
119+
// EndpointSlices is the EndpointSlice controller.
120+
func managedByController(endpointSlice *discovery.EndpointSlice) bool {
121+
managedBy, _ := endpointSlice.Labels[discovery.LabelManagedBy]
122+
return managedBy == controllerName
123+
}

0 commit comments

Comments
 (0)