Skip to content

Commit 0603720

Browse files
Chr031Cshawkins
authored
feat: LogWatch interface provides listeners on close stream event
* onClose callback for LogWatch * onClose callback for LogWatch - unit tests * Update CHANGELOG.md Issue #6880 referenced * Update LogWatchCallbackTest.java header * Check style fixes --------- Co-authored-by: C <[email protected]> Co-authored-by: Steven Hawkins <[email protected]>
1 parent 303a5c7 commit 0603720

File tree

4 files changed

+141
-4
lines changed

4 files changed

+141
-4
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#### Improvements
1010
* Fix #6863: ensuring SerialExecutor does not throw RejectedExecutionException to prevent unnecessary error logs
1111
* Fix #6763: (crd-generator) YAML output customization
12+
* Fix #6880: LogWatch interface provides listeners on close stream event
1213

1314
#### Dependency Upgrade
1415

kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl/LogWatch.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import java.io.Closeable;
1919
import java.io.InputStream;
2020
import java.io.OutputStream;
21+
import java.util.concurrent.CompletionStage;
2122

2223
public interface LogWatch extends Closeable {
2324

@@ -29,6 +30,15 @@ public interface LogWatch extends Closeable {
2930
*/
3031
InputStream getOutput();
3132

33+
/**
34+
* Returns a {@link CompletionStage} released when the log stream is closed.
35+
* If the stream is closed due to an exception (cf onFailure),
36+
* this exception will be passed as parameter, null otherwise
37+
*
38+
* @return a {@link CompletionStage} released when the log stream is closed
39+
*/
40+
CompletionStage<Throwable> onClose();
41+
3242
/**
3343
* Close the Watch.
3444
*/

kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/LogWatchCallback.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.nio.channels.Channels;
3333
import java.nio.channels.WritableByteChannel;
3434
import java.util.concurrent.CompletableFuture;
35+
import java.util.concurrent.CompletionStage;
3536
import java.util.concurrent.atomic.AtomicBoolean;
3637

3738
public class LogWatchCallback implements LogWatch, AutoCloseable {
@@ -44,6 +45,7 @@ public class LogWatchCallback implements LogWatch, AutoCloseable {
4445

4546
private final AtomicBoolean closed = new AtomicBoolean(false);
4647
private final CompletableFuture<AsyncBody> asyncBody = new CompletableFuture<>();
48+
private final CompletableFuture<Throwable> onCloseFuture = new CompletableFuture<>();
4749
private final SerialExecutor serialExecutor;
4850

4951
public LogWatchCallback(OutputStream out, OperationContext context) {
@@ -54,16 +56,22 @@ public LogWatchCallback(OutputStream out, OperationContext context) {
5456
this.serialExecutor = new SerialExecutor(context.getExecutor());
5557
}
5658

59+
@Override
60+
public CompletionStage<Throwable> onClose() {
61+
return onCloseFuture.minimalCompletionStage();
62+
}
63+
5764
@Override
5865
public void close() {
59-
cleanUp();
66+
cleanUp(null);
6067
}
6168

62-
private void cleanUp() {
69+
private void cleanUp(Throwable u) {
6370
if (!closed.compareAndSet(false, true)) {
6471
return;
6572
}
6673
asyncBody.thenAccept(AsyncBody::cancel);
74+
onCloseFuture.complete(u);
6775
serialExecutor.shutdownNow();
6876
}
6977

@@ -111,7 +119,7 @@ public LogWatchCallback callAndWait(HttpClient client, URL url) {
111119
if (t != null) {
112120
onFailure(t);
113121
} else {
114-
cleanUp();
122+
cleanUp(null);
115123
}
116124
}, serialExecutor));
117125
}
@@ -133,7 +141,7 @@ public void onFailure(Throwable u) {
133141
}
134142

135143
LOGGER.error("Log Callback Failure.", u);
136-
cleanUp();
144+
cleanUp(u);
137145
}
138146

139147
}
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
/*
2+
* Copyright (C) 2015 Red Hat, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.fabric8.kubernetes.client.dsl.internal;
18+
19+
import io.fabric8.kubernetes.client.http.AsyncBody;
20+
import io.fabric8.kubernetes.client.http.HttpClient;
21+
import io.fabric8.kubernetes.client.http.HttpRequest;
22+
import io.fabric8.kubernetes.client.http.HttpResponse;
23+
import io.fabric8.kubernetes.client.http.TestAsyncBody;
24+
import io.fabric8.kubernetes.client.http.TestHttpResponse;
25+
import io.fabric8.kubernetes.client.impl.BaseClient;
26+
import io.fabric8.kubernetes.client.utils.KubernetesSerialization;
27+
import org.junit.jupiter.api.BeforeEach;
28+
import org.junit.jupiter.api.Test;
29+
import org.mockito.Mockito;
30+
31+
import java.io.ByteArrayOutputStream;
32+
import java.net.HttpURLConnection;
33+
import java.net.MalformedURLException;
34+
import java.net.URL;
35+
import java.util.concurrent.CompletableFuture;
36+
import java.util.concurrent.CountDownLatch;
37+
import java.util.concurrent.Executor;
38+
import java.util.concurrent.Executors;
39+
import java.util.concurrent.TimeUnit;
40+
41+
import static org.assertj.core.api.Assertions.assertThat;
42+
import static org.mockito.Mockito.mock;
43+
import static org.mockito.Mockito.spy;
44+
import static org.mockito.Mockito.when;
45+
46+
class LogWatchCallbackTest {
47+
private OperationContext context;
48+
private Executor executor = Executors.newFixedThreadPool(2);
49+
private URL url;
50+
private HttpClient httpClientMock;
51+
52+
@BeforeEach
53+
void setUp() throws MalformedURLException {
54+
BaseClient mock = mock(BaseClient.class, Mockito.RETURNS_SELF);
55+
Mockito.when(mock.adapt(BaseClient.class).getKubernetesSerialization()).thenReturn(new KubernetesSerialization());
56+
final OperationContext operationContext = new OperationContext().withClient(mock);
57+
when(mock.getExecutor()).thenReturn(this.executor);
58+
this.context = operationContext;
59+
60+
this.url = new URL("http://url_called");
61+
this.httpClientMock = spy(HttpClient.class);
62+
var httpRequestMock = mock(HttpRequest.class);
63+
var builderMock = mock(HttpRequest.Builder.class);
64+
65+
Mockito.when(httpClientMock.newHttpRequestBuilder()).thenReturn(builderMock);
66+
Mockito.when(builderMock.url(url)).thenReturn(builderMock);
67+
Mockito.when(builderMock.build()).thenReturn(httpRequestMock);
68+
69+
}
70+
71+
@Test
72+
void withOutputStreamCloseEventTest() throws InterruptedException {
73+
74+
var future = new CompletableFuture<HttpResponse<AsyncBody>>();
75+
var reached = new CountDownLatch(1);
76+
77+
Mockito.when(httpClientMock.consumeBytes(Mockito.any(), Mockito.any())).thenReturn(future);
78+
79+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
80+
LogWatchCallback logWatch = new LogWatchCallback(baos, this.context);
81+
logWatch.callAndWait(httpClientMock, url);
82+
83+
logWatch.onClose().thenAccept((Throwable t) -> {
84+
reached.countDown();
85+
});
86+
future.complete(
87+
new TestHttpResponse<AsyncBody>().withCode(HttpURLConnection.HTTP_GONE).withBody(new TestAsyncBody()));
88+
89+
assertThat(reached.await(1, TimeUnit.SECONDS)).isTrue();
90+
logWatch.close();
91+
}
92+
93+
@Test
94+
void withOutputStreamCloseEventOnFailureTest() throws InterruptedException {
95+
96+
var future = new CompletableFuture<HttpResponse<AsyncBody>>();
97+
var reached = new CountDownLatch(1);
98+
99+
Mockito.when(httpClientMock.consumeBytes(Mockito.any(), Mockito.any())).thenReturn(future);
100+
101+
LogWatchCallback logWatch = new LogWatchCallback(new ByteArrayOutputStream(), this.context);
102+
logWatch.callAndWait(httpClientMock, url);
103+
104+
final Throwable[] tReturned = new Throwable[1];
105+
logWatch.onClose().thenAccept((Throwable t) -> {
106+
tReturned[0] = t;
107+
reached.countDown();
108+
});
109+
110+
var th = new Throwable("any exception");
111+
future.completeExceptionally(th);
112+
113+
assertThat(reached.await(1, TimeUnit.SECONDS)).isTrue();
114+
assertThat(tReturned[0]).isEqualTo(th);
115+
116+
logWatch.close();
117+
}
118+
}

0 commit comments

Comments
 (0)