Skip to content

Commit 5f9533a

Browse files
author
C
committed
onClose callback for LogWatch
1 parent af15556 commit 5f9533a

File tree

2 files changed

+30
-5
lines changed
  • kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl
  • kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal

2 files changed

+30
-5
lines changed

kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl/LogWatch.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
import java.io.Closeable;
1919
import java.io.InputStream;
2020
import java.io.OutputStream;
21+
import java.util.concurrent.CompletionStage;
22+
2123

2224
public interface LogWatch extends Closeable {
2325

@@ -29,9 +31,21 @@ public interface LogWatch extends Closeable {
2931
*/
3032
InputStream getOutput();
3133

34+
35+
/**
36+
* Returns a {@link CompletionStage} released when the log stream is closed.
37+
* If the stream is closed due to an exception (cf onFailure),
38+
* this exception will be passed as parameter, null otherwise
39+
*
40+
* @return a {@link CompletionStage} released when the log stream is closed
41+
*/
42+
CompletionStage<Throwable> onClose();
43+
3244
/**
3345
* Close the Watch.
3446
*/
3547
@Override
3648
void close();
49+
50+
3751
}

kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/LogWatchCallback.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.nio.channels.Channels;
3333
import java.nio.channels.WritableByteChannel;
3434
import java.util.concurrent.CompletableFuture;
35+
import java.util.concurrent.CompletionStage;
3536
import java.util.concurrent.atomic.AtomicBoolean;
3637

3738
public class LogWatchCallback implements LogWatch, AutoCloseable {
@@ -41,9 +42,11 @@ public class LogWatchCallback implements LogWatch, AutoCloseable {
4142
private final OutputStream out;
4243
private WritableByteChannel outChannel;
4344
private volatile InputStream output;
45+
4446

4547
private final AtomicBoolean closed = new AtomicBoolean(false);
4648
private final CompletableFuture<AsyncBody> asyncBody = new CompletableFuture<>();
49+
private final CompletableFuture<Throwable> onCloseFuture = new CompletableFuture<>();
4750
private final SerialExecutor serialExecutor;
4851

4952
public LogWatchCallback(OutputStream out, OperationContext context) {
@@ -53,18 +56,26 @@ public LogWatchCallback(OutputStream out, OperationContext context) {
5356
}
5457
this.serialExecutor = new SerialExecutor(context.getExecutor());
5558
}
59+
60+
@Override
61+
public CompletionStage<Throwable> onClose() {
62+
return onCloseFuture.minimalCompletionStage();
63+
}
5664

5765
@Override
5866
public void close() {
59-
cleanUp();
67+
cleanUp(null);
6068
}
6169

62-
private void cleanUp() {
70+
private void cleanUp(Throwable u) {
6371
if (!closed.compareAndSet(false, true)) {
6472
return;
6573
}
74+
6675
asyncBody.thenAccept(AsyncBody::cancel);
76+
onCloseFuture.complete(u);
6777
serialExecutor.shutdownNow();
78+
6879
}
6980

7081
public LogWatchCallback callAndWait(HttpClient client, URL url) {
@@ -111,7 +122,7 @@ public LogWatchCallback callAndWait(HttpClient client, URL url) {
111122
if (t != null) {
112123
onFailure(t);
113124
} else {
114-
cleanUp();
125+
cleanUp(null);
115126
}
116127
}, serialExecutor));
117128
}
@@ -131,9 +142,9 @@ public void onFailure(Throwable u) {
131142
if (closed.get()) {
132143
return;
133144
}
134-
145+
135146
LOGGER.error("Log Callback Failure.", u);
136-
cleanUp();
147+
cleanUp(u);
137148
}
138149

139150
}

0 commit comments

Comments
 (0)