Skip to content

Commit edfdfcc

Browse files
authored
Merge pull request #1322 from netikras/master
Exec.exec returning Future and hiding Process as an implementation detail
2 parents cdde1d4 + ee527eb commit edfdfcc

File tree

3 files changed

+567
-0
lines changed

3 files changed

+567
-0
lines changed
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
Copyright 2020 The Kubernetes Authors.
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
package io.kubernetes.client.custom;
14+
15+
import java.io.InputStream;
16+
import java.io.OutputStream;
17+
import java.util.ArrayList;
18+
import java.util.Collection;
19+
import java.util.function.BiConsumer;
20+
21+
/**
22+
* A collection of all the 3 main pipes used in stdio: STDIN, STDOUT and STDERR. As working with the
23+
* named pipes is usually carried out asynchronously, this collection also provides means to
24+
* initiate and handle the close() event. A close() initiator calls the {@link #close(int, long)}
25+
* method expressing its intent to close the communication channel. Handlers are notified of this
26+
* intent and its up to the handlers to decide what's to be done next. Calling {@link #close(int,
27+
* long)} does not close the streams or do anything else besides notifying the handlers.
28+
*/
29+
public class IOTrio {
30+
private InputStream stdout;
31+
private InputStream stderr;
32+
private OutputStream stdin;
33+
private final Collection<BiConsumer<Integer, Long>> closeHandlers;
34+
35+
public IOTrio() {
36+
closeHandlers = new ArrayList<>();
37+
}
38+
39+
public InputStream getStdout() {
40+
return stdout;
41+
}
42+
43+
public void setStdout(InputStream stdout) {
44+
this.stdout = stdout;
45+
}
46+
47+
public InputStream getStderr() {
48+
return stderr;
49+
}
50+
51+
public void setStderr(InputStream stderr) {
52+
this.stderr = stderr;
53+
}
54+
55+
public OutputStream getStdin() {
56+
return stdin;
57+
}
58+
59+
public void setStdin(OutputStream stdin) {
60+
this.stdin = stdin;
61+
}
62+
63+
/**
64+
* Capture the CLOSE intent and handle it accordingly.
65+
*
66+
* @param handler the handler that's invoked when someone intends to close this communication.
67+
* Multiple handlers can be registered
68+
*/
69+
public void onClose(BiConsumer<Integer, Long> handler) {
70+
closeHandlers.add(handler);
71+
}
72+
73+
/**
74+
* Express an intent to close this communication. This intent will be relayed to all the
75+
* registered handlers and it's up to them what to do with it.
76+
*
77+
* @param code proposed exit code
78+
* @param timeout time in milliseconds given to terminate this communication. Negative timeout
79+
* means no timeout (i.e. wait for as long as it takes). 0 means "stop it now".
80+
*/
81+
public void close(int code, long timeout) {
82+
closeHandlers.forEach(handler -> handler.accept(code, timeout));
83+
}
84+
}

util/src/main/java/io/kubernetes/client/Exec.java

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
import com.google.common.io.CharStreams;
1818
import com.google.gson.reflect.TypeToken;
19+
import io.kubernetes.client.custom.IOTrio;
1920
import io.kubernetes.client.openapi.ApiClient;
2021
import io.kubernetes.client.openapi.ApiException;
2122
import io.kubernetes.client.openapi.Configuration;
@@ -34,11 +35,17 @@
3435
import java.io.UnsupportedEncodingException;
3536
import java.lang.reflect.Type;
3637
import java.net.URLEncoder;
38+
import java.util.Arrays;
3739
import java.util.HashMap;
3840
import java.util.List;
3941
import java.util.Map;
42+
import java.util.concurrent.CompletableFuture;
4043
import java.util.concurrent.CountDownLatch;
44+
import java.util.concurrent.Future;
4145
import java.util.concurrent.TimeUnit;
46+
import java.util.function.BiConsumer;
47+
import java.util.function.Consumer;
48+
import java.util.function.Supplier;
4249
import org.apache.commons.lang3.StringUtils;
4350
import org.slf4j.Logger;
4451
import org.slf4j.LoggerFactory;
@@ -191,6 +198,129 @@ public Process exec(
191198
.execute();
192199
}
193200

