Skip to content

Commit 360d19c

Browse files
committed
Fixes for reading exit code
1 parent 72de537 commit 360d19c

File tree

2 files changed

+229
-12
lines changed

2 files changed

+229
-12
lines changed

src/main/java/io/kubernetes/client/Exec.java

Lines changed: 46 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import java.io.InputStream;
2020
import java.io.OutputStream;
2121
import java.util.concurrent.TimeUnit;
22-
2322
import org.apache.commons.lang3.StringUtils;
2423

2524
public class Exec {
@@ -172,20 +171,57 @@ public Process exec(
172171
throws ApiException, IOException {
173172
String path = makePath(namespace, name, command, container, stdin, tty);
174173

175-
WebSocketStreamHandler handler = new WebSocketStreamHandler();
176-
ExecProcess exec = new ExecProcess(handler);
174+
ExecProcess exec = new ExecProcess();
175+
WebSocketStreamHandler handler = exec.getHandler();
177176
WebSockets.stream(path, "GET", apiClient, handler);
178177

179178
return exec;
180179
}
181180

182181
private static class ExecProcess extends Process {
183-
WebSocketStreamHandler streamHandler;
182+
private final WebSocketStreamHandler streamHandler;
184183
private volatile int statusCode;
185184

186-
public ExecProcess(WebSocketStreamHandler handler) throws IOException {
187-
this.streamHandler = handler;
185+
public ExecProcess() throws IOException {
188186
this.statusCode = -1;
187+
this.streamHandler =
188+
new WebSocketStreamHandler() {
189+
@Override
190+
public void close() {
191+
if (statusCode == -1) {
192+
InputStream inputStream = getInputStream(3);
193+
int exitCode = 0;
194+
try {
195+
int available = inputStream.available();
196+
if (available > 0) {
197+
byte[] b = new byte[available];
198+
inputStream.read(b);
199+
String result = new String(b, "UTF-8");
200+
int idx = result.lastIndexOf(':');
201+
if (idx > 0) {
202+
try {
203+
exitCode = Integer.parseInt(result.substring(idx + 1).trim());
204+
} catch (NumberFormatException nfe) {
205+
// no-op
206+
}
207+
}
208+
}
209+
} catch (IOException e) {
210+
}
211+
212+
// notify of process completion
213+
synchronized (ExecProcess.this) {
214+
statusCode = exitCode;
215+
ExecProcess.this.notifyAll();
216+
}
217+
}
218+
super.close();
219+
}
220+
};
221+
}
222+
223+
private WebSocketStreamHandler getHandler() {
224+
return streamHandler;
189225
}
190226

191227
@Override
@@ -210,20 +246,18 @@ public int waitFor() throws InterruptedException {
210246
}
211247
return statusCode;
212248
}
213-
249+
214250
@Override
215-
public boolean waitFor(long timeout, TimeUnit unit)
216-
throws InterruptedException {
251+
public boolean waitFor(long timeout, TimeUnit unit) throws InterruptedException {
217252
synchronized (this) {
218253
this.wait(TimeUnit.MILLISECONDS.convert(timeout, unit));
219254
}
220255
return !isAlive();
221256
}
222-
257+
223258
@Override
224259
public int exitValue() {
225-
if (statusCode == -1)
226-
throw new IllegalThreadStateException();
260+
if (statusCode == -1) throw new IllegalThreadStateException();
227261
return statusCode;
228262
}
229263

Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
/*
2+
Copyright 2017 The Kubernetes Authors.
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
package io.kubernetes.client.util;
14+
15+
import com.google.common.base.Charsets;
16+
import com.google.common.io.ByteStreams;
17+
import com.google.common.io.CharStreams;
18+
import com.squareup.okhttp.RequestBody;
19+
import com.squareup.okhttp.ws.WebSocket;
20+
import java.io.ByteArrayInputStream;
21+
import java.io.Closeable;
22+
import java.io.IOException;
23+
import java.io.InputStream;
24+
import java.io.OutputStream;
25+
import java.io.PipedInputStream;
26+
import java.io.PipedOutputStream;
27+
import java.io.Reader;
28+
import java.util.HashMap;
29+
import java.util.Map;
30+
import okio.ByteString;
31+
import org.slf4j.Logger;
32+
import org.slf4j.LoggerFactory;
33+
34+
/**
35+
* WebSocketStreamHandler understands the Kubernetes streaming protocol and separates a single
36+
* WebSockets stream into a number of different streams using that protocol.
37+
*/
38+
public class WebSocketStreamHandler implements WebSockets.SocketListener, Closeable {
39+
private static final Logger log = LoggerFactory.getLogger(WebSocketStreamHandler.class);
40+
41+
Map<Integer, PipedOutputStream> output;
42+
Map<Integer, PipedInputStream> input;
43+
WebSocket socket;
44+
String protocol;
45+
46+
public WebSocketStreamHandler() {
47+
output = new HashMap<>();
48+
input = new HashMap<>();
49+
}
50+
51+
@Override
52+
public void open(String protocol, WebSocket socket) {
53+
this.protocol = protocol;
54+
this.socket = socket;
55+
}
56+
57+
@Override
58+
public void bytesMessage(InputStream in) {
59+
try {
60+
OutputStream out = getSocketInputOutputStream(in.read());
61+
ByteStreams.copy(in, out);
62+
} catch (IOException ex) {
63+
log.error("Error writing message", ex);
64+
}
65+
}
66+
67+
@Override
68+
public void textMessage(Reader in) {
69+
try {
70+
OutputStream out = getSocketInputOutputStream(in.read());
71+
InputStream inStream =
72+
new ByteArrayInputStream(CharStreams.toString(in).getBytes(Charsets.UTF_8));
73+
ByteStreams.copy(inStream, out);
74+
} catch (IOException ex) {
75+
log.error("Error writing message", ex);
76+
}
77+
}
78+
79+
@Override
80+
public void close() {
81+
for (PipedOutputStream out : output.values()) {
82+
try {
83+
out.close();
84+
} catch (IOException ex) {
85+
// TODO use a logger here
86+
ex.printStackTrace();
87+
}
88+
}
89+
for (PipedInputStream in : input.values()) {
90+
try {
91+
in.close();
92+
} catch (IOException ex) {
93+
// TODO use a logger here
94+
ex.printStackTrace();
95+
}
96+
}
97+
}
98+
99+
/**
100+
* Get a specific input stream using its identifier
101+
*
102+
* @param stream The stream to return
103+
* @return The specified stream.
104+
*/
105+
public synchronized InputStream getInputStream(int stream) {
106+
if (!input.containsKey(stream)) {
107+
try {
108+
PipedInputStream pipeIn = new PipedInputStream();
109+
PipedOutputStream pipeOut = new PipedOutputStream(pipeIn);
110+
output.put(stream, pipeOut);
111+
input.put(stream, pipeIn);
112+
} catch (IOException ex) {
113+
// This is _very_ unlikely, as it requires the above constructor to fail.
114+
// don't force callers to catch, but still throw
115+
throw new IllegalStateException(ex);
116+
}
117+
}
118+
return input.get(stream);
119+
}
120+
121+
/**
122+
* Gets a specific output stream using it's identified
123+
*
124+
* @param stream The stream to return
125+
* @return The specified stream.
126+
*/
127+
public OutputStream getOutputStream(int stream) {
128+
return new WebSocketOutputStream(stream);
129+
}
130+
131+
/**
132+
* Get the pipe to write data to a specific InputStream. This is called when new data is read from
133+
* the web socket, to send the data on to the right stream.
134+
*
135+
* @param stream The stream to return
136+
* @return The specified stream.
137+
*/
138+
private synchronized OutputStream getSocketInputOutputStream(int stream) {
139+
if (!output.containsKey(stream)) {
140+
try {
141+
PipedInputStream pipeIn = new PipedInputStream();
142+
PipedOutputStream pipeOut = new PipedOutputStream(pipeIn);
143+
output.put(stream, pipeOut);
144+
input.put(stream, pipeIn);
145+
} catch (IOException ex) {
146+
// This is _very_ unlikely, as it requires the above constructor to fail.
147+
// don't force callers to catch, but still throw
148+
throw new IllegalStateException(ex);
149+
}
150+
}
151+
return output.get(stream);
152+
}
153+
154+
private class WebSocketOutputStream extends OutputStream {
155+
private byte stream;
156+
157+
public WebSocketOutputStream(int stream) {
158+
this.stream = (byte) stream;
159+
}
160+
161+
@Override
162+
public void write(int b) throws IOException {
163+
write(new byte[] {(byte) b});
164+
}
165+
166+
@Override
167+
public void write(byte[] b) throws IOException {
168+
this.write(b, 0, b.length);
169+
}
170+
171+
@Override
172+
public void write(byte[] b, int offset, int length) throws IOException {
173+
if (WebSocketStreamHandler.this.socket == null) {
174+
throw new IOException("No websocket connection!");
175+
}
176+
byte[] buffer = new byte[length + 1];
177+
buffer[0] = stream;
178+
System.arraycopy(b, offset, buffer, 1, length);
179+
WebSocketStreamHandler.this.socket.sendMessage(
180+
RequestBody.create(WebSocket.BINARY, ByteString.of(buffer)));
181+
}
182+
}
183+
}

0 commit comments

Comments
 (0)