Skip to content

Commit 704ec25

Browse files
committed
fix flakes on e2e test TCP CLOSE_WAIT timeout
it turns out that the e2e test was not using the timeout used to hold the CLOSE_WAIT status, hence the test was flake depending on how fast it checked the conntrack table. This PR replaces the dependency on ssh using a pod to check the conntrack entries on the host in a loop, to make the test more robust and reduce the flakiness due to race conditions and/or ssh issues. It also fixes a bug trying to grep the conntrack entry, where the error was swallowed if a conntrack entry wasn't found.
1 parent db1990f commit 704ec25

File tree

1 file changed

+123
-47
lines changed

1 file changed

+123
-47
lines changed

test/e2e/network/kube_proxy.go

Lines changed: 123 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,22 @@ limitations under the License.
1717
package network
1818

1919
import (
20+
"encoding/hex"
2021
"encoding/json"
2122
"fmt"
2223
"math"
24+
"net"
2325
"strconv"
2426
"strings"
2527
"time"
2628

2729
v1 "k8s.io/api/core/v1"
2830
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
31+
"k8s.io/apimachinery/pkg/util/wait"
2932

3033
"k8s.io/kubernetes/test/e2e/framework"
3134
e2enode "k8s.io/kubernetes/test/e2e/framework/node"
3235
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
33-
e2essh "k8s.io/kubernetes/test/e2e/framework/ssh"
3436
"k8s.io/kubernetes/test/images/agnhost/net/nat"
3537
imageutils "k8s.io/kubernetes/test/utils/image"
3638

@@ -42,10 +44,10 @@ var kubeProxyE2eImage = imageutils.GetE2EImage(imageutils.Agnhost)
4244

4345
var _ = SIGDescribe("Network", func() {
4446
const (
45-
testDaemonHTTPPort = 11301
46-
testDaemonTCPPort = 11302
47-
timeoutSeconds = 10
48-
postFinTimeoutSeconds = 5
47+
testDaemonHTTPPort = 11301
48+
testDaemonTCPPort = 11302
49+
deadlineTimeoutSeconds = 10
50+
postFinTimeoutSeconds = 30
4951
)
5052

5153
fr := framework.NewDefaultFramework("network")
@@ -81,16 +83,63 @@ var _ = SIGDescribe("Network", func() {
8183

8284
zero := int64(0)
8385

86+
// Create a pod to check the conntrack entries on the host node
87+
// It mounts the host /proc/net folder to be able to access
88+
// the nf_conntrack file with the host conntrack entries
89+
privileged := true
90+
91+
hostExecPod := &v1.Pod{
92+
ObjectMeta: metav1.ObjectMeta{
93+
Name: "e2e-net-exec",
94+
Namespace: fr.Namespace.Name,
95+
Labels: map[string]string{"app": "e2e-net-exec"},
96+
},
97+
Spec: v1.PodSpec{
98+
HostNetwork: true,
99+
NodeName: clientNodeInfo.name,
100+
Containers: []v1.Container{
101+
{
102+
Name: "e2e-net-exec",
103+
Image: kubeProxyE2eImage,
104+
ImagePullPolicy: "Always",
105+
Args: []string{"pause"},
106+
VolumeMounts: []v1.VolumeMount{
107+
{
108+
Name: "proc-net",
109+
MountPath: "/rootfs/proc/net",
110+
ReadOnly: true,
111+
},
112+
},
113+
SecurityContext: &v1.SecurityContext{
114+
Privileged: &privileged,
115+
},
116+
},
117+
},
118+
Volumes: []v1.Volume{
119+
{
120+
Name: "proc-net",
121+
VolumeSource: v1.VolumeSource{
122+
HostPath: &v1.HostPathVolumeSource{
123+
Path: "/proc/net",
124+
},
125+
},
126+
},
127+
},
128+
TerminationGracePeriodSeconds: &zero,
129+
},
130+
}
131+
fr.PodClient().CreateSync(hostExecPod)
132+
defer fr.PodClient().DeleteSync(hostExecPod.Name, &metav1.DeleteOptions{}, framework.DefaultPodDeletionTimeout)
133+
84134
// Some distributions (Ubuntu 16.04 etc.) don't support the proc file.
85-
_, err = e2essh.IssueSSHCommandWithResult(
86-
"ls /proc/net/nf_conntrack",
87-
framework.TestContext.Provider,
88-
clientNodeInfo.node)
135+
_, err = framework.RunHostCmd(fr.Namespace.Name, "e2e-net-exec",
136+
"ls /rootfs/proc/net/nf_conntrack")
89137
if err != nil && strings.Contains(err.Error(), "No such file or directory") {
90138
framework.Skipf("The node %s does not support /proc/net/nf_conntrack", clientNodeInfo.name)
91139
}
92140
framework.ExpectNoError(err)
93141

142+
// Create the client and server pods
94143
clientPodSpec := &v1.Pod{
95144
ObjectMeta: metav1.ObjectMeta{
96145
Name: "e2e-net-client",
@@ -105,7 +154,7 @@ var _ = SIGDescribe("Network", func() {
105154
Image: kubeProxyE2eImage,
106155
ImagePullPolicy: "Always",
107156
Args: []string{
108-
"net", "--serve", fmt.Sprintf("0.0.0.0:%d", testDaemonHTTPPort),
157+
"net", "--serve", fmt.Sprintf(":%d", testDaemonHTTPPort),
109158
},
110159
},
111160
},
@@ -130,7 +179,7 @@ var _ = SIGDescribe("Network", func() {
130179
"net",
131180
"--runner", "nat-closewait-server",
132181
"--options",
133-
fmt.Sprintf(`{"LocalAddr":"0.0.0.0:%v", "PostFindTimeoutSeconds":%v}`,
182+
fmt.Sprintf(`{"LocalAddr":":%v", "PostFinTimeoutSeconds":%v}`,
134183
testDaemonTCPPort,
135184
postFinTimeoutSeconds),
136185
},
@@ -164,10 +213,10 @@ var _ = SIGDescribe("Network", func() {
164213
ginkgo.By("Make client connect")
165214

166215
options := nat.CloseWaitClientOptions{
167-
RemoteAddr: fmt.Sprintf("%v:%v",
168-
serverNodeInfo.nodeIP, testDaemonTCPPort),
169-
TimeoutSeconds: timeoutSeconds,
170-
PostFinTimeoutSeconds: 0,
216+
217+
RemoteAddr: net.JoinHostPort(serverNodeInfo.nodeIP, strconv.Itoa(testDaemonTCPPort)),
218+
TimeoutSeconds: deadlineTimeoutSeconds,
219+
PostFinTimeoutSeconds: postFinTimeoutSeconds,
171220
LeakConnection: true,
172221
}
173222

@@ -179,47 +228,55 @@ var _ = SIGDescribe("Network", func() {
179228
`'%v' 2>/dev/null`,
180229
testDaemonHTTPPort,
181230
string(jsonBytes))
182-
framework.RunHostCmdOrDie(fr.Namespace.Name, "e2e-net-client", cmd)
231+
// Run the closewait command in a subroutine so it keeps waiting during postFinTimeoutSeconds
232+
// otherwise the pod is deleted and the connection is closed loosing the conntrack entry
233+
go func() {
234+
defer ginkgo.GinkgoRecover()
235+
_, err = framework.RunHostCmd(fr.Namespace.Name, "e2e-net-client", cmd)
236+
framework.ExpectNoError(err)
237+
}()
183238

184239
<-time.After(time.Duration(1) * time.Second)
185240

186241
ginkgo.By("Checking /proc/net/nf_conntrack for the timeout")
187-
// If test flakes occur here, then this check should be performed
188-
// in a loop as there may be a race with the client connecting.
189-
e2essh.IssueSSHCommandWithResult(
190-
fmt.Sprintf("sudo cat /proc/net/nf_conntrack | grep 'dport=%v'",
191-
testDaemonTCPPort),
192-
framework.TestContext.Provider,
193-
clientNodeInfo.node)
194-
195-
// Timeout in seconds is available as the fifth column from
196-
// /proc/net/nf_conntrack.
197-
result, err := e2essh.IssueSSHCommandWithResult(
198-
fmt.Sprintf(
199-
"sudo cat /proc/net/nf_conntrack "+
200-
"| grep 'CLOSE_WAIT.*dst=%v.*dport=%v' "+
201-
"| tail -n 1"+
202-
"| awk '{print $5}' ",
203-
serverNodeInfo.nodeIP,
204-
testDaemonTCPPort),
205-
framework.TestContext.Provider,
206-
clientNodeInfo.node)
207-
framework.ExpectNoError(err)
208-
209-
timeoutSeconds, err := strconv.Atoi(strings.TrimSpace(result.Stdout))
210-
framework.ExpectNoError(err)
211-
212242
// These must be synchronized from the default values set in
213243
// pkg/apis/../defaults.go ConntrackTCPCloseWaitTimeout. The
214244
// current defaults are hidden in the initialization code.
215245
const epsilonSeconds = 60
216246
const expectedTimeoutSeconds = 60 * 60
217-
218-
framework.Logf("conntrack entry timeout was: %v, expected: %v",
219-
timeoutSeconds, expectedTimeoutSeconds)
220-
221-
gomega.Expect(math.Abs(float64(timeoutSeconds - expectedTimeoutSeconds))).Should(
222-
gomega.BeNumerically("<", (epsilonSeconds)))
247+
// the conntrack file uses the IPv6 expanded format
248+
ip := fullIPv6(net.ParseIP(serverNodeInfo.nodeIP))
249+
// Obtain the corresponding conntrack entry on the host checking
250+
// the nf_conntrack file from the pod e2e-net-exec.
251+
// It retries in a loop if the entry is not found.
252+
cmd = fmt.Sprintf("cat /rootfs/proc/net/nf_conntrack "+
253+
"| grep -m 1 'CLOSE_WAIT.*dst=%v.*dport=%v' ",
254+
ip, testDaemonTCPPort)
255+
if err := wait.PollImmediate(5*time.Second, 30*time.Second, func() (bool, error) {
256+
result, err := framework.RunHostCmd(fr.Namespace.Name, "e2e-net-exec", cmd)
257+
// retry if we can't obtain the conntrack entry
258+
if err != nil {
259+
framework.Logf("failed to obtain conntrack entry: %v %v", result, err)
260+
return false, nil
261+
}
262+
framework.Logf("conntrack entry for node %v and port %v: %v", serverNodeInfo.nodeIP, testDaemonTCPPort, result)
263+
// Timeout in seconds is available as the fifth column of
264+
// the matched entry in /proc/net/nf_conntrack.
265+
line := strings.Fields(result)
266+
if len(line) < 5 {
267+
return false, fmt.Errorf("conntrack entry does not have a timeout field: %v", line)
268+
}
269+
timeoutSeconds, err := strconv.Atoi(line[4])
270+
if err != nil {
271+
return false, fmt.Errorf("failed to convert matched timeout %s to integer: %v", line[4], err)
272+
}
273+
if math.Abs(float64(timeoutSeconds-expectedTimeoutSeconds)) < epsilonSeconds {
274+
return true, nil
275+
}
276+
return false, fmt.Errorf("wrong TCP CLOSE_WAIT timeout: %v expected: %v", timeoutSeconds, expectedTimeoutSeconds)
277+
}); err != nil {
278+
framework.Failf("no conntrack entry for port %d on node %s", testDaemonTCPPort, serverNodeInfo.nodeIP)
279+
}
223280
})
224281

225282
// Regression test for #74839, where:
@@ -336,3 +393,22 @@ var _ = SIGDescribe("Network", func() {
336393
}
337394
})
338395
})
396+
397+
// fullIPv6 returns a string with the IP representation
398+
// if IPv6 it returns the expanded address format
399+
// credit https://stackoverflow.com/a/52003106/4532704
400+
func fullIPv6(ip net.IP) string {
401+
if ip.To4() == nil {
402+
dst := make([]byte, hex.EncodedLen(len(ip)))
403+
_ = hex.Encode(dst, ip)
404+
return string(dst[0:4]) + ":" +
405+
string(dst[4:8]) + ":" +
406+
string(dst[8:12]) + ":" +
407+
string(dst[12:16]) + ":" +
408+
string(dst[16:20]) + ":" +
409+
string(dst[20:24]) + ":" +
410+
string(dst[24:28]) + ":" +
411+
string(dst[28:])
412+
}
413+
return ip.String()
414+
}

0 commit comments

Comments
 (0)