201+
/**
202+
* A convenience method. Executes a command remotely on a pod and monitors for events in that
203+
* execution. The monitored events are: <br>
204+
* - connection established (onOpen) <br>
205+
* - connection closed (onClosed) <br>
206+
* - execution error occurred (onError) <br>
207+
* This method also allows to specify a MAX timeout for the execution and returns a future in
208+
* order to monitor the execution flow. <br>
209+
* onError and onClosed callbacks are invoked asynchronously, in a separate thread. <br>
210+
*
211+
* @param namespace a namespace the target pod "lives" in
212+
* @param podName a name of the pod to exec the command on
213+
* @param onOpen a callback invoked upon the connection established event.
214+
* @param onClosed a callback invoked upon the process termination. Return code might not always
215+
* be there. N.B. this callback is invoked before the returned {@link Future} is completed.
216+
* @param onError a callback to handle k8s errors (NOT the command errors/stderr!)
217+
* @param timeoutMs timeout in milliseconds for the execution. I.e. the execution will take this
218+
* many ms or less. If the timeout command is running longer than the allowed timeout, the
219+
* command will be "asked" to terminate gracefully. If the command is still running after the
220+
* grace period, the sigkill will be issued. If null is passed, the timeout will not be used
221+
* and will wait for process to exit itself.
222+
* @param tty whether you need tty to pipe the data. TTY mode will trim some binary data in order
223+
* to make it possible to show on screen (tty)
224+
* @param command a tokenized command to run on the pod
225+
* @return a {@link Future} promise representing this execution. Unless something goes south, the
226+
* promise will contain the process return exit code. If the timeoutMs is non-null and the
227+
* timeout expires before the process exits, promise will contain {@link Integer#MAX_VALUE}.
228+
* @throws IOException
229+
*/
230+
public Future<Integer> exec(
231+
String namespace,
232+
String podName,
233+
Consumer<IOTrio> onOpen,
234+
BiConsumer<Integer, IOTrio> onClosed,
235+
BiConsumer<Throwable, IOTrio> onError,
236+
Long timeoutMs,
237+
boolean tty,
238+
String... command)
239+
throws IOException {
240+
CompletableFuture<Integer> future = new CompletableFuture<>();
241+
IOTrio io = new IOTrio();
242+
String cmdStr = Arrays.toString(command);
243+
BiConsumer<Throwable, IOTrio> errHandler =
244+
(err, errIO) -> {
245+
if (onError != null) {
246+
onError.accept(err, errIO);
247+
}
248+
};
249+
try {
250+
Process process = exec(namespace, podName, command, null, true, tty);
251+
252+
io.onClose(
253+
(code, timeout) -> {
254+
process.destroy();
255+
waitForProcessToExit(process, timeout, cmdStr, err -> errHandler.accept(err, io));
256+
// processWaitingThread will handle the rest
257+
});
258+
io.setStdin(process.getOutputStream());
259+
io.setStdout(process.getInputStream());
260+
io.setStderr(process.getErrorStream());
261+
runAsync(
262+
"Process-Waiting-Thread-" + command[0] + "-" + podName,
263+
() -> {
264+
Supplier<Integer> returnCode = process::exitValue;
265+
try {
266+
log.debug("Waiting for process to close in {} ms: {}", timeoutMs, cmdStr);
267+
boolean beforeTimout =
268+
waitForProcessToExit(
269+
process, timeoutMs, cmdStr, err -> errHandler.accept(err, io));
270+
if (!beforeTimout) {
271+
returnCode = () -> Integer.MAX_VALUE;
272+
}
273+
} catch (Exception e) {
274+
errHandler.accept(e, io);
275+
}
276+
log.debug("process.onExit({}): {}", returnCode.get(), cmdStr);
277+
if (onClosed != null) {
278+
onClosed.accept(returnCode.get(), io);
279+
}
280+
future.complete(returnCode.get());
281+
});
282+
if (onOpen != null) {
283+
onOpen.accept(io);
284+
}
285+
} catch (ApiException e) {
286+
errHandler.accept(e, io);
287+
future.completeExceptionally(e);
288+
}
289+
return future;
290+
}
291+
292+
protected void runAsync(String taskName, Runnable task) {
293+
Thread thread = new Thread(task);
294+
thread.setName(taskName);
295+
thread.start();
296+
}
297+
298+
private boolean waitForProcessToExit(
299+
Process process, Long timeoutMs, String cmdStr, Consumer<Exception> onError) {
300+
boolean beforeTimeout = true;
301+
if (timeoutMs != null && timeoutMs >= 0) {
302+
boolean exited = false;
303+
try {
304+
exited = process.waitFor(timeoutMs, TimeUnit.MILLISECONDS);
305+
} catch (InterruptedException e) {
306+
onError.accept(e);
307+
}
308+
log.debug("Process closed={}: {}", exited, cmdStr);
309+
if (!exited && process.isAlive()) {
310+
beforeTimeout = false;
311+
log.warn("Process timed out after {} ms. Shutting down: {}", timeoutMs, cmdStr);
312+
process.destroy();
313+
}
314+
} else {
315+
try {
316+
process.waitFor();
317+
} catch (InterruptedException e) {
318+
onError.accept(e);
319+
}
320+
}
321+
return beforeTimeout;
322+
}
323+
194324
public final class ExecutionBuilder {
195325
private final String namespace;
196326
private final String name;

0 commit comments

Comments
 (0)