Skip to content

Commit 442107b

Browse files
authored
Merge pull request kubernetes#86092 from robscott/endpointslice-proxy-cache-fix
Ensuring kube-proxy does not mutate shared EndpointSlices
2 parents ea7327a + 49e4bd1 commit 442107b

File tree

3 files changed

+53
-1
lines changed

3 files changed

+53
-1
lines changed

pkg/proxy/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ go_test(
7070
"//staging/src/k8s.io/api/core/v1:go_default_library",
7171
"//staging/src/k8s.io/api/discovery/v1beta1:go_default_library",
7272
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
73+
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
7374
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
7475
"//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
7576
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",

pkg/proxy/endpointslicecache.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,11 +108,13 @@ func newEndpointSliceTracker() *endpointSliceTracker {
108108
// newEndpointSliceInfo generates endpointSliceInfo from an EndpointSlice.
109109
func newEndpointSliceInfo(endpointSlice *discovery.EndpointSlice, remove bool) *endpointSliceInfo {
110110
esInfo := &endpointSliceInfo{
111-
Ports: endpointSlice.Ports,
111+
Ports: make([]discovery.EndpointPort, len(endpointSlice.Ports)),
112112
Endpoints: []*endpointInfo{},
113113
Remove: remove,
114114
}
115115

116+
// copy here to avoid mutating shared EndpointSlice object.
117+
copy(esInfo.Ports, endpointSlice.Ports)
116118
sort.Sort(byPort(esInfo.Ports))
117119

118120
if !remove {

pkg/proxy/endpointslicecache_test.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"k8s.io/api/core/v1"
2525
discovery "k8s.io/api/discovery/v1beta1"
2626
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27+
"k8s.io/apimachinery/pkg/runtime"
2728
"k8s.io/apimachinery/pkg/types"
2829
utilpointer "k8s.io/utils/pointer"
2930
)
@@ -152,11 +153,13 @@ func TestEndpointsMapFromESC(t *testing.T) {
152153
t.Run(name, func(t *testing.T) {
153154
esCache := NewEndpointSliceCache(tc.hostname, nil, nil, nil)
154155

156+
cmc := newCacheMutationCheck(tc.endpointSlices)
155157
for _, endpointSlice := range tc.endpointSlices {
156158
esCache.updatePending(endpointSlice, false)
157159
}
158160

159161
compareEndpointsMapsStr(t, esCache.getEndpointsMap(tc.namespacedName, esCache.trackerByServiceMap[tc.namespacedName].pending), tc.expectedMap)
162+
cmc.Check(t)
160163
})
161164
}
162165
}
@@ -315,6 +318,8 @@ func TestEsInfoChanged(t *testing.T) {
315318

316319
for name, tc := range testCases {
317320
t.Run(name, func(t *testing.T) {
321+
cmc := newCacheMutationCheck([]*discovery.EndpointSlice{tc.initialSlice})
322+
318323
if tc.initialSlice != nil {
319324
tc.cache.updatePending(tc.initialSlice, false)
320325
tc.cache.checkoutChanges()
@@ -331,6 +336,8 @@ func TestEsInfoChanged(t *testing.T) {
331336
if tc.expectChanged != changed {
332337
t.Errorf("Expected esInfoChanged() to return %t, got %t", tc.expectChanged, changed)
333338
}
339+
340+
cmc.Check(t)
334341
})
335342
}
336343
}
@@ -378,3 +385,45 @@ func generateEndpointSliceWithOffset(serviceName, namespace string, sliceNum, of
378385
func generateEndpointSlice(serviceName, namespace string, sliceNum, numEndpoints, unreadyMod int, hosts []string, portNums []*int32) *discovery.EndpointSlice {
379386
return generateEndpointSliceWithOffset(serviceName, namespace, sliceNum, sliceNum, numEndpoints, unreadyMod, hosts, portNums)
380387
}
388+
389+
// cacheMutationCheck helps ensure that cached objects have not been changed
390+
// in any way throughout a test run.
391+
type cacheMutationCheck struct {
392+
objects []cacheObject
393+
}
394+
395+
// cacheObject stores a reference to an original object as well as a deep copy
396+
// of that object to track any mutations in the original object.
397+
type cacheObject struct {
398+
original runtime.Object
399+
deepCopy runtime.Object
400+
}
401+
402+
// newCacheMutationCheck initializes a cacheMutationCheck with EndpointSlices.
403+
func newCacheMutationCheck(endpointSlices []*discovery.EndpointSlice) cacheMutationCheck {
404+
cmc := cacheMutationCheck{}
405+
for _, endpointSlice := range endpointSlices {
406+
cmc.Add(endpointSlice)
407+
}
408+
return cmc
409+
}
410+
411+
// Add appends a runtime.Object and a deep copy of that object into the
412+
// cacheMutationCheck.
413+
func (cmc *cacheMutationCheck) Add(o runtime.Object) {
414+
cmc.objects = append(cmc.objects, cacheObject{
415+
original: o,
416+
deepCopy: o.DeepCopyObject(),
417+
})
418+
}
419+
420+
// Check verifies that no objects in the cacheMutationCheck have been mutated.
421+
func (cmc *cacheMutationCheck) Check(t *testing.T) {
422+
for _, o := range cmc.objects {
423+
if !reflect.DeepEqual(o.original, o.deepCopy) {
424+
// Cached objects can't be safely mutated and instead should be deep
425+
// copied before changed in any way.
426+
t.Errorf("Cached object was unexpectedly mutated. Original: %+v, Mutated: %+v", o.deepCopy, o.original)
427+
}
428+
}
429+
}

0 commit comments

Comments
 (0)