Skip to content

Commit 382107e

Browse files
authored
Merge pull request kubernetes#93441 from robscott/endpointslicemirroring-tracker-fix
Fixing memory leak in EndpointSliceMirroring EndpointSlice tracker
2 parents c94242a + 98b63ad commit 382107e

File tree

5 files changed

+153
-45
lines changed

5 files changed

+153
-45
lines changed

pkg/controller/endpointslicemirroring/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ go_test(
6464
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
6565
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
6666
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
67+
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
6768
"//staging/src/k8s.io/apimachinery/pkg/util/rand:go_default_library",
6869
"//staging/src/k8s.io/client-go/informers:go_default_library",
6970
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",

pkg/controller/endpointslicemirroring/endpointslice_tracker.go

Lines changed: 39 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -45,61 +45,76 @@ func newEndpointSliceTracker() *endpointSliceTracker {
4545
}
4646
}
4747

48-
// has returns true if the endpointSliceTracker has a resource version for the
48+
// Has returns true if the endpointSliceTracker has a resource version for the
4949
// provided EndpointSlice.
50-
func (est *endpointSliceTracker) has(endpointSlice *discovery.EndpointSlice) bool {
50+
func (est *endpointSliceTracker) Has(endpointSlice *discovery.EndpointSlice) bool {
5151
est.lock.Lock()
5252
defer est.lock.Unlock()
5353

54-
rrv := est.relatedResourceVersions(endpointSlice)
55-
_, ok := rrv[endpointSlice.Name]
54+
rrv, ok := est.relatedResourceVersions(endpointSlice)
55+
if !ok {
56+
return false
57+
}
58+
_, ok = rrv[endpointSlice.Name]
5659
return ok
5760
}
5861

59-
// stale returns true if this endpointSliceTracker does not have a resource
62+
// Stale returns true if this endpointSliceTracker does not have a resource
6063
// version for the provided EndpointSlice or it does not match the resource
6164
// version of the provided EndpointSlice.
62-
func (est *endpointSliceTracker) stale(endpointSlice *discovery.EndpointSlice) bool {
65+
func (est *endpointSliceTracker) Stale(endpointSlice *discovery.EndpointSlice) bool {
6366
est.lock.Lock()
6467
defer est.lock.Unlock()
6568

66-
rrv := est.relatedResourceVersions(endpointSlice)
69+
rrv, ok := est.relatedResourceVersions(endpointSlice)
70+
if !ok {
71+
return true
72+
}
6773
return rrv[endpointSlice.Name] != endpointSlice.ResourceVersion
6874
}
6975

70-
// update adds or updates the resource version in this endpointSliceTracker for
76+
// Update adds or updates the resource version in this endpointSliceTracker for
7177
// the provided EndpointSlice.
72-
func (est *endpointSliceTracker) update(endpointSlice *discovery.EndpointSlice) {
78+
func (est *endpointSliceTracker) Update(endpointSlice *discovery.EndpointSlice) {
7379
est.lock.Lock()
7480
defer est.lock.Unlock()
7581

76-
rrv := est.relatedResourceVersions(endpointSlice)
82+
rrv, ok := est.relatedResourceVersions(endpointSlice)
83+
if !ok {
84+
rrv = endpointSliceResourceVersions{}
85+
est.resourceVersionsByService[getServiceNN(endpointSlice)] = rrv
86+
}
7787
rrv[endpointSlice.Name] = endpointSlice.ResourceVersion
7888
}
7989

80-
// delete removes the resource version in this endpointSliceTracker for the
90+
// DeleteService removes the set of resource versions tracked for the Service.
91+
func (est *endpointSliceTracker) DeleteService(namespace, name string) {
92+
est.lock.Lock()
93+
defer est.lock.Unlock()
94+
95+
serviceNN := types.NamespacedName{Name: name, Namespace: namespace}
96+
delete(est.resourceVersionsByService, serviceNN)
97+
}
98+
99+
// Delete removes the resource version in this endpointSliceTracker for the
81100
// provided EndpointSlice.
82-
func (est *endpointSliceTracker) delete(endpointSlice *discovery.EndpointSlice) {
101+
func (est *endpointSliceTracker) Delete(endpointSlice *discovery.EndpointSlice) {
83102
est.lock.Lock()
84103
defer est.lock.Unlock()
85104

86-
rrv := est.relatedResourceVersions(endpointSlice)
87-
delete(rrv, endpointSlice.Name)
105+
rrv, ok := est.relatedResourceVersions(endpointSlice)
106+
if ok {
107+
delete(rrv, endpointSlice.Name)
108+
}
88109
}
89110

90111
// relatedResourceVersions returns the set of resource versions tracked for the
91-
// Service corresponding to the provided EndpointSlice. If no resource versions
92-
// are currently tracked for this service, an empty set is initialized.
93-
func (est *endpointSliceTracker) relatedResourceVersions(endpointSlice *discovery.EndpointSlice) endpointSliceResourceVersions {
112+
// Service corresponding to the provided EndpointSlice, and a bool to indicate
113+
// if it exists.
114+
func (est *endpointSliceTracker) relatedResourceVersions(endpointSlice *discovery.EndpointSlice) (endpointSliceResourceVersions, bool) {
94115
serviceNN := getServiceNN(endpointSlice)
95116
vers, ok := est.resourceVersionsByService[serviceNN]
96-
97-
if !ok {
98-
vers = endpointSliceResourceVersions{}
99-
est.resourceVersionsByService[serviceNN] = vers
100-
}
101-
102-
return vers
117+
return vers, ok
103118
}
104119

105120
// getServiceNN returns a namespaced name for the Service corresponding to the

pkg/controller/endpointslicemirroring/endpointslice_tracker_test.go

Lines changed: 106 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,11 @@ package endpointslicemirroring
1919
import (
2020
"testing"
2121

22+
"github.com/stretchr/testify/assert"
23+
2224
discovery "k8s.io/api/discovery/v1beta1"
2325
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
26+
"k8s.io/apimachinery/pkg/types"
2427
)
2528

2629
func TestEndpointSliceTrackerUpdate(t *testing.T) {
@@ -43,47 +46,69 @@ func TestEndpointSliceTrackerUpdate(t *testing.T) {
4346
epSlice1DifferentRV.ResourceVersion = "rv2"
4447

4548
testCases := map[string]struct {
46-
updateParam *discovery.EndpointSlice
47-
checksParam *discovery.EndpointSlice
48-
expectHas bool
49-
expectStale bool
49+
updateParam *discovery.EndpointSlice
50+
checksParam *discovery.EndpointSlice
51+
expectHas bool
52+
expectStale bool
53+
expectResourceVersionsByService map[types.NamespacedName]endpointSliceResourceVersions
5054
}{
5155
"same slice": {
5256
updateParam: epSlice1,
5357
checksParam: epSlice1,
5458
expectHas: true,
5559
expectStale: false,
60+
expectResourceVersionsByService: map[types.NamespacedName]endpointSliceResourceVersions{
61+
{Namespace: epSlice1.Namespace, Name: "svc1"}: {
62+
epSlice1.Name: epSlice1.ResourceVersion,
63+
},
64+
},
5665
},
5766
"different namespace": {
5867
updateParam: epSlice1,
5968
checksParam: epSlice1DifferentNS,
6069
expectHas: false,
6170
expectStale: true,
71+
expectResourceVersionsByService: map[types.NamespacedName]endpointSliceResourceVersions{
72+
{Namespace: epSlice1.Namespace, Name: "svc1"}: {
73+
epSlice1.Name: epSlice1.ResourceVersion,
74+
},
75+
},
6276
},
6377
"different service": {
6478
updateParam: epSlice1,
6579
checksParam: epSlice1DifferentService,
6680
expectHas: false,
6781
expectStale: true,
82+
expectResourceVersionsByService: map[types.NamespacedName]endpointSliceResourceVersions{
83+
{Namespace: epSlice1.Namespace, Name: "svc1"}: {
84+
epSlice1.Name: epSlice1.ResourceVersion,
85+
},
86+
},
6887
},
6988
"different resource version": {
7089
updateParam: epSlice1,
7190
checksParam: epSlice1DifferentRV,
7291
expectHas: true,
7392
expectStale: true,
93+
expectResourceVersionsByService: map[types.NamespacedName]endpointSliceResourceVersions{
94+
{Namespace: epSlice1.Namespace, Name: "svc1"}: {
95+
epSlice1.Name: epSlice1.ResourceVersion,
96+
},
97+
},
7498
},
7599
}
76100

77101
for name, tc := range testCases {
78102
t.Run(name, func(t *testing.T) {
79103
esTracker := newEndpointSliceTracker()
80-
esTracker.update(tc.updateParam)
81-
if esTracker.has(tc.checksParam) != tc.expectHas {
82-
t.Errorf("tc.tracker.has(%+v) == %t, expected %t", tc.checksParam, esTracker.has(tc.checksParam), tc.expectHas)
104+
esTracker.Update(tc.updateParam)
105+
if esTracker.Has(tc.checksParam) != tc.expectHas {
106+
t.Errorf("tc.tracker.Has(%+v) == %t, expected %t", tc.checksParam, esTracker.Has(tc.checksParam), tc.expectHas)
83107
}
84-
if esTracker.stale(tc.checksParam) != tc.expectStale {
85-
t.Errorf("tc.tracker.stale(%+v) == %t, expected %t", tc.checksParam, esTracker.stale(tc.checksParam), tc.expectStale)
108+
if esTracker.Stale(tc.checksParam) != tc.expectStale {
109+
t.Errorf("tc.tracker.Stale(%+v) == %t, expected %t", tc.checksParam, esTracker.Stale(tc.checksParam), tc.expectStale)
86110
}
111+
assert.Equal(t, tc.expectResourceVersionsByService, esTracker.resourceVersionsByService)
87112
})
88113
}
89114
}
@@ -160,15 +185,81 @@ func TestEndpointSliceTrackerDelete(t *testing.T) {
160185
for name, tc := range testCases {
161186
t.Run(name, func(t *testing.T) {
162187
esTracker := newEndpointSliceTracker()
163-
esTracker.update(epSlice1)
188+
esTracker.Update(epSlice1)
164189

165-
esTracker.delete(tc.deleteParam)
166-
if esTracker.has(tc.checksParam) != tc.expectHas {
167-
t.Errorf("esTracker.has(%+v) == %t, expected %t", tc.checksParam, esTracker.has(tc.checksParam), tc.expectHas)
190+
esTracker.Delete(tc.deleteParam)
191+
if esTracker.Has(tc.checksParam) != tc.expectHas {
192+
t.Errorf("esTracker.Has(%+v) == %t, expected %t", tc.checksParam, esTracker.Has(tc.checksParam), tc.expectHas)
193+
}
194+
if esTracker.Stale(tc.checksParam) != tc.expectStale {
195+
t.Errorf("esTracker.Stale(%+v) == %t, expected %t", tc.checksParam, esTracker.Stale(tc.checksParam), tc.expectStale)
196+
}
197+
})
198+
}
199+
}
200+
201+
func TestEndpointSliceTrackerDeleteService(t *testing.T) {
202+
svcName1, svcNS1 := "svc1", "ns1"
203+
svcName2, svcNS2 := "svc2", "ns2"
204+
epSlice1 := &discovery.EndpointSlice{
205+
ObjectMeta: metav1.ObjectMeta{
206+
Name: "example-1",
207+
Namespace: svcNS1,
208+
ResourceVersion: "rv1",
209+
Labels: map[string]string{discovery.LabelServiceName: svcName1},
210+
},
211+
}
212+
213+
testCases := map[string]struct {
214+
updateParam *discovery.EndpointSlice
215+
deleteServiceParam *types.NamespacedName
216+
expectHas bool
217+
expectStale bool
218+
expectResourceVersionsByService map[types.NamespacedName]endpointSliceResourceVersions
219+
}{
220+
"same service": {
221+
updateParam: epSlice1,
222+
deleteServiceParam: &types.NamespacedName{Namespace: svcNS1, Name: svcName1},
223+
expectHas: false,
224+
expectStale: true,
225+
expectResourceVersionsByService: map[types.NamespacedName]endpointSliceResourceVersions{},
226+
},
227+
"different namespace": {
228+
updateParam: epSlice1,
229+
deleteServiceParam: &types.NamespacedName{Namespace: svcNS2, Name: svcName1},
230+
expectHas: true,
231+
expectStale: false,
232+
expectResourceVersionsByService: map[types.NamespacedName]endpointSliceResourceVersions{
233+
{Namespace: epSlice1.Namespace, Name: "svc1"}: {
234+
epSlice1.Name: epSlice1.ResourceVersion,
235+
},
236+
},
237+
},
238+
"different service": {
239+
updateParam: epSlice1,
240+
deleteServiceParam: &types.NamespacedName{Namespace: svcNS1, Name: svcName2},
241+
expectHas: true,
242+
expectStale: false,
243+
expectResourceVersionsByService: map[types.NamespacedName]endpointSliceResourceVersions{
244+
{Namespace: epSlice1.Namespace, Name: "svc1"}: {
245+
epSlice1.Name: epSlice1.ResourceVersion,
246+
},
247+
},
248+
},
249+
}
250+
251+
for name, tc := range testCases {
252+
t.Run(name, func(t *testing.T) {
253+
esTracker := newEndpointSliceTracker()
254+
esTracker.Update(tc.updateParam)
255+
esTracker.DeleteService(tc.deleteServiceParam.Namespace, tc.deleteServiceParam.Name)
256+
if esTracker.Has(tc.updateParam) != tc.expectHas {
257+
t.Errorf("tc.tracker.Has(%+v) == %t, expected %t", tc.updateParam, esTracker.Has(tc.updateParam), tc.expectHas)
168258
}
169-
if esTracker.stale(tc.checksParam) != tc.expectStale {
170-
t.Errorf("esTracker.stale(%+v) == %t, expected %t", tc.checksParam, esTracker.stale(tc.checksParam), tc.expectStale)
259+
if esTracker.Stale(tc.updateParam) != tc.expectStale {
260+
t.Errorf("tc.tracker.Stale(%+v) == %t, expected %t", tc.updateParam, esTracker.Stale(tc.updateParam), tc.expectStale)
171261
}
262+
assert.Equal(t, tc.expectResourceVersionsByService, esTracker.resourceVersionsByService)
172263
})
173264
}
174265
}

pkg/controller/endpointslicemirroring/endpointslicemirroring_controller.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,7 @@ func (c *Controller) syncEndpoints(key string) error {
285285
endpoints, err := c.endpointsLister.Endpoints(namespace).Get(name)
286286
if err != nil || !c.shouldMirror(endpoints) {
287287
if apierrors.IsNotFound(err) || !c.shouldMirror(endpoints) {
288+
c.endpointSliceTracker.DeleteService(namespace, name)
288289
return c.reconciler.deleteEndpoints(namespace, name, endpointSlices)
289290
}
290291
return err
@@ -389,7 +390,7 @@ func (c *Controller) onEndpointSliceAdd(obj interface{}) {
389390
utilruntime.HandleError(fmt.Errorf("onEndpointSliceAdd() expected type discovery.EndpointSlice, got %T", obj))
390391
return
391392
}
392-
if managedByController(endpointSlice) && c.endpointSliceTracker.stale(endpointSlice) {
393+
if managedByController(endpointSlice) && c.endpointSliceTracker.Stale(endpointSlice) {
393394
c.queueEndpointsForEndpointSlice(endpointSlice)
394395
}
395396
}
@@ -405,7 +406,7 @@ func (c *Controller) onEndpointSliceUpdate(prevObj, obj interface{}) {
405406
utilruntime.HandleError(fmt.Errorf("onEndpointSliceUpdated() expected type discovery.EndpointSlice, got %T, %T", prevObj, obj))
406407
return
407408
}
408-
if managedByChanged(prevEndpointSlice, endpointSlice) || (managedByController(endpointSlice) && c.endpointSliceTracker.stale(endpointSlice)) {
409+
if managedByChanged(prevEndpointSlice, endpointSlice) || (managedByController(endpointSlice) && c.endpointSliceTracker.Stale(endpointSlice)) {
409410
c.queueEndpointsForEndpointSlice(endpointSlice)
410411
}
411412
}
@@ -419,7 +420,7 @@ func (c *Controller) onEndpointSliceDelete(obj interface{}) {
419420
utilruntime.HandleError(fmt.Errorf("onEndpointSliceDelete() expected type discovery.EndpointSlice, got %T", obj))
420421
return
421422
}
422-
if managedByController(endpointSlice) && c.endpointSliceTracker.has(endpointSlice) {
423+
if managedByController(endpointSlice) && c.endpointSliceTracker.Has(endpointSlice) {
423424
c.queueEndpointsForEndpointSlice(endpointSlice)
424425
}
425426
}

pkg/controller/endpointslicemirroring/reconciler.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ func (r *reconciler) finalize(endpoints *corev1.Endpoints, slices slicesByAction
246246
}
247247
errs = append(errs, fmt.Errorf("Error creating EndpointSlice for Endpoints %s/%s: %v", endpoints.Namespace, endpoints.Name, err))
248248
} else {
249-
r.endpointSliceTracker.update(createdSlice)
249+
r.endpointSliceTracker.Update(createdSlice)
250250
metrics.EndpointSliceChanges.WithLabelValues("create").Inc()
251251
}
252252
}
@@ -257,7 +257,7 @@ func (r *reconciler) finalize(endpoints *corev1.Endpoints, slices slicesByAction
257257
if err != nil {
258258
errs = append(errs, fmt.Errorf("Error updating %s EndpointSlice for Endpoints %s/%s: %v", endpointSlice.Name, endpoints.Namespace, endpoints.Name, err))
259259
} else {
260-
r.endpointSliceTracker.update(updatedSlice)
260+
r.endpointSliceTracker.Update(updatedSlice)
261261
metrics.EndpointSliceChanges.WithLabelValues("update").Inc()
262262
}
263263
}
@@ -267,7 +267,7 @@ func (r *reconciler) finalize(endpoints *corev1.Endpoints, slices slicesByAction
267267
if err != nil {
268268
errs = append(errs, fmt.Errorf("Error deleting %s EndpointSlice for Endpoints %s/%s: %v", endpointSlice.Name, endpoints.Namespace, endpoints.Name, err))
269269
} else {
270-
r.endpointSliceTracker.delete(endpointSlice)
270+
r.endpointSliceTracker.Delete(endpointSlice)
271271
metrics.EndpointSliceChanges.WithLabelValues("delete").Inc()
272272
}
273273
}

0 commit comments

Comments
 (0)