Skip to content

Commit a89c2bd

Browse files
committed
fix flakes on e2e test TCP CLOSE_WAIT timeout
it turns out that the e2e test was not using the timeout used to hold the CLOSE_WAIT status, hence the test was flake depending on how fast it checked the conntrack table. This PR replaces the dependency on ssh using a pod to check the conntrack entries on the host in a loop, to make the test more robust and reduce the flakiness due to race conditions and/or ssh issues. It also fixes a bug trying to grep the conntrack entry, where the error was swallowed if a conntrack entry wasn't found.
1 parent 788a073 commit a89c2bd

File tree

2 files changed

+123
-69
lines changed

2 files changed

+123
-69
lines changed

test/e2e/network/BUILD

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,6 @@ go_library(
8080
"//test/e2e/framework/skipper:go_default_library",
8181
"//test/e2e/framework/ssh:go_default_library",
8282
"//test/e2e/network/scale:go_default_library",
83-
"//test/images/agnhost/net/nat:go_default_library",
8483
"//test/utils:go_default_library",
8584
"//test/utils/image:go_default_library",
8685
"//vendor/github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud:go_default_library",

test/e2e/network/kube_proxy.go

Lines changed: 123 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -18,22 +18,22 @@ package network
1818

1919
import (
2020
"context"
21-
"encoding/json"
21+
"encoding/hex"
2222
"fmt"
2323
"math"
24+
"net"
2425
"strconv"
2526
"strings"
2627
"time"
2728

2829
v1 "k8s.io/api/core/v1"
2930
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
31+
"k8s.io/apimachinery/pkg/util/wait"
3032

3133
"k8s.io/kubernetes/test/e2e/framework"
3234
e2enode "k8s.io/kubernetes/test/e2e/framework/node"
3335
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
3436
e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
35-
e2essh "k8s.io/kubernetes/test/e2e/framework/ssh"
36-
"k8s.io/kubernetes/test/images/agnhost/net/nat"
3737
imageutils "k8s.io/kubernetes/test/utils/image"
3838

3939
"github.com/onsi/ginkgo"
@@ -44,15 +44,15 @@ var kubeProxyE2eImage = imageutils.GetE2EImage(imageutils.Agnhost)
4444

4545
var _ = SIGDescribe("Network", func() {
4646
const (
47-
testDaemonHTTPPort = 11301
48-
testDaemonTCPPort = 11302
49-
timeoutSeconds = 10
50-
postFinTimeoutSeconds = 5
47+
testDaemonHTTPPort = 11301
48+
testDaemonTCPPort = 11302
49+
deadlineTimeoutSeconds = 5
50+
postFinTimeoutSeconds = 15
5151
)
5252

5353
fr := framework.NewDefaultFramework("network")
5454

55-
ginkgo.It("should set TCP CLOSE_WAIT timeout", func() {
55+
ginkgo.It("should set TCP CLOSE_WAIT timeout [Privileged]", func() {
5656
nodes, err := e2enode.GetBoundedReadySchedulableNodes(fr.ClientSet, 2)
5757
framework.ExpectNoError(err)
5858
if len(nodes.Items) < 2 {
@@ -83,16 +83,63 @@ var _ = SIGDescribe("Network", func() {
8383

8484
zero := int64(0)
8585

86+
// Create a pod to check the conntrack entries on the host node
87+
// It mounts the host /proc/net folder to be able to access
88+
// the nf_conntrack file with the host conntrack entries
89+
privileged := true
90+
91+
hostExecPod := &v1.Pod{
92+
ObjectMeta: metav1.ObjectMeta{
93+
Name: "e2e-net-exec",
94+
Namespace: fr.Namespace.Name,
95+
Labels: map[string]string{"app": "e2e-net-exec"},
96+
},
97+
Spec: v1.PodSpec{
98+
HostNetwork: true,
99+
NodeName: clientNodeInfo.name,
100+
Containers: []v1.Container{
101+
{
102+
Name: "e2e-net-exec",
103+
Image: kubeProxyE2eImage,
104+
ImagePullPolicy: "Always",
105+
Args: []string{"pause"},
106+
VolumeMounts: []v1.VolumeMount{
107+
{
108+
Name: "proc-net",
109+
MountPath: "/rootfs/proc/net",
110+
ReadOnly: true,
111+
},
112+
},
113+
SecurityContext: &v1.SecurityContext{
114+
Privileged: &privileged,
115+
},
116+
},
117+
},
118+
Volumes: []v1.Volume{
119+
{
120+
Name: "proc-net",
121+
VolumeSource: v1.VolumeSource{
122+
HostPath: &v1.HostPathVolumeSource{
123+
Path: "/proc/net",
124+
},
125+
},
126+
},
127+
},
128+
TerminationGracePeriodSeconds: &zero,
129+
},
130+
}
131+
fr.PodClient().CreateSync(hostExecPod)
132+
defer fr.PodClient().DeleteSync(hostExecPod.Name, metav1.DeleteOptions{}, framework.DefaultPodDeletionTimeout)
133+
86134
// Some distributions (Ubuntu 16.04 etc.) don't support the proc file.
87-
_, err = e2essh.IssueSSHCommandWithResult(
88-
"ls /proc/net/nf_conntrack",
89-
framework.TestContext.Provider,
90-
clientNodeInfo.node)
135+
_, err = framework.RunHostCmd(fr.Namespace.Name, "e2e-net-exec",
136+
"ls /rootfs/proc/net/nf_conntrack")
91137
if err != nil && strings.Contains(err.Error(), "No such file or directory") {
92138
e2eskipper.Skipf("The node %s does not support /proc/net/nf_conntrack", clientNodeInfo.name)
93139
}
94140
framework.ExpectNoError(err)
95141

142+
// Create the client and server pods
96143
clientPodSpec := &v1.Pod{
97144
ObjectMeta: metav1.ObjectMeta{
98145
Name: "e2e-net-client",
@@ -107,7 +154,13 @@ var _ = SIGDescribe("Network", func() {
107154
Image: kubeProxyE2eImage,
108155
ImagePullPolicy: "Always",
109156
Args: []string{
110-
"net", "--serve", fmt.Sprintf("0.0.0.0:%d", testDaemonHTTPPort),
157+
"net",
158+
"--runner", "nat-closewait-client",
159+
"--options",
160+
fmt.Sprintf(`{"RemoteAddr":"%v", "PostFinTimeoutSeconds":%v, "TimeoutSeconds":%v, "LeakConnection":true}`,
161+
net.JoinHostPort(serverNodeInfo.nodeIP, strconv.Itoa(testDaemonTCPPort)),
162+
postFinTimeoutSeconds,
163+
deadlineTimeoutSeconds),
111164
},
112165
},
113166
},
@@ -132,7 +185,7 @@ var _ = SIGDescribe("Network", func() {
132185
"net",
133186
"--runner", "nat-closewait-server",
134187
"--options",
135-
fmt.Sprintf(`{"LocalAddr":"0.0.0.0:%v", "PostFindTimeoutSeconds":%v}`,
188+
fmt.Sprintf(`{"LocalAddr":":%v", "PostFinTimeoutSeconds":%v}`,
136189
testDaemonTCPPort,
137190
postFinTimeoutSeconds),
138191
},
@@ -156,72 +209,55 @@ var _ = SIGDescribe("Network", func() {
156209
kubeProxyE2eImage))
157210
fr.PodClient().CreateSync(serverPodSpec)
158211

212+
// The server should be listening before spawning the client pod
213+
<-time.After(time.Duration(2) * time.Second)
214+
// Connect to the server and leak the connection
159215
ginkgo.By(fmt.Sprintf(
160-
"Launching a client daemon on node %v (node ip: %v, image: %v)",
216+
"Launching a client connection on node %v (node ip: %v, image: %v)",
161217
clientNodeInfo.name,
162218
clientNodeInfo.nodeIP,
163219
kubeProxyE2eImage))
164220
fr.PodClient().CreateSync(clientPodSpec)
165221

166-
ginkgo.By("Make client connect")
167-
168-
options := nat.CloseWaitClientOptions{
169-
RemoteAddr: fmt.Sprintf("%v:%v",
170-
serverNodeInfo.nodeIP, testDaemonTCPPort),
171-
TimeoutSeconds: timeoutSeconds,
172-
PostFinTimeoutSeconds: 0,
173-
LeakConnection: true,
174-
}
175-
176-
jsonBytes, err := json.Marshal(options)
177-
framework.ExpectNoError(err, "could not marshal")
178-
179-
cmd := fmt.Sprintf(
180-
`curl -X POST http://localhost:%v/run/nat-closewait-client -d `+
181-
`'%v' 2>/dev/null`,
182-
testDaemonHTTPPort,
183-
string(jsonBytes))
184-
framework.RunHostCmdOrDie(fr.Namespace.Name, "e2e-net-client", cmd)
185-
186-
<-time.After(time.Duration(1) * time.Second)
187-
188222
ginkgo.By("Checking /proc/net/nf_conntrack for the timeout")
189-
// If test flakes occur here, then this check should be performed
190-
// in a loop as there may be a race with the client connecting.
191-
e2essh.IssueSSHCommandWithResult(
192-
fmt.Sprintf("sudo cat /proc/net/nf_conntrack | grep 'dport=%v'",
193-
testDaemonTCPPort),
194-
framework.TestContext.Provider,
195-
clientNodeInfo.node)
196-
197-
// Timeout in seconds is available as the fifth column from
198-
// /proc/net/nf_conntrack.
199-
result, err := e2essh.IssueSSHCommandWithResult(
200-
fmt.Sprintf(
201-
"sudo cat /proc/net/nf_conntrack "+
202-
"| grep 'CLOSE_WAIT.*dst=%v.*dport=%v' "+
203-
"| tail -n 1"+
204-
"| awk '{print $5}' ",
205-
serverNodeInfo.nodeIP,
206-
testDaemonTCPPort),
207-
framework.TestContext.Provider,
208-
clientNodeInfo.node)
209-
framework.ExpectNoError(err)
210-
211-
timeoutSeconds, err := strconv.Atoi(strings.TrimSpace(result.Stdout))
212-
framework.ExpectNoError(err)
213-
214223
// These must be synchronized from the default values set in
215224
// pkg/apis/../defaults.go ConntrackTCPCloseWaitTimeout. The
216225
// current defaults are hidden in the initialization code.
217226
const epsilonSeconds = 60
218227
const expectedTimeoutSeconds = 60 * 60
219-
220-
framework.Logf("conntrack entry timeout was: %v, expected: %v",
221-
timeoutSeconds, expectedTimeoutSeconds)
222-
223-
gomega.Expect(math.Abs(float64(timeoutSeconds - expectedTimeoutSeconds))).Should(
224-
gomega.BeNumerically("<", (epsilonSeconds)))
228+
// the conntrack file uses the IPv6 expanded format
229+
ip := fullIPv6(net.ParseIP(serverNodeInfo.nodeIP))
230+
// Obtain the corresponding conntrack entry on the host checking
231+
// the nf_conntrack file from the pod e2e-net-exec.
232+
// It retries in a loop if the entry is not found.
233+
cmd := fmt.Sprintf("cat /rootfs/proc/net/nf_conntrack "+
234+
"| grep -m 1 'CLOSE_WAIT.*dst=%v.*dport=%v' ",
235+
ip, testDaemonTCPPort)
236+
if err := wait.PollImmediate(deadlineTimeoutSeconds, postFinTimeoutSeconds, func() (bool, error) {
237+
result, err := framework.RunHostCmd(fr.Namespace.Name, "e2e-net-exec", cmd)
238+
// retry if we can't obtain the conntrack entry
239+
if err != nil {
240+
framework.Logf("failed to obtain conntrack entry: %v %v", result, err)
241+
return false, nil
242+
}
243+
framework.Logf("conntrack entry for node %v and port %v: %v", serverNodeInfo.nodeIP, testDaemonTCPPort, result)
244+
// Timeout in seconds is available as the fifth column of
245+
// the matched entry in /proc/net/nf_conntrack.
246+
line := strings.Fields(result)
247+
if len(line) < 5 {
248+
return false, fmt.Errorf("conntrack entry does not have a timeout field: %v", line)
249+
}
250+
timeoutSeconds, err := strconv.Atoi(line[4])
251+
if err != nil {
252+
return false, fmt.Errorf("failed to convert matched timeout %s to integer: %v", line[4], err)
253+
}
254+
if math.Abs(float64(timeoutSeconds-expectedTimeoutSeconds)) < epsilonSeconds {
255+
return true, nil
256+
}
257+
return false, fmt.Errorf("wrong TCP CLOSE_WAIT timeout: %v expected: %v", timeoutSeconds, expectedTimeoutSeconds)
258+
}); err != nil {
259+
framework.Failf("no conntrack entry for port %d on node %s", testDaemonTCPPort, serverNodeInfo.nodeIP)
260+
}
225261
})
226262

227263
// Regression test for #74839, where:
@@ -338,3 +374,22 @@ var _ = SIGDescribe("Network", func() {
338374
}
339375
})
340376
})
377+
378+
// fullIPv6 returns a string with the IP representation
379+
// if IPv6 it returns the expanded address format
380+
// credit https://stackoverflow.com/a/52003106/4532704
381+
func fullIPv6(ip net.IP) string {
382+
if ip.To4() == nil {
383+
dst := make([]byte, hex.EncodedLen(len(ip)))
384+
_ = hex.Encode(dst, ip)
385+
return string(dst[0:4]) + ":" +
386+
string(dst[4:8]) + ":" +
387+
string(dst[8:12]) + ":" +
388+
string(dst[12:16]) + ":" +
389+
string(dst[16:20]) + ":" +
390+
string(dst[20:24]) + ":" +
391+
string(dst[24:28]) + ":" +
392+
string(dst[28:])
393+
}
394+
return ip.String()
395+
}

0 commit comments

Comments
 (0)