Skip to content

Commit 2fd1556

Browse files
authored
Merge pull request kubernetes#77720 from jiatongw/e2e/framework/service_util
Move service_util endpoints related functions to framework/endpoints/ports.go
2 parents ae2a162 + 76f7645 commit 2fd1556

File tree

6 files changed

+172
-115
lines changed

6 files changed

+172
-115
lines changed

test/e2e/framework/endpoints/BUILD

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,21 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library")
22

33
go_library(
44
name = "go_default_library",
5-
srcs = ["wait.go"],
5+
srcs = [
6+
"ports.go",
7+
"wait.go",
8+
],
69
importpath = "k8s.io/kubernetes/test/e2e/framework/endpoints",
710
visibility = ["//visibility:public"],
811
deps = [
12+
"//staging/src/k8s.io/api/core/v1:go_default_library",
913
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
1014
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
15+
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
1116
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
1217
"//test/e2e/framework:go_default_library",
18+
"//test/e2e/framework/log:go_default_library",
19+
"//vendor/github.com/onsi/ginkgo:go_default_library",
1320
],
1421
)
1522

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
/*
2+
Copyright 2019 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
/*
18+
This soak tests places a specified number of pods on each node and then
19+
repeatedly sends queries to a service running on these pods via
20+
a serivce
21+
*/
22+
23+
package endpoints
24+
25+
import (
26+
"fmt"
27+
"sort"
28+
"time"
29+
30+
"github.com/onsi/ginkgo"
31+
v1 "k8s.io/api/core/v1"
32+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
33+
"k8s.io/apimachinery/pkg/types"
34+
clientset "k8s.io/client-go/kubernetes"
35+
e2elog "k8s.io/kubernetes/test/e2e/framework/log"
36+
)
37+
38+
// ServiceStartTimeout is how long to wait for a service endpoint to be resolvable.
39+
const ServiceStartTimeout = 3 * time.Minute
40+
41+
// PortsByPodName is a map that maps pod name to container ports.
42+
type PortsByPodName map[string][]int
43+
44+
// PortsByPodUID is a map that maps pod UID to container ports.
45+
type PortsByPodUID map[types.UID][]int
46+
47+
// GetContainerPortsByPodUID returns a PortsByPodUID map on the given endpoints.
48+
func GetContainerPortsByPodUID(ep *v1.Endpoints) PortsByPodUID {
49+
m := PortsByPodUID{}
50+
for _, ss := range ep.Subsets {
51+
for _, port := range ss.Ports {
52+
for _, addr := range ss.Addresses {
53+
containerPort := port.Port
54+
if _, ok := m[addr.TargetRef.UID]; !ok {
55+
m[addr.TargetRef.UID] = make([]int, 0)
56+
}
57+
m[addr.TargetRef.UID] = append(m[addr.TargetRef.UID], int(containerPort))
58+
}
59+
}
60+
}
61+
return m
62+
}
63+
64+
func translatePodNameToUID(c clientset.Interface, ns string, expectedEndpoints PortsByPodName) (PortsByPodUID, error) {
65+
portsByUID := make(PortsByPodUID)
66+
for name, portList := range expectedEndpoints {
67+
pod, err := c.CoreV1().Pods(ns).Get(name, metav1.GetOptions{})
68+
if err != nil {
69+
return nil, fmt.Errorf("failed to get pod %s, that's pretty weird. validation failed: %s", name, err)
70+
}
71+
portsByUID[pod.ObjectMeta.UID] = portList
72+
}
73+
return portsByUID, nil
74+
}
75+
76+
func validatePorts(ep PortsByPodUID, expectedEndpoints PortsByPodUID) error {
77+
if len(ep) != len(expectedEndpoints) {
78+
// should not happen because we check this condition before
79+
return fmt.Errorf("invalid number of endpoints got %v, expected %v", ep, expectedEndpoints)
80+
}
81+
for podUID := range expectedEndpoints {
82+
if _, ok := ep[podUID]; !ok {
83+
return fmt.Errorf("endpoint %v not found", podUID)
84+
}
85+
if len(ep[podUID]) != len(expectedEndpoints[podUID]) {
86+
return fmt.Errorf("invalid list of ports for uid %v. Got %v, expected %v", podUID, ep[podUID], expectedEndpoints[podUID])
87+
}
88+
sort.Ints(ep[podUID])
89+
sort.Ints(expectedEndpoints[podUID])
90+
for index := range ep[podUID] {
91+
if ep[podUID][index] != expectedEndpoints[podUID][index] {
92+
return fmt.Errorf("invalid list of ports for uid %v. Got %v, expected %v", podUID, ep[podUID], expectedEndpoints[podUID])
93+
}
94+
}
95+
}
96+
return nil
97+
}
98+
99+
// ValidateEndpointsPorts validates that the given service exists and is served by the given expectedEndpoints.
100+
func ValidateEndpointsPorts(c clientset.Interface, namespace, serviceName string, expectedEndpoints PortsByPodName) error {
101+
ginkgo.By(fmt.Sprintf("waiting up to %v for service %s in namespace %s to expose endpoints %v", ServiceStartTimeout, serviceName, namespace, expectedEndpoints))
102+
i := 1
103+
for start := time.Now(); time.Since(start) < ServiceStartTimeout; time.Sleep(1 * time.Second) {
104+
ep, err := c.CoreV1().Endpoints(namespace).Get(serviceName, metav1.GetOptions{})
105+
if err != nil {
106+
e2elog.Logf("Get endpoints failed (%v elapsed, ignoring for 5s): %v", time.Since(start), err)
107+
continue
108+
}
109+
portsByPodUID := GetContainerPortsByPodUID(ep)
110+
expectedPortsByPodUID, err := translatePodNameToUID(c, namespace, expectedEndpoints)
111+
if err != nil {
112+
return err
113+
}
114+
if len(portsByPodUID) == len(expectedEndpoints) {
115+
err := validatePorts(portsByPodUID, expectedPortsByPodUID)
116+
if err != nil {
117+
return err
118+
}
119+
e2elog.Logf("successfully validated that service %s in namespace %s exposes endpoints %v (%v elapsed)",
120+
serviceName, namespace, expectedEndpoints, time.Since(start))
121+
return nil
122+
}
123+
if i%5 == 0 {
124+
e2elog.Logf("Unexpected endpoints: found %v, expected %v (%v elapsed, will retry)", portsByPodUID, expectedEndpoints, time.Since(start))
125+
}
126+
i++
127+
}
128+
if pods, err := c.CoreV1().Pods(metav1.NamespaceAll).List(metav1.ListOptions{}); err == nil {
129+
for _, pod := range pods.Items {
130+
e2elog.Logf("Pod %s\t%s\t%s\t%s", pod.Namespace, pod.Name, pod.Spec.NodeName, pod.DeletionTimestamp)
131+
}
132+
} else {
133+
e2elog.Logf("Can't list pod debug info: %v", err)
134+
}
135+
return fmt.Errorf("Timed out waiting for service %s in namespace %s to expose endpoints %v (%v elapsed)", serviceName, namespace, expectedEndpoints, ServiceStartTimeout)
136+
}

