Skip to content

Commit 3c80450

Browse files
committed
Updating EndpointSliceMirroring controller to listen for Service changes
This fixes a bug that could occur if a custom Endpoints resource was created before a Service was created.
1 parent 1c548c3 commit 3c80450

File tree

4 files changed

+166
-97
lines changed

4 files changed

+166
-97
lines changed

pkg/controller/endpointslicemirroring/endpointslicemirroring_controller.go

Lines changed: 84 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ import (
2525
v1 "k8s.io/api/core/v1"
2626
discovery "k8s.io/api/discovery/v1beta1"
2727
apierrors "k8s.io/apimachinery/pkg/api/errors"
28-
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2928
"k8s.io/apimachinery/pkg/labels"
3029
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
3130
"k8s.io/apimachinery/pkg/util/wait"
@@ -121,6 +120,11 @@ func NewController(endpointsInformer coreinformers.EndpointsInformer,
121120

122121
c.serviceLister = serviceInformer.Lister()
123122
c.servicesSynced = serviceInformer.Informer().HasSynced
123+
serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
124+
AddFunc: c.onServiceAdd,
125+
UpdateFunc: c.onServiceUpdate,
126+
DeleteFunc: c.onServiceDelete,
127+
})
124128

125129
c.maxEndpointsPerSubset = maxEndpointsPerSubset
126130

@@ -273,28 +277,47 @@ func (c *Controller) syncEndpoints(key string) error {
273277
return err
274278
}
275279

276-
endpointSlices, err := endpointSlicesMirroredForService(c.endpointSliceLister, namespace, name)
277-
280+
endpoints, err := c.endpointsLister.Endpoints(namespace).Get(name)
278281
if err != nil {
279-
ep := &v1.Endpoints{ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: namespace}}
280-
c.eventRecorder.Eventf(ep, FailedToListEndpointSlices,
281-
"Error listing EndpointSlices for Endpoints %s/%s: %v", ep.Namespace, ep.Name, err)
282+
if apierrors.IsNotFound(err) {
283+
klog.V(4).Infof("%s/%s Endpoints not found, cleaning up any mirrored EndpointSlices", namespace, name)
284+
c.endpointSliceTracker.DeleteService(namespace, name)
285+
return c.deleteMirroredSlices(namespace, name)
286+
}
282287
return err
283288
}
284289

