Skip to content

Commit 9905a33

Browse files
authored
Merge pull request kubernetes#85368 from robscott/endpointslice-deepcopy
Deep copying EndpointSlices in reconciler before modifying them
2 parents 2343a67 + 4229b99 commit 9905a33

File tree

3 files changed

+83
-15
lines changed

3 files changed

+83
-15
lines changed

pkg/controller/endpointslice/endpointslice_controller_test.go

Lines changed: 54 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,15 @@ package endpointslice
1818

1919
import (
2020
"fmt"
21+
"reflect"
2122
"testing"
2223
"time"
2324

2425
"github.com/stretchr/testify/assert"
2526
v1 "k8s.io/api/core/v1"
2627
discovery "k8s.io/api/discovery/v1beta1"
2728
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29+
"k8s.io/apimachinery/pkg/runtime"
2830
"k8s.io/apimachinery/pkg/util/intstr"
2931
"k8s.io/client-go/informers"
3032
"k8s.io/client-go/kubernetes/fake"
@@ -239,6 +241,8 @@ func TestSyncServiceEndpointSliceLabelSelection(t *testing.T) {
239241
AddressType: discovery.AddressTypeIPv4,
240242
}}
241243

244+
cmc := newCacheMutationCheck(endpointSlices)
245+
242246
// need to add them to both store and fake clientset
243247
for _, endpointSlice := range endpointSlices {
244248
err := esController.endpointSliceStore.Add(endpointSlice)
@@ -262,6 +266,9 @@ func TestSyncServiceEndpointSliceLabelSelection(t *testing.T) {
262266
// only 2 slices should match, 2 should be deleted, 1 should be updated as a placeholder
263267
expectAction(t, client.Actions(), numActionsBefore, "update", "endpointslices")
264268
expectAction(t, client.Actions(), numActionsBefore+1, "delete", "endpointslices")
269+
270+
// ensure cache mutation has not occurred
271+
cmc.Check(t)
265272
}
266273

267274
// Ensure SyncService handles a variety of protocols and IPs appropriately.
@@ -316,17 +323,17 @@ func TestSyncServiceFull(t *testing.T) {
316323
assert.Len(t, slice.Endpoints, 1, "Expected 1 endpoints in first slice")
317324
assert.Equal(t, slice.Annotations["endpoints.kubernetes.io/last-change-trigger-time"], serviceCreateTime.Format(time.RFC3339Nano))
318325
assert.EqualValues(t, []discovery.EndpointPort{{
319-
Name: strPtr("sctp-example"),
326+
Name: utilpointer.StringPtr("sctp-example"),
320327
Protocol: protoPtr(v1.ProtocolSCTP),
321-
Port: int32Ptr(int32(3456)),
328+
Port: utilpointer.Int32Ptr(int32(3456)),
322329
}, {
323-
Name: strPtr("udp-example"),
330+
Name: utilpointer.StringPtr("udp-example"),
324331
Protocol: protoPtr(v1.ProtocolUDP),
325-
Port: int32Ptr(int32(161)),
332+
Port: utilpointer.Int32Ptr(int32(161)),
326333
}, {
327-
Name: strPtr("tcp-example"),
334+
Name: utilpointer.StringPtr("tcp-example"),
328335
Protocol: protoPtr(v1.ProtocolTCP),
329-
Port: int32Ptr(int32(80)),
336+
Port: utilpointer.Int32Ptr(int32(80)),
330337
}}, slice.Ports)
331338

332339
assert.ElementsMatch(t, []discovery.Endpoint{{
@@ -382,14 +389,49 @@ func expectAction(t *testing.T, actions []k8stesting.Action, index int, verb, re
382389
}
383390
}
384391

385-
func strPtr(str string) *string {
386-
return &str
387-
}
388-
392+
// protoPtr takes a Protocol and returns a pointer to it.
389393
func protoPtr(proto v1.Protocol) *v1.Protocol {
390394
return &proto
391395
}
392396

393-
func int32Ptr(num int32) *int32 {
394-
return &num
397+
// cacheMutationCheck helps ensure that cached objects have not been changed
398+
// in any way throughout a test run.
399+
type cacheMutationCheck struct {
400+
objects []cacheObject
401+
}
402+
403+
// cacheObject stores a reference to an original object as well as a deep copy
404+
// of that object to track any mutations in the original object.
405+
type cacheObject struct {
406+
original runtime.Object
407+
deepCopy runtime.Object
408+
}
409+
410+
// newCacheMutationCheck initializes a cacheMutationCheck with EndpointSlices.
411+
func newCacheMutationCheck(endpointSlices []*discovery.EndpointSlice) cacheMutationCheck {
412+
cmc := cacheMutationCheck{}
413+
for _, endpointSlice := range endpointSlices {
414+
cmc.Add(endpointSlice)
415+
}
416+
return cmc
417+
}
418+
419+
// Add appends a runtime.Object and a deep copy of that object into the
420+
// cacheMutationCheck.
421+
func (cmc *cacheMutationCheck) Add(o runtime.Object) {
422+
cmc.objects = append(cmc.objects, cacheObject{
423+
original: o,
424+
deepCopy: o.DeepCopyObject(),
425+
})
426+
}
427+
428+
// Check verifies that no objects in the cacheMutationCheck have been mutated.
429+
func (cmc *cacheMutationCheck) Check(t *testing.T) {
430+
for _, o := range cmc.objects {
431+
if !reflect.DeepEqual(o.original, o.deepCopy) {
432+
// Cached objects can't be safely mutated and instead should be deep
433+
// copied before changed in any way.
434+
t.Errorf("Cached object was unexpectedly mutated. Original: %+v, Mutated: %+v", o.deepCopy, o.original)
435+
}
436+
}
395437
}

pkg/controller/endpointslice/reconciler.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -290,9 +290,11 @@ func (r *reconciler) reconcileByPortMapping(
290290
// if no endpoints desired in this slice, mark for deletion
291291
sliceNamesToDelete.Insert(existingSlice.Name)
292292
} else {
293-
// otherwise, mark for update
294-
existingSlice.Endpoints = newEndpoints
295-
sliceNamesToUpdate.Insert(existingSlice.Name)
293+
// otherwise, copy and mark for update
294+
epSlice := existingSlice.DeepCopy()
295+
epSlice.Endpoints = newEndpoints
296+
slicesByName[existingSlice.Name] = epSlice
297+
sliceNamesToUpdate.Insert(epSlice.Name)
296298
}
297299
} else {
298300
// slices with no changes will be useful if there are leftover endpoints
@@ -344,6 +346,10 @@ func (r *reconciler) reconcileByPortMapping(
344346
// If we didn't find a sliceToFill, generate a new empty one.
345347
if sliceToFill == nil {
346348
sliceToFill = newEndpointSlice(service, endpointMeta)
349+
} else {
350+
// deep copy required to modify this slice.
351+
sliceToFill = sliceToFill.DeepCopy()
352+
slicesByName[sliceToFill.Name] = sliceToFill
347353
}
348354

349355
// Fill the slice up with remaining endpoints.

pkg/controller/endpointslice/reconciler_test.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,7 @@ func TestReconcileEndpointSlicesSomePreexisting(t *testing.T) {
318318
}
319319

320320
existingSlices := []*discovery.EndpointSlice{endpointSlice1, endpointSlice2}
321+
cmc := newCacheMutationCheck(existingSlices)
321322
createEndpointSlices(t, client, namespace, existingSlices)
322323

323324
numActionsBefore := len(client.Actions())
@@ -332,6 +333,9 @@ func TestReconcileEndpointSlicesSomePreexisting(t *testing.T) {
332333
// 1 new slice (0->100) + 1 updated slice (62->89)
333334
expectUnorderedSlicesWithLengths(t, fetchEndpointSlices(t, client, namespace), []int{89, 61, 100})
334335
expectMetrics(t, expectedMetrics{desiredSlices: 3, actualSlices: 3, desiredEndpoints: 250, addedPerSync: 127, removedPerSync: 0, numCreated: 1, numUpdated: 1, numDeleted: 0})
336+
337+
// ensure cache mutation has not occurred
338+
cmc.Check(t)
335339
}
336340

337341
// now with preexisting slices, we have 300 pods matching a service
@@ -370,6 +374,7 @@ func TestReconcileEndpointSlicesSomePreexistingWorseAllocation(t *testing.T) {
370374
}
371375

372376
existingSlices := []*discovery.EndpointSlice{endpointSlice1, endpointSlice2}
377+
cmc := newCacheMutationCheck(existingSlices)
373378
createEndpointSlices(t, client, namespace, existingSlices)
374379

375380
numActionsBefore := len(client.Actions())
@@ -383,6 +388,9 @@ func TestReconcileEndpointSlicesSomePreexistingWorseAllocation(t *testing.T) {
383388
// 2 new slices (100, 52) in addition to existing slices (74, 74)
384389
expectUnorderedSlicesWithLengths(t, fetchEndpointSlices(t, client, namespace), []int{74, 74, 100, 52})
385390
expectMetrics(t, expectedMetrics{desiredSlices: 3, actualSlices: 4, desiredEndpoints: 300, addedPerSync: 152, removedPerSync: 0, numCreated: 2, numUpdated: 0, numDeleted: 0})
391+
392+
// ensure cache mutation has not occurred
393+
cmc.Check(t)
386394
}
387395

388396
// In some cases, such as a service port change, all slices for that service will require a change
@@ -445,6 +453,7 @@ func TestReconcileEndpointSlicesRecycling(t *testing.T) {
445453
existingSlices[sliceNum].Endpoints = append(existingSlices[sliceNum].Endpoints, podToEndpoint(pod, &corev1.Node{}, &svc))
446454
}
447455

456+
cmc := newCacheMutationCheck(existingSlices)
448457
createEndpointSlices(t, client, namespace, existingSlices)
449458

450459
numActionsBefore := len(client.Actions())
@@ -463,6 +472,9 @@ func TestReconcileEndpointSlicesRecycling(t *testing.T) {
463472
// thanks to recycling, we get a free repack of endpoints, resulting in 3 full slices instead of 10 mostly empty slices
464473
expectUnorderedSlicesWithLengths(t, fetchEndpointSlices(t, client, namespace), []int{100, 100, 100})
465474
expectMetrics(t, expectedMetrics{desiredSlices: 3, actualSlices: 3, desiredEndpoints: 300, addedPerSync: 300, removedPerSync: 0, numCreated: 0, numUpdated: 3, numDeleted: 7})
475+
476+
// ensure cache mutation has not occurred
477+
cmc.Check(t)
466478
}
467479

468480
// In this test, we want to verify that endpoints are added to a slice that will
@@ -493,6 +505,7 @@ func TestReconcileEndpointSlicesUpdatePacking(t *testing.T) {
493505
}
494506
existingSlices = append(existingSlices, slice2)
495507

508+
cmc := newCacheMutationCheck(existingSlices)
496509
createEndpointSlices(t, client, namespace, existingSlices)
497510

498511
// ensure that endpoints in each slice will be marked for update.
@@ -519,6 +532,9 @@ func TestReconcileEndpointSlicesUpdatePacking(t *testing.T) {
519532

520533
// additional pods should get added to fuller slice
521534
expectUnorderedSlicesWithLengths(t, fetchEndpointSlices(t, client, namespace), []int{95, 20})
535+
536+
// ensure cache mutation has not occurred
537+
cmc.Check(t)
522538
}
523539

524540
// In this test, we want to verify that old EndpointSlices with a deprecated IP
@@ -552,6 +568,7 @@ func TestReconcileEndpointSlicesReplaceDeprecated(t *testing.T) {
552568

553569
createEndpointSlices(t, client, namespace, existingSlices)
554570

571+
cmc := newCacheMutationCheck(existingSlices)
555572
r := newReconciler(client, []*corev1.Node{{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}}}, defaultMaxEndpointsPerSlice)
556573
reconcileHelper(t, r, &svc, pods, existingSlices, time.Now())
557574

@@ -569,6 +586,9 @@ func TestReconcileEndpointSlicesReplaceDeprecated(t *testing.T) {
569586
t.Errorf("Expected address type to be IPv4, got %s", endpointSlice.AddressType)
570587
}
571588
}
589+
590+
// ensure cache mutation has not occurred
591+
cmc.Check(t)
572592
}
573593

574594
// Named ports can map to different port numbers on different pods.

0 commit comments

Comments
 (0)