Skip to content

Commit 25a8378

Browse files
committed
Notify on exec process end and read exit code
1 parent 3b53b68 commit 25a8378

File tree

1 file changed

+59
-7
lines changed
  • util/src/main/java/io/kubernetes/client

1 file changed

+59
-7
lines changed

util/src/main/java/io/kubernetes/client/Exec.java

Lines changed: 59 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
import java.io.IOException;
1919
import java.io.InputStream;
2020
import java.io.OutputStream;
21+
import java.util.concurrent.TimeUnit;
22+
2123
import org.apache.commons.lang3.StringUtils;
2224

2325
public class Exec {
@@ -170,20 +172,57 @@ public Process exec(
170172
throws ApiException, IOException {
171173
String path = makePath(namespace, name, command, container, stdin, tty);
172174

173-
WebSocketStreamHandler handler = new WebSocketStreamHandler();
174-
ExecProcess exec = new ExecProcess(handler);
175+
ExecProcess exec = new ExecProcess();
176+
WebSocketStreamHandler handler = exec.getHandler();
175177
WebSockets.stream(path, "GET", apiClient, handler);
176178

177179
return exec;
178180
}
179181

180182
private static class ExecProcess extends Process {
181-
WebSocketStreamHandler streamHandler;
182-
private int statusCode;
183+
private final WebSocketStreamHandler streamHandler;
184+
private volatile int statusCode;
183185

184-
public ExecProcess(WebSocketStreamHandler handler) throws IOException {
185-
this.streamHandler = handler;
186+
public ExecProcess() throws IOException {
186187
this.statusCode = -1;
188+
this.streamHandler = new WebSocketStreamHandler() {
189+
@Override
190+
public void close() {
191+
if (statusCode == -1) {
192+
InputStream inputStream = getInputStream(3);
193+
int exitCode = 0;
194+
try {
195+
int available = inputStream.available();
196+
if (available > 0) {
197+
byte[] b = new byte[available];
198+
inputStream.read(b);
199+
String result = new String(b, "UTF-8");
200+
int idx = result.lastIndexOf(':');
201+
if (idx > 0) {
202+
try {
203+
exitCode = Integer.parseInt(result.substring(idx + 1).trim());
204+
} catch (NumberFormatException nfe) {
205+
// no-op
206+
}
207+
}
208+
}
209+
} catch (IOException e) {
210+
}
211+
212+
// notify of process completion
213+
synchronized (ExecProcess.this) {
214+
statusCode = exitCode;
215+
ExecProcess.this.notifyAll();
216+
}
217+
}
218+
super.close();
219+
}
220+
221+
};
222+
}
223+
224+
private WebSocketStreamHandler getHandler() {
225+
return streamHandler;
187226
}
188227

189228
@Override
@@ -208,11 +247,24 @@ public int waitFor() throws InterruptedException {
208247
}
209248
return statusCode;
210249
}
211-
250+
251+
@Override
252+
public boolean waitFor(long timeout, TimeUnit unit)
253+
throws InterruptedException {
254+
synchronized (this) {
255+
this.wait(TimeUnit.MILLISECONDS.convert(timeout, unit));
256+
}
257+
return !isAlive();
258+
}
259+
260+
@Override
212261
public int exitValue() {
262+
if (statusCode == -1)
263+
throw new IllegalThreadStateException();
213264
return statusCode;
214265
}
215266

267+
@Override
216268
public void destroy() {
217269
streamHandler.close();
218270
}

0 commit comments

Comments
 (0)