Skip to content

Commit 8dbca91

Browse files
authored
Merge pull request kubernetes#91962 from aojea/udp1
Deflake Services should be able to preserve UDP traffic when server pod cycles for a NodePort service
2 parents 51aac92 + 27d3266 commit 8dbca91

File tree

4 files changed

+249
-180
lines changed

4 files changed

+249
-180
lines changed

test/e2e/network/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ load(
88
go_library(
99
name = "go_default_library",
1010
srcs = [
11+
"conntrack.go",
1112
"dns.go",
1213
"dns_common.go",
1314
"dns_configmap.go",

test/e2e/network/conntrack.go

Lines changed: 246 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,246 @@
1+
/*
2+
Copyright 2020 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package network
18+
19+
import (
20+
"fmt"
21+
"strings"
22+
"time"
23+
24+
"github.com/onsi/ginkgo"
25+
v1 "k8s.io/api/core/v1"
26+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27+
"k8s.io/apimachinery/pkg/util/intstr"
28+
clientset "k8s.io/client-go/kubernetes"
29+
"k8s.io/kubernetes/test/e2e/framework"
30+
e2enode "k8s.io/kubernetes/test/e2e/framework/node"
31+
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
32+
e2eservice "k8s.io/kubernetes/test/e2e/framework/service"
33+
e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
34+
)
35+
36+
const (
37+
serviceName = "svc-udp"
38+
podClient = "pod-client"
39+
podBackend1 = "pod-server-1"
40+
podBackend2 = "pod-server-2"
41+
srcPort = 12345
42+
)
43+
44+
// Linux NAT uses conntrack to perform NAT, everytime a new
45+
// flow is seen, a connection is created in the conntrack table, and it
46+
// is being used by the NAT module.
47+
// Each entry in the conntrack table has associated a timeout, that removes
48+
// the connection once it expires.
49+
// UDP is a connectionless protocol, so the conntrack module tracking functions
50+
// are not very advanced.
51+
// It uses a short timeout (30 sec by default) that is renewed if there are new flows
52+
// matching the connection. Otherwise it expires the entry.
53+
// This behaviour can cause issues in Kubernetes when one entry on the conntrack table
54+
// is never expired because the sender does not stop sending traffic, but the pods or
55+
// endpoints were deleted, blackholing the traffic
56+
// In order to mitigate this problem, Kubernetes delete the stale entries:
57+
// - when an endpoint is removed
58+
// - when a service goes from no endpoints to new endpoint
59+
60+
// Ref: https://api.semanticscholar.org/CorpusID:198903401
61+
// Boye, Magnus. “Netfilter Connection Tracking and NAT Implementation.” (2012).
62+
63+
var _ = SIGDescribe("Conntrack", func() {
64+
65+
fr := framework.NewDefaultFramework("conntrack")
66+
67+
type nodeInfo struct {
68+
name string
69+
nodeIP string
70+
}
71+
72+
var (
73+
cs clientset.Interface
74+
ns string
75+
clientNodeInfo, serverNodeInfo nodeInfo
76+
)
77+
78+
ginkgo.BeforeEach(func() {
79+
cs = fr.ClientSet
80+
ns = fr.Namespace.Name
81+
82+
nodes, err := e2enode.GetBoundedReadySchedulableNodes(cs, 2)
83+
framework.ExpectNoError(err)
84+
if len(nodes.Items) < 2 {
85+
e2eskipper.Skipf(
86+
"Test requires >= 2 Ready nodes, but there are only %v nodes",
87+
len(nodes.Items))
88+
}
89+
90+
ips := e2enode.CollectAddresses(nodes, v1.NodeInternalIP)
91+
92+
clientNodeInfo = nodeInfo{
93+
name: nodes.Items[0].Name,
94+
nodeIP: ips[0],
95+
}
96+
97+
serverNodeInfo = nodeInfo{
98+
name: nodes.Items[1].Name,
99+
nodeIP: ips[1],
100+
}
101+
})
102+
103+
ginkgo.It("should be able to preserve UDP traffic when server pod cycles for a NodePort service", func() {
104+
105+
// Create a NodePort service
106+
udpJig := e2eservice.NewTestJig(cs, ns, serviceName)
107+
ginkgo.By("creating a UDP service " + serviceName + " with type=NodePort in " + ns)
108+
udpService, err := udpJig.CreateUDPService(func(svc *v1.Service) {
109+
svc.Spec.Type = v1.ServiceTypeNodePort
110+
svc.Spec.Ports = []v1.ServicePort{
111+
{Port: 80, Name: "udp", Protocol: v1.ProtocolUDP, TargetPort: intstr.FromInt(80)},
112+
}
113+
})
114+
framework.ExpectNoError(err)
115+
116+
// Create a pod in one node to create the UDP traffic against the NodePort service every 5 seconds
117+
ginkgo.By("creating a client pod for probing the service " + serviceName)
118+
clientPod := newAgnhostPod(podClient, "")
119+
clientPod.Spec.NodeName = clientNodeInfo.name
120+
cmd := fmt.Sprintf(`date; for i in $(seq 1 300); do echo "$(date) Try: ${i}"; echo hostname | nc -u -w 5 -p %d %s %d; echo; done`, srcPort, serverNodeInfo.nodeIP, udpService.Spec.Ports[0].NodePort)
121+
clientPod.Spec.Containers[0].Command = []string{"/bin/sh", "-c", cmd}
122+
clientPod.Spec.Containers[0].Name = podClient
123+
fr.PodClient().CreateSync(clientPod)
124+
125+
// Read the client pod logs
126+
logs, err := e2epod.GetPodLogs(cs, ns, podClient, podClient)
127+
framework.ExpectNoError(err)
128+
framework.Logf("Pod client logs: %s", logs)
129+
130+
// Add a backend pod to the service in the other node
131+
ginkgo.By("creating a backend pod " + podBackend1 + " for the service " + serviceName)
132+
serverPod1 := newAgnhostPod(podBackend1, "netexec", fmt.Sprintf("--udp-port=%d", 80))
133+
serverPod1.Labels = udpJig.Labels
134+
serverPod1.Spec.NodeName = serverNodeInfo.name
135+
fr.PodClient().CreateSync(serverPod1)
136+
137+
// Waiting for service to expose endpoint.
138+
err = validateEndpointsPorts(cs, ns, serviceName, portsByPodName{podBackend1: {80}})
139+
framework.ExpectNoError(err, "failed to validate endpoints for service %s in namespace: %s", serviceName, ns)
140+
141+
// Check that the pod receives the traffic
142+
// UDP conntrack entries timeout is 30 sec by default
143+
ginkgo.By("checking client pod connected to the backend 1 on Node IP " + serverNodeInfo.nodeIP)
144+
time.Sleep(30 * time.Second)
145+
logs, err = e2epod.GetPodLogs(cs, ns, podClient, podClient)
146+
framework.ExpectNoError(err)
147+
framework.Logf("Pod client logs: %s", logs)
148+
if !strings.Contains(string(logs), podBackend1) {
149+
framework.Failf("Failed to connecto to backend 1")
150+
}
151+
152+
// Create a second pod
153+
ginkgo.By("creating a second backend pod " + podBackend2 + " for the service " + serviceName)
154+
serverPod2 := newAgnhostPod(podBackend2, "netexec", fmt.Sprintf("--udp-port=%d", 80))
155+
serverPod2.Labels = udpJig.Labels
156+
serverPod2.Spec.NodeName = serverNodeInfo.name
157+
fr.PodClient().CreateSync(serverPod2)
158+
159+
// and delete the first pod
160+
framework.Logf("Cleaning up %s pod", podBackend1)
161+
fr.PodClient().DeleteSync(podBackend1, metav1.DeleteOptions{}, framework.DefaultPodDeletionTimeout)
162+
163+
// Check that the second pod keeps receiving traffic
164+
// UDP conntrack entries timeout is 30 sec by default
165+
ginkgo.By("checking client pod connected to the backend 2 on Node IP " + serverNodeInfo.nodeIP)
166+
time.Sleep(30 * time.Second)
167+
logs, err = e2epod.GetPodLogs(cs, ns, podClient, podClient)
168+
framework.ExpectNoError(err)
169+
framework.Logf("Pod client logs: %s", logs)
170+
if !strings.Contains(string(logs), podBackend2) {
171+
framework.Failf("Failed to connecto to backend 2")
172+
}
173+
})
174+
175+
ginkgo.It("should be able to preserve UDP traffic when server pod cycles for a ClusterIP service", func() {
176+
177+
// Create a NodePort service
178+
udpJig := e2eservice.NewTestJig(cs, ns, serviceName)
179+
ginkgo.By("creating a UDP service " + serviceName + " with type=ClusterIP in " + ns)
180+
udpService, err := udpJig.CreateUDPService(func(svc *v1.Service) {
181+
svc.Spec.Type = v1.ServiceTypeClusterIP
182+
svc.Spec.Ports = []v1.ServicePort{
183+
{Port: 80, Name: "udp", Protocol: v1.ProtocolUDP, TargetPort: intstr.FromInt(80)},
184+
}
185+
})
186+
framework.ExpectNoError(err)
187+
188+
// Create a pod in one node to create the UDP traffic against the NodePort service every 5 seconds
189+
ginkgo.By("creating a client pod for probing the service " + serviceName)
190+
clientPod := newAgnhostPod(podClient, "")
191+
clientPod.Spec.NodeName = clientNodeInfo.name
192+
cmd := fmt.Sprintf(`date; for i in $(seq 1 300); do echo "$(date) Try: ${i}"; echo hostname | nc -u -w 5 -p %d %s %d; echo; done`, srcPort, udpService.Spec.ClusterIP, udpService.Spec.Ports[0].Port)
193+
clientPod.Spec.Containers[0].Command = []string{"/bin/sh", "-c", cmd}
194+
clientPod.Spec.Containers[0].Name = podClient
195+
fr.PodClient().CreateSync(clientPod)
196+
197+
// Read the client pod logs
198+
logs, err := e2epod.GetPodLogs(cs, ns, podClient, podClient)
199+
framework.ExpectNoError(err)
200+
framework.Logf("Pod client logs: %s", logs)
201+
202+
// Add a backend pod to the service in the other node
203+
ginkgo.By("creating a backend pod " + podBackend1 + " for the service " + serviceName)
204+
serverPod1 := newAgnhostPod(podBackend1, "netexec", fmt.Sprintf("--udp-port=%d", 80))
205+
serverPod1.Labels = udpJig.Labels
206+
serverPod1.Spec.NodeName = serverNodeInfo.name
207+
fr.PodClient().CreateSync(serverPod1)
208+
209+
// Waiting for service to expose endpoint.
210+
err = validateEndpointsPorts(cs, ns, serviceName, portsByPodName{podBackend1: {80}})
211+
framework.ExpectNoError(err, "failed to validate endpoints for service %s in namespace: %s", serviceName, ns)
212+
213+
// Check that the pod receives the traffic
214+
// UDP conntrack entries timeout is 30 sec by default
215+
ginkgo.By("checking client pod connected to the backend 1 on Node IP " + serverNodeInfo.nodeIP)
216+
time.Sleep(30 * time.Second)
217+
logs, err = e2epod.GetPodLogs(cs, ns, podClient, podClient)
218+
framework.ExpectNoError(err)
219+
framework.Logf("Pod client logs: %s", logs)
220+
if !strings.Contains(string(logs), podBackend1) {
221+
framework.Failf("Failed to connecto to backend 1")
222+
}
223+
224+
// Create a second pod
225+
ginkgo.By("creating a second backend pod " + podBackend2 + " for the service " + serviceName)
226+
serverPod2 := newAgnhostPod(podBackend2, "netexec", fmt.Sprintf("--udp-port=%d", 80))
227+
serverPod2.Labels = udpJig.Labels
228+
serverPod2.Spec.NodeName = serverNodeInfo.name
229+
fr.PodClient().CreateSync(serverPod2)
230+
231+
// and delete the first pod
232+
framework.Logf("Cleaning up %s pod", podBackend1)
233+
fr.PodClient().DeleteSync(podBackend1, metav1.DeleteOptions{}, framework.DefaultPodDeletionTimeout)
234+
235+
// Check that the second pod keeps receiving traffic
236+
// UDP conntrack entries timeout is 30 sec by default
237+
ginkgo.By("checking client pod connected to the backend 2 on Node IP " + serverNodeInfo.nodeIP)
238+
time.Sleep(30 * time.Second)
239+
logs, err = e2epod.GetPodLogs(cs, ns, podClient, podClient)
240+
framework.ExpectNoError(err)
241+
framework.Logf("Pod client logs: %s", logs)
242+
if !strings.Contains(string(logs), podBackend2) {
243+
framework.Failf("Failed to connecto to backend 2")
244+
}
245+
})
246+
})

0 commit comments

Comments
 (0)