Skip to content

Commit b7d8045

Browse files
author
Antonio Ojea
committed
endpoinslices must mirror services labels
Implement, in the endpoint slice controller, the same logic used for labels in the legacy endpoints controller. The labels in the endpoint and in the parent must be equivalent. Headless services add the well-known IsHeadlessService label. Slices must have two well known labels: LabelServiceName and LabelManagedBy.
1 parent 5f79e91 commit b7d8045

File tree

5 files changed

+684
-24
lines changed

5 files changed

+684
-24
lines changed

pkg/controller/endpointslice/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ go_library(
1414
deps = [
1515
"//pkg/api/v1/pod:go_default_library",
1616
"//pkg/apis/core:go_default_library",
17+
"//pkg/apis/core/v1/helper:go_default_library",
1718
"//pkg/apis/discovery/validation:go_default_library",
1819
"//pkg/controller:go_default_library",
1920
"//pkg/controller/endpointslice/metrics:go_default_library",

pkg/controller/endpointslice/reconciler.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -246,9 +246,11 @@ func (r *reconciler) finalize(
246246

247247
// reconcileByPortMapping compares the endpoints found in existing slices with
248248
// the list of desired endpoints and returns lists of slices to create, update,
249-
// and delete. The logic is split up into several main steps:
249+
// and delete. It also checks that the slices mirror the parent services labels.
250+
// The logic is split up into several main steps:
250251
// 1. Iterate through existing slices, delete endpoints that are no longer
251-
// desired and update matching endpoints that have changed.
252+
// desired and update matching endpoints that have changed. It also checks
253+
// if the slices have the labels of the parent services, and updates them if not.
252254
// 2. Iterate through slices that have been modified in 1 and fill them up with
253255
// any remaining desired endpoints.
254256
// 3. If there still desired endpoints left, try to fit them into a previously
@@ -287,6 +289,9 @@ func (r *reconciler) reconcileByPortMapping(
287289
}
288290
}
289291

292+
// generate the slice labels and check if parent labels have changed
293+
labels, labelsChanged := setEndpointSliceLabels(existingSlice, service)
294+
290295
// If an endpoint was updated or removed, mark for update or delete
291296
if endpointUpdated || len(existingSlice.Endpoints) != len(newEndpoints) {
292297
if len(existingSlice.Endpoints) > len(newEndpoints) {
@@ -299,9 +304,16 @@ func (r *reconciler) reconcileByPortMapping(
299304
// otherwise, copy and mark for update
300305
epSlice := existingSlice.DeepCopy()
301306
epSlice.Endpoints = newEndpoints
307+
epSlice.Labels = labels
302308
slicesByName[existingSlice.Name] = epSlice
303309
sliceNamesToUpdate.Insert(epSlice.Name)
304310
}
311+
} else if labelsChanged {
312+
// if labels have changed, copy and mark for update
313+
epSlice := existingSlice.DeepCopy()
314+
epSlice.Labels = labels
315+
slicesByName[existingSlice.Name] = epSlice
316+
sliceNamesToUpdate.Insert(epSlice.Name)
305317
} else {
306318
// slices with no changes will be useful if there are leftover endpoints
307319
sliceNamesUnchanged.Insert(existingSlice.Name)

pkg/controller/endpointslice/reconciler_test.go

Lines changed: 168 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,13 @@ func TestReconcile1Pod(t *testing.T) {
7171
namespace := "test"
7272
ipv6Family := corev1.IPv6Protocol
7373
svcv4, _ := newServiceAndEndpointMeta("foo", namespace)
74+
svcv4ClusterIP, _ := newServiceAndEndpointMeta("foo", namespace)
75+
svcv4ClusterIP.Spec.ClusterIP = "1.1.1.1"
76+
svcv4Labels, _ := newServiceAndEndpointMeta("foo", namespace)
77+
svcv4Labels.Labels = map[string]string{"foo": "bar"}
78+
svcv4BadLabels, _ := newServiceAndEndpointMeta("foo", namespace)
79+
svcv4BadLabels.Labels = map[string]string{discovery.LabelServiceName: "bad",
80+
discovery.LabelManagedBy: "actor", corev1.IsHeadlessService: "invalid"}
7481
svcv6, _ := newServiceAndEndpointMeta("foo", namespace)
7582
svcv6.Spec.IPFamily = &ipv6Family
7683
svcv6ClusterIP, _ := newServiceAndEndpointMeta("foo", namespace)
@@ -93,6 +100,7 @@ func TestReconcile1Pod(t *testing.T) {
93100
service corev1.Service
94101
expectedAddressType discovery.AddressType
95102
expectedEndpoint discovery.Endpoint
103+
expectedLabels map[string]string
96104
}{
97105
"ipv4": {
98106
service: svcv4,
@@ -111,6 +119,80 @@ func TestReconcile1Pod(t *testing.T) {
111119
Name: "pod1",
112120
},
113121
},
122+
expectedLabels: map[string]string{
123+
discovery.LabelManagedBy: controllerName,
124+
discovery.LabelServiceName: "foo",
125+
corev1.IsHeadlessService: "",
126+
},
127+
},
128+
"ipv4-clusterip": {
129+
service: svcv4ClusterIP,
130+
expectedAddressType: discovery.AddressTypeIPv4,
131+
expectedEndpoint: discovery.Endpoint{
132+
Addresses: []string{"1.2.3.4"},
133+
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
134+
Topology: map[string]string{
135+
"kubernetes.io/hostname": "node-1",
136+
"topology.kubernetes.io/zone": "us-central1-a",
137+
"topology.kubernetes.io/region": "us-central1",
138+
},
139+
TargetRef: &corev1.ObjectReference{
140+
Kind: "Pod",
141+
Namespace: namespace,
142+
Name: "pod1",
143+
},
144+
},
145+
expectedLabels: map[string]string{
146+
discovery.LabelManagedBy: controllerName,
147+
discovery.LabelServiceName: "foo",
148+
},
149+
},
150+
"ipv4-labels": {
151+
service: svcv4Labels,
152+
expectedAddressType: discovery.AddressTypeIPv4,
153+
expectedEndpoint: discovery.Endpoint{
154+
Addresses: []string{"1.2.3.4"},
155+
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
156+
Topology: map[string]string{
157+
"kubernetes.io/hostname": "node-1",
158+
"topology.kubernetes.io/zone": "us-central1-a",
159+
"topology.kubernetes.io/region": "us-central1",
160+
},
161+
TargetRef: &corev1.ObjectReference{
162+
Kind: "Pod",
163+
Namespace: namespace,
164+
Name: "pod1",
165+
},
166+
},
167+
expectedLabels: map[string]string{
168+
discovery.LabelManagedBy: controllerName,
169+
discovery.LabelServiceName: "foo",
170+
"foo": "bar",
171+
corev1.IsHeadlessService: "",
172+
},
173+
},
174+
"ipv4-bad-labels": {
175+
service: svcv4BadLabels,
176+
expectedAddressType: discovery.AddressTypeIPv4,
177+
expectedEndpoint: discovery.Endpoint{
178+
Addresses: []string{"1.2.3.4"},
179+
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
180+
Topology: map[string]string{
181+
"kubernetes.io/hostname": "node-1",
182+
"topology.kubernetes.io/zone": "us-central1-a",
183+
"topology.kubernetes.io/region": "us-central1",
184+
},
185+
TargetRef: &corev1.ObjectReference{
186+
Kind: "Pod",
187+
Namespace: namespace,
188+
Name: "pod1",
189+
},
190+
},
191+
expectedLabels: map[string]string{
192+
discovery.LabelManagedBy: controllerName,
193+
discovery.LabelServiceName: "foo",
194+
corev1.IsHeadlessService: "",
195+
},
114196
},
115197
"ipv6": {
116198
service: svcv6,
@@ -129,6 +211,11 @@ func TestReconcile1Pod(t *testing.T) {
129211
Name: "pod1",
130212
},
131213
},
214+
expectedLabels: map[string]string{
215+
discovery.LabelManagedBy: controllerName,
216+
discovery.LabelServiceName: "foo",
217+
corev1.IsHeadlessService: "",
218+
},
132219
},
133220
"ipv6-clusterip": {
134221
service: svcv6ClusterIP,
@@ -147,6 +234,10 @@ func TestReconcile1Pod(t *testing.T) {
147234
Name: "pod1",
148235
},
149236
},
237+
expectedLabels: map[string]string{
238+
discovery.LabelManagedBy: controllerName,
239+
discovery.LabelServiceName: "foo",
240+
},
150241
},
151242
}
152243

@@ -173,8 +264,8 @@ func TestReconcile1Pod(t *testing.T) {
173264
t.Errorf("Expected EndpointSlice name to start with %s, got %s", testCase.service.Name, slice.Name)
174265
}
175266

176-
if slice.Labels[discovery.LabelServiceName] != testCase.service.Name {
177-
t.Errorf("Expected EndpointSlice to have label set with %s value, got %s", testCase.service.Name, slice.Labels[discovery.LabelServiceName])
267+
if !reflect.DeepEqual(testCase.expectedLabels, slice.Labels) {
268+
t.Errorf("Expected EndpointSlice to have labels: %v , got %v", testCase.expectedLabels, slice.Labels)
178269
}
179270

180271
if slice.Annotations[corev1.EndpointsLastChangeTriggerTime] != triggerTime.Format(time.RFC3339Nano) {
@@ -430,6 +521,81 @@ func TestReconcileEndpointSlicesUpdating(t *testing.T) {
430521
expectUnorderedSlicesWithLengths(t, fetchEndpointSlices(t, client, namespace), []int{100, 100, 50})
431522
}
432523

524+
// In some cases, such as service labels updates, all slices for that service will require a change
525+
// This test ensures that we are updating those slices and not calling create + delete for each
526+
func TestReconcileEndpointSlicesServicesLabelsUpdating(t *testing.T) {
527+
client := newClientset()
528+
namespace := "test"
529+
svc, _ := newServiceAndEndpointMeta("foo", namespace)
530+
531+
// start with 250 pods
532+
pods := []*corev1.Pod{}
533+
for i := 0; i < 250; i++ {
534+
ready := !(i%3 == 0)
535+
pods = append(pods, newPod(i, namespace, ready, 1))
536+
}
537+
538+
r := newReconciler(client, []*corev1.Node{{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}}}, defaultMaxEndpointsPerSlice)
539+
reconcileHelper(t, r, &svc, pods, []*discovery.EndpointSlice{}, time.Now())
540+
numActionsExpected := 3
541+
assert.Len(t, client.Actions(), numActionsExpected, "Expected 3 additional clientset actions")
542+
543+
slices := fetchEndpointSlices(t, client, namespace)
544+
numActionsExpected++
545+
expectUnorderedSlicesWithLengths(t, slices, []int{100, 100, 50})
546+
547+
// update service with new labels
548+
svc.Labels = map[string]string{"foo": "bar"}
549+
reconcileHelper(t, r, &svc, pods, []*discovery.EndpointSlice{&slices[0], &slices[1], &slices[2]}, time.Now())
550+
551+
numActionsExpected += 3
552+
assert.Len(t, client.Actions(), numActionsExpected, "Expected 3 additional clientset actions")
553+
expectActions(t, client.Actions(), 3, "update", "endpointslices")
554+
555+
newSlices := fetchEndpointSlices(t, client, namespace)
556+
expectUnorderedSlicesWithLengths(t, newSlices, []int{100, 100, 50})
557+
// check that the labels were updated
558+
for _, slice := range newSlices {
559+
w, ok := slice.Labels["foo"]
560+
if !ok {
561+
t.Errorf("Expected label \"foo\" from parent service not found")
562+
} else if "bar" != w {
563+
t.Errorf("Expected EndpointSlice to have parent service labels: have %s value, expected bar", w)
564+
}
565+
}
566+
}
567+
568+
// In some cases, such as service labels updates, all slices for that service will require a change
569+
// However, this should not happen for reserved labels
570+
func TestReconcileEndpointSlicesServicesReservedLabels(t *testing.T) {
571+
client := newClientset()
572+
namespace := "test"
573+
svc, _ := newServiceAndEndpointMeta("foo", namespace)
574+
575+
// start with 250 pods
576+
pods := []*corev1.Pod{}
577+
for i := 0; i < 250; i++ {
578+
ready := !(i%3 == 0)
579+
pods = append(pods, newPod(i, namespace, ready, 1))
580+
}
581+
582+
r := newReconciler(client, []*corev1.Node{{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}}}, defaultMaxEndpointsPerSlice)
583+
reconcileHelper(t, r, &svc, pods, []*discovery.EndpointSlice{}, time.Now())
584+
numActionsExpected := 3
585+
assert.Len(t, client.Actions(), numActionsExpected, "Expected 3 additional clientset actions")
586+
slices := fetchEndpointSlices(t, client, namespace)
587+
numActionsExpected++
588+
expectUnorderedSlicesWithLengths(t, slices, []int{100, 100, 50})
589+
590+
// update service with new labels
591+
svc.Labels = map[string]string{discovery.LabelServiceName: "bad", discovery.LabelManagedBy: "actor", corev1.IsHeadlessService: "invalid"}
592+
reconcileHelper(t, r, &svc, pods, []*discovery.EndpointSlice{&slices[0], &slices[1], &slices[2]}, time.Now())
593+
assert.Len(t, client.Actions(), numActionsExpected, "Expected no additional clientset actions")
594+
595+
newSlices := fetchEndpointSlices(t, client, namespace)
596+
expectUnorderedSlicesWithLengths(t, newSlices, []int{100, 100, 50})
597+
}
598+
433599
// In this test, we start with 10 slices that only have 30 endpoints each
434600
// An initial reconcile makes no changes (as desired to limit writes)
435601
// When we change a service port, all slices will need to be updated in some way

pkg/controller/endpointslice/utils.go

Lines changed: 64 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"time"
2222

2323
corev1 "k8s.io/api/core/v1"
24+
v1 "k8s.io/api/core/v1"
2425
discovery "k8s.io/api/discovery/v1beta1"
2526
apiequality "k8s.io/apimachinery/pkg/api/equality"
2627
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -30,6 +31,7 @@ import (
3031
"k8s.io/klog/v2"
3132
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
3233
api "k8s.io/kubernetes/pkg/apis/core"
34+
helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
3335
"k8s.io/kubernetes/pkg/apis/discovery/validation"
3436
endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint"
3537
utilnet "k8s.io/utils/net"
@@ -152,12 +154,9 @@ func endpointsEqualBeyondHash(ep1, ep2 *discovery.Endpoint) bool {
152154
func newEndpointSlice(service *corev1.Service, endpointMeta *endpointMeta) *discovery.EndpointSlice {
153155
gvk := schema.GroupVersionKind{Version: "v1", Kind: "Service"}
154156
ownerRef := metav1.NewControllerRef(service, gvk)
155-
return &discovery.EndpointSlice{
157+
epSlice := &discovery.EndpointSlice{
156158
ObjectMeta: metav1.ObjectMeta{
157-
Labels: map[string]string{
158-
discovery.LabelServiceName: service.Name,
159-
discovery.LabelManagedBy: controllerName,
160-
},
159+
Labels: map[string]string{},
161160
GenerateName: getEndpointSlicePrefix(service.Name),
162161
OwnerReferences: []metav1.OwnerReference{*ownerRef},
163162
Namespace: service.Namespace,
@@ -166,6 +165,10 @@ func newEndpointSlice(service *corev1.Service, endpointMeta *endpointMeta) *disc
166165
AddressType: endpointMeta.AddressType,
167166
Endpoints: []discovery.Endpoint{},
168167
}
168+
// add parent service labels
169+
epSlice.Labels, _ = setEndpointSliceLabels(epSlice, service)
170+
171+
return epSlice
169172
}
170173

171174
// getEndpointSlicePrefix returns a suitable prefix for an EndpointSlice name.
@@ -267,6 +270,62 @@ func serviceControllerKey(endpointSlice *discovery.EndpointSlice) (string, error
267270
return fmt.Sprintf("%s/%s", endpointSlice.Namespace, serviceName), nil
268271
}
269272

273+
// setEndpointSliceLabels returns a map with the new endpoint slices labels and true if there was an update.
274+
// Slices labels must be equivalent to the Service labels except for the reserved IsHeadlessService, LabelServiceName and LabelManagedBy labels
275+
// Changes to IsHeadlessService, LabelServiceName and LabelManagedBy labels on the Service do not result in updates to EndpointSlice labels.
276+
func setEndpointSliceLabels(epSlice *discovery.EndpointSlice, service *corev1.Service) (map[string]string, bool) {
277+
updated := false
278+
epLabels := make(map[string]string)
279+
svcLabels := make(map[string]string)
280+
281+
// check if the endpoint slice and the service have the same labels
282+
// clone current slice labels except the reserved labels
283+
for key, value := range epSlice.Labels {
284+
if IsReservedLabelKey(key) {
285+
continue
286+
}
287+
// copy endpoint slice labels
288+
epLabels[key] = value
289+
}
290+
291+
for key, value := range service.Labels {
292+
if IsReservedLabelKey(key) {
293+
klog.Warningf("Service %s/%s using reserved endpoint slices label, skipping label %s: %s", service.Namespace, service.Name, key, value)
294+
continue
295+
}
296+
// copy service labels
297+
svcLabels[key] = value
298+
}
299+
300+
// if the labels are not identical update the slice with the corresponding service labels
301+
if !apiequality.Semantic.DeepEqual(epLabels, svcLabels) {
302+
updated = true
303+
}
304+
305+
// add or remove headless label depending on the service Type
306+
if !helper.IsServiceIPSet(service) {
307+
svcLabels[v1.IsHeadlessService] = ""
308+
} else {
309+
delete(svcLabels, v1.IsHeadlessService)
310+
}
311+
312+
// override endpoint slices reserved labels
313+
svcLabels[discovery.LabelServiceName] = service.Name
314+
svcLabels[discovery.LabelManagedBy] = controllerName
315+
316+
return svcLabels, updated
317+
}
318+
319+
// IsReservedLabelKey return true if the label is one of the reserved label for slices
320+
func IsReservedLabelKey(label string) bool {
321+
if label == discovery.LabelServiceName ||
322+
label == discovery.LabelManagedBy ||
323+
label == v1.IsHeadlessService {
324+
return true
325+
}
326+
return false
327+
}
328+
270329
// endpointSliceEndpointLen helps sort endpoint slices by the number of
271330
// endpoints they contain.
272331
type endpointSliceEndpointLen []*discovery.EndpointSlice

0 commit comments

Comments
 (0)