Skip to content

Commit a6b08a8

Browse files
committed
Refactor "updates to ExternalTrafficPolicy" test
The LoadBalancer test "should handle updates to ExternalTrafficPolicy field" had a bunch of problems, but the biggest is that (without doing [Disruptive] things to the cluster or making unwarranted assumptions about source IPs) it's very hard to *prove* that the cloud load balancer is doing Cluster traffic policy semantics (distributing connections to all nodes) rather than Local (distributing only to nodes with endpoints). So split the test into 2 new tests with more focused semantics: - "should implement NodePort and HealthCheckNodePort correctly when ExternalTrafficPolicy changes" (in service.go) tests that the service proxy is correctly implementing the proxy side of Cluster-vs-Local traffic policy for LoadBalancer Services, without testing the cloud load balancer itself at all. - "should target all nodes with endpoints" (in loadbalancer.go) complements the existing "should only target nodes with endpoints", to ensure that when a service has `externalTrafficPolicy: Local`, and there are endpoints on multiple nodes, that the cloud is correctly configured to target all of those endpoints, not just one.
1 parent 8e8bef6 commit a6b08a8

File tree

2 files changed

+210
-171
lines changed

2 files changed

+210
-171
lines changed

test/e2e/network/loadbalancer.go

Lines changed: 48 additions & 171 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@ import (
5656
"k8s.io/utils/ptr"
5757

5858
"github.com/onsi/ginkgo/v2"
59-
"github.com/onsi/gomega"
6059
)
6160

