Skip to content

Commit 9891318

Browse files
authored
Merge pull request quarkusio#50361 from geoand/quarkusio#50336
When an error occurs during a streaming response, call `HttpServerResponse.reset()`
2 parents c0a01a9 + 31f64a5 commit 9891318

File tree

5 files changed

+139
-0
lines changed

5 files changed

+139
-0
lines changed

extensions/resteasy-reactive/rest-servlet/runtime/src/main/java/io/quarkus/resteasy/reactive/server/servlet/runtime/ServletRequestContext.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -724,4 +724,9 @@ public void handle(Void event) {
724724
});
725725
return this;
726726
}
727+
728+
@Override
729+
public void reset() {
730+
context.response().reset();
731+
}
727732
}
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
package io.quarkus.resteasy.reactive.server.test.stream;
2+
3+
import static org.junit.jupiter.api.Assertions.fail;
4+
5+
import java.util.concurrent.CompletableFuture;
6+
import java.util.concurrent.ExecutionException;
7+
import java.util.concurrent.TimeUnit;
8+
import java.util.concurrent.TimeoutException;
9+
import java.util.concurrent.atomic.AtomicLong;
10+
import java.util.function.Consumer;
11+
import java.util.stream.IntStream;
12+
13+
import jakarta.inject.Inject;
14+
import jakarta.ws.rs.DefaultValue;
15+
import jakarta.ws.rs.GET;
16+
import jakarta.ws.rs.Path;
17+
18+
import org.jboss.resteasy.reactive.RestQuery;
19+
import org.junit.jupiter.api.Assertions;
20+
import org.junit.jupiter.api.Test;
21+
import org.junit.jupiter.api.extension.RegisterExtension;
22+
23+
import io.quarkus.test.QuarkusUnitTest;
24+
import io.restassured.RestAssured;
25+
import io.smallrye.mutiny.Multi;
26+
import io.smallrye.mutiny.subscription.MultiEmitter;
27+
import io.vertx.core.Handler;
28+
import io.vertx.core.Vertx;
29+
import io.vertx.core.buffer.Buffer;
30+
import io.vertx.core.http.HttpClient;
31+
import io.vertx.core.http.HttpClientRequest;
32+
import io.vertx.core.http.HttpClosedException;
33+
import io.vertx.core.http.HttpMethod;
34+
import io.vertx.core.impl.NoStackTraceException;
35+
36+
public class ErrorDuringStreamingTestCase {
37+
38+
@RegisterExtension
39+
static QuarkusUnitTest runner = new QuarkusUnitTest()
40+
.withApplicationRoot(jar -> jar.addClasses(Resource.class));
41+
42+
@Inject
43+
Vertx vertx;
44+
45+
@Test
46+
public void noFailure() throws ExecutionException, InterruptedException, TimeoutException {
47+
HttpClient client = null;
48+
try {
49+
AtomicLong count = new AtomicLong();
50+
CompletableFuture<Object> latch = new CompletableFuture<>();
51+
client = vertx.createHttpClient();
52+
sendRequest(client, "/test", latch, b -> count.getAndIncrement());
53+
latch.get(10, TimeUnit.SECONDS);
54+
Assertions.assertEquals(2 * 100, count.get());
55+
} finally {
56+
if (client != null) {
57+
client.close().toCompletionStage().toCompletableFuture().get();
58+
}
59+
}
60+
}
61+
62+
@Test
63+
public void failure() throws InterruptedException, TimeoutException, ExecutionException {
64+
HttpClient client = null;
65+
AtomicLong count = new AtomicLong();
66+
try {
67+
CompletableFuture<Object> latch = new CompletableFuture<>();
68+
client = vertx.createHttpClient();
69+
sendRequest(client, "/test?fail=true", latch, b -> count.getAndIncrement());
70+
latch.get(10, TimeUnit.SECONDS);
71+
fail("The client should have failed as the server reset the connection");
72+
} catch (ExecutionException e) {
73+
Assertions.assertInstanceOf(HttpClosedException.class, e.getCause());
74+
Assertions.assertEquals(100, count.get());
75+
} finally {
76+
if (client != null) {
77+
client.close().toCompletionStage().toCompletableFuture().get();
78+
}
79+
}
80+
}
81+
82+
private void sendRequest(HttpClient client, String requestURI, CompletableFuture<Object> latch,
83+
Consumer<Buffer> bodyConsumer) {
84+
Handler<Throwable> failure = latch::completeExceptionally;
85+
client.request(HttpMethod.GET, RestAssured.port, "localhost", requestURI)
86+
.onFailure(failure)
87+
.onSuccess(new Handler<>() {
88+
@Override
89+
public void handle(HttpClientRequest event) {
90+
event.connect().onFailure(failure)
91+
.onSuccess(response -> {
92+
response
93+
.handler(bodyConsumer::accept)
94+
.exceptionHandler(latch::completeExceptionally)
95+
.end(latch::complete);
96+
});
97+
98+
}
99+
});
100+
}
101+
102+
@Path("test")
103+
public static class Resource {
104+
105+
@GET
106+
public Multi<String> stream(@RestQuery @DefaultValue("false") boolean fail) {
107+
return Multi.createFrom().emitter(emitter -> {
108+
emit(emitter);
109+
if (fail) {
110+
throw new NoStackTraceException("dummy");
111+
} else {
112+
emit(emitter);
113+
emitter.complete();
114+
}
115+
});
116+
}
117+
118+
private static void emit(MultiEmitter<? super String> emitter) {
119+
IntStream.range(0, 100).forEach(i -> {
120+
emitter.emit(String.valueOf(i));
121+
});
122+
}
123+
}
124+
}

independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/PublisherResponseHandler.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,9 @@ protected void handleException(ResteasyReactiveRequestContext requestContext, Th
246246
// it will appear to be an SSE value, which is incorrect, so we should only log it and close the connection
247247
if (requestContext.serverResponse().headWritten()) {
248248
log.error("Exception in SSE server handling, impossible to send it to client", t);
249+
// HTTP chunked encoding sends an indeterminate number of chunks, but it has to end with an end chunk of zero size to indicate successful transmission.
250+
// reset() will cause this last chunk to not be sent, even though every other chunk was sent, and so clients can detect the error
251+
requestContext.serverResponse().reset();
249252
} else {
250253
// we can go through the abort chain
251254
requestContext.resume(t, true);

independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/spi/ServerHttpResponse.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,4 +50,6 @@ public interface ServerHttpResponse extends StreamingResponse<ServerHttpResponse
5050
boolean isWriteQueueFull();
5151

5252
ServerHttpResponse addDrainHandler(Runnable onDrain);
53+
54+
void reset();
5355
}

independent-projects/resteasy-reactive/server/vertx/src/main/java/org/jboss/resteasy/reactive/server/vertx/VertxResteasyReactiveRequestContext.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -531,6 +531,11 @@ public void handle(Void event) {
531531
return this;
532532
}
533533

534+
@Override
535+
public void reset() {
536+
response.reset();
537+
}
538+
534539
@Override
535540
public boolean isWriteQueueFull() {
536541
return response.writeQueueFull();

0 commit comments

Comments
 (0)