From 05c810f1225c53ae3a4ed0726e94ad1435f1e2b4 Mon Sep 17 00:00:00 2001 From: Steve Hawkins Date: Wed, 10 May 2023 16:19:06 -0400 Subject: [PATCH 1/2] fix #4154: adding a callback for stream consumption --- .../kubernetes/client/StreamConsumer.java | 66 ++++++++++++++++ .../kubernetes/client/dsl/Loggable.java | 8 +- .../client/dsl/TtyExecErrorable.java | 8 +- .../client/dsl/TtyExecOutputErrorable.java | 8 +- .../kubernetes/client/http/BufferUtil.java | 4 +- .../kubernetes/client/http/WebSocket.java | 2 + .../dsl/internal/ExecWebSocketListener.java | 54 +++++++------ .../client/dsl/internal/LogWatchCallback.java | 79 +++++++++---------- .../dsl/internal/PodOperationContext.java | 10 ++- .../apps/v1/DeploymentOperationsImpl.java | 52 +----------- .../apps/v1/ReplicaSetOperationsImpl.java | 36 +-------- .../v1/RollableScalableResourceOperation.java | 26 ++++++ .../apps/v1/StatefulSetOperationsImpl.java | 36 +-------- .../internal/batch/v1/JobOperationsImpl.java | 6 +- .../internal/core/v1/PodOperationsImpl.java | 30 +++---- .../ReplicationControllerOperationsImpl.java | 37 +-------- .../v1beta1/DeploymentOperationsImpl.java | 52 +----------- .../v1beta1/ReplicaSetOperationsImpl.java | 41 +--------- .../utils/internal/PodOperationUtil.java | 16 ++-- .../utils/internal/PodOperationUtilTest.java | 20 ++--- .../apps/DeploymentConfigOperationsImpl.java | 6 +- .../internal/build/BuildOperationsImpl.java | 6 +- 22 files changed, 234 insertions(+), 369 deletions(-) create mode 100644 kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/StreamConsumer.java diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/StreamConsumer.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/StreamConsumer.java new file mode 100644 index 00000000000..8a8691324b7 --- /dev/null +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/StreamConsumer.java @@ -0,0 +1,66 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.fabric8.kubernetes.client; + +import java.io.OutputStream; +import java.io.PipedInputStream; +import java.io.PipedOutputStream; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; +import java.util.concurrent.CompletionStage; + +public interface StreamConsumer { + + public static StreamConsumer newStreamConsumer(OutputStream os) { + if (os == null) { + return null; + } + checkForPiped(os); + return newBlockingStreamConsumer(Channels.newChannel(os)); + } + + public static StreamConsumer newBlockingStreamConsumer(WritableByteChannel channel) { + if (channel == null) { + return null; + } + return buffer -> { + int remaining = buffer.remaining(); + if (channel.write(buffer) != remaining) { + throw new KubernetesClientException("Unsucessful blocking write"); + } + return null; + }; + } + + public static void checkForPiped(Object object) { + if (object instanceof PipedOutputStream || object instanceof PipedInputStream) { + throw new KubernetesClientException("Piped streams should not be used"); + } + } + + /** + * A callback for consuming a stream as a series of {@link ByteBuffer}s + * + * @param buffer + * @return a {@link CompletionStage} that is completed when the buffer has been fully consumed, + * or null if it was already consumed + * @throws Exception + */ + CompletionStage consume(ByteBuffer buffer) throws Exception; + +} diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl/Loggable.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl/Loggable.java index 92fd9b1d64c..d66f2ab526d 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl/Loggable.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl/Loggable.java @@ -16,6 +16,8 @@ package io.fabric8.kubernetes.client.dsl; +import io.fabric8.kubernetes.client.StreamConsumer; + import java.io.InputStream; import java.io.OutputStream; import java.io.PipedOutputStream; @@ -70,7 +72,11 @@ public interface Loggable { * @param out {@link OutputStream} for storing logs * @return returns a Closeable interface for log watch */ - LogWatch watchLog(OutputStream out); + default LogWatch watchLog(OutputStream out) { + return watchLog(StreamConsumer.newStreamConsumer(out), true); + } + + LogWatch watchLog(StreamConsumer consumer, boolean blocking); /** * While waiting for Pod logs, how long shall we wait until a Pod diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl/TtyExecErrorable.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl/TtyExecErrorable.java index bab2f620937..b447a2f461f 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl/TtyExecErrorable.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl/TtyExecErrorable.java @@ -15,6 +15,8 @@ */ package io.fabric8.kubernetes.client.dsl; +import io.fabric8.kubernetes.client.StreamConsumer; + import java.io.InputStream; import java.io.OutputStream; import java.io.PipedOutputStream; @@ -27,7 +29,11 @@ public interface TtyExecErrorable extends *

