Skip to content

Commit c601e95

Browse files
committed
Additional race conditions and resolve code review comments
1 parent b73118e commit c601e95

File tree

4 files changed

+205
-76
lines changed

4 files changed

+205
-76
lines changed

examples/src/main/java/io/kubernetes/client/examples/ExecExample.java

Lines changed: 41 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
Copyright 2017 The Kubernetes Authors.
2+
Copyright 2017, 2018 The Kubernetes Authors.
33
Licensed under the Apache License, Version 2.0 (the "License");
44
you may not use this file except in compliance with the License.
55
You may obtain a copy of the License at
@@ -19,6 +19,9 @@
1919
import io.kubernetes.client.Exec;
2020
import io.kubernetes.client.util.Config;
2121
import java.io.IOException;
22+
import java.io.InterruptedIOException;
23+
import java.util.ArrayList;
24+
import java.util.List;
2225

2326
/**
2427
* A simple example of how to use the Java API
@@ -30,47 +33,68 @@
3033
*/
3134
public class ExecExample {
3235
public static void main(String[] args) throws IOException, ApiException, InterruptedException {
36+
String podName = "nginx-4217019353-k5sn9";
37+
String namespace = "default";
38+
List<String> commands = new ArrayList<>();
39+
40+
int len = args.length;
41+
if (len >= 1) podName = args[0];
42+
if (len >= 2) namespace = args[1];
43+
for (int i = 2; i < len; i++) {
44+
commands.add(args[i]);
45+
}
46+
3347
ApiClient client = Config.defaultClient();
3448
Configuration.setDefaultApiClient(client);
3549

3650
Exec exec = new Exec();
37-
boolean tty = System.console() != null;
38-
// final Process proc = exec.exec("default", "nginx-2371676037-czqx3", new String[]
39-
// {"sh", "-c", "echo foo"}, true, tty);
51+
boolean tty = true; // Optional: System.console() != null;
52+
// final Process proc = exec.exec("default", "nginx-4217019353-k5sn9", new String[]
53+
// {"sh", "-c", "echo foo"}, true, tty);
4054
final Process proc =
41-
exec.exec("default", "nginx-4217019353-k5sn9", new String[] {"sh"}, true, tty);
55+
exec.exec(
56+
namespace,
57+
podName,
58+
commands.isEmpty()
59+
? new String[] {"sh"}
60+
: commands.toArray(new String[commands.size()]),
61+
true,
62+
tty);
4263

43-
new Thread(
64+
Thread in =
65+
new Thread(
4466
new Runnable() {
4567
public void run() {
4668
try {
4769
ByteStreams.copy(System.in, proc.getOutputStream());
70+
} catch (InterruptedIOException ie) {
71+
// no-op
4872
} catch (IOException ex) {
4973
ex.printStackTrace();
5074
}
5175
}
52-
})
53-
.start();
76+
});
77+
in.start();
5478

55-
new Thread(
79+
Thread out =
80+
new Thread(
5681
new Runnable() {
5782
public void run() {
5883
try {
5984
ByteStreams.copy(proc.getInputStream(), System.out);
85+
} catch (InterruptedIOException ie) {
86+
// no-op
6087
} catch (IOException ex) {
6188
ex.printStackTrace();
6289
}
6390
}
64-
})
65-
.start();
91+
});
92+
out.start();
6693

6794
proc.waitFor();
68-
try {
69-
// Wait for buffers to flush.
70-
Thread.sleep(2000);
71-
} catch (InterruptedException ex) {
72-
ex.printStackTrace();
73-
}
95+
96+
// wait for any last output; no need to wait for input thread
97+
out.join();
7498

7599
proc.destroy();
76100

util/src/main/java/io/kubernetes/client/Exec.java

