Skip to content

Commit b924512

Browse files
author
C
committed
onClose callback for LogWatch
1 parent 0a2c2c4 commit b924512

File tree

2 files changed

+31
-5
lines changed
  • kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl
  • kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal

2 files changed

+31
-5
lines changed

kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl/LogWatch.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
import java.io.Closeable;
1919
import java.io.InputStream;
2020
import java.io.OutputStream;
21+
import java.util.concurrent.CompletionStage;
22+
2123

2224
public interface LogWatch extends Closeable {
2325

@@ -29,9 +31,21 @@ public interface LogWatch extends Closeable {
2931
*/
3032
InputStream getOutput();
3133

34+
35+
/**
36+
* Returns a {@link CompletionStage} released when the log stream is closed.
37+
* If the stream is closed due to an exception (cf onFailure),
38+
* this exception will be passed as parameter, null otherwise
39+
*
40+
* @return a {@link CompletionStage} released when the log stream is closed
41+
*/
42+
CompletionStage<Throwable> onClose();
43+
3244
/**
3345
* Close the Watch.
3446
*/
3547
@Override
3648
void close();
49+
50+
3751
}

kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/LogWatchCallback.java

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,9 @@
3232
import java.nio.channels.Channels;
3333
import java.nio.channels.WritableByteChannel;
3434
import java.util.concurrent.CompletableFuture;
35+
import java.util.concurrent.CompletionStage;
3536
import java.util.concurrent.atomic.AtomicBoolean;
37+
import java.util.function.Consumer;
3638

3739
public class LogWatchCallback implements LogWatch, AutoCloseable {
3840

@@ -41,9 +43,11 @@ public class LogWatchCallback implements LogWatch, AutoCloseable {
4143
private final OutputStream out;
4244
private WritableByteChannel outChannel;
4345
private volatile InputStream output;
46+
4447

4548
private final AtomicBoolean closed = new AtomicBoolean(false);
4649
private final CompletableFuture<AsyncBody> asyncBody = new CompletableFuture<>();
50+
private final CompletableFuture<Throwable> onCloseFuture = new CompletableFuture<>();
4751
private final SerialExecutor serialExecutor;
4852

4953
public LogWatchCallback(OutputStream out, OperationContext context) {
@@ -53,18 +57,26 @@ public LogWatchCallback(OutputStream out, OperationContext context) {
5357
}
5458
this.serialExecutor = new SerialExecutor(context.getExecutor());
5559
}
60+
61+
@Override
62+
public CompletionStage<Throwable> onClose() {
63+
return onCloseFuture.minimalCompletionStage();
64+
}
5665

5766
@Override
5867
public void close() {
59-
cleanUp();
68+
cleanUp(null);
6069
}
6170

62-
private void cleanUp() {
71+
private void cleanUp(Throwable u) {
6372
if (!closed.compareAndSet(false, true)) {
6473
return;
6574
}
75+
6676
asyncBody.thenAccept(AsyncBody::cancel);
77+
onCloseFuture.complete(u);
6778
serialExecutor.shutdownNow();
79+
6880
}
6981

7082
public LogWatchCallback callAndWait(HttpClient client, URL url) {
@@ -111,7 +123,7 @@ public LogWatchCallback callAndWait(HttpClient client, URL url) {
111123
if (t != null) {
112124
onFailure(t);
113125
} else {
114-
cleanUp();
126+
cleanUp(null);
115127
}
116128
}, serialExecutor));
117129
}
@@ -131,9 +143,9 @@ public void onFailure(Throwable u) {
131143
if (closed.get()) {
132144
return;
133145
}
134-
146+
135147
LOGGER.error("Log Callback Failure.", u);
136-
cleanUp();
148+
cleanUp(u);
137149
}
138150

139151
}

0 commit comments

Comments
 (0)