Skip to content

Commit 9fb6e2e

Browse files
committed
Fix Endpoint/EndpointSlice pod change detection
The endpoint controllers responded to Pod changes by trying to figure out if the generated endpoint resource would change, rather than just checking if the Pod had changed, but since the set of Pod fields that need to be checked depend on the Service and Node as well, the code ended up only checking for a subset of the changes it should have. In particular, EndpointSliceController ended up only looking at IPv4 Pod IPs when processing Pod update events, so when a Pod went from having no IP to having only an IPv6 IP, EndpointSliceController would think it hadn't changed.
1 parent f9ad7db commit 9fb6e2e

File tree

8 files changed

+218
-311
lines changed

8 files changed

+218
-311
lines changed

pkg/controller/endpoint/BUILD

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ go_test(
4848
"//pkg/api/v1/endpoints:go_default_library",
4949
"//pkg/apis/core:go_default_library",
5050
"//pkg/controller:go_default_library",
51-
"//pkg/controller/util/endpoint:go_default_library",
5251
"//pkg/features:go_default_library",
5352
"//staging/src/k8s.io/api/core/v1:go_default_library",
5453
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",

pkg/controller/endpoint/endpoints_controller.go

Lines changed: 25 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package endpoint
1919
import (
2020
"context"
2121
"fmt"
22-
"reflect"
2322
"strconv"
2423
"time"
2524

@@ -213,64 +212,49 @@ func (e *EndpointController) addPod(obj interface{}) {
213212
}
214213

215214
func podToEndpointAddressForService(svc *v1.Service, pod *v1.Pod) (*v1.EndpointAddress, error) {
215+
var endpointIP string
216+
216217
if !utilfeature.DefaultFeatureGate.Enabled(features.IPv6DualStack) {
217-
return podToEndpointAddress(pod), nil
218-
}
219-
220-
// api-server service controller ensured that the service got the correct IP Family
221-
// according to user setup, here we only need to match EndPoint IPs' family to service
222-
// actual IP family. as in, we don't need to check service.IPFamily
223-
224-
ipv6ClusterIP := utilnet.IsIPv6String(svc.Spec.ClusterIP)
225-
for _, podIP := range pod.Status.PodIPs {
226-
ipv6PodIP := utilnet.IsIPv6String(podIP.IP)
227-
// same family?
228-
// TODO (khenidak) when we remove the max of 2 PodIP limit from pods
229-
// we will have to return multiple endpoint addresses
230-
if ipv6ClusterIP == ipv6PodIP {
231-
return &v1.EndpointAddress{
232-
IP: podIP.IP,
233-
NodeName: &pod.Spec.NodeName,
234-
TargetRef: &v1.ObjectReference{
235-
Kind: "Pod",
236-
Namespace: pod.ObjectMeta.Namespace,
237-
Name: pod.ObjectMeta.Name,
238-
UID: pod.ObjectMeta.UID,
239-
ResourceVersion: pod.ObjectMeta.ResourceVersion,
240-
}}, nil
218+
endpointIP = pod.Status.PodIP
219+
} else {
220+
// api-server service controller ensured that the service got the correct IP Family
221+
// according to user setup, here we only need to match EndPoint IPs' family to service
222+
// actual IP family. as in, we don't need to check service.IPFamily
223+
224+
ipv6ClusterIP := utilnet.IsIPv6String(svc.Spec.ClusterIP)
225+
for _, podIP := range pod.Status.PodIPs {
226+
ipv6PodIP := utilnet.IsIPv6String(podIP.IP)
227+
// same family?
228+
// TODO (khenidak) when we remove the max of 2 PodIP limit from pods
229+
// we will have to return multiple endpoint addresses
230+
if ipv6ClusterIP == ipv6PodIP {
231+
endpointIP = podIP.IP
232+
break
233+
}
234+
}
235+
if endpointIP == "" {
236+
return nil, fmt.Errorf("failed to find a matching endpoint for service %v", svc.Name)
241237
}
242238
}
243-
return nil, fmt.Errorf("failed to find a matching endpoint for service %v", svc.Name)
244-
}
245239

246-
func podToEndpointAddress(pod *v1.Pod) *v1.EndpointAddress {
247240
return &v1.EndpointAddress{
248-
IP: pod.Status.PodIP,
241+
IP: endpointIP,
249242
NodeName: &pod.Spec.NodeName,
250243
TargetRef: &v1.ObjectReference{
251244
Kind: "Pod",
252245
Namespace: pod.ObjectMeta.Namespace,
253246
Name: pod.ObjectMeta.Name,
254247
UID: pod.ObjectMeta.UID,
255248
ResourceVersion: pod.ObjectMeta.ResourceVersion,
256-
}}
257-
}
258-
259-
func endpointChanged(pod1, pod2 *v1.Pod) bool {
260-
endpointAddress1 := podToEndpointAddress(pod1)
261-
endpointAddress2 := podToEndpointAddress(pod2)
262-
263-
endpointAddress1.TargetRef.ResourceVersion = ""
264-
endpointAddress2.TargetRef.ResourceVersion = ""
265-
266-
return !reflect.DeepEqual(endpointAddress1, endpointAddress2)
249+
},
250+
}, nil
267251
}
268252

269253
// When a pod is updated, figure out what services it used to be a member of
270254
// and what services it will be a member of, and enqueue the union of these.
271255
// old and cur must be *v1.Pod types.
272256
func (e *EndpointController) updatePod(old, cur interface{}) {
273-
services := endpointutil.GetServicesToUpdateOnPodChange(e.serviceLister, e.serviceSelectorCache, old, cur, endpointChanged)
257+
services := endpointutil.GetServicesToUpdateOnPodChange(e.serviceLister, e.serviceSelectorCache, old, cur)
274258
for key := range services {
275259
e.queue.AddAfter(key, e.endpointUpdatesBatchPeriod)
276260
}

pkg/controller/endpoint/endpoints_controller_test.go

Lines changed: 4 additions & 178 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ import (
4343
endptspkg "k8s.io/kubernetes/pkg/api/v1/endpoints"
4444
api "k8s.io/kubernetes/pkg/apis/core"
4545
"k8s.io/kubernetes/pkg/controller"
46-
endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint"
4746
"k8s.io/kubernetes/pkg/features"
4847
utilpointer "k8s.io/utils/pointer"
4948
)
@@ -67,7 +66,6 @@ func testPod(namespace string, id int, nPorts int, isReady bool, makeDualstack b
6766
Containers: []v1.Container{{Ports: []v1.ContainerPort{}}},
6867
},
6968
Status: v1.PodStatus{
70-
PodIP: fmt.Sprintf("1.2.3.%d", 4+id),
7169
Conditions: []v1.PodCondition{
7270
{
7371
Type: v1.PodReady,
@@ -83,16 +81,11 @@ func testPod(namespace string, id int, nPorts int, isReady bool, makeDualstack b
8381
p.Spec.Containers[0].Ports = append(p.Spec.Containers[0].Ports,
8482
v1.ContainerPort{Name: fmt.Sprintf("port%d", j), ContainerPort: int32(8080 + j)})
8583
}
84+
p.Status.PodIPs = append(p.Status.PodIPs, v1.PodIP{IP: fmt.Sprintf("1.2.3.%d", 4+id)})
8685
if makeDualstack {
87-
p.Status.PodIPs = []v1.PodIP{
88-
{
89-
IP: p.Status.PodIP,
90-
},
91-
{
92-
IP: fmt.Sprintf("2000::%d", id),
93-
},
94-
}
86+
p.Status.PodIPs = append(p.Status.PodIPs, v1.PodIP{IP: fmt.Sprintf("2000::%d", id)})
9587
}
88+
p.Status.PodIP = p.Status.PodIPs[0].IP
9689

9790
return p
9891
}
@@ -1238,169 +1231,6 @@ func TestPodToEndpointAddressForService(t *testing.T) {
12381231

12391232
}
12401233

1241-
func TestPodToEndpointAddress(t *testing.T) {
1242-
podStore := cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)
1243-
ns := "test"
1244-
addPods(podStore, ns, 1, 1, 0, false)
1245-
pods := podStore.List()
1246-
if len(pods) != 1 {
1247-
t.Errorf("podStore size: expected: %d, got: %d", 1, len(pods))
1248-
return
1249-
}
1250-
pod := pods[0].(*v1.Pod)
1251-
epa := podToEndpointAddress(pod)
1252-
if epa.IP != pod.Status.PodIP {
1253-
t.Errorf("IP: expected: %s, got: %s", pod.Status.PodIP, epa.IP)
1254-
}
1255-
if *(epa.NodeName) != pod.Spec.NodeName {
1256-
t.Errorf("NodeName: expected: %s, got: %s", pod.Spec.NodeName, *(epa.NodeName))
1257-
}
1258-
if epa.TargetRef.Kind != "Pod" {
1259-
t.Errorf("TargetRef.Kind: expected: %s, got: %s", "Pod", epa.TargetRef.Kind)
1260-
}
1261-
if epa.TargetRef.Namespace != pod.ObjectMeta.Namespace {
1262-
t.Errorf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.Namespace, epa.TargetRef.Namespace)
1263-
}
1264-
if epa.TargetRef.Name != pod.ObjectMeta.Name {
1265-
t.Errorf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.Name, epa.TargetRef.Name)
1266-
}
1267-
if epa.TargetRef.UID != pod.ObjectMeta.UID {
1268-
t.Errorf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.UID, epa.TargetRef.UID)
1269-
}
1270-
if epa.TargetRef.ResourceVersion != pod.ObjectMeta.ResourceVersion {
1271-
t.Errorf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.ResourceVersion, epa.TargetRef.ResourceVersion)
1272-
}
1273-
}
1274-
1275-
func TestPodChanged(t *testing.T) {
1276-
podStore := cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)
1277-
ns := "test"
1278-
addPods(podStore, ns, 1, 1, 0, false)
1279-
pods := podStore.List()
1280-
if len(pods) != 1 {
1281-
t.Errorf("podStore size: expected: %d, got: %d", 1, len(pods))
1282-
return
1283-
}
1284-
oldPod := pods[0].(*v1.Pod)
1285-
newPod := oldPod.DeepCopy()
1286-
1287-
if podChangedHelper(oldPod, newPod, endpointChanged) {
1288-
t.Errorf("Expected pod to be unchanged for copied pod")
1289-
}
1290-
1291-
newPod.Spec.NodeName = "changed"
1292-
if !podChangedHelper(oldPod, newPod, endpointChanged) {
1293-
t.Errorf("Expected pod to be changed for pod with NodeName changed")
1294-
}
1295-
newPod.Spec.NodeName = oldPod.Spec.NodeName
1296-
1297-
newPod.ObjectMeta.ResourceVersion = "changed"
1298-
if podChangedHelper(oldPod, newPod, endpointChanged) {
1299-
t.Errorf("Expected pod to be unchanged for pod with only ResourceVersion changed")
1300-
}
1301-
newPod.ObjectMeta.ResourceVersion = oldPod.ObjectMeta.ResourceVersion
1302-
1303-
newPod.Status.PodIP = "1.2.3.1"
1304-
if !podChangedHelper(oldPod, newPod, endpointChanged) {
1305-
t.Errorf("Expected pod to be changed with pod IP address change")
1306-
}
1307-
newPod.Status.PodIP = oldPod.Status.PodIP
1308-
1309-
/* dual stack tests */
1310-
// primary changes, because changing IPs is done by changing sandbox
1311-
// case 1: add new secondary IP
1312-
newPod.Status.PodIP = "1.1.3.1"
1313-
newPod.Status.PodIPs = []v1.PodIP{
1314-
{
1315-
IP: "1.1.3.1",
1316-
},
1317-
{
1318-
IP: "2000::1",
1319-
},
1320-
}
1321-
if !podChangedHelper(oldPod, newPod, endpointChanged) {
1322-
t.Errorf("Expected pod to be changed with adding secondary IP")
1323-
}
1324-
// reset
1325-
newPod.Status.PodIPs = nil
1326-
newPod.Status.PodIP = oldPod.Status.PodIP
1327-
1328-
// case 2: removing a secondary IP
1329-
saved := oldPod.Status.PodIP
1330-
oldPod.Status.PodIP = "1.1.3.1"
1331-
oldPod.Status.PodIPs = []v1.PodIP{
1332-
{
1333-
IP: "1.1.3.1",
1334-
},
1335-
{
1336-
IP: "2000::1",
1337-
},
1338-
}
1339-
1340-
newPod.Status.PodIP = "1.2.3.4"
1341-
newPod.Status.PodIPs = []v1.PodIP{
1342-
{
1343-
IP: "1.2.3.4",
1344-
},
1345-
}
1346-
1347-
// reset
1348-
oldPod.Status.PodIPs = nil
1349-
newPod.Status.PodIPs = nil
1350-
oldPod.Status.PodIP = saved
1351-
newPod.Status.PodIP = saved
1352-
// case 3: change secondary
1353-
// case 2: removing a secondary IP
1354-
saved = oldPod.Status.PodIP
1355-
oldPod.Status.PodIP = "1.1.3.1"
1356-
oldPod.Status.PodIPs = []v1.PodIP{
1357-
{
1358-
IP: "1.1.3.1",
1359-
},
1360-
{
1361-
IP: "2000::1",
1362-
},
1363-
}
1364-
1365-
newPod.Status.PodIP = "1.2.3.4"
1366-
newPod.Status.PodIPs = []v1.PodIP{
1367-
{
1368-
IP: "1.2.3.4",
1369-
},
1370-
{
1371-
IP: "2000::2",
1372-
},
1373-
}
1374-
1375-
// reset
1376-
oldPod.Status.PodIPs = nil
1377-
newPod.Status.PodIPs = nil
1378-
oldPod.Status.PodIP = saved
1379-
newPod.Status.PodIP = saved
1380-
1381-
/* end dual stack testing */
1382-
1383-
newPod.ObjectMeta.Name = "wrong-name"
1384-
if !podChangedHelper(oldPod, newPod, endpointChanged) {
1385-
t.Errorf("Expected pod to be changed with pod name change")
1386-
}
1387-
newPod.ObjectMeta.Name = oldPod.ObjectMeta.Name
1388-
1389-
saveConditions := oldPod.Status.Conditions
1390-
oldPod.Status.Conditions = nil
1391-
if !podChangedHelper(oldPod, newPod, endpointChanged) {
1392-
t.Errorf("Expected pod to be changed with pod readiness change")
1393-
}
1394-
oldPod.Status.Conditions = saveConditions
1395-
1396-
now := metav1.NewTime(time.Now().UTC())
1397-
newPod.ObjectMeta.DeletionTimestamp = &now
1398-
if !podChangedHelper(oldPod, newPod, endpointChanged) {
1399-
t.Errorf("Expected pod to be changed with DeletionTimestamp change")
1400-
}
1401-
newPod.ObjectMeta.DeletionTimestamp = oldPod.ObjectMeta.DeletionTimestamp.DeepCopy()
1402-
}
1403-
14041234
func TestLastTriggerChangeTimeAnnotation(t *testing.T) {
14051235
ns := "other"
14061236
testServer, endpointsHandler := makeTestServer(t, ns)
@@ -1680,6 +1510,7 @@ func TestPodUpdatesBatching(t *testing.T) {
16801510
oldPod := old.(*v1.Pod)
16811511
newPod := oldPod.DeepCopy()
16821512
newPod.Status.PodIP = update.podIP
1513+
newPod.Status.PodIPs[0].IP = update.podIP
16831514
newPod.ResourceVersion = strconv.Itoa(resourceVersion)
16841515
resourceVersion++
16851516

@@ -1996,8 +1827,3 @@ func stringVal(str *string) string {
19961827
}
19971828
return *str
19981829
}
1999-
2000-
func podChangedHelper(oldPod, newPod *v1.Pod, endpointChanged endpointutil.EndpointsMatch) bool {
2001-
podChanged, _ := endpointutil.PodChanged(oldPod, newPod, endpointChanged)
2002-
return podChanged
2003-
}

pkg/controller/endpointslice/endpointslice_controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -457,7 +457,7 @@ func (c *Controller) addPod(obj interface{}) {
457457
}
458458

459459
func (c *Controller) updatePod(old, cur interface{}) {
460-
services := endpointutil.GetServicesToUpdateOnPodChange(c.serviceLister, c.serviceSelectorCache, old, cur, podEndpointChanged)
460+
services := endpointutil.GetServicesToUpdateOnPodChange(c.serviceLister, c.serviceSelectorCache, old, cur)
461461
for key := range services {
462462
c.queue.AddAfter(key, c.endpointUpdatesBatchPeriod)
463463
}

pkg/controller/endpointslice/utils.go

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ package endpointslice
1818

1919
import (
2020
"fmt"
21-
"reflect"
2221
"time"
2322

2423
corev1 "k8s.io/api/core/v1"
@@ -36,19 +35,7 @@ import (
3635
utilnet "k8s.io/utils/net"
3736
)
3837

39-
// podEndpointChanged returns true if the results of podToEndpoint are different
40-
// for the pods passed to this function.
41-
func podEndpointChanged(pod1, pod2 *corev1.Pod) bool {
42-
endpoint1 := podToEndpoint(pod1, &corev1.Node{}, &corev1.Service{Spec: corev1.ServiceSpec{}})
43-
endpoint2 := podToEndpoint(pod2, &corev1.Node{}, &corev1.Service{Spec: corev1.ServiceSpec{}})
44-
45-
endpoint1.TargetRef.ResourceVersion = ""
46-
endpoint2.TargetRef.ResourceVersion = ""
47-
48-
return !reflect.DeepEqual(endpoint1, endpoint2)
49-
}
50-
51-
// podToEndpoint returns an Endpoint object generated from a Pod and Node.
38+
// podToEndpoint returns an Endpoint object generated from pod, node, and service.
5239
func podToEndpoint(pod *corev1.Pod, node *corev1.Node, service *corev1.Service) discovery.Endpoint {
5340
// Build out topology information. This is currently limited to hostname,
5441
// zone, and region, but this will be expanded in the future.

0 commit comments

Comments
 (0)