@@ -14,13 +14,12 @@ import (
14
14
corev1 "k8s.io/api/core/v1"
15
15
rbacv1 "k8s.io/api/rbac/v1"
16
16
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
17
- "k8s.io/cli-runtime/pkg/genericclioptions"
18
17
"k8s.io/client-go/kubernetes"
18
+ "k8s.io/client-go/kubernetes/scheme"
19
19
"k8s.io/client-go/rest"
20
20
"k8s.io/client-go/tools/portforward"
21
+ "k8s.io/client-go/tools/remotecommand"
21
22
"k8s.io/client-go/transport/spdy"
22
- "k8s.io/kubernetes/pkg/kubectl/cmd/exec"
23
- cmdportforward "k8s.io/kubernetes/pkg/kubectl/cmd/portforward"
24
23
25
24
"github.com/jetstack/kube-oidc-proxy/pkg/utils"
26
25
)
@@ -119,97 +118,108 @@ func Test_Upgrade(t *testing.T) {
119
118
t .Fatal (err )
120
119
}
121
120
121
+ // wait for our echo server to become ready
122
122
err = utils .WaitForPodReady (e2eSuite .kubeclient ,
123
123
"echoserver" , namespaceUpgradeTest )
124
124
if err != nil {
125
125
t .Fatal (err )
126
126
}
127
127
128
- var execOut bytes.Buffer
129
- var execErr bytes.Buffer
128
+ RESTClient , err := rest .RESTClientFor (restConfig )
129
+ if err != nil {
130
+ t .Fatal (err )
131
+ }
130
132
131
133
// curl echo server from within pod
132
- ioStreams := genericclioptions.IOStreams {In : nil , Out : & execOut , ErrOut : & execErr }
133
- execOptions := & exec.ExecOptions {
134
- StreamOptions : exec.StreamOptions {
135
- IOStreams : ioStreams ,
136
- PodName : pod .Name ,
137
- Namespace : pod .Namespace ,
138
- },
139
- Command : []string {
140
- "curl" , "127.0.0.1:8080" , "-s" , "-d" , "hello world" ,
141
- },
142
- PodClient : kubeclient .CoreV1 (),
143
- Config : restConfig ,
144
- Executor : & exec.DefaultRemoteExecutor {},
134
+ req := RESTClient .Post ().
135
+ Resource ("pods" ).
136
+ Name (pod .Name ).
137
+ Namespace (pod .Namespace ).
138
+ SubResource ("exec" ).
139
+ VersionedParams (& corev1.PodExecOptions {
140
+ Container : "echoserver" ,
141
+ Command : []string {
142
+ "curl" , "127.0.0.1:8080" , "-s" , "-d" , "hello world" ,
143
+ },
144
+ Stdin : false ,
145
+ Stdout : true ,
146
+ Stderr : true ,
147
+ TTY : false ,
148
+ }, scheme .ParameterCodec )
149
+
150
+ exec , err := remotecommand .NewSPDYExecutor (restConfig , "POST" , req .URL ())
151
+ if err != nil {
152
+ t .Fatalf ("failed to create SPDY executor: %s" , err )
145
153
}
154
+ execOut := & bytes.Buffer {}
155
+ execErr := & bytes.Buffer {}
146
156
147
- if err := execOptions .Validate (); err != nil {
148
- t .Fatal (err )
157
+ sopt := remotecommand.StreamOptions {
158
+ Stdout : execOut ,
159
+ Stderr : execErr ,
160
+ Tty : false ,
149
161
}
150
162
151
- if err := execOptions .Run (); err != nil {
152
- t .Fatal (err )
163
+ err = exec .Stream (sopt )
164
+ if err != nil {
165
+ t .Fatalf ("failed to execute stream command: %s" , err )
153
166
}
154
167
155
168
// should have no stderr output
156
169
if execErr .String () != "" {
157
170
t .Errorf ("got curl error: %s" , execErr .String ())
158
171
}
159
172
173
+ t .Logf ("%s/%s: %s" , pod .Namespace , pod .Name , execOut .String ())
174
+
160
175
// should have correct stdout output from echo server
161
176
if ! strings .HasSuffix (execOut .String (), "BODY:\n hello world" ) {
162
177
t .Errorf ("got unexpected echoserver response: exp=...hello world got=%s" ,
163
178
execOut .String ())
164
179
}
165
180
166
181
// test we can port forward to the echo server and curl on localhost to it
167
-
168
- var portOut bytes.Buffer
169
- var portErr bytes.Buffer
182
+ portOut := & bytes.Buffer {}
183
+ portErr := & bytes.Buffer {}
170
184
171
185
freePort , err := utils .FreePort ()
172
186
if err != nil {
173
187
t .Fatal (err )
174
188
}
175
189
176
- RESTClient , err := rest .RESTClientFor (restConfig )
177
- if err != nil {
178
- t .Fatal (err )
179
- }
180
-
181
- ioStreams = genericclioptions.IOStreams {In : nil , Out : & portOut , ErrOut : & portErr }
182
-
183
- portForwardOptions := & cmdportforward.PortForwardOptions {
184
- Namespace : pod .Namespace ,
185
- PodName : pod .Name ,
186
- RESTClient : RESTClient ,
187
- Config : restConfig ,
188
- PodClient : kubeclient .CoreV1 (),
189
- Address : []string {"127.0.0.1" },
190
- Ports : []string {freePort + ":8080" },
191
- PortForwarder : & defaultPortForwarder {
192
- IOStreams : ioStreams ,
193
- },
194
- StopChannel : make (chan struct {}, 1 ),
195
- ReadyChannel : make (chan struct {}),
190
+ req = RESTClient .Post ().
191
+ Resource ("pods" ).
192
+ Namespace (pod .Namespace ).
193
+ Name (pod .Name ).
194
+ SubResource ("portforward" )
195
+
196
+ pfopts := & portForwardOptions {
197
+ address : []string {"127.0.0.1" },
198
+ ports : []string {freePort + ":8080" },
199
+ stopCh : make (chan struct {}, 1 ),
200
+ readyCh : make (chan struct {}),
201
+ outBuf : portOut ,
202
+ errBuf : portErr ,
203
+ restConfig : restConfig ,
196
204
}
197
205
198
206
go func () {
199
- defer close (portForwardOptions . StopChannel )
207
+ defer close (pfopts . stopCh )
200
208
201
209
// give a chance to establish a connection
202
210
time .Sleep (time .Second * 2 )
203
211
204
212
portInR := bytes .NewReader ([]byte ("hello world" ))
205
213
214
+ // send message through port forward
206
215
resp , err := http .Post (
207
216
fmt .Sprintf ("http://127.0.0.1:%s" , freePort ), "" , portInR )
208
217
if err != nil {
209
218
t .Errorf ("failed to request echoserver from port forward: %s" , err )
210
219
return
211
220
}
212
221
222
+ // expect 200 resp and correct body
213
223
if resp .StatusCode != 200 {
214
224
t .Errorf ("got unexpected response code from server, exp=200 got=%d" ,
215
225
resp .StatusCode )
@@ -228,31 +238,31 @@ func Test_Upgrade(t *testing.T) {
228
238
}
229
239
}()
230
240
231
- if err := portForwardOptions .Validate (); err != nil {
232
- t .Fatal (err )
241
+ if err := forwardPorts ("POST" , req .URL (), pfopts ); err != nil {
242
+ t .Error (err )
243
+ return
233
244
}
234
245
235
- if err := portForwardOptions .RunPortForward (); err != nil {
236
- t .Fatal (err )
237
- }
238
246
}
239
247
240
- // taken from k8s.io/kubernetes/pkg/kubectl/cmd/portforward
241
- type defaultPortForwarder struct {
242
- genericclioptions.IOStreams
248
+ type portForwardOptions struct {
249
+ address , ports []string
250
+ readyCh , stopCh chan struct {}
251
+ outBuf , errBuf * bytes.Buffer
252
+ restConfig * rest.Config
243
253
}
244
254
245
- func ( f * defaultPortForwarder ) ForwardPorts ( method string , url * url.URL , opts cmdportforward. PortForwardOptions ) error {
246
- transport , upgrader , err := spdy .RoundTripperFor (opts .Config )
255
+ func forwardPorts ( method string , url * url.URL , opts * portForwardOptions ) error {
256
+ transport , upgrader , err := spdy .RoundTripperFor (opts .restConfig )
247
257
if err != nil {
248
258
return err
249
259
}
250
260
dialer := spdy .NewDialer (upgrader , & http.Client {Transport : transport }, method , url )
251
- fw , err := portforward .NewOnAddresses (dialer , opts .Address , opts .Ports , opts .StopChannel , opts .ReadyChannel , f .Out , f .ErrOut )
261
+ fw , err := portforward .NewOnAddresses (dialer , opts .address ,
262
+ opts .ports , opts .stopCh , opts .readyCh , opts .outBuf , opts .errBuf )
252
263
if err != nil {
253
264
return err
254
265
}
266
+
255
267
return fw .ForwardPorts ()
256
268
}
257
-
258
- //
0 commit comments