Skip to content

Commit 31f64a5

Browse files
committed
Add test that verifies Quarkus REST closes connection on streaming error
1 parent 37c4e28 commit 31f64a5

File tree

1 file changed

+124
-0
lines changed

1 file changed

+124
-0
lines changed
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
package io.quarkus.resteasy.reactive.server.test.stream;
2+
3+
import static org.junit.jupiter.api.Assertions.fail;
4+
5+
import java.util.concurrent.CompletableFuture;
6+
import java.util.concurrent.ExecutionException;
7+
import java.util.concurrent.TimeUnit;
8+
import java.util.concurrent.TimeoutException;
9+
import java.util.concurrent.atomic.AtomicLong;
10+
import java.util.function.Consumer;
11+
import java.util.stream.IntStream;
12+
13+
import jakarta.inject.Inject;
14+
import jakarta.ws.rs.DefaultValue;
15+
import jakarta.ws.rs.GET;
16+
import jakarta.ws.rs.Path;
17+
18+
import org.jboss.resteasy.reactive.RestQuery;
19+
import org.junit.jupiter.api.Assertions;
20+
import org.junit.jupiter.api.Test;
21+
import org.junit.jupiter.api.extension.RegisterExtension;
22+
23+
import io.quarkus.test.QuarkusUnitTest;
24+
import io.restassured.RestAssured;
25+
import io.smallrye.mutiny.Multi;
26+
import io.smallrye.mutiny.subscription.MultiEmitter;
27+
import io.vertx.core.Handler;
28+
import io.vertx.core.Vertx;
29+
import io.vertx.core.buffer.Buffer;
30+
import io.vertx.core.http.HttpClient;
31+
import io.vertx.core.http.HttpClientRequest;
32+
import io.vertx.core.http.HttpClosedException;
33+
import io.vertx.core.http.HttpMethod;
34+
import io.vertx.core.impl.NoStackTraceException;
35+
36+
public class ErrorDuringStreamingTestCase {
37+
38+
@RegisterExtension
39+
static QuarkusUnitTest runner = new QuarkusUnitTest()
40+
.withApplicationRoot(jar -> jar.addClasses(Resource.class));
41+
42+
@Inject
43+
Vertx vertx;
44+
45+
@Test
46+
public void noFailure() throws ExecutionException, InterruptedException, TimeoutException {
47+
HttpClient client = null;
48+
try {
49+
AtomicLong count = new AtomicLong();
50+
CompletableFuture<Object> latch = new CompletableFuture<>();
51+
client = vertx.createHttpClient();
52+
sendRequest(client, "/test", latch, b -> count.getAndIncrement());
53+
latch.get(10, TimeUnit.SECONDS);
54+
Assertions.assertEquals(2 * 100, count.get());
55+
} finally {
56+
if (client != null) {
57+
client.close().toCompletionStage().toCompletableFuture().get();
58+
}
59+
}
60+
}
61+
62+
@Test
63+
public void failure() throws InterruptedException, TimeoutException, ExecutionException {
64+
HttpClient client = null;
65+
AtomicLong count = new AtomicLong();
66+
try {
67+
CompletableFuture<Object> latch = new CompletableFuture<>();
68+
client = vertx.createHttpClient();
69+
sendRequest(client, "/test?fail=true", latch, b -> count.getAndIncrement());
70+
latch.get(10, TimeUnit.SECONDS);
71+
fail("The client should have failed as the server reset the connection");
72+
} catch (ExecutionException e) {
73+
Assertions.assertInstanceOf(HttpClosedException.class, e.getCause());
74+
Assertions.assertEquals(100, count.get());
75+
} finally {
76+
if (client != null) {
77+
client.close().toCompletionStage().toCompletableFuture().get();
78+
}
79+
}
80+
}
81+
82+
private void sendRequest(HttpClient client, String requestURI, CompletableFuture<Object> latch,
83+
Consumer<Buffer> bodyConsumer) {
84+
Handler<Throwable> failure = latch::completeExceptionally;
85+
client.request(HttpMethod.GET, RestAssured.port, "localhost", requestURI)
86+
.onFailure(failure)
87+
.onSuccess(new Handler<>() {
88+
@Override
89+
public void handle(HttpClientRequest event) {
90+
event.connect().onFailure(failure)
91+
.onSuccess(response -> {
92+
response
93+
.handler(bodyConsumer::accept)
94+
.exceptionHandler(latch::completeExceptionally)
95+
.end(latch::complete);
96+
});
97+
98+
}
99+
});
100+
}
101+
102+
@Path("test")
103+
public static class Resource {
104+
105+
@GET
106+
public Multi<String> stream(@RestQuery @DefaultValue("false") boolean fail) {
107+
return Multi.createFrom().emitter(emitter -> {
108+
emit(emitter);
109+
if (fail) {
110+
throw new NoStackTraceException("dummy");
111+
} else {
112+
emit(emitter);
113+
emitter.complete();
114+
}
115+
});
116+
}
117+
118+
private static void emit(MultiEmitter<? super String> emitter) {
119+
IntStream.range(0, 100).forEach(i -> {
120+
emitter.emit(String.valueOf(i));
121+
});
122+
}
123+
}
124+
}

0 commit comments

Comments
 (0)