285-
endpoints, err := c.endpointsLister.Endpoints(namespace).Get(name)
286-
if err != nil || !c.shouldMirror(endpoints) {
287-
if apierrors.IsNotFound(err) || !c.shouldMirror(endpoints) {
290+
if !c.shouldMirror(endpoints) {
291+
klog.V(4).Infof("%s/%s Endpoints should not be mirrored, cleaning up any mirrored EndpointSlices", namespace, name)
292+
c.endpointSliceTracker.DeleteService(namespace, name)
293+
return c.deleteMirroredSlices(namespace, name)
294+
}
295+
296+
svc, err := c.serviceLister.Services(namespace).Get(name)
297+
if err != nil {
298+
if apierrors.IsNotFound(err) {
299+
klog.V(4).Infof("%s/%s Service not found, cleaning up any mirrored EndpointSlices", namespace, name)
288300
c.endpointSliceTracker.DeleteService(namespace, name)
289-
return c.reconciler.deleteEndpoints(namespace, name, endpointSlices)
301+
return c.deleteMirroredSlices(namespace, name)
290302
}
291303
return err
292304
}
293305

306+
// This means that if a Service transitions away from a nil selector, any
307+
// mirrored EndpointSlices will not be cleaned up. #91072 tracks this issue
308+
// for this controller along with the Endpoints and EndpointSlice
309+
// controllers.
310+
if svc.Spec.Selector != nil {
311+
return nil
312+
}
313+
314+
endpointSlices, err := endpointSlicesMirroredForService(c.endpointSliceLister, namespace, name)
315+
if err != nil {
316+
return err
317+
}
318+
294319
err = c.reconciler.reconcile(endpoints, endpointSlices)
295320
if err != nil {
296-
c.eventRecorder.Eventf(endpoints, v1.EventTypeWarning, FailedToUpdateEndpointSlices,
297-
"Error updating EndpointSlices for Endpoints %s/%s: %v", endpoints.Namespace, endpoints.Name, err)
298321
return err
299322
}
300323

@@ -314,28 +337,54 @@ func (c *Controller) queueEndpoints(obj interface{}) {
314337

315338
// shouldMirror returns true if an Endpoints resource should be mirrored by this
316339
// controller. This will be false if:
340+
// - the Endpoints resource is nil.
317341
// - the Endpoints resource has a skip-mirror label.
318342
// - the Endpoints resource has a leader election annotation.
319-
// - the corresponding Service resource does not exist.
320-
// - the corresponding Service resource has a non-nil selector.
343+
// This does not ensure that a corresponding Service exists with a nil selector.
344+
// That check should be performed separately.
321345
func (c *Controller) shouldMirror(endpoints *v1.Endpoints) bool {
322346
if endpoints == nil || skipMirror(endpoints.Labels) || hasLeaderElection(endpoints.Annotations) {
323347
return false
324348
}
325349

326-
svc, err := c.serviceLister.Services(endpoints.Namespace).Get(endpoints.Name)
327-
if err != nil {
328-
if !apierrors.IsNotFound(err) {
329-
klog.Errorf("Error fetching %s/%s Service: %v", endpoints.Namespace, endpoints.Name, err)
330-
}
331-
return false
350+
return true
351+
}
352+
353+
// onServiceAdd queues a sync for the relevant Endpoints resource.
354+
func (c *Controller) onServiceAdd(obj interface{}) {
355+
service := obj.(*v1.Service)
356+
if service == nil {
357+
utilruntime.HandleError(fmt.Errorf("onServiceAdd() expected type v1.Service, got %T", obj))
358+
return
359+
}
360+
if service.Spec.Selector == nil {
361+
c.queueEndpoints(obj)
332362
}
363+
}
333364

334-
if svc.Spec.Selector != nil {
335-
return false
365+
// onServiceUpdate queues a sync for the relevant Endpoints resource.
366+
func (c *Controller) onServiceUpdate(prevObj, obj interface{}) {
367+
service := obj.(*v1.Service)
368+
prevService := prevObj.(*v1.Service)
369+
if service == nil || prevService == nil {
370+
utilruntime.HandleError(fmt.Errorf("onServiceUpdate() expected type v1.Endpoints, got %T, %T", prevObj, obj))
371+
return
336372
}
373+
if (service.Spec.Selector == nil) != (prevService.Spec.Selector == nil) {
374+
c.queueEndpoints(obj)
375+
}
376+
}
337377

338-
return true
378+
// onServiceDelete queues a sync for the relevant Endpoints resource.
379+
func (c *Controller) onServiceDelete(obj interface{}) {
380+
service := getServiceFromDeleteAction(obj)
381+
if service == nil {
382+
utilruntime.HandleError(fmt.Errorf("onServiceDelete() expected type v1.Service, got %T", obj))
383+
return
384+
}
385+
if service.Spec.Selector == nil {
386+
c.queueEndpoints(obj)
387+
}
339388
}
340389

341390
// onEndpointsAdd queues a sync for the relevant Endpoints resource.
@@ -437,6 +486,18 @@ func (c *Controller) queueEndpointsForEndpointSlice(endpointSlice *discovery.End
437486
c.queue.AddAfter(key, c.endpointUpdatesBatchPeriod)
438487
}
439488

489+
// deleteMirroredSlices will delete and EndpointSlices that have been mirrored
490+
// for Endpoints with this namespace and name.
491+
func (c *Controller) deleteMirroredSlices(namespace, name string) error {
492+
endpointSlices, err := endpointSlicesMirroredForService(c.endpointSliceLister, namespace, name)
493+
if err != nil {
494+
return err
495+
}
496+
497+
c.endpointSliceTracker.DeleteService(namespace, name)
498+
return c.reconciler.deleteEndpoints(namespace, name, endpointSlices)
499+
}
500+
440501
// endpointSlicesMirroredForService returns the EndpointSlices that have been
441502
// mirrored for a Service by this controller.
442503
func endpointSlicesMirroredForService(endpointSliceLister discoverylisters.EndpointSliceLister, namespace, name string) ([]*discovery.EndpointSlice, error) {

pkg/controller/endpointslicemirroring/endpointslicemirroring_controller_test.go

Lines changed: 49 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -74,12 +74,14 @@ func TestSyncEndpoints(t *testing.T) {
7474

7575
testCases := []struct {
7676
testName string
77+
service *v1.Service
7778
endpoints *v1.Endpoints
7879
endpointSlices []*discovery.EndpointSlice
7980
expectedNumActions int
8081
expectedNumSlices int
8182
}{{
8283
testName: "Endpoints with no addresses",
84+
service: &v1.Service{},
8385
endpoints: &v1.Endpoints{
8486
Subsets: []v1.EndpointSubset{{
8587
Ports: []v1.EndpointPort{{Port: 80}},
@@ -90,6 +92,7 @@ func TestSyncEndpoints(t *testing.T) {
9092
expectedNumSlices: 0,
9193
}, {
9294
testName: "Endpoints with skip label true",
95+
service: &v1.Service{},
9396
endpoints: &v1.Endpoints{
9497
ObjectMeta: metav1.ObjectMeta{
9598
Labels: map[string]string{discovery.LabelSkipMirror: "true"},
@@ -104,6 +107,7 @@ func TestSyncEndpoints(t *testing.T) {
104107
expectedNumSlices: 0,
105108
}, {
106109
testName: "Endpoints with skip label false",
110+
service: &v1.Service{},
107111
endpoints: &v1.Endpoints{
108112
ObjectMeta: metav1.ObjectMeta{
109113
Labels: map[string]string{discovery.LabelSkipMirror: "false"},
@@ -116,8 +120,37 @@ func TestSyncEndpoints(t *testing.T) {
116120
endpointSlices: []*discovery.EndpointSlice{},
117121
expectedNumActions: 1,
118122
expectedNumSlices: 1,
123+
}, {
124+
testName: "Endpoints with missing Service",
125+
service: nil,
126+
endpoints: &v1.Endpoints{
127+
Subsets: []v1.EndpointSubset{{
128+
Ports: []v1.EndpointPort{{Port: 80}},
129+
Addresses: []v1.EndpointAddress{{IP: "10.0.0.1"}},
130+
}},
131+
},
132+
endpointSlices: []*discovery.EndpointSlice{},
133+
expectedNumActions: 0,
134+
expectedNumSlices: 0,
135+
}, {
136+
testName: "Endpoints with Service with selector specified",
137+
service: &v1.Service{
138+
Spec: v1.ServiceSpec{
139+
Selector: map[string]string{"foo": "bar"},
140+
},
141+
},
142+
endpoints: &v1.Endpoints{
143+
Subsets: []v1.EndpointSubset{{
144+
Ports: []v1.EndpointPort{{Port: 80}},
145+
Addresses: []v1.EndpointAddress{{IP: "10.0.0.1"}},
146+
}},
147+
},
148+
endpointSlices: []*discovery.EndpointSlice{},
149+
expectedNumActions: 0,
150+
expectedNumSlices: 0,
119151
}, {
120152
testName: "Existing EndpointSlices that need to be cleaned up",
153+
service: &v1.Service{},
121154
endpoints: &v1.Endpoints{
122155
Subsets: []v1.EndpointSubset{{
123156
Ports: []v1.EndpointPort{{Port: 80}},
@@ -136,6 +169,7 @@ func TestSyncEndpoints(t *testing.T) {
136169
expectedNumSlices: 0,
137170
}, {
138171
testName: "Existing EndpointSlices managed by a different controller, no addresses to sync",
172+
service: &v1.Service{},
139173
endpoints: &v1.Endpoints{
140174
Subsets: []v1.EndpointSubset{{
141175
Ports: []v1.EndpointPort{{Port: 80}},
@@ -154,6 +188,7 @@ func TestSyncEndpoints(t *testing.T) {
154188
expectedNumSlices: 0,
155189
}, {
156190
testName: "Endpoints with 1000 addresses",
191+
service: &v1.Service{},
157192
endpoints: &v1.Endpoints{
158193
Subsets: []v1.EndpointSubset{{
159194
Ports: []v1.EndpointPort{{Port: 80}},
@@ -165,6 +200,7 @@ func TestSyncEndpoints(t *testing.T) {
165200
expectedNumSlices: 1,
166201
}, {
167202
testName: "Endpoints with 1001 addresses - 1 should not be mirrored",
203+
service: &v1.Service{},
168204
endpoints: &v1.Endpoints{
169205
Subsets: []v1.EndpointSubset{{
170206
Ports: []v1.EndpointPort{{Port: 80}},
@@ -182,10 +218,11 @@ func TestSyncEndpoints(t *testing.T) {
182218
tc.endpoints.Name = endpointsName
183219
tc.endpoints.Namespace = namespace
184220
esController.endpointsStore.Add(tc.endpoints)
185-
esController.serviceStore.Add(&v1.Service{ObjectMeta: metav1.ObjectMeta{
186-
Name: endpointsName,
187-
Namespace: namespace,
188-
}})
221+
if tc.service != nil {
222+
tc.service.Name = endpointsName
223+
tc.service.Namespace = namespace
224+
esController.serviceStore.Add(tc.service)
225+
}
189226

190227
for _, epSlice := range tc.endpointSlices {
191228
epSlice.Namespace = namespace
@@ -214,102 +251,51 @@ func TestSyncEndpoints(t *testing.T) {
214251
}
215252

216253
func TestShouldMirror(t *testing.T) {
217-
svcWithSelector := &v1.Service{
218-
ObjectMeta: metav1.ObjectMeta{
219-
Name: "with-selector",
220-
Namespace: "example1",
221-
},
222-
Spec: v1.ServiceSpec{
223-
Selector: map[string]string{"with": "selector"},
224-
},
225-
}
226-
svcWithoutSelector := &v1.Service{
227-
ObjectMeta: metav1.ObjectMeta{
228-
Name: "without-selector",
229-
Namespace: "example1",
230-
},
231-
Spec: v1.ServiceSpec{},
232-
}
233-
234254
testCases := []struct {
235255
testName string
236256
endpoints *v1.Endpoints
237-
service *v1.Service
238257
shouldMirror bool
239258
}{{
240-
testName: "Service without selector with matching endpoints",
241-
service: svcWithoutSelector,
259+
testName: "Standard Endpoints",
242260
endpoints: &v1.Endpoints{
243261
ObjectMeta: metav1.ObjectMeta{
244-
Name: svcWithoutSelector.Name,
245-
Namespace: svcWithoutSelector.Namespace,
262+
Name: "test-endpoints",
246263
},
247264
},
248265
shouldMirror: true,
249266
}, {
250-
testName: "Service without selector, matching Endpoints with skip-mirror=true",
251-
service: svcWithoutSelector,
267+
testName: "Endpoints with skip-mirror=true",
252268
endpoints: &v1.Endpoints{
253269
ObjectMeta: metav1.ObjectMeta{
254-
Name: svcWithSelector.Name,
255-
Namespace: svcWithSelector.Namespace,
270+
Name: "test-endpoints",
256271
Labels: map[string]string{
257272
discovery.LabelSkipMirror: "true",
258273
},
259274
},
260275
},
261276
shouldMirror: false,
262277
}, {
263-
testName: "Service without selector, matching Endpoints with skip-mirror=invalid",
264-
service: svcWithoutSelector,
278+
testName: "Endpoints with skip-mirror=invalid",
265279
endpoints: &v1.Endpoints{
266280
ObjectMeta: metav1.ObjectMeta{
267-
Name: svcWithoutSelector.Name,
268-
Namespace: svcWithoutSelector.Namespace,
281+
Name: "test-endpoints",
269282
Labels: map[string]string{
270283
discovery.LabelSkipMirror: "invalid",
271284
},
272285
},
273286
},
274287
shouldMirror: true,
275288
}, {
276-
testName: "Service without selector, matching Endpoints with leader election annotation",
277-
service: svcWithoutSelector,
289+
testName: "Endpoints with leader election annotation",
278290
endpoints: &v1.Endpoints{
279291
ObjectMeta: metav1.ObjectMeta{
280-
Name: svcWithSelector.Name,
281-
Namespace: svcWithSelector.Namespace,
292+
Name: "test-endpoints",
282293
Annotations: map[string]string{
283294
resourcelock.LeaderElectionRecordAnnotationKey: "",
284295
},
285296
},
286297
},
287298
shouldMirror: false,
288-
}, {
289-
testName: "Service without selector, matching Endpoints without skip label in different namespace",
290-
service: svcWithSelector,
291-
endpoints: &v1.Endpoints{
292-
ObjectMeta: metav1.ObjectMeta{
293-
Name: svcWithSelector.Name,
294-
Namespace: svcWithSelector.Namespace + "different",
295-
},
296-
},
297-
shouldMirror: false,
298-
}, {
299-
testName: "Service without selector or matching endpoints",
300-
service: svcWithoutSelector,
301-
endpoints: nil,
302-
shouldMirror: false,
303-
}, {
304-
testName: "Endpoints without matching Service",
305-
service: nil,
306-
endpoints: &v1.Endpoints{
307-
ObjectMeta: metav1.ObjectMeta{
308-
Name: svcWithoutSelector.Name,
309-
Namespace: svcWithoutSelector.Namespace,
310-
},
311-
},
312-
shouldMirror: false,
313299
}}
314300

315301
for _, tc := range testCases {
@@ -323,13 +309,6 @@ func TestShouldMirror(t *testing.T) {
323309
}
324310
}
325311

326-
if tc.service != nil {
327-
err := c.serviceStore.Add(tc.service)
328-
if err != nil {
329-
t.Fatalf("Error adding Service to store: %v", err)
330-
}
331-
}
332-
333312
shouldMirror := c.shouldMirror(tc.endpoints)
334313

335314
if shouldMirror != tc.shouldMirror {

0 commit comments

Comments
 (0)