From 60955901dcd24bee599f27b0b634757d14f14cee Mon Sep 17 00:00:00 2001 From: jansupol Date: Mon, 28 Oct 2024 10:26:58 +0100 Subject: [PATCH 1/4] Eliminate unnecessary flushes Signed-off-by: jansupol --- .../glassfish/jersey/apache/connector/StreamingTest.java | 4 +++- .../jersey/apache5/connector/StreamingTest.java | 4 +++- .../jersey/message/internal/CommittingOutputStream.java | 4 ++++ .../jersey/message/internal/OutboundMessageContext.java | 9 +++++++++ .../glassfish/jersey/message/internal/ReaderWriter.java | 8 +++----- .../tests/integration/jersey4321/StreamingTest.java | 4 +++- 6 files changed, 25 insertions(+), 8 deletions(-) diff --git a/connectors/apache-connector/src/test/java/org/glassfish/jersey/apache/connector/StreamingTest.java b/connectors/apache-connector/src/test/java/org/glassfish/jersey/apache/connector/StreamingTest.java index a8521d723a3..14ba1f2aa03 100644 --- a/connectors/apache-connector/src/test/java/org/glassfish/jersey/apache/connector/StreamingTest.java +++ b/connectors/apache-connector/src/test/java/org/glassfish/jersey/apache/connector/StreamingTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2015, 2022 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2015, 2024 Oracle and/or its affiliates. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v. 2.0, which is available at @@ -130,6 +130,8 @@ private void clientCloseTest(int readTimeout) throws IOException { inputStream.close(); // trigger sending another 'A' to the stream; it should fail // (indicating that the streaming has been terminated on the server) + // But only the second flush causes the Exception + assertEquals("OK", sendTarget.request().get().readEntity(String.class)); assertEquals("NOK", sendTarget.request().get().readEntity(String.class)); assertEquals(0, counter.get()); } diff --git a/connectors/apache5-connector/src/test/java/org/glassfish/jersey/apache5/connector/StreamingTest.java b/connectors/apache5-connector/src/test/java/org/glassfish/jersey/apache5/connector/StreamingTest.java index 4a38280f63f..8f7888ae39d 100644 --- a/connectors/apache5-connector/src/test/java/org/glassfish/jersey/apache5/connector/StreamingTest.java +++ b/connectors/apache5-connector/src/test/java/org/glassfish/jersey/apache5/connector/StreamingTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2022, 2024 Oracle and/or its affiliates. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v. 2.0, which is available at @@ -129,6 +129,8 @@ private void clientCloseTest(int readTimeout) throws IOException { inputStream.close(); // trigger sending another 'A' to the stream; it should fail // (indicating that the streaming has been terminated on the server) + // But only the second flush causes the Exception + assertEquals("OK", sendTarget.request().get().readEntity(String.class)); assertEquals("NOK", sendTarget.request().get().readEntity(String.class)); assertEquals(0, counter.get()); } diff --git a/core-common/src/main/java/org/glassfish/jersey/message/internal/CommittingOutputStream.java b/core-common/src/main/java/org/glassfish/jersey/message/internal/CommittingOutputStream.java index e9f1bc20374..02ddd8f3894 100644 --- a/core-common/src/main/java/org/glassfish/jersey/message/internal/CommittingOutputStream.java +++ b/core-common/src/main/java/org/glassfish/jersey/message/internal/CommittingOutputStream.java @@ -128,6 +128,10 @@ public void setStreamProvider(OutboundMessageContext.StreamProvider streamProvid this.streamProvider = streamProvider; } + /* package */ boolean hasStreamProvider() { + return streamProvider != null; + } + /** * Enable buffering of the serialized entity. * diff --git a/core-common/src/main/java/org/glassfish/jersey/message/internal/OutboundMessageContext.java b/core-common/src/main/java/org/glassfish/jersey/message/internal/OutboundMessageContext.java index b1b7745bbd1..2915e693249 100644 --- a/core-common/src/main/java/org/glassfish/jersey/message/internal/OutboundMessageContext.java +++ b/core-common/src/main/java/org/glassfish/jersey/message/internal/OutboundMessageContext.java @@ -561,6 +561,15 @@ public boolean isCommitted() { public void close() { if (hasEntity()) { try { + if (CommittingOutputStream.class.isInstance(getEntityStream())) { + // This invokes the ContainerResponseWriter#writeResponseStatusAndHeaders + // which allows to set the proper entityStream + CommittingOutputStream cos = (CommittingOutputStream) getEntityStream(); + if (cos.hasStreamProvider()) { + ((CommittingOutputStream) getEntityStream()).commit(); + } + } + final OutputStream es = getEntityStream(); if (!FlushedCloseable.class.isInstance(es)) { es.flush(); diff --git a/core-common/src/main/java/org/glassfish/jersey/message/internal/ReaderWriter.java b/core-common/src/main/java/org/glassfish/jersey/message/internal/ReaderWriter.java index ae8e7d86027..14a668e8350 100644 --- a/core-common/src/main/java/org/glassfish/jersey/message/internal/ReaderWriter.java +++ b/core-common/src/main/java/org/glassfish/jersey/message/internal/ReaderWriter.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2010, 2023 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2010, 2024 Oracle and/or its affiliates. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v. 2.0, which is available at @@ -65,7 +65,7 @@ public final class ReaderWriter { public static final int BUFFER_SIZE = getBufferSize(); /** - * Whether {@linkplain BUFFER_SIZE} is to be ignored in favor of JRE's own decision. + * Whether {@linkplain #BUFFER_SIZE} is to be ignored in favor of JRE's own decision. */ public static final boolean AUTOSIZE_BUFFER = getAutosizeBuffer(); @@ -269,9 +269,7 @@ private static byte[] readAllBytes(InputStream inputStream) throws IOException { * @throws IOException in case of a write failure. */ public static void writeToAsString(String s, OutputStream out, MediaType type) throws IOException { - Writer osw = new OutputStreamWriter(out, getCharset(type)); - osw.write(s); - osw.flush(); + out.write(s.getBytes(getCharset(type))); } /** diff --git a/tests/integration/jersey-4321/src/test/java/org/glassfish/jersey/tests/integration/jersey4321/StreamingTest.java b/tests/integration/jersey-4321/src/test/java/org/glassfish/jersey/tests/integration/jersey4321/StreamingTest.java index c3f2993ea3a..579937ddc63 100644 --- a/tests/integration/jersey-4321/src/test/java/org/glassfish/jersey/tests/integration/jersey4321/StreamingTest.java +++ b/tests/integration/jersey-4321/src/test/java/org/glassfish/jersey/tests/integration/jersey4321/StreamingTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019, 2022 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2019, 2024 Oracle and/or its affiliates. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v. 2.0, which is available at @@ -60,6 +60,8 @@ public void clientCloseCloseResponseFirstTest() throws IOException { inputStream.close(); // trigger sending another 'A' to the stream; it should fail // (indicating that the streaming has been terminated on the server) + // But only the second flush causes the Exception + assertEquals("OK", sendTarget.request().get().readEntity(String.class)); assertEquals("NOK", sendTarget.request().get().readEntity(String.class)); } From e66a63571ec724b82aa9cd24124fbbb4390873c8 Mon Sep 17 00:00:00 2001 From: jansupol Date: Tue, 5 Nov 2024 22:18:14 +0100 Subject: [PATCH 2/4] Added test for bufferring and chunked options Signed-off-by: jansupol --- .../internal/CommittingOutputStream.java | 7 +- .../internal/OutboundMessageContext.java | 15 +- .../ContainerResponseWriterNoFlushTest.java | 151 ++++++++++++++++++ 3 files changed, 161 insertions(+), 12 deletions(-) create mode 100644 core-server/src/test/java/org/glassfish/jersey/server/ContainerResponseWriterNoFlushTest.java diff --git a/core-common/src/main/java/org/glassfish/jersey/message/internal/CommittingOutputStream.java b/core-common/src/main/java/org/glassfish/jersey/message/internal/CommittingOutputStream.java index 02ddd8f3894..446dbcec13b 100644 --- a/core-common/src/main/java/org/glassfish/jersey/message/internal/CommittingOutputStream.java +++ b/core-common/src/main/java/org/glassfish/jersey/message/internal/CommittingOutputStream.java @@ -26,6 +26,7 @@ import org.glassfish.jersey.innate.VirtualThreadSupport; import org.glassfish.jersey.internal.LocalizationMessages; import org.glassfish.jersey.internal.guava.Preconditions; +import org.glassfish.jersey.io.spi.FlushedCloseable; /** * A committing output stream with optional serialized entity buffering functionality @@ -128,8 +129,10 @@ public void setStreamProvider(OutboundMessageContext.StreamProvider streamProvid this.streamProvider = streamProvider; } - /* package */ boolean hasStreamProvider() { - return streamProvider != null; + /* package */ void flushOnClose() throws IOException { + if (!FlushedCloseable.class.isInstance(adaptedOutput)) { + flush(); + } } /** diff --git a/core-common/src/main/java/org/glassfish/jersey/message/internal/OutboundMessageContext.java b/core-common/src/main/java/org/glassfish/jersey/message/internal/OutboundMessageContext.java index 2915e693249..d119ff56cab 100644 --- a/core-common/src/main/java/org/glassfish/jersey/message/internal/OutboundMessageContext.java +++ b/core-common/src/main/java/org/glassfish/jersey/message/internal/OutboundMessageContext.java @@ -561,18 +561,13 @@ public boolean isCommitted() { public void close() { if (hasEntity()) { try { - if (CommittingOutputStream.class.isInstance(getEntityStream())) { - // This invokes the ContainerResponseWriter#writeResponseStatusAndHeaders - // which allows to set the proper entityStream - CommittingOutputStream cos = (CommittingOutputStream) getEntityStream(); - if (cos.hasStreamProvider()) { - ((CommittingOutputStream) getEntityStream()).commit(); - } - } - final OutputStream es = getEntityStream(); if (!FlushedCloseable.class.isInstance(es)) { - es.flush(); + if (CommittingOutputStream.class.isInstance(es)) { + ((CommittingOutputStream) es).flushOnClose(); + } else { + es.flush(); + } } es.close(); } catch (IOException e) { diff --git a/core-server/src/test/java/org/glassfish/jersey/server/ContainerResponseWriterNoFlushTest.java b/core-server/src/test/java/org/glassfish/jersey/server/ContainerResponseWriterNoFlushTest.java new file mode 100644 index 00000000000..bccaafff16b --- /dev/null +++ b/core-server/src/test/java/org/glassfish/jersey/server/ContainerResponseWriterNoFlushTest.java @@ -0,0 +1,151 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0, which is available at + * http://www.eclipse.org/legal/epl-2.0. + * + * This Source Code may also be made available under the following Secondary + * Licenses when the conditions for such availability set forth in the + * Eclipse Public License v. 2.0 are satisfied: GNU General Public License, + * version 2 with the GNU Classpath Exception, which is available at + * https://www.gnu.org/software/classpath/license.html. + * + * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + */ + +package org.glassfish.jersey.server; + + +import jakarta.ws.rs.GET; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.Produces; +import jakarta.ws.rs.core.MediaType; +import jakarta.ws.rs.core.Response; +import jakarta.ws.rs.core.StreamingOutput; +import org.glassfish.jersey.internal.MapPropertiesDelegate; +import org.glassfish.jersey.io.spi.FlushedCloseable; +import org.glassfish.jersey.message.MessageBodyWorkers; +import org.glassfish.jersey.server.RequestContextBuilder.TestContainerRequest; +import org.glassfish.jersey.server.spi.ContainerResponseWriter; +import org.hamcrest.MatcherAssert; +import org.hamcrest.Matchers; +import org.junit.jupiter.api.Test; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +public class ContainerResponseWriterNoFlushTest { + private static final String RESPONSE = "RESPONSE"; + private static AtomicInteger flushCounter = new AtomicInteger(0); + private static class TestResponseOutputStream extends ByteArrayOutputStream implements FlushedCloseable { + private boolean closed = false; + @Override + public void close() throws IOException { + if (!closed) { + closed = true; + flush(); + super.close(); + } + } + + @Override + public void flush() throws IOException { + flushCounter.incrementAndGet(); + } + } + + private static class TestContainerWriter implements ContainerResponseWriter { + TestResponseOutputStream outputStream; + private final boolean buffering; + + private TestContainerWriter(boolean buffering) { + this.buffering = buffering; + } + + @Override + public OutputStream writeResponseStatusAndHeaders(long contentLength, ContainerResponse responseContext) + throws ContainerException { + outputStream = new TestResponseOutputStream(); + responseContext.setEntityStream(outputStream); + return outputStream; + } + + @Override + public boolean suspend(long timeOut, TimeUnit timeUnit, TimeoutHandler timeoutHandler) { + return false; + } + + @Override + public void setSuspendTimeout(long timeOut, TimeUnit timeUnit) throws IllegalStateException { + } + + @Override + public void commit() { + } + + @Override + public void failure(Throwable error) { + throw new RuntimeException(error); + } + + @Override + public boolean enableResponseBuffering() { + return buffering; + } + } + + @Path("/test") + public static class StreamResource { + + @GET + @Path(value = "/stream") + @Produces(MediaType.TEXT_PLAIN) + public Response stream() { + + StreamingOutput stream = output -> { + output.write(RESPONSE.getBytes(StandardCharsets.UTF_8)); + }; + return Response.ok(stream).build(); + } + } + + @Test + public void testWriterBuffering() { + TestContainerWriter writer = new TestContainerWriter(true); + testWriter(writer); + } + + @Test + public void testWriterNoBuffering() { + TestContainerWriter writer = new TestContainerWriter(false); + testWriter(writer); + } + + private void testWriter(TestContainerWriter writer) { + flushCounter.set(0); + RequestContextBuilder rcb = RequestContextBuilder.from("/test/stream", "GET"); + + TestContainerRequest request = rcb.new TestContainerRequest( + null, URI.create("/test/stream"), "GET", null, new MapPropertiesDelegate()) { + @Override + public void setWorkers(MessageBodyWorkers workers) { + if (workers != null) { + setWriter(writer); + } + super.setWorkers(workers); + } + }; + + ApplicationHandler applicationHandler = new ApplicationHandler(new ResourceConfig(StreamResource.class)); + Future future = applicationHandler.apply(request); + MatcherAssert.assertThat(writer.outputStream.toString(), Matchers.is(RESPONSE)); + MatcherAssert.assertThat(flushCounter.get(), Matchers.is(1)); + } +} From b481675040604278732e7896f37afc999a5b55c7 Mon Sep 17 00:00:00 2001 From: jansupol Date: Fri, 15 Nov 2024 12:21:56 +0100 Subject: [PATCH 3/4] cleanup in test Signed-off-by: jansupol --- .../jersey/server/ContainerResponseWriterNoFlushTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core-server/src/test/java/org/glassfish/jersey/server/ContainerResponseWriterNoFlushTest.java b/core-server/src/test/java/org/glassfish/jersey/server/ContainerResponseWriterNoFlushTest.java index bccaafff16b..067d9e2bb5f 100644 --- a/core-server/src/test/java/org/glassfish/jersey/server/ContainerResponseWriterNoFlushTest.java +++ b/core-server/src/test/java/org/glassfish/jersey/server/ContainerResponseWriterNoFlushTest.java @@ -73,7 +73,7 @@ private TestContainerWriter(boolean buffering) { public OutputStream writeResponseStatusAndHeaders(long contentLength, ContainerResponse responseContext) throws ContainerException { outputStream = new TestResponseOutputStream(); - responseContext.setEntityStream(outputStream); +// responseContext.setEntityStream(outputStream); return outputStream; } From 2da4412bcc9fa8750780c4ca40b49c336bf1de61 Mon Sep 17 00:00:00 2001 From: jansupol Date: Wed, 4 Dec 2024 14:36:54 +0100 Subject: [PATCH 4/4] Make FilteringOutputStream FlushedClosable by default Signed-off-by: jansupol --- .../glassfish/jersey/io/spi/FlushedCloseable.java | 14 ++++++++++++-- .../message/internal/CommittingOutputStream.java | 2 +- .../message/internal/OutboundMessageContext.java | 2 +- 3 files changed, 14 insertions(+), 4 deletions(-) diff --git a/core-common/src/main/java/org/glassfish/jersey/io/spi/FlushedCloseable.java b/core-common/src/main/java/org/glassfish/jersey/io/spi/FlushedCloseable.java index 12aa7144d87..6b9f3009d33 100644 --- a/core-common/src/main/java/org/glassfish/jersey/io/spi/FlushedCloseable.java +++ b/core-common/src/main/java/org/glassfish/jersey/io/spi/FlushedCloseable.java @@ -17,6 +17,7 @@ package org.glassfish.jersey.io.spi; import java.io.Closeable; +import java.io.FilterOutputStream; import java.io.Flushable; import java.io.IOException; import java.io.OutputStream; @@ -27,8 +28,8 @@ * That way, {@link #flush()} method is not called twice. * *

- * Usable by {@link javax.ws.rs.client.ClientRequestContext#setEntityStream(OutputStream)}. - * Usable by {@link javax.ws.rs.container.ContainerResponseContext#setEntityStream(OutputStream)}. + * Usable by {@link jakarta.ws.rs.client.ClientRequestContext#setEntityStream(OutputStream)}. + * Usable by {@link jakarta.ws.rs.container.ContainerResponseContext#setEntityStream(OutputStream)}. *

* *

@@ -52,4 +53,13 @@ public interface FlushedCloseable extends Flushable, Closeable { * @throws IOException if an I/O error occurs */ public void close() throws IOException; + + /** + * Determine if the stream {@link OutputStream#flush() flushes} on {@link OutputStream#close()}. + * @param stream the provided {@link OutputStream} + * @return {@code true} if the stream ensures to call {@link OutputStream#flush()} on {@link OutputStream#close()}. + */ + public static boolean flushOnClose(OutputStream stream) { + return FilterOutputStream.class.isInstance(stream) || FlushedCloseable.class.isInstance(stream); + } } diff --git a/core-common/src/main/java/org/glassfish/jersey/message/internal/CommittingOutputStream.java b/core-common/src/main/java/org/glassfish/jersey/message/internal/CommittingOutputStream.java index 446dbcec13b..f87c80da872 100644 --- a/core-common/src/main/java/org/glassfish/jersey/message/internal/CommittingOutputStream.java +++ b/core-common/src/main/java/org/glassfish/jersey/message/internal/CommittingOutputStream.java @@ -130,7 +130,7 @@ public void setStreamProvider(OutboundMessageContext.StreamProvider streamProvid } /* package */ void flushOnClose() throws IOException { - if (!FlushedCloseable.class.isInstance(adaptedOutput)) { + if (!FlushedCloseable.flushOnClose(adaptedOutput)) { flush(); } } diff --git a/core-common/src/main/java/org/glassfish/jersey/message/internal/OutboundMessageContext.java b/core-common/src/main/java/org/glassfish/jersey/message/internal/OutboundMessageContext.java index d119ff56cab..db496827ba5 100644 --- a/core-common/src/main/java/org/glassfish/jersey/message/internal/OutboundMessageContext.java +++ b/core-common/src/main/java/org/glassfish/jersey/message/internal/OutboundMessageContext.java @@ -562,7 +562,7 @@ public void close() { if (hasEntity()) { try { final OutputStream es = getEntityStream(); - if (!FlushedCloseable.class.isInstance(es)) { + if (!FlushedCloseable.flushOnClose(es)) { if (CommittingOutputStream.class.isInstance(es)) { ((CommittingOutputStream) es).flushOnClose(); } else {