1
1
/*
2
- Copyright 2017 The Kubernetes Authors.
2
+ Copyright 2017, 2018 The Kubernetes Authors.
3
3
Licensed under the Apache License, Version 2.0 (the "License");
4
4
you may not use this file except in compliance with the License.
5
5
You may obtain a copy of the License at
12
12
*/
13
13
package io .kubernetes .client ;
14
14
15
+ import com .google .common .io .CharStreams ;
16
+ import com .google .gson .reflect .TypeToken ;
15
17
import io .kubernetes .client .models .V1Pod ;
18
+ import io .kubernetes .client .models .V1Status ;
19
+ import io .kubernetes .client .models .V1StatusCause ;
20
+ import io .kubernetes .client .models .V1StatusDetails ;
16
21
import io .kubernetes .client .util .WebSocketStreamHandler ;
17
22
import io .kubernetes .client .util .WebSockets ;
18
23
import java .io .IOException ;
19
24
import java .io .InputStream ;
25
+ import java .io .InputStreamReader ;
20
26
import java .io .OutputStream ;
27
+ import java .io .Reader ;
28
+ import java .lang .reflect .Type ;
29
+ import java .util .HashMap ;
30
+ import java .util .List ;
31
+ import java .util .Map ;
32
+ import java .util .concurrent .TimeUnit ;
21
33
import org .apache .commons .lang3 .StringUtils ;
34
+ import org .slf4j .Logger ;
35
+ import org .slf4j .LoggerFactory ;
22
36
23
37
public class Exec {
38
+ private static final Logger log = LoggerFactory .getLogger (Exec .class );
39
+
24
40
private ApiClient apiClient ;
25
41
26
42
/** Simple Exec API constructor, uses default configuration */
@@ -170,20 +186,90 @@ public Process exec(
170
186
throws ApiException , IOException {
171
187
String path = makePath (namespace , name , command , container , stdin , tty );
172
188
173
- WebSocketStreamHandler handler = new WebSocketStreamHandler ();
174
- ExecProcess exec = new ExecProcess ( handler );
189
+ ExecProcess exec = new ExecProcess ();
190
+ WebSocketStreamHandler handler = exec . getHandler ( );
175
191
WebSockets .stream (path , "GET" , apiClient , handler );
176
192
177
193
return exec ;
178
194
}
179
195
180
- private static class ExecProcess extends Process {
181
- WebSocketStreamHandler streamHandler ;
182
- private int statusCode ;
196
+ static int parseExitCode (ApiClient client , InputStream inputStream ) {
197
+ try {
198
+ Type returnType = new TypeToken <V1Status >() {}.getType ();
199
+ String body ;
200
+ try (final Reader reader = new InputStreamReader (inputStream )) {
201
+ body = CharStreams .toString (reader );
202
+ }
203
+
204
+ V1Status status = client .getJSON ().deserialize (body , returnType );
205
+ if ("Success" .equals (status .getStatus ())) return 0 ;
206
+
207
+ if ("NonZeroExitCode" .equals (status .getReason ())) {
208
+ V1StatusDetails details = status .getDetails ();
209
+ if (details != null ) {
210
+ List <V1StatusCause > causes = details .getCauses ();
211
+ if (causes != null ) {
212
+ for (V1StatusCause cause : causes ) {
213
+ if ("ExitCode" .equals (cause .getReason ())) {
214
+ try {
215
+ return Integer .parseInt (cause .getMessage ());
216
+ } catch (NumberFormatException nfe ) {
217
+ log .error ("Error parsing exit code from status channel response" , nfe );
218
+ }
219
+ }
220
+ }
221
+ }
222
+ }
223
+ }
224
+ } catch (Throwable t ) {
225
+ log .error ("Error parsing exit code from status channel response" , t );
226
+ }
227
+
228
+ // Unable to parse the exit code from the content
229
+ return -1 ;
230
+ }
231
+
232
+ private class ExecProcess extends Process {
233
+ private final WebSocketStreamHandler streamHandler ;
234
+ private int statusCode = -1 ;
235
+ private boolean isAlive = true ;
236
+ private final Map <Integer , InputStream > input = new HashMap <>();
237
+
238
+ public ExecProcess () throws IOException {
239
+ this .streamHandler =
240
+ new WebSocketStreamHandler () {
241
+ @ Override
242
+ protected void handleMessage (int stream , InputStream inStream ) {
243
+ if (stream == 3 ) {
244
+ int exitCode = parseExitCode (apiClient , inStream );
245
+ if (exitCode >= 0 ) {
246
+ // notify of process completion
247
+ synchronized (ExecProcess .this ) {
248
+ statusCode = exitCode ;
249
+ isAlive = false ;
250
+ ExecProcess .this .notifyAll ();
251
+ }
252
+ }
253
+ } else super .handleMessage (stream , inStream );
254
+ }
255
+
256
+ @ Override
257
+ public void close () {
258
+ // notify of process completion
259
+ synchronized (ExecProcess .this ) {
260
+ if (isAlive ) {
261
+ isAlive = false ;
262
+ ExecProcess .this .notifyAll ();
263
+ }
264
+ }
183
265
184
- public ExecProcess (WebSocketStreamHandler handler ) throws IOException {
185
- this .streamHandler = handler ;
186
- this .statusCode = -1 ;
266
+ super .close ();
267
+ }
268
+ };
269
+ }
270
+
271
+ private WebSocketStreamHandler getHandler () {
272
+ return streamHandler ;
187
273
}
188
274
189
275
@ Override
@@ -193,28 +279,58 @@ public OutputStream getOutputStream() {
193
279
194
280
@ Override
195
281
public InputStream getInputStream () {
196
- return streamHandler . getInputStream (1 );
282
+ return getInputStream (1 );
197
283
}
198
284
199
285
@ Override
200
286
public InputStream getErrorStream () {
201
- return streamHandler .getInputStream (2 );
287
+ return getInputStream (2 );
288
+ }
289
+
290
+ private synchronized InputStream getInputStream (int stream ) {
291
+ if (!input .containsKey (stream )) {
292
+ input .put (stream , streamHandler .getInputStream (stream ));
293
+ }
294
+ return input .get (stream );
202
295
}
203
296
204
297
@ Override
205
298
public int waitFor () throws InterruptedException {
206
299
synchronized (this ) {
207
300
this .wait ();
301
+ return statusCode ;
302
+ }
303
+ }
304
+
305
+ @ Override
306
+ public boolean waitFor (long timeout , TimeUnit unit ) throws InterruptedException {
307
+ synchronized (this ) {
308
+ this .wait (TimeUnit .MILLISECONDS .convert (timeout , unit ));
309
+ return !isAlive ();
208
310
}
209
- return statusCode ;
210
311
}
211
312
212
- public int exitValue () {
313
+ @ Override
314
+ public synchronized int exitValue () {
315
+ if (isAlive ) throw new IllegalThreadStateException ();
213
316
return statusCode ;
214
317
}
215
318
319
+ @ Override
320
+ public synchronized boolean isAlive () {
321
+ return isAlive ;
322
+ }
323
+
324
+ @ Override
216
325
public void destroy () {
217
326
streamHandler .close ();
327
+ for (InputStream in : input .values ()) {
328
+ try {
329
+ in .close ();
330
+ } catch (IOException ex ) {
331
+ log .error ("Error on close" , ex );
332
+ }
333
+ }
218
334
}
219
335
}
220
336
}
0 commit comments