Lines changed: 50 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
Copyright 2017 The Kubernetes Authors.
2+
Copyright 2017, 2018 The Kubernetes Authors.
33
Licensed under the Apache License, Version 2.0 (the "License");
44
you may not use this file except in compliance with the License.
55
You may obtain a copy of the License at
@@ -18,10 +18,16 @@
1818
import java.io.IOException;
1919
import java.io.InputStream;
2020
import java.io.OutputStream;
21+
import java.util.HashMap;
22+
import java.util.Map;
2123
import java.util.concurrent.TimeUnit;
2224
import org.apache.commons.lang3.StringUtils;
25+
import org.slf4j.Logger;
26+
import org.slf4j.LoggerFactory;
2327

2428
public class Exec {
29+
private static final Logger log = LoggerFactory.getLogger(Exec.class);
30+
2531
private ApiClient apiClient;
2632

2733
/** Simple Exec API constructor, uses default configuration */
@@ -178,10 +184,34 @@ public Process exec(
178184
return exec;
179185
}
180186

187+
static int parseExitCode(InputStream inputStream) {
188+
int exitCode = 0;
189+
try {
190+
int available = inputStream.available();
191+
if (available > 0) {
192+
byte[] b = new byte[available];
193+
inputStream.read(b);
194+
String result = new String(b, "UTF-8");
195+
int idx = result.lastIndexOf(':');
196+
if (idx > 0) {
197+
try {
198+
exitCode = Integer.parseInt(result.substring(idx + 1).trim());
199+
} catch (NumberFormatException nfe) {
200+
log.error("Error parsing exit code from status channel response", nfe);
201+
}
202+
}
203+
}
204+
} catch (IOException io) {
205+
log.error("Error parsing exit code from status channel response", io);
206+
}
207+
208+
return exitCode;
209+
}
210+
181211
private static class ExecProcess extends Process {
182212
private final WebSocketStreamHandler streamHandler;
183213
private volatile int statusCode;
184-
private volatile boolean isDestroyed = false;
214+
private final Map<Integer, InputStream> input = new HashMap<>();
185215

186216
public ExecProcess() throws IOException {
187217
this.statusCode = -1;
@@ -190,25 +220,7 @@ public ExecProcess() throws IOException {
190220
@Override
191221
public void close() {
192222
if (statusCode == -1) {
193-
InputStream inputStream = getInputStream(3);
194-
int exitCode = 0;
195-
try {
196-
int available = inputStream.available();
197-
if (available > 0) {
198-
byte[] b = new byte[available];
199-
inputStream.read(b);
200-
String result = new String(b, "UTF-8");
201-
int idx = result.lastIndexOf(':');
202-
if (idx > 0) {
203-
try {
204-
exitCode = Integer.parseInt(result.substring(idx + 1).trim());
205-
} catch (NumberFormatException nfe) {
206-
// no-op
207-
}
208-
}
209-
}
210-
} catch (IOException e) {
211-
}
223+
int exitCode = parseExitCode(ExecProcess.this.getInputStream(3));
212224

213225
// notify of process completion
214226
synchronized (ExecProcess.this) {
@@ -217,7 +229,7 @@ public void close() {
217229
}
218230
}
219231

220-
if (isDestroyed) super.close();
232+
super.close();
221233
}
222234
};
223235
}
@@ -233,12 +245,19 @@ public OutputStream getOutputStream() {
233245

234246
@Override
235247
public InputStream getInputStream() {
236-
return streamHandler.getInputStream(1);
248+
return getInputStream(1);
237249
}
238250

239251
@Override
240252
public InputStream getErrorStream() {
241-
return streamHandler.getInputStream(2);
253+
return getInputStream(2);
254+
}
255+
256+
private synchronized InputStream getInputStream(int stream) {
257+
if (!input.containsKey(stream)) {
258+
input.put(stream, streamHandler.getInputStream(stream));
259+
}
260+
return input.get(stream);
242261
}
243262

244263
@Override
@@ -265,8 +284,14 @@ public int exitValue() {
265284

266285
@Override
267286
public void destroy() {
268-
isDestroyed = true;
269287
streamHandler.close();
288+
for (InputStream in : input.values()) {
289+
try {
290+
in.close();
291+
} catch (IOException ex) {
292+
log.error("Error on close", ex);
293+
}
294+
}
270295
}
271296
}
272297
}

0 commit comments

Comments
 (0)