Skip to content

Commit 176c73d

Browse files
author
C
committed
onClose callback for LogWatch
1 parent af15556 commit 176c73d

File tree

2 files changed

+22
-4
lines changed
  • kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl
  • kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal

2 files changed

+22
-4
lines changed

kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl/LogWatch.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import java.io.Closeable;
1919
import java.io.InputStream;
2020
import java.io.OutputStream;
21+
import java.util.concurrent.CompletionStage;
2122

2223
public interface LogWatch extends Closeable {
2324

@@ -29,6 +30,15 @@ public interface LogWatch extends Closeable {
2930
*/
3031
InputStream getOutput();
3132

33+
/**
34+
* Returns a {@link CompletionStage} released when the log stream is closed.
35+
* If the stream is closed due to an exception (cf onFailure),
36+
* this exception will be passed as parameter, null otherwise
37+
*
38+
* @return a {@link CompletionStage} released when the log stream is closed
39+
*/
40+
CompletionStage<Throwable> onClose();
41+
3242
/**
3343
* Close the Watch.
3444
*/

kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/LogWatchCallback.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.nio.channels.Channels;
3333
import java.nio.channels.WritableByteChannel;
3434
import java.util.concurrent.CompletableFuture;
35+
import java.util.concurrent.CompletionStage;
3536
import java.util.concurrent.atomic.AtomicBoolean;
3637

3738
public class LogWatchCallback implements LogWatch, AutoCloseable {
@@ -44,6 +45,7 @@ public class LogWatchCallback implements LogWatch, AutoCloseable {
4445

4546
private final AtomicBoolean closed = new AtomicBoolean(false);
4647
private final CompletableFuture<AsyncBody> asyncBody = new CompletableFuture<>();
48+
private final CompletableFuture<Throwable> onCloseFuture = new CompletableFuture<>();
4749
private final SerialExecutor serialExecutor;
4850

4951
public LogWatchCallback(OutputStream out, OperationContext context) {
@@ -54,16 +56,22 @@ public LogWatchCallback(OutputStream out, OperationContext context) {
5456
this.serialExecutor = new SerialExecutor(context.getExecutor());
5557
}
5658

59+
@Override
60+
public CompletionStage<Throwable> onClose() {
61+
return onCloseFuture.minimalCompletionStage();
62+
}
63+
5764
@Override
5865
public void close() {
59-
cleanUp();
66+
cleanUp(null);
6067
}
6168

62-
private void cleanUp() {
69+
private void cleanUp(Throwable u) {
6370
if (!closed.compareAndSet(false, true)) {
6471
return;
6572
}
6673
asyncBody.thenAccept(AsyncBody::cancel);
74+
onCloseFuture.complete(u);
6775
serialExecutor.shutdownNow();
6876
}
6977

@@ -111,7 +119,7 @@ public LogWatchCallback callAndWait(HttpClient client, URL url) {
111119
if (t != null) {
112120
onFailure(t);
113121
} else {
114-
cleanUp();
122+
cleanUp(null);
115123
}
116124
}, serialExecutor));
117125
}
@@ -133,7 +141,7 @@ public void onFailure(Throwable u) {
133141
}
134142

135143
LOGGER.error("Log Callback Failure.", u);
136-
cleanUp();
144+
cleanUp(u);
137145
}
138146

139147
}

0 commit comments

Comments
 (0)