6261
// getInternalIP returns node internal IP
@@ -990,7 +989,7 @@ var _ = common.SIGDescribe("LoadBalancers", feature.LoadBalancer, func() {
990989

991990
var _ = common.SIGDescribe("LoadBalancers ExternalTrafficPolicy: Local", feature.LoadBalancer, framework.WithSlow(), func() {
992991
f := framework.NewDefaultFramework("esipp")
993-
f.NamespacePodSecurityLevel = admissionapi.LevelBaseline
992+
f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
994993
var loadBalancerCreateTimeout time.Duration
995994

996995
var cs clientset.Interface
@@ -1151,6 +1150,53 @@ var _ = common.SIGDescribe("LoadBalancers ExternalTrafficPolicy: Local", feature
11511150
}
11521151
})
11531152

1153+
ginkgo.It("should target all nodes with endpoints", func(ctx context.Context) {
1154+
// FIXME: need a better platform-independent timeout
1155+
loadBalancerCreateTimeout := e2eservice.GetServiceLoadBalancerCreationTimeout(ctx, cs)
1156+
1157+
nodes, err := e2enode.GetBoundedReadySchedulableNodes(ctx, cs, 2)
1158+
framework.ExpectNoError(err)
1159+
if len(nodes.Items) == 1 {
1160+
e2eskipper.Skipf("Test requires multiple schedulable nodes")
1161+
}
1162+
1163+
namespace := f.Namespace.Name
1164+
serviceName := "external-local-update"
1165+
jig := e2eservice.NewTestJig(cs, namespace, serviceName)
1166+
1167+
ginkgo.By("creating the service")
1168+
svc, err := jig.CreateOnlyLocalLoadBalancerService(ctx, loadBalancerCreateTimeout, false, nil)
1169+
framework.ExpectNoError(err, "creating the service")
1170+
ingress := e2eservice.GetIngressPoint(&svc.Status.LoadBalancer.Ingress[0])
1171+
svcPort := int(svc.Spec.Ports[0].Port)
1172+
framework.Logf("ingress is %s:%d", ingress, svcPort)
1173+
1174+
ginkgo.By("creating endpoints on multiple nodes")
1175+
_, err = jig.Run(ctx, func(rc *v1.ReplicationController) {
1176+
rc.Spec.Replicas = ptr.To[int32](2)
1177+
rc.Spec.Template.Spec.Affinity = &v1.Affinity{
1178+
PodAntiAffinity: &v1.PodAntiAffinity{
1179+
RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{
1180+
{
1181+
LabelSelector: &metav1.LabelSelector{MatchLabels: jig.Labels},
1182+
TopologyKey: "kubernetes.io/hostname",
1183+
},
1184+
},
1185+
},
1186+
}
1187+
})
1188+
framework.ExpectNoError(err, "creating the endpoints")
1189+
1190+
ginkgo.By("ensuring that the LoadBalancer targets all endpoints")
1191+
// We're not testing affinity here, but we can use checkAffinity(false) to
1192+
// test that affinity *isn't* enabled, which is to say, that connecting to
1193+
// ingress:svcPort multiple times eventually reaches at least 2 different
1194+
// endpoints.
1195+
if !checkAffinity(ctx, cs, nil, ingress, svcPort, false) {
1196+
framework.Failf("Load balancer connections only reached one of the two endpoints")
1197+
}
1198+
})
1199+
11541200
ginkgo.It("should work from pods", func(ctx context.Context) {
11551201
var err error
11561202
namespace := f.Namespace.Name
@@ -1216,175 +1262,6 @@ var _ = common.SIGDescribe("LoadBalancers ExternalTrafficPolicy: Local", feature
12161262
framework.Failf("Source IP not preserved from %v, expected '%v' got '%v'", pausePod.Name, pausePod.Status.PodIP, srcIP)
12171263
}
12181264
})
1219-
1220-
ginkgo.It("should handle updates to ExternalTrafficPolicy field", func(ctx context.Context) {
1221-
namespace := f.Namespace.Name
1222-
serviceName := "external-local-update"
1223-
jig := e2eservice.NewTestJig(cs, namespace, serviceName)
1224-
1225-
nodes, err := e2enode.GetBoundedReadySchedulableNodes(ctx, cs, e2eservice.MaxNodesForEndpointsTests)
1226-
framework.ExpectNoError(err)
1227-
if len(nodes.Items) < 2 {
1228-
framework.Failf("Need at least 2 nodes to verify source ip from a node without endpoint")
1229-
}
1230-
1231-
svc, err := jig.CreateOnlyLocalLoadBalancerService(ctx, loadBalancerCreateTimeout, true, nil)
1232-
framework.ExpectNoError(err)
1233-
ginkgo.DeferCleanup(func(ctx context.Context) {
1234-
err = jig.ChangeServiceType(ctx, v1.ServiceTypeClusterIP, loadBalancerCreateTimeout)
1235-
framework.ExpectNoError(err)
1236-
err := cs.CoreV1().Services(svc.Namespace).Delete(ctx, svc.Name, metav1.DeleteOptions{})
1237-
framework.ExpectNoError(err)
1238-
})
1239-
1240-
// save the health check node port because it disappears when Local traffic policy is turned off.
1241-
healthCheckNodePort := int(svc.Spec.HealthCheckNodePort)
1242-
1243-
ginkgo.By("changing ExternalTrafficPolicy to Cluster")
1244-
svc, err = jig.UpdateService(ctx, func(svc *v1.Service) {
1245-
svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyCluster
1246-
})
1247-
framework.ExpectNoError(err)
1248-
if svc.Spec.HealthCheckNodePort > 0 {
1249-
framework.Failf("Service HealthCheck NodePort still present")
1250-
}
1251-
1252-
epNodes, err := jig.ListNodesWithEndpoint(ctx)
1253-
framework.ExpectNoError(err)
1254-
// map from name of nodes with endpoint to internal ip
1255-
// it is assumed that there is only a single node with the endpoint
1256-
endpointNodeMap := make(map[string]string)
1257-
// map from name of nodes without endpoint to internal ip
1258-
noEndpointNodeMap := make(map[string]string)
1259-
for _, node := range epNodes {
1260-
ips := e2enode.GetAddresses(&node, v1.NodeInternalIP)
1261-
if len(ips) < 1 {
1262-
framework.Failf("No internal ip found for node %s", node.Name)
1263-
}
1264-
endpointNodeMap[node.Name] = ips[0]
1265-
}
1266-
for _, n := range nodes.Items {
1267-
ips := e2enode.GetAddresses(&n, v1.NodeInternalIP)
1268-
if len(ips) < 1 {
1269-
framework.Failf("No internal ip found for node %s", n.Name)
1270-
}
1271-
if _, ok := endpointNodeMap[n.Name]; !ok {
1272-
noEndpointNodeMap[n.Name] = ips[0]
1273-
}
1274-
}
1275-
gomega.Expect(endpointNodeMap).ToNot(gomega.BeEmpty())
1276-
gomega.Expect(noEndpointNodeMap).ToNot(gomega.BeEmpty())
1277-
1278-
svcTCPPort := int(svc.Spec.Ports[0].Port)
1279-
svcNodePort := int(svc.Spec.Ports[0].NodePort)
1280-
ingressIP := e2eservice.GetIngressPoint(&svc.Status.LoadBalancer.Ingress[0])
1281-
path := "/clientip"
1282-
dialCmd := "clientip"
1283-
1284-
config := e2enetwork.NewNetworkingTestConfig(ctx, f)
1285-
1286-
ginkgo.By(fmt.Sprintf("endpoints present on nodes %v, absent on nodes %v", endpointNodeMap, noEndpointNodeMap))
1287-
for nodeName, nodeIP := range noEndpointNodeMap {
1288-
ginkgo.By(fmt.Sprintf("Checking %v (%v:%v/%v) proxies to endpoints on another node", nodeName, nodeIP[0], svcNodePort, dialCmd))
1289-
_, err := GetHTTPContentFromTestContainer(ctx, config, nodeIP, svcNodePort, e2eservice.KubeProxyLagTimeout, dialCmd)
1290-
framework.ExpectNoError(err, "Could not reach HTTP service through %v:%v/%v after %v", nodeIP, svcNodePort, dialCmd, e2eservice.KubeProxyLagTimeout)
1291-
}
1292-
1293-
for nodeName, nodeIP := range endpointNodeMap {
1294-
ginkgo.By(fmt.Sprintf("checking kube-proxy health check fails on node with endpoint (%s), public IP %s", nodeName, nodeIP))
1295-
var body string
1296-
pollFn := func(ctx context.Context) (bool, error) {
1297-
// we expect connection failure here, but not other errors
1298-
resp, err := config.GetResponseFromTestContainer(ctx,
1299-
"http",
1300-
"healthz",
1301-
nodeIP,
1302-
healthCheckNodePort)
1303-
if err != nil {
1304-
return false, nil
1305-
}
1306-
if len(resp.Errors) > 0 {
1307-
return true, nil
1308-
}
1309-
if len(resp.Responses) > 0 {
1310-
body = resp.Responses[0]
1311-
}
1312-
return false, nil
1313-
}
1314-
if pollErr := wait.PollUntilContextTimeout(ctx, framework.Poll, e2eservice.TestTimeout, true, pollFn); pollErr != nil {
1315-
framework.Failf("Kube-proxy still exposing health check on node %v:%v, after traffic policy set to Cluster. body %s",
1316-
nodeName, healthCheckNodePort, body)
1317-
}
1318-
}
1319-
1320-
// Poll till kube-proxy re-adds the MASQUERADE rule on the node.
1321-
ginkgo.By(fmt.Sprintf("checking source ip is NOT preserved through loadbalancer %v", ingressIP))
1322-
var clientIP string
1323-
pollErr := wait.PollUntilContextTimeout(ctx, framework.Poll, 3*e2eservice.KubeProxyLagTimeout, true, func(ctx context.Context) (bool, error) {
1324-
clientIPPort, err := GetHTTPContent(ingressIP, svcTCPPort, e2eservice.KubeProxyLagTimeout, path)
1325-
if err != nil {
1326-
return false, nil
1327-
}
1328-
// The clientIPPort returned from GetHTTPContent is in this format: x.x.x.x:port or [xx:xx:xx::x]:port
1329-
host, _, err := net.SplitHostPort(clientIPPort)
1330-
if err != nil {
1331-
framework.Logf("SplitHostPort returned unexpected error: %q", clientIPPort)
1332-
return false, nil
1333-
}
1334-
ip := netutils.ParseIPSloppy(host)
1335-
if ip == nil {
1336-
framework.Logf("Invalid client IP address format: %q", host)
1337-
return false, nil
1338-
}
1339-
if subnetPrefix.Contains(ip) {
1340-
return true, nil
1341-
}
1342-
return false, nil
1343-
})
1344-
if pollErr != nil {
1345-
framework.Failf("Source IP WAS preserved with Cluster traffic policy. Got %v, expected a cluster ip.", clientIP)
1346-
}
1347-
1348-
// TODO: We need to attempt to create another service with the previously
1349-
// allocated healthcheck nodePort. If the health check nodePort has been
1350-
// freed, the new service creation will succeed, upon which we cleanup.
1351-
// If the health check nodePort has NOT been freed, the new service
1352-
// creation will fail.
1353-
1354-
ginkgo.By("setting ExternalTrafficPolicy back to Local")
1355-
svc, err = jig.UpdateService(ctx, func(svc *v1.Service) {
1356-
svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal
1357-
// Request the same healthCheckNodePort as before, to test the user-requested allocation path
1358-
svc.Spec.HealthCheckNodePort = int32(healthCheckNodePort)
1359-
})
1360-
framework.ExpectNoError(err)
1361-
loadBalancerPropagationTimeout := e2eservice.GetServiceLoadBalancerPropagationTimeout(ctx, cs)
1362-
pollErr = wait.PollUntilContextTimeout(ctx, framework.PollShortTimeout, loadBalancerPropagationTimeout, true, func(ctx context.Context) (bool, error) {
1363-
clientIPPort, err := GetHTTPContent(ingressIP, svcTCPPort, e2eservice.KubeProxyLagTimeout, path)
1364-
if err != nil {
1365-
return false, nil
1366-
}
1367-
ginkgo.By(fmt.Sprintf("Endpoint %v:%v%v returned client ip %v", ingressIP, svcTCPPort, path, clientIPPort))
1368-
// The clientIPPort returned from GetHTTPContent is in this format: x.x.x.x:port or [xx:xx:xx::x]:port
1369-
host, _, err := net.SplitHostPort(clientIPPort)
1370-
if err != nil {
1371-
framework.Logf("SplitHostPort returned unexpected error: %q", clientIPPort)
1372-
return false, nil
1373-
}
1374-
ip := netutils.ParseIPSloppy(host)
1375-
if ip == nil {
1376-
framework.Logf("Invalid client IP address format: %q", host)
1377-
return false, nil
1378-
}
1379-
if !subnetPrefix.Contains(ip) {
1380-
return true, nil
1381-
}
1382-
return false, nil
1383-
})
1384-
if pollErr != nil {
1385-
framework.Failf("Source IP (%v) is not the client IP after ExternalTrafficPolicy set back to Local, expected a public IP.", clientIP)
1386-
}
1387-
})
13881265
})
13891266

13901267
func ipToSourceRange(ip string) string {

0 commit comments

Comments
 (0)