3232import java .nio .channels .Channels ;
3333import java .nio .channels .WritableByteChannel ;
3434import java .util .concurrent .CompletableFuture ;
35+ import java .util .concurrent .CompletionStage ;
3536import java .util .concurrent .atomic .AtomicBoolean ;
3637
3738public class LogWatchCallback implements LogWatch , AutoCloseable {
@@ -44,6 +45,7 @@ public class LogWatchCallback implements LogWatch, AutoCloseable {
4445
4546 private final AtomicBoolean closed = new AtomicBoolean (false );
4647 private final CompletableFuture <AsyncBody > asyncBody = new CompletableFuture <>();
48+ private final CompletableFuture <Throwable > onCloseFuture = new CompletableFuture <>();
4749 private final SerialExecutor serialExecutor ;
4850
4951 public LogWatchCallback (OutputStream out , OperationContext context ) {
@@ -54,16 +56,22 @@ public LogWatchCallback(OutputStream out, OperationContext context) {
5456 this .serialExecutor = new SerialExecutor (context .getExecutor ());
5557 }
5658
59+ @ Override
60+ public CompletionStage <Throwable > onClose () {
61+ return onCloseFuture .minimalCompletionStage ();
62+ }
63+
5764 @ Override
5865 public void close () {
59- cleanUp ();
66+ cleanUp (null );
6067 }
6168
62- private void cleanUp () {
69+ private void cleanUp (Throwable u ) {
6370 if (!closed .compareAndSet (false , true )) {
6471 return ;
6572 }
6673 asyncBody .thenAccept (AsyncBody ::cancel );
74+ onCloseFuture .complete (u );
6775 serialExecutor .shutdownNow ();
6876 }
6977
@@ -111,7 +119,7 @@ public LogWatchCallback callAndWait(HttpClient client, URL url) {
111119 if (t != null ) {
112120 onFailure (t );
113121 } else {
114- cleanUp ();
122+ cleanUp (null );
115123 }
116124 }, serialExecutor ));
117125 }
@@ -133,7 +141,7 @@ public void onFailure(Throwable u) {
133141 }
134142
135143 LOGGER .error ("Log Callback Failure." , u );
136- cleanUp ();
144+ cleanUp (u );
137145 }
138146
139147}
0 commit comments