Skip to content

Commit 6adabac

Browse files
committed
Move exit value parsing to message handling for channel 3
1 parent bef5d7a commit 6adabac

File tree

3 files changed

+53
-30
lines changed

3 files changed

+53
-30
lines changed

examples/src/main/java/io/kubernetes/client/examples/ExecExample.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public static void main(String[] args) throws IOException, ApiException, Interru
4747
Configuration.setDefaultApiClient(client);
4848

4949
Exec exec = new Exec();
50-
boolean tty = true; // Optional: System.console() != null;
50+
boolean tty = System.console() != null;
5151
// final Process proc = exec.exec("default", "nginx-4217019353-k5sn9", new String[]
5252
// {"sh", "-c", "echo foo"}, true, tty);
5353
final Process proc =
@@ -93,6 +93,6 @@ public void run() {
9393

9494
proc.destroy();
9595

96-
System.exit(0);
96+
System.exit(proc.exitValue());
9797
}
9898
}

util/src/main/java/io/kubernetes/client/Exec.java

Lines changed: 38 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
*/
1313
package io.kubernetes.client;
1414

15+
import com.google.common.io.CharStreams;
1516
import com.google.gson.reflect.TypeToken;
1617
import io.kubernetes.client.models.V1Pod;
1718
import io.kubernetes.client.models.V1Status;
@@ -21,7 +22,9 @@
2122
import io.kubernetes.client.util.WebSockets;
2223
import java.io.IOException;
2324
import java.io.InputStream;
25+
import java.io.InputStreamReader;
2426
import java.io.OutputStream;
27+
import java.io.Reader;
2528
import java.lang.reflect.Type;
2629
import java.util.HashMap;
2730
import java.util.List;
@@ -192,16 +195,12 @@ public Process exec(
192195

193196
static int parseExitCode(ApiClient client, InputStream inputStream) {
194197
try {
195-
int available = inputStream.available();
196-
197-
// Kubernetes returns no content when the exit code is 0
198-
if (available == 0) return 0;
199-
200-
byte[] b = new byte[available];
201-
inputStream.read(b);
202-
String body = new String(b, "UTF-8");
203-
204198
Type returnType = new TypeToken<V1Status>() {}.getType();
199+
String body;
200+
try (final Reader reader = new InputStreamReader(inputStream)) {
201+
body = CharStreams.toString(reader);
202+
}
203+
205204
V1Status status = client.getJSON().deserialize(body, returnType);
206205
if ("Success".equals(status.getStatus())) return 0;
207206

@@ -232,21 +231,34 @@ static int parseExitCode(ApiClient client, InputStream inputStream) {
232231

233232
private class ExecProcess extends Process {
234233
private final WebSocketStreamHandler streamHandler;
235-
private volatile int statusCode;
234+
private int statusCode = -1;
235+
private boolean isAlive = true;
236236
private final Map<Integer, InputStream> input = new HashMap<>();
237237

238238
public ExecProcess() throws IOException {
239-
this.statusCode = -1;
240239
this.streamHandler =
241240
new WebSocketStreamHandler() {
242241
@Override
243-
public void close() {
244-
if (statusCode == -1) {
245-
int exitCode = parseExitCode(apiClient, ExecProcess.this.getInputStream(3));
242+
protected void handleMessage(int stream, InputStream inStream) {
243+
if (stream == 3) {
244+
int exitCode = parseExitCode(apiClient, inStream);
245+
if (exitCode >= 0) {
246+
// notify of process completion
247+
synchronized (ExecProcess.this) {
248+
statusCode = exitCode;
249+
isAlive = false;
250+
ExecProcess.this.notifyAll();
251+
}
252+
}
253+
} else super.handleMessage(stream, inStream);
254+
}
246255

247-
// notify of process completion
248-
synchronized (ExecProcess.this) {
249-
statusCode = exitCode;
256+
@Override
257+
public void close() {
258+
// notify of process completion
259+
synchronized (ExecProcess.this) {
260+
if (isAlive) {
261+
isAlive = false;
250262
ExecProcess.this.notifyAll();
251263
}
252264
}
@@ -286,24 +298,29 @@ private synchronized InputStream getInputStream(int stream) {
286298
public int waitFor() throws InterruptedException {
287299
synchronized (this) {
288300
this.wait();
301+
return statusCode;
289302
}
290-
return statusCode;
291303
}
292304

293305
@Override
294306
public boolean waitFor(long timeout, TimeUnit unit) throws InterruptedException {
295307
synchronized (this) {
296308
this.wait(TimeUnit.MILLISECONDS.convert(timeout, unit));
309+
return !isAlive();
297310
}
298-
return !isAlive();
299311
}
300312

301313
@Override
302-
public int exitValue() {
303-
if (statusCode == -1) throw new IllegalThreadStateException();
314+
public synchronized int exitValue() {
315+
if (isAlive) throw new IllegalThreadStateException();
304316
return statusCode;
305317
}
306318

319+
@Override
320+
public synchronized boolean isAlive() {
321+
return isAlive;
322+
}
323+
307324
@Override
308325
public void destroy() {
309326
streamHandler.close();

util/src/main/java/io/kubernetes/client/util/WebSocketStreamHandler.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -67,25 +67,31 @@ public synchronized void open(String protocol, WebSocket socket) {
6767
@Override
6868
public void bytesMessage(InputStream in) {
6969
try {
70-
OutputStream out = getSocketInputOutputStream(in.read());
71-
ByteStreams.copy(in, out);
70+
handleMessage(in.read(), in);
7271
} catch (IOException ex) {
73-
log.error("Error writing message", ex);
72+
log.error("Error reading message channel", ex);
7473
}
7574
}
7675

7776
@Override
7877
public void textMessage(Reader in) {
7978
try {
80-
OutputStream out = getSocketInputOutputStream(in.read());
81-
InputStream inStream =
82-
new ByteArrayInputStream(CharStreams.toString(in).getBytes(Charsets.UTF_8));
83-
ByteStreams.copy(inStream, out);
79+
handleMessage(
80+
in.read(), new ByteArrayInputStream(CharStreams.toString(in).getBytes(Charsets.UTF_8)));
8481
} catch (IOException ex) {
8582
log.error("Error writing message", ex);
8683
}
8784
}
8885

86+
protected void handleMessage(int stream, InputStream inStream) {
87+
try {
88+
OutputStream out = getSocketInputOutputStream(stream);
89+
ByteStreams.copy(inStream, out);
90+
} catch (IOException ex) {
91+
log.error("Error handling message", ex);
92+
}
93+
}
94+
8995
@Override
9096
public synchronized void close() {
9197
if (state != State.CLOSED) {

0 commit comments

Comments
 (0)