@@ -25,6 +25,7 @@ import (
25
25
"fmt"
26
26
"io"
27
27
"net"
28
+ "net/http"
28
29
"os/exec"
29
30
"regexp"
30
31
"strconv"
@@ -35,6 +36,9 @@ import (
35
36
"golang.org/x/net/websocket"
36
37
v1 "k8s.io/api/core/v1"
37
38
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
39
+ "k8s.io/apimachinery/pkg/util/intstr"
40
+ utilnet "k8s.io/apimachinery/pkg/util/net"
41
+ "k8s.io/apimachinery/pkg/util/rand"
38
42
"k8s.io/apimachinery/pkg/util/wait"
39
43
"k8s.io/kubernetes/test/e2e/framework"
40
44
e2ekubectl "k8s.io/kubernetes/test/e2e/framework/kubectl"
@@ -123,6 +127,71 @@ func pfPod(expectedClientData, chunks, chunkSize, chunkIntervalMillis string, bi
123
127
}
124
128
}
125
129
130
+ func pfNeverReadRequestBodyPod () * v1.Pod {
131
+ return & v1.Pod {
132
+ ObjectMeta : metav1.ObjectMeta {
133
+ Name : "issue-74551" ,
134
+ },
135
+ Spec : v1.PodSpec {
136
+ RestartPolicy : v1 .RestartPolicyNever ,
137
+ Containers : []v1.Container {
138
+ {
139
+ Name : "server" ,
140
+ Image : imageutils .GetE2EImage (imageutils .Agnhost ),
141
+ Args : []string {
142
+ "netexec" ,
143
+ "--http-port=80" ,
144
+ },
145
+ ReadinessProbe : & v1.Probe {
146
+ ProbeHandler : v1.ProbeHandler {
147
+ HTTPGet : & v1.HTTPGetAction {
148
+ Path : "/healthz" ,
149
+ Port : intstr.IntOrString {
150
+ IntVal : int32 (80 ),
151
+ },
152
+ Scheme : v1 .URISchemeHTTP ,
153
+ },
154
+ },
155
+ InitialDelaySeconds : 5 ,
156
+ TimeoutSeconds : 60 ,
157
+ PeriodSeconds : 1 ,
158
+ },
159
+ },
160
+ },
161
+ },
162
+ }
163
+ }
164
+
165
+ func testWebServerPod () * v1.Pod {
166
+ return & v1.Pod {
167
+ ObjectMeta : metav1.ObjectMeta {
168
+ Name : podName ,
169
+ Labels : map [string ]string {"name" : podName },
170
+ },
171
+ Spec : v1.PodSpec {
172
+ Containers : []v1.Container {
173
+ {
174
+ Name : "testwebserver" ,
175
+ Image : imageutils .GetE2EImage (imageutils .Agnhost ),
176
+ Args : []string {"test-webserver" },
177
+ Ports : []v1.ContainerPort {{ContainerPort : int32 (80 )}},
178
+ ReadinessProbe : & v1.Probe {
179
+ ProbeHandler : v1.ProbeHandler {
180
+ HTTPGet : & v1.HTTPGetAction {
181
+ Path : "/" ,
182
+ Port : intstr .FromInt32 (int32 (80 )),
183
+ },
184
+ },
185
+ InitialDelaySeconds : 5 ,
186
+ TimeoutSeconds : 3 ,
187
+ FailureThreshold : 10 ,
188
+ },
189
+ },
190
+ },
191
+ },
192
+ }
193
+ }
194
+
126
195
// WaitForTerminatedContainer waits till a given container be terminated for a given pod.
127
196
func WaitForTerminatedContainer (ctx context.Context , f * framework.Framework , pod * v1.Pod , containerName string ) error {
128
197
return e2epod .WaitForPodCondition (ctx , f .ClientSet , f .Namespace .Name , pod .Name , "container terminated" , framework .PodStartTimeout , func (pod * v1.Pod ) (bool , error ) {
@@ -493,6 +562,110 @@ var _ = SIGDescribe("Kubectl Port forwarding", func() {
493
562
doTestOverWebSockets (ctx , "localhost" , f )
494
563
})
495
564
})
565
+
566
+ ginkgo .Describe ("with a pod being removed" , func () {
567
+ ginkgo .It ("should stop port-forwarding" , func (ctx context.Context ) {
568
+ ginkgo .By ("Creating the target pod" )
569
+ pod := pfNeverReadRequestBodyPod ()
570
+ _ , err := f .ClientSet .CoreV1 ().Pods (f .Namespace .Name ).Create (ctx , pod , metav1.CreateOptions {})
571
+ framework .ExpectNoError (err , "couldn't create pod" )
572
+
573
+ err = e2epod .WaitTimeoutForPodReadyInNamespace (ctx , f .ClientSet , pod .Name , f .Namespace .Name , framework .PodStartTimeout )
574
+ framework .ExpectNoError (err , "pod did not start running" )
575
+
576
+ ginkgo .By ("Running 'kubectl port-forward'" )
577
+ cmd := runPortForward (f .Namespace .Name , pod .Name , 80 )
578
+ defer cmd .Stop ()
579
+
580
+ ginkgo .By ("Running port-forward client" )
581
+ reqChan := make (chan bool )
582
+ errorChan := make (chan error )
583
+ go func () {
584
+ defer ginkgo .GinkgoRecover ()
585
+
586
+ // try to mock a big request, which should take some time
587
+ for sentBodySize := 0 ; sentBodySize < 1024 * 1024 * 1024 ; {
588
+ size := rand .Intn (4 * 1024 * 1024 )
589
+ url := fmt .Sprintf ("http://localhost:%d/header" , cmd .port )
590
+ _ , err := post (url , strings .NewReader (strings .Repeat ("x" , size )), nil )
591
+ if err != nil {
592
+ errorChan <- err
593
+ }
594
+ ginkgo .By (fmt .Sprintf ("Sent %d chunk of data" , sentBodySize ))
595
+ if sentBodySize == 0 {
596
+ close (reqChan )
597
+ }
598
+ sentBodySize += size
599
+ }
600
+ }()
601
+
602
+ ginkgo .By ("Remove the forwarded pod after the first client request" )
603
+ <- reqChan
604
+ e2epod .DeletePodOrFail (ctx , f .ClientSet , f .Namespace .Name , pod .Name )
605
+
606
+ ginkgo .By ("Wait for client being interrupted" )
607
+ select {
608
+ case err = <- errorChan :
609
+ case <- time .After (e2epod .DefaultPodDeletionTimeout ):
610
+ }
611
+
612
+ ginkgo .By ("Check the client error" )
613
+ gomega .Expect (err ).To (gomega .HaveOccurred ())
614
+ gomega .Expect (err .Error ()).To (gomega .Or (gomega .ContainSubstring ("connection reset by peer" ), gomega .ContainSubstring ("EOF" )))
615
+
616
+ ginkgo .By ("Check kubectl port-forward exit code" )
617
+ gomega .Expect (cmd .cmd .ProcessState .ExitCode ()).To (gomega .BeNumerically ("<" , 0 ), "kubectl port-forward should finish with non-zero exit code" )
618
+ })
619
+ })
620
+
621
+ ginkgo .Describe ("Shutdown client connection while the remote stream is writing data to the port-forward connection" , func () {
622
+ ginkgo .It ("port-forward should keep working after detect broken connection" , func (ctx context.Context ) {
623
+ ginkgo .By ("Creating the target pod" )
624
+ pod := testWebServerPod ()
625
+ _ , err := f .ClientSet .CoreV1 ().Pods (f .Namespace .Name ).Create (ctx , pod , metav1.CreateOptions {})
626
+ framework .ExpectNoError (err , "couldn't create pod" )
627
+
628
+ err = e2epod .WaitTimeoutForPodReadyInNamespace (ctx , f .ClientSet , pod .Name , f .Namespace .Name , framework .PodStartTimeout )
629
+ framework .ExpectNoError (err , "pod did not start running" )
630
+
631
+ ginkgo .By ("Running 'kubectl port-forward'" )
632
+ cmd := runPortForward (f .Namespace .Name , pod .Name , 80 )
633
+ defer cmd .Stop ()
634
+
635
+ ginkgo .By ("Send a http request to verify port-forward working" )
636
+ client := http.Client {
637
+ Timeout : 10 * time .Second ,
638
+ }
639
+ resp , err := client .Get (fmt .Sprintf ("http://127.0.0.1:%d/" , cmd .port ))
640
+ framework .ExpectNoError (err , "couldn't get http response from port-forward" )
641
+ gomega .Expect (resp .StatusCode ).To (gomega .Equal (http .StatusOK ), "unexpected status code" )
642
+
643
+ ginkgo .By ("Dialing the local port" )
644
+ conn , err := net .Dial ("tcp" , fmt .Sprintf ("127.0.0.1:%d" , cmd .port ))
645
+ framework .ExpectNoError (err , "couldn't connect to port %d" , cmd .port )
646
+
647
+ // use raw tcp connection to emulate client close connection without reading response
648
+ ginkgo .By ("Request agnhost binary file (40MB+)" )
649
+ requestLines := []string {"GET /agnhost HTTP/1.1" , "Host: localhost" , "" }
650
+ for _ , line := range requestLines {
651
+ _ , err := conn .Write (append ([]byte (line ), []byte ("\r \n " )... ))
652
+ framework .ExpectNoError (err , "couldn't write http request to local connection" )
653
+ }
654
+
655
+ ginkgo .By ("Read only one byte from the connection" )
656
+ _ , err = conn .Read (make ([]byte , 1 ))
657
+ framework .ExpectNoError (err , "couldn't read from the local connection" )
658
+
659
+ ginkgo .By ("Close client connection without reading remain data" )
660
+ err = conn .Close ()
661
+ framework .ExpectNoError (err , "couldn't close local connection" )
662
+
663
+ ginkgo .By ("Send another http request through port-forward again" )
664
+ resp , err = client .Get (fmt .Sprintf ("http://127.0.0.1:%d/" , cmd .port ))
665
+ framework .ExpectNoError (err , "couldn't get http response from port-forward" )
666
+ gomega .Expect (resp .StatusCode ).To (gomega .Equal (http .StatusOK ), "unexpected status code" )
667
+ })
668
+ })
496
669
})
497
670
498
671
func wsRead (conn * websocket.Conn ) (byte , []byte , error ) {
@@ -521,3 +694,24 @@ func wsWrite(conn *websocket.Conn, channel byte, data []byte) error {
521
694
err := websocket .Message .Send (conn , frame )
522
695
return err
523
696
}
697
+
698
+ func post (url string , reader io.Reader , transport * http.Transport ) (string , error ) {
699
+ if transport == nil {
700
+ transport = utilnet .SetTransportDefaults (& http.Transport {})
701
+ }
702
+ client := & http.Client {Transport : transport }
703
+ req , err := http .NewRequest (http .MethodPost , url , reader )
704
+ if err != nil {
705
+ return "" , err
706
+ }
707
+ resp , err := client .Do (req )
708
+ if err != nil {
709
+ return "" , err
710
+ }
711
+ defer resp .Body .Close () //nolint: errcheck
712
+ body , err := io .ReadAll (resp .Body )
713
+ if err != nil {
714
+ return "" , err
715
+ }
716
+ return string (body ), nil
717
+ }
0 commit comments