* In particular do no use a {@link PipedOutputStream} - use {@link #redirectingError()} instead */ - TtyExecErrorChannelable writingError(OutputStream in); + default TtyExecErrorChannelable writingError(OutputStream in) { + return writingError(StreamConsumer.newStreamConsumer(in), true); + } + + TtyExecErrorChannelable writingError(StreamConsumer consumer, boolean blocking); /** * If the {@link ExecWatch} should terminate when a stdErr message is received. diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl/TtyExecOutputErrorable.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl/TtyExecOutputErrorable.java index cdf9d0cd3ad..50e9b896078 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl/TtyExecOutputErrorable.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl/TtyExecOutputErrorable.java @@ -15,6 +15,8 @@ */ package io.fabric8.kubernetes.client.dsl; +import io.fabric8.kubernetes.client.StreamConsumer; + import java.io.InputStream; import java.io.OutputStream; import java.io.PipedOutputStream; @@ -27,7 +29,11 @@ public interface TtyExecOutputErrorable extends *

* In particular do no use a {@link PipedOutputStream} - use {@link #redirectingOutput()} instead */ - TtyExecErrorable writingOutput(OutputStream in); + default TtyExecErrorable writingOutput(OutputStream in) { + return writingOutput(StreamConsumer.newStreamConsumer(in), true); + } + + TtyExecErrorable writingOutput(StreamConsumer consumer, boolean blocking); /** * Will provide an {@link InputStream} via {@link ExecWatch#getOutput()} diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/BufferUtil.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/BufferUtil.java index ca68d180c9e..58ea0b555ac 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/BufferUtil.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/BufferUtil.java @@ -76,14 +76,14 @@ public static ByteBuffer copy(ByteBuffer buffer) { /** * Very rudimentary method to check if the provided ByteBuffer contains text. - * + * * @return true if the buffer contains text, false otherwise. */ public static boolean isPlainText(ByteBuffer originalBuffer) { if (originalBuffer == null) { return false; } - final ByteBuffer buffer = copy(originalBuffer); + final ByteBuffer buffer = originalBuffer.asReadOnlyBuffer(); final CharsetDecoder decoder = StandardCharsets.UTF_8.newDecoder(); try { decoder.decode(buffer); diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/WebSocket.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/WebSocket.java index f57fc82ecdf..98412f3f5b5 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/WebSocket.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/WebSocket.java @@ -42,6 +42,8 @@ default void onMessage(WebSocket webSocket, String text) { /** * Called once the full binary message has been built. {@link WebSocket#request()} must * be called to receive more messages. + * + * @param bytes which will not further used nor modified by the {@link HttpClient} */ default void onMessage(WebSocket webSocket, ByteBuffer bytes) { webSocket.request(); diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/ExecWebSocketListener.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/ExecWebSocketListener.java index 6ddcad7934f..90c61dc26e5 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/ExecWebSocketListener.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/ExecWebSocketListener.java @@ -20,6 +20,7 @@ import io.fabric8.kubernetes.api.model.Status; import io.fabric8.kubernetes.api.model.StatusCause; import io.fabric8.kubernetes.client.KubernetesClientException; +import io.fabric8.kubernetes.client.StreamConsumer; import io.fabric8.kubernetes.client.dsl.ExecListener; import io.fabric8.kubernetes.client.dsl.ExecListener.Response; import io.fabric8.kubernetes.client.dsl.ExecWatch; @@ -37,8 +38,6 @@ import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; -import java.nio.channels.Channels; -import java.nio.channels.WritableByteChannel; import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.HashMap; @@ -51,6 +50,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; /** * A {@link WebSocket.Listener} for exec operations. @@ -106,7 +106,7 @@ public ListenerStream(String name) { this.name = name; } - private void handle(ByteBuffer byteString, WebSocket webSocket) throws IOException { + private void handle(ByteBuffer byteString, WebSocket webSocket) throws Exception { if (handler != null) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("exec message received {} bytes on channel {}", byteString.remaining(), name); @@ -178,37 +178,46 @@ private ListenerStream createStream(String name, StreamContext streamContext) { if (streamContext == null) { return stream; } - OutputStream os = streamContext.getOutputStream(); - if (os == null) { + StreamConsumer consumer = streamContext.getConsumer(); + if (consumer == null) { // redirecting stream.inputStream = new ExecWatchInputStream(() -> this.webSocketRef.get().request()); this.exitCode.whenComplete(stream.inputStream::onExit); stream.handler = b -> stream.inputStream.consume(Arrays.asList(b)); } else { - WritableByteChannel channel = Channels.newChannel(os); - stream.handler = b -> asyncWrite(channel, b); + stream.handler = b -> consume(consumer, b, streamContext.isBlocking() ? serialExecutor : Runnable::run, + this::postConsume); } return stream; } - private void asyncWrite(WritableByteChannel channel, ByteBuffer b) { - CompletableFuture.runAsync(() -> { + public static void consume(StreamConsumer consumer, ByteBuffer bytes, Executor executor, Consumer postConsume) { + CompletableFuture.supplyAsync(() -> { try { - channel.write(b); - } catch (IOException e) { + return consumer.consume(bytes); + } catch (Exception e) { throw KubernetesClientException.launderThrowable(e); } - }, serialExecutor).whenComplete((v, t) -> { - webSocketRef.get().request(); - if (t != null) { - if (closed.get()) { - LOGGER.debug("Stream write failed after close", t); - } else { - // This could happen if the user simply closes their stream prior to completion - LOGGER.warn("Stream write failed", t); - } + }, executor) + .whenComplete((cs, t) -> { + if (cs != null) { + cs.whenComplete((v, t1) -> postConsume.accept(t1)); + } else { + postConsume.accept(t); + } + }); + } + + private void postConsume(Throwable t) { + webSocketRef.get().request(); + if (t != null) { + if (closed.get()) { + LOGGER.debug("Stream write failed after close", t); + } else { + // This could happen if the user simply closes their stream prior to completion + LOGGER.warn("Stream write failed", t); } - }); + } } @Override @@ -299,6 +308,7 @@ public void onError(WebSocket webSocket, Throwable t, boolean connectionError) { @Override public void onMessage(WebSocket webSocket, String text) { LOGGER.debug("Exec Web Socket: onMessage(String)"); + // this is unexpected and will likely just result in an exception onMessage(webSocket, ByteBuffer.wrap(text.getBytes(StandardCharsets.UTF_8))); } @@ -337,7 +347,7 @@ public void onMessage(WebSocket webSocket, ByteBuffer bytes) { default: throw new IOException("Unknown stream ID " + streamID); } - } catch (IOException e) { + } catch (Exception e) { throw KubernetesClientException.launderThrowable(e); } finally { if (close) { diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/LogWatchCallback.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/LogWatchCallback.java index 2ca97a9289f..0657b747c30 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/LogWatchCallback.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/LogWatchCallback.java @@ -17,17 +17,17 @@ import io.fabric8.kubernetes.client.KubernetesClientException; import io.fabric8.kubernetes.client.RequestConfigBuilder; +import io.fabric8.kubernetes.client.StreamConsumer; import io.fabric8.kubernetes.client.dsl.LogWatch; import io.fabric8.kubernetes.client.http.AsyncBody; +import io.fabric8.kubernetes.client.http.BufferUtil; import io.fabric8.kubernetes.client.http.HttpClient; import io.fabric8.kubernetes.client.http.HttpRequest; import io.fabric8.kubernetes.client.utils.internal.SerialExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.io.InputStream; -import java.io.OutputStream; import java.net.URL; import java.nio.ByteBuffer; import java.nio.channels.Channels; @@ -41,8 +41,8 @@ public class LogWatchCallback implements LogWatch, AutoCloseable { private static final Logger LOGGER = LoggerFactory.getLogger(LogWatchCallback.class); - private final OutputStream out; - private WritableByteChannel outChannel; + private final StreamConsumer consumer; + private final boolean blocking; private final OperationContext context; private volatile InputStream output; @@ -50,11 +50,9 @@ public class LogWatchCallback implements LogWatch, AutoCloseable { private final CompletableFuture asyncBody = new CompletableFuture<>(); private final SerialExecutor serialExecutor; - public LogWatchCallback(OutputStream out, OperationContext context) { - this.out = out; - if (out != null) { - outChannel = Channels.newChannel(out); - } + public LogWatchCallback(StreamConsumer consumer, boolean blocking, OperationContext context) { + this.consumer = consumer; + this.blocking = blocking; this.context = context; this.serialExecutor = new SerialExecutor(context.getExecutor()); } @@ -79,7 +77,7 @@ public LogWatchCallback callAndWait(HttpClient client, URL url) { .withRequestTimeout(0).build()) .readTimeout(0, TimeUnit.MILLISECONDS).build(); - if (out == null) { + if (consumer == null) { // we can pass the input stream directly to the consumer clone.sendAsync(request, InputStream.class).whenComplete((r, e) -> { if (e != null) { @@ -91,45 +89,40 @@ public LogWatchCallback callAndWait(HttpClient client, URL url) { }).join(); } else { // we need to write the bytes to the given output - // we don't know if the write will be blocking, so hand it off to another thread - clone.consumeBytes(request, (buffers, a) -> CompletableFuture.runAsync(() -> { - for (ByteBuffer byteBuffer : buffers) { - try { - outChannel.write(byteBuffer); - } catch (IOException e1) { - throw KubernetesClientException.launderThrowable(e1); - } - } - }, serialExecutor).whenComplete((v, t) -> { - if (t != null) { - a.cancel(); - onFailure(t); - } else if (!closed.get()) { - a.consume(); - } else { - a.cancel(); - } - })).whenComplete((a, e) -> { - if (e != null) { - onFailure(e); - } - if (a != null) { - asyncBody.complete(a.body()); - a.body().consume(); - a.body().done().whenComplete((v, t) -> CompletableFuture.runAsync(() -> { - if (t != null) { - onFailure(t); - } else { - cleanUp(); + clone.consumeBytes(request, (buffers, a) -> ExecWebSocketListener.consume(consumer, + ByteBuffer.wrap(BufferUtil.toArray(buffers)), blocking ? serialExecutor : Runnable::run, (t) -> postConsume(a, t))) + .whenComplete((a, e) -> { + if (e != null) { + onFailure(e); } - }, serialExecutor)); - } - }); + if (a != null) { + asyncBody.complete(a.body()); + a.body().consume(); + a.body().done().whenComplete((v, t) -> CompletableFuture.runAsync(() -> { + if (t != null) { + onFailure(t); + } else { + cleanUp(); + } + }, serialExecutor)); + } + }); } return this; } + private void postConsume(AsyncBody a, Throwable t) { + if (t != null) { + a.cancel(); + onFailure(t); + } else if (!closed.get()) { + a.consume(); + } else { + a.cancel(); + } + } + @Override public InputStream getOutput() { return output; diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/PodOperationContext.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/PodOperationContext.java index 2c619155c48..13374da00ff 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/PodOperationContext.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/PodOperationContext.java @@ -16,6 +16,7 @@ package io.fabric8.kubernetes.client.dsl.internal; import io.fabric8.kubernetes.client.KubernetesClientException; +import io.fabric8.kubernetes.client.StreamConsumer; import io.fabric8.kubernetes.client.dsl.ExecListener; import io.fabric8.kubernetes.client.utils.URLUtils.URLBuilder; import lombok.AllArgsConstructor; @@ -24,7 +25,6 @@ import lombok.NoArgsConstructor; import java.io.InputStream; -import java.io.OutputStream; @Builder(toBuilder = true) @NoArgsConstructor @@ -34,10 +34,12 @@ public class PodOperationContext { @Getter public static final class StreamContext { - private OutputStream outputStream; + private StreamConsumer consumer; + private boolean blocking; - public StreamContext(OutputStream outputStream) { - this.outputStream = outputStream; + public StreamContext(StreamConsumer consumer, boolean blocking) { + this.consumer = consumer; + this.blocking = blocking; } public StreamContext() { diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/apps/v1/DeploymentOperationsImpl.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/apps/v1/DeploymentOperationsImpl.java index 5f88b9efd52..cbc0e238f2a 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/apps/v1/DeploymentOperationsImpl.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/apps/v1/DeploymentOperationsImpl.java @@ -25,7 +25,6 @@ import io.fabric8.kubernetes.api.model.extensions.DeploymentRollback; import io.fabric8.kubernetes.client.Client; import io.fabric8.kubernetes.client.dsl.BytesLimitTerminateTimeTailPrettyLoggable; -import io.fabric8.kubernetes.client.dsl.LogWatch; import io.fabric8.kubernetes.client.dsl.Loggable; import io.fabric8.kubernetes.client.dsl.PrettyLoggable; import io.fabric8.kubernetes.client.dsl.RollableScalableResource; @@ -39,14 +38,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.Reader; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.concurrent.TimeUnit; public class DeploymentOperationsImpl @@ -123,16 +118,7 @@ public Deployment undo() { } @Override - public String getLog(boolean isPretty) { - StringBuilder stringBuilder = new StringBuilder(); - List> rcList = doGetLog(); - for (RollableScalableResource rcOperation : rcList) { - stringBuilder.append(rcOperation.getLog(isPretty)); - } - return stringBuilder.toString(); - } - - private List> doGetLog() { + protected List> doGetLog() { List> rcs = new ArrayList<>(); Deployment deployment = requireFromServer(); String rcUid = deployment.getMetadata().getUid(); @@ -150,42 +136,6 @@ private List> doGetLog() { return rcs; } - /** - * Returns an unclosed Reader. It's the caller responsibility to close it. - * - * @return Reader - */ - @Override - public Reader getLogReader() { - return findFirstPodResource().map(Loggable::getLogReader).orElse(null); - } - - /** - * Returns an unclosed InputStream. It's the caller responsibility to close it. - * - * @return InputStream - */ - @Override - public InputStream getLogInputStream() { - return findFirstPodResource().map(Loggable::getLogInputStream).orElse(null); - } - - @Override - public LogWatch watchLog(OutputStream out) { - return findFirstPodResource().map(it -> it.watchLog(out)).orElse(null); - } - - private Optional> findFirstPodResource() { - List> podResources = doGetLog(); - if (!podResources.isEmpty()) { - if (podResources.size() > 1) { - LOG.debug("Found {} pods, Using first one to get log", podResources.size()); - } - return Optional.of(podResources.get(0)); - } - return Optional.empty(); - } - private ReplicaSetList getReplicaSetListForDeployment(Deployment deployment) { return new ReplicaSetOperationsImpl(context.getClient()).inNamespace(getNamespace()) .withLabels(deployment.getSpec().getSelector().getMatchLabels()).list(); diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/apps/v1/ReplicaSetOperationsImpl.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/apps/v1/ReplicaSetOperationsImpl.java index 47a951f47c5..a3e7df86982 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/apps/v1/ReplicaSetOperationsImpl.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/apps/v1/ReplicaSetOperationsImpl.java @@ -23,7 +23,6 @@ import io.fabric8.kubernetes.client.Client; import io.fabric8.kubernetes.client.KubernetesClientException; import io.fabric8.kubernetes.client.dsl.BytesLimitTerminateTimeTailPrettyLoggable; -import io.fabric8.kubernetes.client.dsl.LogWatch; import io.fabric8.kubernetes.client.dsl.Loggable; import io.fabric8.kubernetes.client.dsl.PodResource; import io.fabric8.kubernetes.client.dsl.PrettyLoggable; @@ -36,9 +35,6 @@ import io.fabric8.kubernetes.client.dsl.internal.PodOperationContext; import io.fabric8.kubernetes.client.utils.internal.PodOperationUtil; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.Reader; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -81,43 +77,13 @@ public Status rollback(DeploymentRollback deploymentRollback) { } @Override - public String getLog(boolean isPretty) { - return PodOperationUtil - .getLog(new ReplicaSetOperationsImpl(rollingOperationContext.withPrettyOutput(isPretty), context).doGetLog(), isPretty); - } - - private List doGetLog() { + public List doGetLog() { ReplicaSet replicaSet = requireFromServer(); return PodOperationUtil.getPodOperationsForController(context, rollingOperationContext, replicaSet.getMetadata().getUid(), getReplicaSetSelectorLabels(replicaSet)); } - /** - * Returns an unclosed Reader. It's the caller responsibility to close it. - * - * @return Reader - */ - @Override - public Reader getLogReader() { - return PodOperationUtil.getLogReader(doGetLog()); - } - - /** - * Returns an unclosed InputStream. It's the caller responsibility to close it. - * - * @return InputStream - */ - @Override - public InputStream getLogInputStream() { - return PodOperationUtil.getLogInputStream(doGetLog()); - } - - @Override - public LogWatch watchLog(OutputStream out) { - return PodOperationUtil.watchLog(doGetLog(), out); - } - static Map getReplicaSetSelectorLabels(ReplicaSet replicaSet) { Map labels = new HashMap<>(); diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/apps/v1/RollableScalableResourceOperation.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/apps/v1/RollableScalableResourceOperation.java index fcd08739465..f497a1c0129 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/apps/v1/RollableScalableResourceOperation.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/apps/v1/RollableScalableResourceOperation.java @@ -19,6 +19,7 @@ import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.KubernetesResourceList; import io.fabric8.kubernetes.client.KubernetesClientException; +import io.fabric8.kubernetes.client.StreamConsumer; import io.fabric8.kubernetes.client.dsl.LogWatch; import io.fabric8.kubernetes.client.dsl.Loggable; import io.fabric8.kubernetes.client.dsl.Resource; @@ -28,7 +29,10 @@ import io.fabric8.kubernetes.client.dsl.internal.OperationContext; import io.fabric8.kubernetes.client.dsl.internal.PodOperationContext; import io.fabric8.kubernetes.client.utils.Serialization; +import io.fabric8.kubernetes.client.utils.internal.PodOperationUtil; +import java.io.InputStream; +import java.io.Reader; import java.util.Collections; import java.util.List; import java.util.Map; @@ -176,4 +180,26 @@ public T undo() { throw new KubernetesClientException(context.getPlural() + " undo is not supported"); } + protected abstract List doGetLog(); + + @Override + public String getLog(boolean isPretty) { + return PodOperationUtil.getLog(doGetLog(), isPretty); + } + + @Override + public Reader getLogReader() { + return PodOperationUtil.getLogReader(doGetLog()); + } + + @Override + public InputStream getLogInputStream() { + return PodOperationUtil.getLogInputStream(doGetLog()); + } + + @Override + public LogWatch watchLog(StreamConsumer consumer, boolean blocking) { + return PodOperationUtil.watchLog(doGetLog(), consumer, blocking); + } + } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/apps/v1/StatefulSetOperationsImpl.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/apps/v1/StatefulSetOperationsImpl.java index 5d82ebba514..6099c0000d5 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/apps/v1/StatefulSetOperationsImpl.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/apps/v1/StatefulSetOperationsImpl.java @@ -25,7 +25,6 @@ import io.fabric8.kubernetes.client.Client; import io.fabric8.kubernetes.client.KubernetesClientException; import io.fabric8.kubernetes.client.dsl.BytesLimitTerminateTimeTailPrettyLoggable; -import io.fabric8.kubernetes.client.dsl.LogWatch; import io.fabric8.kubernetes.client.dsl.Loggable; import io.fabric8.kubernetes.client.dsl.PodResource; import io.fabric8.kubernetes.client.dsl.PrettyLoggable; @@ -41,9 +40,6 @@ import io.fabric8.kubernetes.client.utils.Serialization; import io.fabric8.kubernetes.client.utils.internal.PodOperationUtil; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.Reader; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -84,12 +80,7 @@ public Status rollback(DeploymentRollback deploymentRollback) { } @Override - public String getLog(boolean isPretty) { - return PodOperationUtil.getLog( - new StatefulSetOperationsImpl(rollingOperationContext.withPrettyOutput(isPretty), context).doGetLog(), isPretty); - } - - private List doGetLog() { + protected List doGetLog() { StatefulSet statefulSet = requireFromServer(); return PodOperationUtil.getPodOperationsForController(context, @@ -97,31 +88,6 @@ private List doGetLog() { getStatefulSetSelectorLabels(statefulSet)); } - /** - * Returns an unclosed Reader. It's the caller responsibility to close it. - * - * @return Reader - */ - @Override - public Reader getLogReader() { - return PodOperationUtil.getLogReader(doGetLog()); - } - - /** - * Returns an unclosed InputStream. It's the caller responsibility to close it. - * - * @return InputStream - */ - @Override - public InputStream getLogInputStream() { - return PodOperationUtil.getLogInputStream(doGetLog()); - } - - @Override - public LogWatch watchLog(OutputStream out) { - return PodOperationUtil.watchLog(doGetLog(), out); - } - @Override public StatefulSet restart() { return RollingUpdater.restart(this); diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/batch/v1/JobOperationsImpl.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/batch/v1/JobOperationsImpl.java index e178c55bfeb..7970c05a1ff 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/batch/v1/JobOperationsImpl.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/batch/v1/JobOperationsImpl.java @@ -18,6 +18,7 @@ import io.fabric8.kubernetes.api.model.batch.v1.Job; import io.fabric8.kubernetes.api.model.batch.v1.JobList; import io.fabric8.kubernetes.client.Client; +import io.fabric8.kubernetes.client.StreamConsumer; import io.fabric8.kubernetes.client.dsl.BytesLimitTerminateTimeTailPrettyLoggable; import io.fabric8.kubernetes.client.dsl.LogWatch; import io.fabric8.kubernetes.client.dsl.Loggable; @@ -35,7 +36,6 @@ import org.slf4j.LoggerFactory; import java.io.InputStream; -import java.io.OutputStream; import java.io.Reader; import java.util.HashMap; import java.util.List; @@ -152,8 +152,8 @@ public LogWatch watchLog() { } @Override - public LogWatch watchLog(OutputStream out) { - return PodOperationUtil.watchLog(doGetLog(), out); + public LogWatch watchLog(StreamConsumer consumer, boolean blocking) { + return PodOperationUtil.watchLog(doGetLog(), consumer, blocking); } @Override diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/core/v1/PodOperationsImpl.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/core/v1/PodOperationsImpl.java index f090cff2c5b..89569aa0104 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/core/v1/PodOperationsImpl.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/core/v1/PodOperationsImpl.java @@ -29,6 +29,7 @@ import io.fabric8.kubernetes.client.LocalPortForward; import io.fabric8.kubernetes.client.PortForward; import io.fabric8.kubernetes.client.RequestConfigBuilder; +import io.fabric8.kubernetes.client.StreamConsumer; import io.fabric8.kubernetes.client.dsl.BytesLimitTerminateTimeTailPrettyLoggable; import io.fabric8.kubernetes.client.dsl.CopyOrReadable; import io.fabric8.kubernetes.client.dsl.EphemeralContainersResource; @@ -72,8 +73,6 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.io.PipedInputStream; -import java.io.PipedOutputStream; import java.io.Reader; import java.net.InetAddress; import java.net.MalformedURLException; @@ -168,21 +167,14 @@ public LogWatch watchLog() { return watchLog(null); } - private void checkForPiped(Object object) { - if (object instanceof PipedOutputStream || object instanceof PipedInputStream) { - throw new KubernetesClientException("Piped streams should not be used"); - } - } - @Override - public LogWatch watchLog(OutputStream out) { - checkForPiped(out); + public LogWatch watchLog(StreamConsumer consumer, boolean blocking) { try { PodOperationUtil.waitUntilReadyOrSucceded(this, getContext().getReadyWaitTimeout() != null ? getContext().getReadyWaitTimeout() : DEFAULT_POD_READY_WAIT_TIMEOUT); // Issue Pod Logs HTTP request URL url = new URL(URLUtils.join(getResourceUrl().toString(), getContext().getLogParameters() + "&follow=true")); - final LogWatchCallback callback = new LogWatchCallback(out, context); + final LogWatchCallback callback = new LogWatchCallback(consumer, blocking, context); return callback.callAndWait(httpClient, url); } catch (IOException ioException) { throw KubernetesClientException.launderThrowable(forOperationType("watchLog"), ioException); @@ -551,7 +543,7 @@ public void run() { @Override public TtyExecOutputErrorable readingInput(InputStream in) { - checkForPiped(in); + StreamConsumer.checkForPiped(in); return new PodOperationsImpl(getContext().withIn(in), context); } @@ -566,9 +558,8 @@ public PodOperationsImpl redirectingInput(Integer bufferSize) { } @Override - public TtyExecErrorable writingOutput(OutputStream out) { - checkForPiped(out); - return new PodOperationsImpl(getContext().toBuilder().output(new StreamContext(out)).build(), context); + public TtyExecErrorable writingOutput(StreamConsumer consumer, boolean blocking) { + return new PodOperationsImpl(getContext().toBuilder().output(new StreamContext(consumer, blocking)).build(), context); } @Override @@ -577,9 +568,8 @@ public TtyExecErrorable redirectingOutput() { } @Override - public TtyExecErrorChannelable writingError(OutputStream err) { - checkForPiped(err); - return new PodOperationsImpl(getContext().toBuilder().error(new StreamContext(err)).build(), context); + public TtyExecErrorChannelable writingError(StreamConsumer consumer, boolean blocking) { + return new PodOperationsImpl(getContext().toBuilder().error(new StreamContext(consumer, blocking)).build(), context); } @Override @@ -589,8 +579,8 @@ public TtyExecErrorChannelable redirectingError() { @Override public TtyExecable writingErrorChannel(OutputStream errChannel) { - checkForPiped(errChannel); - return new PodOperationsImpl(getContext().toBuilder().errorChannel(new StreamContext(errChannel)).build(), context); + return new PodOperationsImpl(getContext().toBuilder() + .errorChannel(new StreamContext(StreamConsumer.newStreamConsumer(errChannel), true)).build(), context); } @Override diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/core/v1/ReplicationControllerOperationsImpl.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/core/v1/ReplicationControllerOperationsImpl.java index 10dbc0fa3d8..e20832067f1 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/core/v1/ReplicationControllerOperationsImpl.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/core/v1/ReplicationControllerOperationsImpl.java @@ -23,7 +23,6 @@ import io.fabric8.kubernetes.client.Client; import io.fabric8.kubernetes.client.KubernetesClientException; import io.fabric8.kubernetes.client.dsl.BytesLimitTerminateTimeTailPrettyLoggable; -import io.fabric8.kubernetes.client.dsl.LogWatch; import io.fabric8.kubernetes.client.dsl.Loggable; import io.fabric8.kubernetes.client.dsl.PodResource; import io.fabric8.kubernetes.client.dsl.PrettyLoggable; @@ -38,9 +37,6 @@ import io.fabric8.kubernetes.client.dsl.internal.apps.v1.RollingUpdater; import io.fabric8.kubernetes.client.utils.internal.PodOperationUtil; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.Reader; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -83,13 +79,7 @@ public Status rollback(DeploymentRollback deploymentRollback) { } @Override - public String getLog(boolean isPretty) { - return PodOperationUtil.getLog( - new ReplicationControllerOperationsImpl(rollingOperationContext.withPrettyOutput(isPretty), context).doGetLog(), - isPretty); - } - - private List doGetLog() { + protected List doGetLog() { ReplicationController rc = requireFromServer(); return PodOperationUtil.getPodOperationsForController(context, @@ -97,31 +87,6 @@ private List doGetLog() { getReplicationControllerPodLabels(rc)); } - /** - * Returns an unclosed Reader. It's the caller responsibility to close it. - * - * @return Reader - */ - @Override - public Reader getLogReader() { - return PodOperationUtil.getLogReader(doGetLog()); - } - - /** - * Returns an unclosed InputStream. It's the caller responsibility to close it. - * - * @return InputStream - */ - @Override - public InputStream getLogInputStream() { - return PodOperationUtil.getLogInputStream(doGetLog()); - } - - @Override - public LogWatch watchLog(OutputStream out) { - return PodOperationUtil.watchLog(doGetLog(), out); - } - static Map getReplicationControllerPodLabels(ReplicationController replicationController) { Map labels = new HashMap<>(); if (replicationController != null && replicationController.getSpec() != null diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/extensions/v1beta1/DeploymentOperationsImpl.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/extensions/v1beta1/DeploymentOperationsImpl.java index 76aabc89e23..cdbd35dc2c5 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/extensions/v1beta1/DeploymentOperationsImpl.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/extensions/v1beta1/DeploymentOperationsImpl.java @@ -25,7 +25,6 @@ import io.fabric8.kubernetes.api.model.extensions.ReplicaSetList; import io.fabric8.kubernetes.client.Client; import io.fabric8.kubernetes.client.dsl.BytesLimitTerminateTimeTailPrettyLoggable; -import io.fabric8.kubernetes.client.dsl.LogWatch; import io.fabric8.kubernetes.client.dsl.Loggable; import io.fabric8.kubernetes.client.dsl.PrettyLoggable; import io.fabric8.kubernetes.client.dsl.RollableScalableResource; @@ -40,14 +39,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.Reader; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.concurrent.TimeUnit; public class DeploymentOperationsImpl @@ -124,16 +119,7 @@ public Deployment undo() { } @Override - public String getLog(boolean isPretty) { - StringBuilder stringBuilder = new StringBuilder(); - List> rcList = doGetLog(); - for (RollableScalableResource rcOperation : rcList) { - stringBuilder.append(rcOperation.getLog(isPretty)); - } - return stringBuilder.toString(); - } - - private List> doGetLog() { + protected List> doGetLog() { List> rcs = new ArrayList<>(); Deployment deployment = requireFromServer(); String rcUid = deployment.getMetadata().getUid(); @@ -151,42 +137,6 @@ private List> doGetLog() { return rcs; } - /** - * Returns an unclosed Reader. It's the caller responsibility to close it. - * - * @return Reader - */ - @Override - public Reader getLogReader() { - return findFirstPodResource().map(Loggable::getLogReader).orElse(null); - } - - /** - * Returns an unclosed InputStream. It's the caller responsibility to close it. - * - * @return InputStream - */ - @Override - public InputStream getLogInputStream() { - return findFirstPodResource().map(Loggable::getLogInputStream).orElse(null); - } - - @Override - public LogWatch watchLog(OutputStream out) { - return findFirstPodResource().map(it -> it.watchLog(out)).orElse(null); - } - - private Optional> findFirstPodResource() { - List> podResources = doGetLog(); - if (!podResources.isEmpty()) { - if (podResources.size() > 1) { - LOG.debug("Found {} pods, Using first one to get log", podResources.size()); - } - return Optional.of(podResources.get(0)); - } - return Optional.empty(); - } - private ReplicaSetList getReplicaSetListForDeployment(Deployment deployment) { return new ReplicaSetOperationsImpl(context.getClient()).inNamespace(getNamespace()) .withLabels(deployment.getSpec().getSelector().getMatchLabels()).list(); diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/extensions/v1beta1/ReplicaSetOperationsImpl.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/extensions/v1beta1/ReplicaSetOperationsImpl.java index 5f0b6ad2b3b..d7bc51d8f79 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/extensions/v1beta1/ReplicaSetOperationsImpl.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/extensions/v1beta1/ReplicaSetOperationsImpl.java @@ -23,7 +23,6 @@ import io.fabric8.kubernetes.client.Client; import io.fabric8.kubernetes.client.KubernetesClientException; import io.fabric8.kubernetes.client.dsl.BytesLimitTerminateTimeTailPrettyLoggable; -import io.fabric8.kubernetes.client.dsl.LogWatch; import io.fabric8.kubernetes.client.dsl.Loggable; import io.fabric8.kubernetes.client.dsl.PodResource; import io.fabric8.kubernetes.client.dsl.PrettyLoggable; @@ -37,9 +36,6 @@ import io.fabric8.kubernetes.client.dsl.internal.apps.v1.RollingUpdater; import io.fabric8.kubernetes.client.utils.internal.PodOperationUtil; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.Reader; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -82,48 +78,13 @@ public Status rollback(DeploymentRollback deploymentRollback) { } @Override - public String getLog(boolean isPretty) { - StringBuilder stringBuilder = new StringBuilder(); - List podOperationList = new ReplicaSetOperationsImpl(rollingOperationContext.withPrettyOutput(isPretty), - context).doGetLog(); - for (PodResource podOperation : podOperationList) { - stringBuilder.append(podOperation.getLog(isPretty)); - } - return stringBuilder.toString(); - } - - private List doGetLog() { + protected List doGetLog() { ReplicaSet replicaSet = requireFromServer(); return PodOperationUtil.getPodOperationsForController(context, rollingOperationContext, replicaSet.getMetadata().getUid(), getReplicaSetSelectorLabels(replicaSet)); } - /** - * Returns an unclosed Reader. It's the caller responsibility to close it. - * - * @return Reader - */ - @Override - public Reader getLogReader() { - return PodOperationUtil.getLogReader(doGetLog()); - } - - /** - * Returns an unclosed InputStream. It's the caller responsibility to close it. - * - * @return InputStream - */ - @Override - public InputStream getLogInputStream() { - return PodOperationUtil.getLogInputStream(doGetLog()); - } - - @Override - public LogWatch watchLog(OutputStream out) { - return PodOperationUtil.watchLog(doGetLog(), out); - } - static Map getReplicaSetSelectorLabels(ReplicaSet replicaSet) { Map labels = new HashMap<>(); diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/internal/PodOperationUtil.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/internal/PodOperationUtil.java index 218b08ff0db..90647a8ffb7 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/internal/PodOperationUtil.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/internal/PodOperationUtil.java @@ -19,6 +19,7 @@ import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.api.model.PodList; import io.fabric8.kubernetes.client.KubernetesClientTimeoutException; +import io.fabric8.kubernetes.client.StreamConsumer; import io.fabric8.kubernetes.client.dsl.LogWatch; import io.fabric8.kubernetes.client.dsl.Loggable; import io.fabric8.kubernetes.client.dsl.PodResource; @@ -31,7 +32,6 @@ import org.slf4j.LoggerFactory; import java.io.InputStream; -import java.io.OutputStream; import java.io.Reader; import java.util.ArrayList; import java.util.List; @@ -79,19 +79,19 @@ public static List getPodOperationsForController(OperationContext c selectorLabels); } - public static LogWatch watchLog(List podResources, OutputStream out) { - return findFirstPodResource(podResources).map(it -> it.watchLog(out)).orElse(null); + public static LogWatch watchLog(List podResources, StreamConsumer consumer, boolean blocking) { + return findFirstPodResource(podResources).map(it -> it.watchLog(consumer, blocking)).orElse(null); } - public static Reader getLogReader(List podResources) { + public static Reader getLogReader(List podResources) { return findFirstPodResource(podResources).map(Loggable::getLogReader).orElse(null); } - public static InputStream getLogInputStream(List podResources) { + public static InputStream getLogInputStream(List podResources) { return findFirstPodResource(podResources).map(Loggable::getLogInputStream).orElse(null); } - private static Optional findFirstPodResource(List podResources) { + private static Optional findFirstPodResource(List podResources) { if (!podResources.isEmpty()) { if (podResources.size() > 1) { LOG.debug("Found {} pods, Using first one to get log", podResources.size()); @@ -101,9 +101,9 @@ private static Optional findFirstPodResource(List podR return Optional.empty(); } - public static String getLog(List podOperationList, Boolean isPretty) { + public static String getLog(List podOperationList, boolean isPretty) { StringBuilder stringBuilder = new StringBuilder(); - for (PodResource podOperation : podOperationList) { + for (Loggable podOperation : podOperationList) { stringBuilder.append(podOperation.getLog(isPretty)); } return stringBuilder.toString(); diff --git a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/utils/internal/PodOperationUtilTest.java b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/utils/internal/PodOperationUtilTest.java index 1aae54e1569..cb1b794ed9b 100644 --- a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/utils/internal/PodOperationUtilTest.java +++ b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/utils/internal/PodOperationUtilTest.java @@ -19,6 +19,7 @@ import io.fabric8.kubernetes.api.model.PodBuilder; import io.fabric8.kubernetes.api.model.PodList; import io.fabric8.kubernetes.api.model.PodListBuilder; +import io.fabric8.kubernetes.client.StreamConsumer; import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable; import io.fabric8.kubernetes.client.dsl.LogWatch; import io.fabric8.kubernetes.client.dsl.PodResource; @@ -31,7 +32,6 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; -import java.io.ByteArrayOutputStream; import java.io.InputStream; import java.io.Reader; import java.util.ArrayList; @@ -117,14 +117,13 @@ void testGetPodOperationsForController() { void testWatchLogSinglePod() { // Given PodResource podResource = mock(PodResource.class, Mockito.RETURNS_DEEP_STUBS); - ByteArrayOutputStream byteArrayOutputStream = mock(ByteArrayOutputStream.class, Mockito.RETURNS_DEEP_STUBS); - + StreamConsumer consumer = Mockito.mock(StreamConsumer.class); // When - LogWatch logWatch = PodOperationUtil.watchLog(createMockPodResourceList(podResource), byteArrayOutputStream); + LogWatch logWatch = PodOperationUtil.watchLog(createMockPodResourceList(podResource), consumer, true); // Then assertThat(logWatch).isNotNull(); - verify(podResource, times(1)).watchLog(byteArrayOutputStream); + verify(podResource, times(1)).watchLog(consumer, true); } @Test @@ -132,20 +131,21 @@ void testWatchLogMultiplePodReplicasPicksFirstPod() { // Given PodResource p1 = mock(PodResource.class, Mockito.RETURNS_DEEP_STUBS); PodResource p2 = mock(PodResource.class, Mockito.RETURNS_DEEP_STUBS); - ByteArrayOutputStream byteArrayOutputStream = mock(ByteArrayOutputStream.class, Mockito.RETURNS_DEEP_STUBS); + StreamConsumer consumer = Mockito.mock(StreamConsumer.class); // When - LogWatch logWatch = PodOperationUtil.watchLog(createMockPodResourceList(p1, p2), byteArrayOutputStream); + LogWatch logWatch = PodOperationUtil.watchLog(createMockPodResourceList(p1, p2), + consumer, false); // Then assertThat(logWatch).isNotNull(); - verify(p1, times(1)).watchLog(byteArrayOutputStream); - verify(p2, times(0)).watchLog(byteArrayOutputStream); + verify(p1, times(1)).watchLog(consumer, false); + verify(p2, times(0)).watchLog(consumer, false); } @Test void testWatchLogEmptyPodResourceList() { - assertThat(PodOperationUtil.watchLog(Collections.emptyList(), null)).isNull(); + assertThat(PodOperationUtil.watchLog(Collections.emptyList(), null, false)).isNull(); } @Test diff --git a/openshift-client/src/main/java/io/fabric8/openshift/client/dsl/internal/apps/DeploymentConfigOperationsImpl.java b/openshift-client/src/main/java/io/fabric8/openshift/client/dsl/internal/apps/DeploymentConfigOperationsImpl.java index acf001796da..ad653616778 100644 --- a/openshift-client/src/main/java/io/fabric8/openshift/client/dsl/internal/apps/DeploymentConfigOperationsImpl.java +++ b/openshift-client/src/main/java/io/fabric8/openshift/client/dsl/internal/apps/DeploymentConfigOperationsImpl.java @@ -17,6 +17,7 @@ import io.fabric8.kubernetes.client.Client; import io.fabric8.kubernetes.client.KubernetesClientException; +import io.fabric8.kubernetes.client.StreamConsumer; import io.fabric8.kubernetes.client.dsl.BytesLimitTerminateTimeTailPrettyLoggable; import io.fabric8.kubernetes.client.dsl.LogWatch; import io.fabric8.kubernetes.client.dsl.Loggable; @@ -36,7 +37,6 @@ import io.fabric8.openshift.client.dsl.DeployableScalableResource; import java.io.InputStream; -import java.io.OutputStream; import java.io.Reader; import java.net.MalformedURLException; import java.net.URL; @@ -135,12 +135,12 @@ public LogWatch watchLog() { } @Override - public LogWatch watchLog(OutputStream out) { + public LogWatch watchLog(StreamConsumer consumer, boolean blocking) { try { // In case of DeploymentConfig we directly get logs at DeploymentConfig Url, but we need to wait for Pods waitUntilDeploymentConfigPodBecomesReady(get()); URL url = getResourceLogUrl(true); - final LogWatchCallback callback = new LogWatchCallback(out, context); + final LogWatchCallback callback = new LogWatchCallback(consumer, blocking, context); return callback.callAndWait(this.httpClient, url); } catch (Throwable t) { throw KubernetesClientException.launderThrowable(forOperationType("watchLog"), t); diff --git a/openshift-client/src/main/java/io/fabric8/openshift/client/dsl/internal/build/BuildOperationsImpl.java b/openshift-client/src/main/java/io/fabric8/openshift/client/dsl/internal/build/BuildOperationsImpl.java index 40e5dfaaf96..16d97b5b073 100644 --- a/openshift-client/src/main/java/io/fabric8/openshift/client/dsl/internal/build/BuildOperationsImpl.java +++ b/openshift-client/src/main/java/io/fabric8/openshift/client/dsl/internal/build/BuildOperationsImpl.java @@ -17,6 +17,7 @@ import io.fabric8.kubernetes.client.Client; import io.fabric8.kubernetes.client.KubernetesClientException; +import io.fabric8.kubernetes.client.StreamConsumer; import io.fabric8.kubernetes.client.dsl.BytesLimitTerminateTimeTailPrettyLoggable; import io.fabric8.kubernetes.client.dsl.LogWatch; import io.fabric8.kubernetes.client.dsl.Loggable; @@ -38,7 +39,6 @@ import java.io.IOException; import java.io.InputStream; -import java.io.OutputStream; import java.io.Reader; import java.net.URL; import java.util.HashMap; @@ -129,12 +129,12 @@ public LogWatch watchLog() { } @Override - public LogWatch watchLog(OutputStream out) { + public LogWatch watchLog(StreamConsumer consumer, boolean blocking) { try { // In case of Build we directly get logs at Build Url, but we need to wait for Pods waitUntilBuildPodBecomesReady(get()); URL url = new URL(URLUtils.join(getResourceUrl().toString(), getLogParameters() + "&follow=true")); - final LogWatchCallback callback = new LogWatchCallback(out, context); + final LogWatchCallback callback = new LogWatchCallback(consumer, blocking, context); return callback.callAndWait(this.httpClient, url); } catch (IOException t) { throw KubernetesClientException.launderThrowable(forOperationType("watchLog"), t); From a7b1d2859b0340f30b44a3c64ed7834154656d17 Mon Sep 17 00:00:00 2001 From: Steve Hawkins Date: Thu, 18 May 2023 10:52:06 -0400 Subject: [PATCH 2/2] fix #4154: adding method to detect blocking and implementing exec stream available --- .../client/jdkhttp/JdkWebSocketImpl.java | 10 ++++++- .../client/jetty/JettyWebSocket.java | 6 +++++ .../client/vertx/VertxWebSocket.java | 11 +++++++- .../kubernetes/client/dsl/ExecWatch.java | 27 ++++++++++++++----- .../dsl/internal/ExecWatchInputStream.java | 11 ++++++-- .../dsl/internal/ExecWebSocketListener.java | 18 ++++++++----- .../client/dsl/internal/LogWatchCallback.java | 3 --- .../kubernetes/client/mock/PodTest.java | 3 +++ 8 files changed, 69 insertions(+), 20 deletions(-) diff --git a/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkWebSocketImpl.java b/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkWebSocketImpl.java index 663ef8e54a1..712b61b44a8 100644 --- a/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkWebSocketImpl.java +++ b/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkWebSocketImpl.java @@ -18,6 +18,8 @@ import io.fabric8.kubernetes.client.http.BufferUtil; import io.fabric8.kubernetes.client.http.WebSocket; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -92,6 +94,7 @@ public void onOpen(java.net.http.WebSocket webSocket) { } } + private static final Logger LOG = LoggerFactory.getLogger(JdkWebSocketImpl.class); private java.net.http.WebSocket webSocket; private AtomicLong queueSize; @@ -106,7 +109,12 @@ public boolean send(ByteBuffer buffer) { final int size = buffer.remaining(); queueSize.addAndGet(size); CompletableFuture cf = webSocket.sendBinary(buffer, true); - cf.whenComplete((b, t) -> queueSize.addAndGet(-size)); + cf.whenComplete((b, t) -> { + if (t != null) { + LOG.debug("Queued write did not succeed", t); + } + queueSize.addAndGet(-size); + }); return asBoolean(cf); } diff --git a/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyWebSocket.java b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyWebSocket.java index 538e092208b..a904e32e1de 100644 --- a/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyWebSocket.java +++ b/httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyWebSocket.java @@ -26,6 +26,8 @@ import org.eclipse.jetty.websocket.api.WebSocketListener; import org.eclipse.jetty.websocket.api.WriteCallback; import org.eclipse.jetty.websocket.api.exceptions.UpgradeException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.ByteBuffer; @@ -38,6 +40,9 @@ import java.util.concurrent.locks.ReentrantLock; public class JettyWebSocket implements WebSocket, WebSocketListener { + + private static final Logger LOG = LoggerFactory.getLogger(JettyWebSocket.class); + private final WebSocket.Listener listener; private final AtomicLong sendQueue; private final Lock lock; @@ -66,6 +71,7 @@ public boolean send(ByteBuffer buffer) { webSocketSession.getRemote().sendBytes(buffer, new WriteCallback() { @Override public void writeFailed(Throwable x) { + LOG.debug("Queued write did not succeed", x); sendQueue.addAndGet(-size); } diff --git a/httpclient-vertx/src/main/java/io/fabric8/kubernetes/client/vertx/VertxWebSocket.java b/httpclient-vertx/src/main/java/io/fabric8/kubernetes/client/vertx/VertxWebSocket.java index c7c8300f564..19aaa939d33 100644 --- a/httpclient-vertx/src/main/java/io/fabric8/kubernetes/client/vertx/VertxWebSocket.java +++ b/httpclient-vertx/src/main/java/io/fabric8/kubernetes/client/vertx/VertxWebSocket.java @@ -21,12 +21,16 @@ import io.vertx.core.Future; import io.vertx.core.buffer.Buffer; import io.vertx.core.http.HttpClosedException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicInteger; class VertxWebSocket implements WebSocket { + private static final Logger LOG = LoggerFactory.getLogger(VertxWebSocket.class); + private final io.vertx.core.http.WebSocket ws; private final AtomicInteger pending = new AtomicInteger(); private final Listener listener; @@ -72,7 +76,12 @@ public boolean send(ByteBuffer buffer) { int len = vertxBuffer.length(); pending.addAndGet(len); Future res = ws.writeBinaryMessage(vertxBuffer); - res.onComplete(ignore -> pending.addAndGet(-len)); + res.onComplete(result -> { + if (result.cause() != null) { + LOG.debug("Queued write did not succeed", result.cause()); + } + pending.addAndGet(-len); + }); return true; } diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl/ExecWatch.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl/ExecWatch.java index feed1084019..5ca991af091 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl/ExecWatch.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl/ExecWatch.java @@ -25,23 +25,35 @@ public interface ExecWatch extends Closeable { /** * Gets the {@link OutputStream} for stdIn if {@link ContainerResource#redirectingInput()} has been called. *

- * Closing this stream does not immediately force sending. You will typically call {@link #close()} after - * you are finished writing - the close message will not be sent until all pending messages have been sent. - * + * This is a standard blocking {@link OutputStream} with the exception that close/flush do not immediately force sending. + * Instead these operations force any pending data into the write queue. + *

+ * Until Kubernetes supports half-closing a stream https://github.com/kubernetes/kubernetes/issues/89899 you will typically + * call {@link #close()} after closing this stream - that close message will not be sent until all pending messages have been + * sent. + *

+ * If you want non-blocking like behavior, your logic may poll the {@link #willWriteBlock(int)} method + * prior to calling a write method on the stream. + * * @return the stdIn stream */ OutputStream getInput(); + /** + * @return true if writing the given number of bytes will block on the write queue + */ + boolean willWriteBlock(int bytes); + /** * Gets the {@link InputStream} for stdOut if {@link TtyExecOutputErrorable#redirectingOutput()} has been called. - * + * * @return the stdOut stream */ InputStream getOutput(); /** * Gets the {@link InputStream} for stdErr if {@link TtyExecErrorable#redirectingError()} has been called. - * + * * @return the stdErr stream */ InputStream getError(); @@ -52,7 +64,7 @@ public interface ExecWatch extends Closeable { * could indicate abnormal termination. *

* See also {@link #exitCode()} - * + * * @return the channel 3 stream */ InputStream getErrorChannel(); @@ -63,6 +75,9 @@ public interface ExecWatch extends Closeable { @Override void close(); + /** + * This operation may block if the write queue is nearly full + */ void resize(int cols, int rows); /** diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/ExecWatchInputStream.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/ExecWatchInputStream.java index 06e8a14d8da..b1e640392a2 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/ExecWatchInputStream.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/ExecWatchInputStream.java @@ -77,13 +77,20 @@ void consume(List value) { assert !complete || failed == null; buffers.addAll(value); buffers.notifyAll(); - if ((currentBuffer != null ? currentBuffer.remaining() : 0) - + buffers.stream().mapToInt(ByteBuffer::remaining).sum() < bufferSize) { + if (available() < bufferSize) { request.run(); } } } + @Override + public int available() { + synchronized (buffers) { + return (currentBuffer != null ? currentBuffer.remaining() : 0) + + buffers.stream().mapToInt(ByteBuffer::remaining).sum(); + } + } + private ByteBuffer current() throws IOException { synchronized (buffers) { while (currentBuffer == null || !currentBuffer.hasRemaining()) { diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/ExecWebSocketListener.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/ExecWebSocketListener.java index 90c61dc26e5..29706726041 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/ExecWebSocketListener.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/ExecWebSocketListener.java @@ -43,6 +43,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.Executor; @@ -70,6 +71,8 @@ public class ExecWebSocketListener implements ExecWatch, AutoCloseable, WebSocke static final String REASON_NON_ZERO_EXIT_CODE = "NonZeroExitCode"; static final String STATUS_SUCCESS = "Success"; + // taken from the okhttp implementation, but generally protects memory + // and the other implementations as well which may impose frame limits private static final long MAX_QUEUE_SIZE = 16 * 1024 * 1024L; private final class SimpleResponse implements Response { @@ -145,6 +148,7 @@ private void handle(ByteBuffer byteString, WebSocket webSocket) throws Exception private final AtomicBoolean closed = new AtomicBoolean(false); private final CompletableFuture exitCode = new CompletableFuture<>(); private ObjectMapper objectMapper = new ObjectMapper(); + private int bufferSize; public static String toString(ByteBuffer buffer) { return StandardCharsets.UTF_8.decode(buffer).toString(); @@ -157,7 +161,7 @@ public ExecWebSocketListener(PodOperationContext context) { public ExecWebSocketListener(PodOperationContext context, Executor executor) { this.listener = context.getExecListener(); - Integer bufferSize = context.getBufferSize(); + bufferSize = Optional.ofNullable(context.getBufferSize()).orElse(8192); if (context.isRedirectingIn()) { this.input = InputStreamPumper.writableOutputStream(this::sendWithErrorChecking, bufferSize); this.in = null; @@ -260,7 +264,7 @@ public void onOpen(WebSocket webSocket) { if (in != null && !executorService.isShutdown()) { // the task will be cancelled via shutdownNow // TODO: this does not work if the inputstream does not support available - InputStreamPumper.pump(InputStreamPumper.asInterruptible(in), this::send, executorService); + InputStreamPumper.pump(InputStreamPumper.asInterruptible(in), this::sendWithErrorChecking, executorService); } } finally { if (listener != null) { @@ -420,6 +424,10 @@ public InputStream getErrorChannel() { return errorChannel.inputStream; } + public boolean willWriteBlock(int bytes) { + return MAX_QUEUE_SIZE - this.webSocketRef.get().queueSize() < bytes + bytes / bufferSize + 1; + } + @Override public void resize(int cols, int rows) { if (cols < 0 || rows < 0) { @@ -449,13 +457,9 @@ private void send(byte[] bytes, int offset, int length, byte flag) { } } - private void send(byte[] bytes, int offset, int length) { - send(bytes, offset, length, (byte) 0); - } - void sendWithErrorChecking(byte[] bytes, int offset, int length) { checkError(); - send(bytes, offset, length); + send(bytes, offset, length, (byte) 0); checkError(); } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/LogWatchCallback.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/LogWatchCallback.java index 0657b747c30..d79af113cb5 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/LogWatchCallback.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/LogWatchCallback.java @@ -15,7 +15,6 @@ */ package io.fabric8.kubernetes.client.dsl.internal; -import io.fabric8.kubernetes.client.KubernetesClientException; import io.fabric8.kubernetes.client.RequestConfigBuilder; import io.fabric8.kubernetes.client.StreamConsumer; import io.fabric8.kubernetes.client.dsl.LogWatch; @@ -30,8 +29,6 @@ import java.io.InputStream; import java.net.URL; import java.nio.ByteBuffer; -import java.nio.channels.Channels; -import java.nio.channels.WritableByteChannel; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; diff --git a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/PodTest.java b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/PodTest.java index 88f39ac2b83..45d94df31e4 100644 --- a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/PodTest.java +++ b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/PodTest.java @@ -416,6 +416,9 @@ void testExec() throws InterruptedException { .usingListener(createCountDownLatchListener(execLatch)) .exec("ls"); + assertTrue(watch.willWriteBlock(1 << 25)); + assertFalse(watch.willWriteBlock((1 << 24) - 8000)); + execLatch.await(10, TimeUnit.MINUTES); assertNotNull(watch); assertEquals(expectedOutput, baos.toString());