Skip to content

Commit f254291

Browse files
jcarranzangeoand
authored andcommitted
Fix StreamingOutput connection reset on mid-stream errors
Fixes: #50754
1 parent 55a8eeb commit f254291

File tree

3 files changed

+184
-2
lines changed

3 files changed

+184
-2
lines changed

independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/providers/serialisers/StreamingOutputMessageBodyWriter.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,13 @@
1010
import jakarta.ws.rs.core.MultivaluedMap;
1111
import jakarta.ws.rs.core.StreamingOutput;
1212

13+
import org.jboss.resteasy.reactive.server.core.ResteasyReactiveRequestContext;
1314
import org.jboss.resteasy.reactive.server.spi.ResteasyReactiveResourceInfo;
1415
import org.jboss.resteasy.reactive.server.spi.ServerMessageBodyWriter;
1516
import org.jboss.resteasy.reactive.server.spi.ServerRequestContext;
1617

1718
public class StreamingOutputMessageBodyWriter implements ServerMessageBodyWriter<StreamingOutput> {
19+
1820
@Override
1921
public boolean isWriteable(Class<?> type, Type genericType, Annotation[] annotations, MediaType mediaType) {
2022
return doIsWriteable(type);
@@ -45,7 +47,25 @@ public void writeTo(StreamingOutput streamingOutput, Class<?> type, Type generic
4547

4648
@Override
4749
public void writeResponse(StreamingOutput o, Type genericType, ServerRequestContext context)
48-
throws WebApplicationException, IOException {
49-
o.write(context.getOrCreateOutputStream());
50+
throws WebApplicationException {
51+
ResteasyReactiveRequestContext rrContext = (ResteasyReactiveRequestContext) context;
52+
try {
53+
o.write(context.getOrCreateOutputStream());
54+
} catch (Throwable t) {
55+
if (context.serverResponse().headWritten()) {
56+
context.serverResponse().reset();
57+
rrContext.resume(t);
58+
} else {
59+
if (t instanceof WebApplicationException) {
60+
throw (WebApplicationException) t;
61+
} else if (t instanceof IOException) {
62+
throw new WebApplicationException(t);
63+
} else if (t instanceof RuntimeException) {
64+
throw new WebApplicationException(t);
65+
} else {
66+
throw new WebApplicationException(t);
67+
}
68+
}
69+
}
5070
}
5171
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package io.quarkus.it.resteasy.reactive.elytron;
2+
3+
import java.io.IOException;
4+
import java.io.OutputStream;
5+
import java.nio.charset.StandardCharsets;
6+
7+
import jakarta.ws.rs.DefaultValue;
8+
import jakarta.ws.rs.GET;
9+
import jakarta.ws.rs.Path;
10+
import jakarta.ws.rs.Produces;
11+
import jakarta.ws.rs.QueryParam;
12+
import jakarta.ws.rs.core.MediaType;
13+
import jakarta.ws.rs.core.StreamingOutput;
14+
15+
import io.smallrye.common.annotation.Blocking;
16+
17+
@Path("/streaming-output-error")
18+
public class StreamingOutputResource {
19+
private static final int ITEMS_PER_EMIT = 100;
20+
21+
private static final byte[] CHUNK = "This is one chunk of data.\n".getBytes(StandardCharsets.UTF_8);
22+
23+
@GET
24+
@Path("/output")
25+
@Produces(MediaType.TEXT_PLAIN)
26+
@Blocking
27+
public StreamingOutput streamOutput(@QueryParam("fail") @DefaultValue("false") boolean fail) {
28+
return outputStream -> {
29+
try {
30+
writeData(outputStream);
31+
if (fail) {
32+
throw new IOException("dummy failure");
33+
}
34+
writeData(outputStream);
35+
} catch (IOException e) {
36+
throw new RuntimeException(e);
37+
}
38+
};
39+
}
40+
41+
private void writeData(OutputStream out) throws IOException {
42+
for (int i = 0; i < ITEMS_PER_EMIT; i++) {
43+
out.write(CHUNK);
44+
out.flush();
45+
}
46+
}
47+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
package io.quarkus.it.resteasy.reactive.elytron;
2+
3+
import static io.restassured.RestAssured.port;
4+
5+
import java.nio.charset.StandardCharsets;
6+
import java.time.Duration;
7+
import java.util.concurrent.CompletableFuture;
8+
import java.util.concurrent.ExecutionException;
9+
import java.util.concurrent.TimeUnit;
10+
import java.util.concurrent.atomic.AtomicLong;
11+
import java.util.function.Consumer;
12+
13+
import org.junit.jupiter.api.AfterEach;
14+
import org.junit.jupiter.api.Assertions;
15+
import org.junit.jupiter.api.BeforeEach;
16+
import org.junit.jupiter.api.Test;
17+
18+
import io.quarkus.test.junit.QuarkusTest;
19+
import io.vertx.core.Handler;
20+
import io.vertx.core.Vertx;
21+
import io.vertx.core.buffer.Buffer;
22+
import io.vertx.core.http.HttpClient;
23+
import io.vertx.core.http.HttpClosedException;
24+
import io.vertx.core.http.HttpMethod;
25+
26+
@QuarkusTest
27+
public class StreamingOutputErrorHandlingTest {
28+
private static final Duration TIMEOUT = Duration.ofSeconds(10);
29+
private static final int ITEMS_PER_BATCH = 100;
30+
private static final int BYTES_PER_CHUNK = "This is one chunk of data.\n"
31+
.getBytes(StandardCharsets.UTF_8).length;
32+
private static final long EXPECTED_BYTES_FIRST_BATCH = (long) ITEMS_PER_BATCH * BYTES_PER_CHUNK;
33+
private static final long EXPECTED_BYTES_COMPLETE = (long) ITEMS_PER_BATCH * 2 * BYTES_PER_CHUNK;
34+
35+
private Vertx vertx;
36+
private HttpClient client;
37+
38+
@BeforeEach
39+
public void setup() {
40+
vertx = Vertx.vertx();
41+
client = vertx.createHttpClient();
42+
}
43+
44+
@AfterEach
45+
public void cleanup() throws Exception {
46+
if (client != null) {
47+
client.close().toCompletionStage().toCompletableFuture().get(5, TimeUnit.SECONDS);
48+
}
49+
if (vertx != null) {
50+
vertx.close().toCompletionStage().toCompletableFuture().get(5, TimeUnit.SECONDS);
51+
}
52+
}
53+
54+
@Test
55+
public void testStreamingOutputFailureMidStream() {
56+
AtomicLong byteCount = new AtomicLong();
57+
CompletableFuture<Void> latch = new CompletableFuture<>();
58+
59+
sendRequest("/streaming-output-error/output?fail=true", latch,
60+
b -> byteCount.addAndGet(b.length()));
61+
62+
Assertions.assertTimeoutPreemptively(TIMEOUT, () -> {
63+
ExecutionException ex = Assertions.assertThrows(ExecutionException.class,
64+
latch::get,
65+
"Client should have detected that the server reset the connection");
66+
67+
Assertions.assertInstanceOf(HttpClosedException.class, ex.getCause(),
68+
"Expected HttpClosedException when connection is reset mid-stream");
69+
});
70+
71+
Assertions.assertEquals(EXPECTED_BYTES_FIRST_BATCH, byteCount.get(),
72+
"Should have received only the first batch of data before failure");
73+
}
74+
75+
@Test
76+
public void testStreamingOutputSuccess() {
77+
AtomicLong byteCount = new AtomicLong();
78+
CompletableFuture<Void> latch = new CompletableFuture<>();
79+
80+
sendRequest("/streaming-output-error/output?fail=false", latch,
81+
b -> byteCount.addAndGet(b.length()));
82+
83+
Assertions.assertTimeoutPreemptively(TIMEOUT,
84+
() -> latch.get(),
85+
"StreamingOutput should complete successfully without errors");
86+
87+
Assertions.assertEquals(EXPECTED_BYTES_COMPLETE, byteCount.get(),
88+
"Should have received all data when no errors occur");
89+
}
90+
91+
private void sendRequest(String uri, CompletableFuture<Void> latch, Consumer<Buffer> bodyConsumer) {
92+
Handler<Throwable> failureHandler = latch::completeExceptionally;
93+
94+
client.request(HttpMethod.GET, port, "localhost", uri)
95+
.onFailure(failureHandler)
96+
.onSuccess(request -> {
97+
request.end();
98+
request.connect()
99+
.onFailure(failureHandler)
100+
.onSuccess(response -> {
101+
response.request().connection().closeHandler(v -> {
102+
failureHandler.handle(new HttpClosedException("Connection was closed"));
103+
});
104+
105+
response.handler(buffer -> {
106+
if (buffer.length() > 0) {
107+
bodyConsumer.accept(buffer);
108+
}
109+
});
110+
response.exceptionHandler(failureHandler);
111+
response.endHandler(v -> latch.complete(null));
112+
});
113+
});
114+
}
115+
}

0 commit comments

Comments
 (0)