diff --git a/connectors/apache-connector/src/test/java/org/glassfish/jersey/apache/connector/StreamingTest.java b/connectors/apache-connector/src/test/java/org/glassfish/jersey/apache/connector/StreamingTest.java index a8521d723a3..14ba1f2aa03 100644 --- a/connectors/apache-connector/src/test/java/org/glassfish/jersey/apache/connector/StreamingTest.java +++ b/connectors/apache-connector/src/test/java/org/glassfish/jersey/apache/connector/StreamingTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2015, 2022 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2015, 2024 Oracle and/or its affiliates. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v. 2.0, which is available at @@ -130,6 +130,8 @@ private void clientCloseTest(int readTimeout) throws IOException { inputStream.close(); // trigger sending another 'A' to the stream; it should fail // (indicating that the streaming has been terminated on the server) + // But only the second flush causes the Exception + assertEquals("OK", sendTarget.request().get().readEntity(String.class)); assertEquals("NOK", sendTarget.request().get().readEntity(String.class)); assertEquals(0, counter.get()); } diff --git a/connectors/apache5-connector/src/test/java/org/glassfish/jersey/apache5/connector/StreamingTest.java b/connectors/apache5-connector/src/test/java/org/glassfish/jersey/apache5/connector/StreamingTest.java index 4a38280f63f..8f7888ae39d 100644 --- a/connectors/apache5-connector/src/test/java/org/glassfish/jersey/apache5/connector/StreamingTest.java +++ b/connectors/apache5-connector/src/test/java/org/glassfish/jersey/apache5/connector/StreamingTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2022, 2024 Oracle and/or its affiliates. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v. 2.0, which is available at @@ -129,6 +129,8 @@ private void clientCloseTest(int readTimeout) throws IOException { inputStream.close(); // trigger sending another 'A' to the stream; it should fail // (indicating that the streaming has been terminated on the server) + // But only the second flush causes the Exception + assertEquals("OK", sendTarget.request().get().readEntity(String.class)); assertEquals("NOK", sendTarget.request().get().readEntity(String.class)); assertEquals(0, counter.get()); } diff --git a/core-common/src/main/java/org/glassfish/jersey/io/spi/FlushedCloseable.java b/core-common/src/main/java/org/glassfish/jersey/io/spi/FlushedCloseable.java index 12aa7144d87..6b9f3009d33 100644 --- a/core-common/src/main/java/org/glassfish/jersey/io/spi/FlushedCloseable.java +++ b/core-common/src/main/java/org/glassfish/jersey/io/spi/FlushedCloseable.java @@ -17,6 +17,7 @@ package org.glassfish.jersey.io.spi; import java.io.Closeable; +import java.io.FilterOutputStream; import java.io.Flushable; import java.io.IOException; import java.io.OutputStream; @@ -27,8 +28,8 @@ * That way, {@link #flush()} method is not called twice. * *
- * Usable by {@link javax.ws.rs.client.ClientRequestContext#setEntityStream(OutputStream)}. - * Usable by {@link javax.ws.rs.container.ContainerResponseContext#setEntityStream(OutputStream)}. + * Usable by {@link jakarta.ws.rs.client.ClientRequestContext#setEntityStream(OutputStream)}. + * Usable by {@link jakarta.ws.rs.container.ContainerResponseContext#setEntityStream(OutputStream)}. *
* *
@@ -52,4 +53,13 @@ public interface FlushedCloseable extends Flushable, Closeable {
      * @throws IOException if an I/O error occurs
      */
     public void close() throws IOException;
+
+    /**
+     * Determine if the stream {@link OutputStream#flush() flushes} on {@link OutputStream#close()}.
+     * @param stream the provided {@link OutputStream}
+     * @return {@code true} if the stream ensures to call {@link OutputStream#flush()} on {@link OutputStream#close()}.
+     */
+    public static boolean flushOnClose(OutputStream stream) {
+        return FilterOutputStream.class.isInstance(stream) || FlushedCloseable.class.isInstance(stream);
+    }
 }
diff --git a/core-common/src/main/java/org/glassfish/jersey/message/internal/CommittingOutputStream.java b/core-common/src/main/java/org/glassfish/jersey/message/internal/CommittingOutputStream.java
index e9f1bc20374..f87c80da872 100644
--- a/core-common/src/main/java/org/glassfish/jersey/message/internal/CommittingOutputStream.java
+++ b/core-common/src/main/java/org/glassfish/jersey/message/internal/CommittingOutputStream.java
@@ -26,6 +26,7 @@
 import org.glassfish.jersey.innate.VirtualThreadSupport;
 import org.glassfish.jersey.internal.LocalizationMessages;
 import org.glassfish.jersey.internal.guava.Preconditions;
+import org.glassfish.jersey.io.spi.FlushedCloseable;
 
 /**
  * A committing output stream with optional serialized entity buffering functionality
@@ -128,6 +129,12 @@ public void setStreamProvider(OutboundMessageContext.StreamProvider streamProvid
         this.streamProvider = streamProvider;
     }
 
+    /* package */ void flushOnClose() throws IOException {
+        if (!FlushedCloseable.flushOnClose(adaptedOutput)) {
+            flush();
+        }
+    }
+
     /**
      * Enable buffering of the serialized entity.
      *
diff --git a/core-common/src/main/java/org/glassfish/jersey/message/internal/OutboundMessageContext.java b/core-common/src/main/java/org/glassfish/jersey/message/internal/OutboundMessageContext.java
index b1b7745bbd1..db496827ba5 100644
--- a/core-common/src/main/java/org/glassfish/jersey/message/internal/OutboundMessageContext.java
+++ b/core-common/src/main/java/org/glassfish/jersey/message/internal/OutboundMessageContext.java
@@ -562,8 +562,12 @@ public void close() {
         if (hasEntity()) {
             try {
                 final OutputStream es = getEntityStream();
-                if (!FlushedCloseable.class.isInstance(es)) {
-                    es.flush();
+                if (!FlushedCloseable.flushOnClose(es)) {
+                    if (CommittingOutputStream.class.isInstance(es)) {
+                        ((CommittingOutputStream) es).flushOnClose();
+                    } else {
+                        es.flush();
+                    }
                 }
                 es.close();
             } catch (IOException e) {
diff --git a/core-common/src/main/java/org/glassfish/jersey/message/internal/ReaderWriter.java b/core-common/src/main/java/org/glassfish/jersey/message/internal/ReaderWriter.java
index ae8e7d86027..14a668e8350 100644
--- a/core-common/src/main/java/org/glassfish/jersey/message/internal/ReaderWriter.java
+++ b/core-common/src/main/java/org/glassfish/jersey/message/internal/ReaderWriter.java
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2010, 2023 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2010, 2024 Oracle and/or its affiliates. All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v. 2.0, which is available at
@@ -65,7 +65,7 @@ public final class ReaderWriter {
     public static final int BUFFER_SIZE = getBufferSize();
 
     /**
-     * Whether {@linkplain BUFFER_SIZE} is to be ignored in favor of JRE's own decision.
+     * Whether {@linkplain #BUFFER_SIZE} is to be ignored in favor of JRE's own decision.
      */
     public static final boolean AUTOSIZE_BUFFER = getAutosizeBuffer();
 
@@ -269,9 +269,7 @@ private static byte[] readAllBytes(InputStream inputStream) throws IOException {
      * @throws IOException in case of a write failure.
      */
     public static void writeToAsString(String s, OutputStream out, MediaType type) throws IOException {
-        Writer osw = new OutputStreamWriter(out, getCharset(type));
-        osw.write(s);
-        osw.flush();
+        out.write(s.getBytes(getCharset(type)));
     }
 
     /**
diff --git a/core-server/src/test/java/org/glassfish/jersey/server/ContainerResponseWriterNoFlushTest.java b/core-server/src/test/java/org/glassfish/jersey/server/ContainerResponseWriterNoFlushTest.java
new file mode 100644
index 00000000000..067d9e2bb5f
--- /dev/null
+++ b/core-server/src/test/java/org/glassfish/jersey/server/ContainerResponseWriterNoFlushTest.java
@@ -0,0 +1,151 @@
+/*
+ * Copyright (c) 2024 Oracle and/or its affiliates. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v. 2.0, which is available at
+ * http://www.eclipse.org/legal/epl-2.0.
+ *
+ * This Source Code may also be made available under the following Secondary
+ * Licenses when the conditions for such availability set forth in the
+ * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
+ * version 2 with the GNU Classpath Exception, which is available at
+ * https://www.gnu.org/software/classpath/license.html.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
+ */
+
+package org.glassfish.jersey.server;
+
+
+import jakarta.ws.rs.GET;
+import jakarta.ws.rs.Path;
+import jakarta.ws.rs.Produces;
+import jakarta.ws.rs.core.MediaType;
+import jakarta.ws.rs.core.Response;
+import jakarta.ws.rs.core.StreamingOutput;
+import org.glassfish.jersey.internal.MapPropertiesDelegate;
+import org.glassfish.jersey.io.spi.FlushedCloseable;
+import org.glassfish.jersey.message.MessageBodyWorkers;
+import org.glassfish.jersey.server.RequestContextBuilder.TestContainerRequest;
+import org.glassfish.jersey.server.spi.ContainerResponseWriter;
+import org.hamcrest.MatcherAssert;
+import org.hamcrest.Matchers;
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class ContainerResponseWriterNoFlushTest {
+    private static final String RESPONSE = "RESPONSE";
+    private static AtomicInteger flushCounter = new AtomicInteger(0);
+    private static class TestResponseOutputStream extends ByteArrayOutputStream implements FlushedCloseable {
+        private boolean closed = false;
+        @Override
+        public void close() throws IOException {
+            if (!closed) {
+                closed = true;
+                flush();
+                super.close();
+            }
+        }
+
+        @Override
+        public void flush() throws IOException {
+            flushCounter.incrementAndGet();
+        }
+    }
+
+    private static class TestContainerWriter implements ContainerResponseWriter {
+        TestResponseOutputStream outputStream;
+        private final boolean buffering;
+
+        private TestContainerWriter(boolean buffering) {
+            this.buffering = buffering;
+        }
+
+        @Override
+        public OutputStream writeResponseStatusAndHeaders(long contentLength, ContainerResponse responseContext)
+                throws ContainerException {
+            outputStream = new TestResponseOutputStream();
+//            responseContext.setEntityStream(outputStream);
+            return outputStream;
+        }
+
+        @Override
+        public boolean suspend(long timeOut, TimeUnit timeUnit, TimeoutHandler timeoutHandler) {
+            return false;
+        }
+
+        @Override
+        public void setSuspendTimeout(long timeOut, TimeUnit timeUnit) throws IllegalStateException {
+        }
+
+        @Override
+        public void commit() {
+        }
+
+        @Override
+        public void failure(Throwable error) {
+            throw new RuntimeException(error);
+        }
+
+        @Override
+        public boolean enableResponseBuffering() {
+            return buffering;
+        }
+    }
+
+    @Path("/test")
+    public static class StreamResource {
+
+        @GET
+        @Path(value = "/stream")
+        @Produces(MediaType.TEXT_PLAIN)
+        public Response stream() {
+
+            StreamingOutput stream = output -> {
+                output.write(RESPONSE.getBytes(StandardCharsets.UTF_8));
+            };
+            return Response.ok(stream).build();
+        }
+    }
+
+    @Test
+    public void testWriterBuffering() {
+        TestContainerWriter writer = new TestContainerWriter(true);
+        testWriter(writer);
+    }
+
+    @Test
+    public void testWriterNoBuffering() {
+        TestContainerWriter writer = new TestContainerWriter(false);
+        testWriter(writer);
+    }
+
+    private void testWriter(TestContainerWriter writer) {
+        flushCounter.set(0);
+        RequestContextBuilder rcb = RequestContextBuilder.from("/test/stream", "GET");
+
+        TestContainerRequest request = rcb.new TestContainerRequest(
+                null, URI.create("/test/stream"), "GET", null, new MapPropertiesDelegate()) {
+            @Override
+            public void setWorkers(MessageBodyWorkers workers) {
+                if (workers != null) {
+                    setWriter(writer);
+                }
+                super.setWorkers(workers);
+            }
+        };
+
+        ApplicationHandler applicationHandler = new ApplicationHandler(new ResourceConfig(StreamResource.class));
+        Future