test/e2e/framework/service_util.go

Lines changed: 0 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ import (
3030
"k8s.io/apimachinery/pkg/api/errors"
3131
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3232
"k8s.io/apimachinery/pkg/labels"
33-
"k8s.io/apimachinery/pkg/types"
3433
"k8s.io/apimachinery/pkg/util/intstr"
3534
utilnet "k8s.io/apimachinery/pkg/util/net"
3635
"k8s.io/apimachinery/pkg/util/sets"
@@ -1238,104 +1237,6 @@ func UpdateService(c clientset.Interface, namespace, serviceName string, update
12381237
return service, err
12391238
}
12401239

1241-
// GetContainerPortsByPodUID returns a PortsByPodUID map on the given endpoints.
1242-
func GetContainerPortsByPodUID(endpoints *v1.Endpoints) PortsByPodUID {
1243-
m := PortsByPodUID{}
1244-
for _, ss := range endpoints.Subsets {
1245-
for _, port := range ss.Ports {
1246-
for _, addr := range ss.Addresses {
1247-
containerPort := port.Port
1248-
if _, ok := m[addr.TargetRef.UID]; !ok {
1249-
m[addr.TargetRef.UID] = make([]int, 0)
1250-
}
1251-
m[addr.TargetRef.UID] = append(m[addr.TargetRef.UID], int(containerPort))
1252-
}
1253-
}
1254-
}
1255-
return m
1256-
}
1257-
1258-
// PortsByPodName maps pod name to ports.
1259-
type PortsByPodName map[string][]int
1260-
1261-
// PortsByPodUID maps UID to ports.
1262-
type PortsByPodUID map[types.UID][]int
1263-
1264-
func translatePodNameToUIDOrFail(c clientset.Interface, ns string, expectedEndpoints PortsByPodName) PortsByPodUID {
1265-
portsByUID := make(PortsByPodUID)
1266-
1267-
for name, portList := range expectedEndpoints {
1268-
pod, err := c.CoreV1().Pods(ns).Get(name, metav1.GetOptions{})
1269-
if err != nil {
1270-
Failf("failed to get pod %s, that's pretty weird. validation failed: %s", name, err)
1271-
}
1272-
portsByUID[pod.ObjectMeta.UID] = portList
1273-
}
1274-
// Logf("successfully translated pod names to UIDs: %v -> %v on namespace %s", expectedEndpoints, portsByUID, ns)
1275-
return portsByUID
1276-
}
1277-
1278-
func validatePortsOrFail(endpoints PortsByPodUID, expectedEndpoints PortsByPodUID) {
1279-
if len(endpoints) != len(expectedEndpoints) {
1280-
// should not happen because we check this condition before
1281-
Failf("invalid number of endpoints got %v, expected %v", endpoints, expectedEndpoints)
1282-
}
1283-
for podUID := range expectedEndpoints {
1284-
if _, ok := endpoints[podUID]; !ok {
1285-
Failf("endpoint %v not found", podUID)
1286-
}
1287-
if len(endpoints[podUID]) != len(expectedEndpoints[podUID]) {
1288-
Failf("invalid list of ports for uid %v. Got %v, expected %v", podUID, endpoints[podUID], expectedEndpoints[podUID])
1289-
}
1290-
sort.Ints(endpoints[podUID])
1291-
sort.Ints(expectedEndpoints[podUID])
1292-
for index := range endpoints[podUID] {
1293-
if endpoints[podUID][index] != expectedEndpoints[podUID][index] {
1294-
Failf("invalid list of ports for uid %v. Got %v, expected %v", podUID, endpoints[podUID], expectedEndpoints[podUID])
1295-
}
1296-
}
1297-
}
1298-
}
1299-
1300-
// ValidateEndpointsOrFail validates that the given service exists and is served by the given expectedEndpoints.
1301-
func ValidateEndpointsOrFail(c clientset.Interface, namespace, serviceName string, expectedEndpoints PortsByPodName) {
1302-
ginkgo.By(fmt.Sprintf("waiting up to %v for service %s in namespace %s to expose endpoints %v", ServiceStartTimeout, serviceName, namespace, expectedEndpoints))
1303-
i := 1
1304-
for start := time.Now(); time.Since(start) < ServiceStartTimeout; time.Sleep(1 * time.Second) {
1305-
endpoints, err := c.CoreV1().Endpoints(namespace).Get(serviceName, metav1.GetOptions{})
1306-
if err != nil {
1307-
Logf("Get endpoints failed (%v elapsed, ignoring for 5s): %v", time.Since(start), err)
1308-
continue
1309-
}
1310-
// Logf("Found endpoints %v", endpoints)
1311-
1312-
portsByPodUID := GetContainerPortsByPodUID(endpoints)
1313-
// Logf("Found port by pod UID %v", portsByPodUID)
1314-
1315-
expectedPortsByPodUID := translatePodNameToUIDOrFail(c, namespace, expectedEndpoints)
1316-
if len(portsByPodUID) == len(expectedEndpoints) {
1317-
validatePortsOrFail(portsByPodUID, expectedPortsByPodUID)
1318-
Logf("successfully validated that service %s in namespace %s exposes endpoints %v (%v elapsed)",
1319-
serviceName, namespace, expectedEndpoints, time.Since(start))
1320-
return
1321-
}
1322-
1323-
if i%5 == 0 {
1324-
Logf("Unexpected endpoints: found %v, expected %v (%v elapsed, will retry)", portsByPodUID, expectedEndpoints, time.Since(start))
1325-
}
1326-
i++
1327-
}
1328-
1329-
if pods, err := c.CoreV1().Pods(metav1.NamespaceAll).List(metav1.ListOptions{}); err == nil {
1330-
for _, pod := range pods.Items {
1331-
Logf("Pod %s\t%s\t%s\t%s", pod.Namespace, pod.Name, pod.Spec.NodeName, pod.DeletionTimestamp)
1332-
}
1333-
} else {
1334-
Logf("Can't list pod debug info: %v", err)
1335-
}
1336-
Failf("Timed out waiting for service %s in namespace %s to expose endpoints %v (%v elapsed)", serviceName, namespace, expectedEndpoints, ServiceStartTimeout)
1337-
}
1338-
13391240
// StartServeHostnameService creates a replication controller that serves its
13401241
// hostname and a service on top of it.
13411242
func StartServeHostnameService(c clientset.Interface, svc *v1.Service, ns string, replicas int) ([]string, string, error) {

test/e2e/kubectl/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ go_library(
3333
"//test/e2e/common:go_default_library",
3434
"//test/e2e/framework:go_default_library",
3535
"//test/e2e/framework/auth:go_default_library",
36+
"//test/e2e/framework/endpoints:go_default_library",
3637
"//test/e2e/framework/job:go_default_library",
3738
"//test/e2e/framework/log:go_default_library",
3839
"//test/e2e/framework/testfiles:go_default_library",

test/e2e/kubectl/kubectl.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,6 @@ import (
4040
"time"
4141

4242
"github.com/elazarl/goproxy"
43-
"sigs.k8s.io/yaml"
44-
4543
v1 "k8s.io/api/core/v1"
4644
rbacv1beta1 "k8s.io/api/rbac/v1beta1"
4745
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
@@ -60,13 +58,15 @@ import (
6058
commonutils "k8s.io/kubernetes/test/e2e/common"
6159
"k8s.io/kubernetes/test/e2e/framework"
6260
"k8s.io/kubernetes/test/e2e/framework/auth"
61+
e2eendpoints "k8s.io/kubernetes/test/e2e/framework/endpoints"
6362
jobutil "k8s.io/kubernetes/test/e2e/framework/job"
6463
e2elog "k8s.io/kubernetes/test/e2e/framework/log"
6564
"k8s.io/kubernetes/test/e2e/framework/testfiles"
6665
"k8s.io/kubernetes/test/e2e/scheduling"
6766
testutils "k8s.io/kubernetes/test/utils"
6867
"k8s.io/kubernetes/test/utils/crd"
6968
uexec "k8s.io/utils/exec"
69+
"sigs.k8s.io/yaml"
7070

7171
"github.com/onsi/ginkgo"
7272
"github.com/onsi/gomega"
@@ -1075,7 +1075,7 @@ metadata:
10751075
})
10761076
validateService := func(name string, servicePort int, timeout time.Duration) {
10771077
err := wait.Poll(framework.Poll, timeout, func() (bool, error) {
1078-
endpoints, err := c.CoreV1().Endpoints(ns).Get(name, metav1.GetOptions{})
1078+
ep, err := c.CoreV1().Endpoints(ns).Get(name, metav1.GetOptions{})
10791079
if err != nil {
10801080
// log the real error
10811081
e2elog.Logf("Get endpoints failed (interval %v): %v", framework.Poll, err)
@@ -1089,7 +1089,7 @@ metadata:
10891089
return false, err
10901090
}
10911091

1092-
uidToPort := framework.GetContainerPortsByPodUID(endpoints)
1092+
uidToPort := e2eendpoints.GetContainerPortsByPodUID(ep)
10931093
if len(uidToPort) == 0 {
10941094
e2elog.Logf("No endpoint found, retrying")
10951095
return false, nil

0 commit comments

Comments
 (0)