Skip to content

Commit 5cafd15

Browse files
committed
Support cancelling pending REST requests
Closes: #48620
1 parent 3e0f5b4 commit 5cafd15

File tree

4 files changed

+116
-0
lines changed

4 files changed

+116
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
package io.quarkus.rest.client.reactive;
2+
3+
import static io.restassured.RestAssured.when;
4+
import static org.hamcrest.Matchers.is;
5+
6+
import java.net.URI;
7+
import java.util.concurrent.CountDownLatch;
8+
import java.util.concurrent.TimeUnit;
9+
import java.util.concurrent.atomic.AtomicLong;
10+
import java.util.stream.IntStream;
11+
12+
import jakarta.ws.rs.GET;
13+
import jakarta.ws.rs.Path;
14+
15+
import org.eclipse.microprofile.rest.client.RestClientBuilder;
16+
import org.jboss.resteasy.reactive.client.api.QuarkusRestClientProperties;
17+
import org.junit.jupiter.api.Test;
18+
import org.junit.jupiter.api.extension.RegisterExtension;
19+
20+
import io.quarkus.test.QuarkusUnitTest;
21+
import io.quarkus.test.common.http.TestHTTPResource;
22+
import io.smallrye.mutiny.Uni;
23+
24+
public class RequestCancellationTest {
25+
26+
@RegisterExtension
27+
static final QuarkusUnitTest config = new QuarkusUnitTest()
28+
.withApplicationRoot((jar) -> jar.addClasses(Client.class, Resource.class));
29+
30+
@TestHTTPResource
31+
URI uri;
32+
33+
@Test
34+
public void test() throws InterruptedException {
35+
Client client = RestClientBuilder.newBuilder()
36+
.baseUri(uri)
37+
.property(QuarkusRestClientProperties.CONNECTION_POOL_SIZE, 1) // make sure client requests are serialized
38+
.build(Client.class);
39+
40+
when().get("resource/count")
41+
.then()
42+
.statusCode(200)
43+
.body(is("0"));
44+
45+
CountDownLatch latch = new CountDownLatch(1);
46+
client.get().subscribe().with(res -> {
47+
latch.countDown();
48+
});
49+
50+
// create a bunch of requests that we test won't end up hitting the server
51+
Uni.join().all(IntStream.range(0, 100).mapToObj(i -> client.get()).toList())
52+
.andCollectFailures()
53+
.subscribe() // actually initiate the requests
54+
.with(res -> {
55+
})
56+
.cancel(); // cancel all the requests
57+
58+
latch.await(5, TimeUnit.SECONDS);
59+
60+
// ensure that only the first request was made
61+
when().get("resource/count")
62+
.then()
63+
.statusCode(200)
64+
.body(is("1"));
65+
}
66+
67+
@Path("resource")
68+
public interface Client {
69+
70+
@GET
71+
Uni<String> get();
72+
}
73+
74+
@Path("resource")
75+
public static class Resource {
76+
77+
private static final AtomicLong COUNTER = new AtomicLong();
78+
79+
@GET
80+
public String get() throws InterruptedException {
81+
COUNTER.incrementAndGet();
82+
// ensure that each request takes a long time to complete to we don't end up with a race
83+
// condition where the client requests that were to be canceled, had time to execute because
84+
// the previous requests in the queue completed too fast
85+
Thread.sleep(2000);
86+
return "foo";
87+
}
88+
89+
@Path("count")
90+
@GET
91+
public long count() {
92+
return COUNTER.get();
93+
}
94+
}
95+
}

independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/handlers/ClientSendRequestHandler.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,11 @@ public void handle(RestClientRequestContext requestContext) {
101101
future.subscribe().with(new Consumer<>() {
102102
@Override
103103
public void accept(HttpClientRequest httpClientRequest) {
104+
if (requestContext.isUserCanceled()) {
105+
// in this case the user aborted before the request was even created
106+
return;
107+
}
108+
104109
requestContext.setHttpClientRequest(httpClientRequest);
105110

106111
// adapt headers to HTTP/2 depending on the underlying HTTP connection

independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/RestClientRequestContext.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import java.util.Map;
1616
import java.util.concurrent.CompletableFuture;
1717
import java.util.concurrent.Executor;
18+
import java.util.concurrent.atomic.AtomicBoolean;
1819

1920
import jakarta.ws.rs.RuntimeType;
2021
import jakarta.ws.rs.WebApplicationException;
@@ -107,6 +108,8 @@ public class RestClientRequestContext extends AbstractResteasyReactiveContext<Re
107108
private Map<Class<?>, MultipartResponseData> multipartResponsesData;
108109
private StackTraceElement[] callerStackTrace;
109110

111+
private final AtomicBoolean userCanceled = new AtomicBoolean();
112+
110113
public RestClientRequestContext(ClientImpl restClient,
111114
HttpClient httpClient, String httpMethod, URI uri,
112115
ConfigurationImpl configuration, ClientRequestHeaders requestHeaders,
@@ -601,4 +604,12 @@ private Boolean getBooleanProperty(String name, Boolean defaultValue) {
601604
protected boolean isRequestScopeManagementRequired() {
602605
return false;
603606
}
607+
608+
public void setUserCanceled() {
609+
userCanceled.set(true);
610+
}
611+
612+
public boolean isUserCanceled() {
613+
return userCanceled.get();
614+
}
604615
}

independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/UniInvoker.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,14 @@ public void run() {
5050
// be very defensive here as things could have been nulled out when the application is being torn down
5151
RestClientRequestContext restClientRequestContext = restClientRequestContextRef.get();
5252
if (restClientRequestContext != null) {
53+
restClientRequestContext.setUserCanceled();
54+
5355
HttpClientRequest httpClientRequest = restClientRequestContext.getHttpClientRequest();
5456
if (httpClientRequest != null) {
57+
// if there is already an HTTP request in flight, cancel it
5558
httpClientRequest.reset();
59+
} else {
60+
// by having already done setUserCanceled, Quarkus knows to reset the request when it finally gets created
5661
}
5762
}
5863
}

0 commit comments

Comments
 (0)