@@ -25,6 +25,7 @@ import (
25
25
v1 "k8s.io/api/core/v1"
26
26
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27
27
"k8s.io/apimachinery/pkg/util/intstr"
28
+ "k8s.io/apimachinery/pkg/util/wait"
28
29
clientset "k8s.io/client-go/kubernetes"
29
30
"k8s.io/kubernetes/test/e2e/framework"
30
31
e2enode "k8s.io/kubernetes/test/e2e/framework/node"
@@ -75,6 +76,20 @@ var _ = SIGDescribe("Conntrack", func() {
75
76
clientNodeInfo , serverNodeInfo nodeInfo
76
77
)
77
78
79
+ logContainsFn := func (text string ) wait.ConditionFunc {
80
+ return func () (bool , error ) {
81
+ logs , err := e2epod .GetPodLogs (cs , ns , podClient , podClient )
82
+ if err != nil {
83
+ // Retry the error next time.
84
+ return false , nil
85
+ }
86
+ if ! strings .Contains (string (logs ), text ) {
87
+ return false , nil
88
+ }
89
+ return true , nil
90
+ }
91
+ }
92
+
78
93
ginkgo .BeforeEach (func () {
79
94
cs = fr .ClientSet
80
95
ns = fr .Namespace .Name
@@ -117,7 +132,7 @@ var _ = SIGDescribe("Conntrack", func() {
117
132
ginkgo .By ("creating a client pod for probing the service " + serviceName )
118
133
clientPod := newAgnhostPod (podClient , "" )
119
134
clientPod .Spec .NodeName = clientNodeInfo .name
120
- cmd := fmt .Sprintf (`date; for i in $(seq 1 300 ); do echo "$(date) Try: ${i}"; echo hostname | nc -u -w 5 -p %d %s %d; echo; done` , srcPort , serverNodeInfo .nodeIP , udpService .Spec .Ports [0 ].NodePort )
135
+ cmd := fmt .Sprintf (`date; for i in $(seq 1 3000 ); do echo "$(date) Try: ${i}"; echo hostname | nc -u -w 5 -p %d %s %d; echo; done` , srcPort , serverNodeInfo .nodeIP , udpService .Spec .Ports [0 ].NodePort )
121
136
clientPod .Spec .Containers [0 ].Command = []string {"/bin/sh" , "-c" , cmd }
122
137
clientPod .Spec .Containers [0 ].Name = podClient
123
138
fr .PodClient ().CreateSync (clientPod )
@@ -138,15 +153,17 @@ var _ = SIGDescribe("Conntrack", func() {
138
153
err = validateEndpointsPorts (cs , ns , serviceName , portsByPodName {podBackend1 : {80 }})
139
154
framework .ExpectNoError (err , "failed to validate endpoints for service %s in namespace: %s" , serviceName , ns )
140
155
141
- // Check that the pod receives the traffic
142
- // UDP conntrack entries timeout is 30 sec by default
156
+ // Note that the fact that Endpoints object already exists, does NOT mean
157
+ // that iptables (or whatever else is used) was already programmed.
158
+ // Additionally take into account that UDP conntract entries timeout is
159
+ // 30 seconds by default.
160
+ // Based on the above check if the pod receives the traffic.
143
161
ginkgo .By ("checking client pod connected to the backend 1 on Node IP " + serverNodeInfo .nodeIP )
144
- time .Sleep (30 * time .Second )
145
- logs , err = e2epod .GetPodLogs (cs , ns , podClient , podClient )
146
- framework .ExpectNoError (err )
147
- framework .Logf ("Pod client logs: %s" , logs )
148
- if ! strings .Contains (string (logs ), podBackend1 ) {
149
- framework .Failf ("Failed to connecto to backend 1" )
162
+ if err := wait .PollImmediate (5 * time .Second , time .Minute , logContainsFn (podBackend1 )); err != nil {
163
+ logs , err = e2epod .GetPodLogs (cs , ns , podClient , podClient )
164
+ framework .ExpectNoError (err )
165
+ framework .Logf ("Pod client logs: %s" , logs )
166
+ framework .Failf ("Failed to connect to backend 1" )
150
167
}
151
168
152
169
// Create a second pod
@@ -160,21 +177,24 @@ var _ = SIGDescribe("Conntrack", func() {
160
177
framework .Logf ("Cleaning up %s pod" , podBackend1 )
161
178
fr .PodClient ().DeleteSync (podBackend1 , metav1.DeleteOptions {}, framework .DefaultPodDeletionTimeout )
162
179
180
+ // Waiting for service to expose endpoint.
181
+ err = validateEndpointsPorts (cs , ns , serviceName , portsByPodName {podBackend2 : {80 }})
182
+ framework .ExpectNoError (err , "failed to validate endpoints for service %s in namespace: %s" , serviceName , ns )
183
+
163
184
// Check that the second pod keeps receiving traffic
164
185
// UDP conntrack entries timeout is 30 sec by default
165
186
ginkgo .By ("checking client pod connected to the backend 2 on Node IP " + serverNodeInfo .nodeIP )
166
- time .Sleep (30 * time .Second )
167
- logs , err = e2epod .GetPodLogs (cs , ns , podClient , podClient )
168
- framework .ExpectNoError (err )
169
- framework .Logf ("Pod client logs: %s" , logs )
170
- if ! strings .Contains (string (logs ), podBackend2 ) {
171
- framework .Failf ("Failed to connecto to backend 2" )
187
+ if err := wait .PollImmediate (5 * time .Second , time .Minute , logContainsFn (podBackend2 )); err != nil {
188
+ logs , err = e2epod .GetPodLogs (cs , ns , podClient , podClient )
189
+ framework .ExpectNoError (err )
190
+ framework .Logf ("Pod client logs: %s" , logs )
191
+ framework .Failf ("Failed to connect to backend 2" )
172
192
}
173
193
})
174
194
175
195
ginkgo .It ("should be able to preserve UDP traffic when server pod cycles for a ClusterIP service" , func () {
176
196
177
- // Create a NodePort service
197
+ // Create a ClusterIP service
178
198
udpJig := e2eservice .NewTestJig (cs , ns , serviceName )
179
199
ginkgo .By ("creating a UDP service " + serviceName + " with type=ClusterIP in " + ns )
180
200
udpService , err := udpJig .CreateUDPService (func (svc * v1.Service ) {
@@ -185,11 +205,11 @@ var _ = SIGDescribe("Conntrack", func() {
185
205
})
186
206
framework .ExpectNoError (err )
187
207
188
- // Create a pod in one node to create the UDP traffic against the NodePort service every 5 seconds
208
+ // Create a pod in one node to create the UDP traffic against the ClusterIP service every 5 seconds
189
209
ginkgo .By ("creating a client pod for probing the service " + serviceName )
190
210
clientPod := newAgnhostPod (podClient , "" )
191
211
clientPod .Spec .NodeName = clientNodeInfo .name
192
- cmd := fmt .Sprintf (`date; for i in $(seq 1 300 ); do echo "$(date) Try: ${i}"; echo hostname | nc -u -w 5 -p %d %s %d; echo; done` , srcPort , udpService .Spec .ClusterIP , udpService .Spec .Ports [0 ].Port )
212
+ cmd := fmt .Sprintf (`date; for i in $(seq 1 3000 ); do echo "$(date) Try: ${i}"; echo hostname | nc -u -w 5 -p %d %s %d; echo; done` , srcPort , udpService .Spec .ClusterIP , udpService .Spec .Ports [0 ].Port )
193
213
clientPod .Spec .Containers [0 ].Command = []string {"/bin/sh" , "-c" , cmd }
194
214
clientPod .Spec .Containers [0 ].Name = podClient
195
215
fr .PodClient ().CreateSync (clientPod )
@@ -210,15 +230,17 @@ var _ = SIGDescribe("Conntrack", func() {
210
230
err = validateEndpointsPorts (cs , ns , serviceName , portsByPodName {podBackend1 : {80 }})
211
231
framework .ExpectNoError (err , "failed to validate endpoints for service %s in namespace: %s" , serviceName , ns )
212
232
213
- // Check that the pod receives the traffic
214
- // UDP conntrack entries timeout is 30 sec by default
233
+ // Note that the fact that Endpoints object already exists, does NOT mean
234
+ // that iptables (or whatever else is used) was already programmed.
235
+ // Additionally take into account that UDP conntract entries timeout is
236
+ // 30 seconds by default.
237
+ // Based on the above check if the pod receives the traffic.
215
238
ginkgo .By ("checking client pod connected to the backend 1 on Node IP " + serverNodeInfo .nodeIP )
216
- time .Sleep (30 * time .Second )
217
- logs , err = e2epod .GetPodLogs (cs , ns , podClient , podClient )
218
- framework .ExpectNoError (err )
219
- framework .Logf ("Pod client logs: %s" , logs )
220
- if ! strings .Contains (string (logs ), podBackend1 ) {
221
- framework .Failf ("Failed to connecto to backend 1" )
239
+ if err := wait .PollImmediate (5 * time .Second , time .Minute , logContainsFn (podBackend1 )); err != nil {
240
+ logs , err = e2epod .GetPodLogs (cs , ns , podClient , podClient )
241
+ framework .ExpectNoError (err )
242
+ framework .Logf ("Pod client logs: %s" , logs )
243
+ framework .Failf ("Failed to connect to backend 1" )
222
244
}
223
245
224
246
// Create a second pod
@@ -232,15 +254,18 @@ var _ = SIGDescribe("Conntrack", func() {
232
254
framework .Logf ("Cleaning up %s pod" , podBackend1 )
233
255
fr .PodClient ().DeleteSync (podBackend1 , metav1.DeleteOptions {}, framework .DefaultPodDeletionTimeout )
234
256
257
+ // Waiting for service to expose endpoint.
258
+ err = validateEndpointsPorts (cs , ns , serviceName , portsByPodName {podBackend2 : {80 }})
259
+ framework .ExpectNoError (err , "failed to validate endpoints for service %s in namespace: %s" , serviceName , ns )
260
+
235
261
// Check that the second pod keeps receiving traffic
236
262
// UDP conntrack entries timeout is 30 sec by default
237
263
ginkgo .By ("checking client pod connected to the backend 2 on Node IP " + serverNodeInfo .nodeIP )
238
- time .Sleep (30 * time .Second )
239
- logs , err = e2epod .GetPodLogs (cs , ns , podClient , podClient )
240
- framework .ExpectNoError (err )
241
- framework .Logf ("Pod client logs: %s" , logs )
242
- if ! strings .Contains (string (logs ), podBackend2 ) {
243
- framework .Failf ("Failed to connecto to backend 2" )
264
+ if err := wait .PollImmediate (5 * time .Second , time .Minute , logContainsFn (podBackend2 )); err != nil {
265
+ logs , err = e2epod .GetPodLogs (cs , ns , podClient , podClient )
266
+ framework .ExpectNoError (err )
267
+ framework .Logf ("Pod client logs: %s" , logs )
268
+ framework .Failf ("Failed to connect to backend 2" )
244
269
}
245
270
})
246
271
})
0 commit comments