Skip to content

Commit f49b4b5

Browse files
committed
Exec.exec returning Future and hiding Process as an implementation detail
1 parent fbf4652 commit f49b4b5

File tree

3 files changed

+411
-8
lines changed

3 files changed

+411
-8
lines changed
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
/*
2+
Copyright 2020 The Kubernetes Authors.
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
package io.kubernetes.client.custom;
14+
15+
import java.io.InputStream;
16+
import java.util.concurrent.*;
17+
18+
public class AsyncPump implements Runnable {
19+
20+
private Thread thread;
21+
private ExecutorService executorService;
22+
private byte[] buff;
23+
private InputStream is;
24+
private CompletableFuture<?> promise;
25+
private NoisyConsumer<byte[]> consumer;
26+
private NoisyConsumer<Exception> errorHandler;
27+
private NoisyRunnable onFinish;
28+
29+
public AsyncPump(NoisyConsumer<byte[]> consumer, NoisyConsumer<Exception> errorHandler, NoisyRunnable onFinish) {
30+
buff = new byte[1024];
31+
this.consumer = consumer != null ? consumer : NoisyConsumer.NOP;
32+
this.errorHandler = errorHandler != null ? errorHandler : NoisyConsumer.NOP;
33+
this.onFinish = onFinish != null ? onFinish : NoisyRunnable.NOP;
34+
this.promise = new CompletableFuture<>();
35+
}
36+
37+
public AsyncPump(NoisyConsumer<byte[]> consumer, NoisyRunnable onFinish) {
38+
this(consumer, Exception::printStackTrace, onFinish);
39+
}
40+
41+
public AsyncPump(NoisyConsumer<byte[]> consumer) {
42+
this(consumer, NoisyRunnable.NOP);
43+
}
44+
45+
public AsyncPump setExecutorService(ExecutorService executorService) {
46+
this.executorService = executorService;
47+
return this;
48+
}
49+
50+
public AsyncPump stop() {
51+
if (thread != null)
52+
thread.interrupt();
53+
return this;
54+
}
55+
56+
public AsyncPump startAsync(InputStream is) {
57+
this.is = is;
58+
59+
if (executorService != null) {
60+
executorService.submit(this);
61+
} else {
62+
thread = new Thread(this);
63+
thread.setName("Async pump");
64+
thread.start();
65+
}
66+
return this;
67+
}
68+
69+
public boolean waitForExit() {
70+
try {
71+
promise.get();
72+
return true;
73+
} catch (InterruptedException | ExecutionException e) {
74+
return false;
75+
}
76+
}
77+
78+
public boolean waitForExit(long timeoutMs) {
79+
try {
80+
promise.get(timeoutMs, TimeUnit.MILLISECONDS);
81+
return true;
82+
} catch (InterruptedException | ExecutionException | TimeoutException e) {
83+
return false;
84+
}
85+
}
86+
87+
public AsyncPump start(InputStream is) {
88+
this.is = is;
89+
return this;
90+
}
91+
92+
@Override
93+
public void run() {
94+
int nRead;
95+
try {
96+
while ((nRead = is.read(buff)) > -1) {
97+
byte[] giveAway = new byte[nRead];
98+
System.arraycopy(buff, 0, giveAway, 0, nRead);
99+
consumer.accept(giveAway);
100+
}
101+
onFinish.run();
102+
} catch (Exception e) {
103+
try {
104+
errorHandler.accept(e);
105+
} catch (Exception exception) {
106+
promise.completeExceptionally(exception);
107+
}
108+
}
109+
promise.complete(null);
110+
}
111+
112+
public interface NoisyConsumer<T> {
113+
NoisyConsumer NOP = t -> { };
114+
115+
void accept(T t) throws Exception;
116+
}
117+
118+
public interface NoisyRunnable {
119+
NoisyRunnable NOP = () -> { };
120+
121+
void run() throws Exception;
122+
}
123+
}
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
Copyright 2020 The Kubernetes Authors.
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
package io.kubernetes.client.custom;
14+
15+
import java.io.InputStream;
16+
import java.io.OutputStream;
17+
import java.util.ArrayList;
18+
import java.util.Collection;
19+
import java.util.function.BiConsumer;
20+
21+
/**
22+
* A collection of all the 3 main pipes used in stdio: STDIN, STDOUT and STDERR. As working with the named pipes is usually carried out
23+
* asynchronously, this collection also provides means to initiate and handle the close() event. A close() initiator calls the
24+
* {@link #close(int, long)} method expressing its intent to close the communication channel. Handlers are notified of this intent and
25+
* its up to the handlers to decide what's to be done next. Calling {@link #close(int, long)} does not close the streams or do anything
26+
* else besides notifying the handlers.
27+
*/
28+
public class IOTrio {
29+
private InputStream stdout;
30+
private InputStream stderr;
31+
private OutputStream stdin;
32+
private final Collection<BiConsumer<Integer, Long>> closeHandlers;
33+
34+
public IOTrio() {
35+
closeHandlers = new ArrayList<>();
36+
}
37+
38+
public InputStream getStdout() {
39+
return stdout;
40+
}
41+
42+
public void setStdout(InputStream stdout) {
43+
this.stdout = stdout;
44+
}
45+
46+
public InputStream getStderr() {
47+
return stderr;
48+
}
49+
50+
public void setStderr(InputStream stderr) {
51+
this.stderr = stderr;
52+
}
53+
54+
public OutputStream getStdin() {
55+
return stdin;
56+
}
57+
58+
public void setStdin(OutputStream stdin) {
59+
this.stdin = stdin;
60+
}
61+
62+
/**
63+
* Capture the CLOSE intent and handle it accordingly.
64+
*
65+
* @param handler the handler that's invoked when someone intends to close this communication. Multiple handlers can be registered
66+
*/
67+
public void onClose(BiConsumer<Integer, Long> handler) {
68+
closeHandlers.add(handler);
69+
}
70+
71+
/**
72+
* Express an intent to close this communication. This intent will be relayed to all the registered handlers and it's up to them what
73+
* to do with it.
74+
* @param code proposed exit code
75+
* @param timeout time in milliseconds given to terminate this communication. Negative timeout means no timeout (i.e. wait for as long
76+
* as it takes). 0 means "stop it now".
77+
*/
78+
public void close(int code, long timeout) {
79+
closeHandlers.forEach(handler -> handler.accept(code, timeout));
80+
}
81+
}

0 commit comments

Comments
 (0)