Skip to content

Commit 74dd40a

Browse files
committed
async retry actually async
1 parent cf02e28 commit 74dd40a

File tree

3 files changed

+57
-39
lines changed

3 files changed

+57
-39
lines changed
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package co.elastic.clients.transport.rest_client;
2+
3+
import org.elasticsearch.client.Cancellable;
4+
5+
import java.util.concurrent.CompletableFuture;
6+
7+
/**
8+
* The {@code Future} implementation returned by async requests.
9+
* It wraps the RestClient's cancellable and propagates cancellation.
10+
*/
11+
public class RequestFuture<T> extends CompletableFuture<T> {
12+
private volatile Cancellable cancellable;
13+
14+
@Override
15+
public boolean cancel(boolean mayInterruptIfRunning) {
16+
boolean cancelled = super.cancel(mayInterruptIfRunning);
17+
if (cancelled && cancellable != null) {
18+
cancellable.cancel();
19+
}
20+
return cancelled;
21+
}
22+
23+
public void setCancellable(Cancellable cancellable) {
24+
this.cancellable = cancellable;
25+
}
26+
}

java-client/src/main/java/co/elastic/clients/transport/rest_client/RestClientHttpClient.java

Lines changed: 2 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -49,23 +49,6 @@ public class RestClientHttpClient implements TransportHttpClient {
4949

5050
private static final ConcurrentHashMap<String, ContentType> ContentTypeCache = new ConcurrentHashMap<>();
5151

52-
/**
53-
* The {@code Future} implementation returned by async requests.
54-
* It wraps the RestClient's cancellable and propagates cancellation.
55-
*/
56-
private static class RequestFuture<T> extends CompletableFuture<T> {
57-
private volatile Cancellable cancellable;
58-
59-
@Override
60-
public boolean cancel(boolean mayInterruptIfRunning) {
61-
boolean cancelled = super.cancel(mayInterruptIfRunning);
62-
if (cancelled && cancellable != null) {
63-
cancellable.cancel();
64-
}
65-
return cancelled;
66-
}
67-
}
68-
6952
private final RestClient restClient;
7053

7154
public RestClientHttpClient(RestClient restClient) {
@@ -110,7 +93,7 @@ public CompletableFuture<Response> performRequestAsync(
11093
return future;
11194
}
11295

113-
future.cancellable = restClient.performRequestAsync(restRequest, new ResponseListener() {
96+
future.setCancellable(restClient.performRequestAsync(restRequest, new ResponseListener() {
11497
@Override
11598
public void onSuccess(org.elasticsearch.client.Response response) {
11699
future.complete(new RestResponse(response));
@@ -120,7 +103,7 @@ public void onSuccess(org.elasticsearch.client.Response response) {
120103
public void onFailure(Exception exception) {
121104
future.completeExceptionally(exception);
122105
}
123-
});
106+
}));
124107

125108
return future;
126109
}

java-client/src/main/java/co/elastic/clients/transport/rest_client/RetryRestClientHttpClient.java

Lines changed: 29 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,13 @@ public Response performRequestRetry(String endpointId, @Nullable Node node, Requ
3131
try {
3232
return delegate.performRequest(endpointId, node, request, options);
3333
} catch (ResponseException e) {
34-
if (e.getResponse().getStatusLine().getStatusCode() == 503) { // TODO list of statuses
34+
if (e.getResponse().getStatusLine().getStatusCode() == 503) { // TODO list of statuses, configurable or hardcoded?
3535
// synchronous retry
3636
if (backoffIter.hasNext()) {
3737
try {
38-
Thread.sleep(backoffIter.next()); // TODO ... no?
38+
Thread.sleep(backoffIter.next());
3939
} catch (InterruptedException ie) {
40+
throw e; // TODO okay with masking IE and just returning original exception?
4041
}
4142
System.out.println("Retrying");
4243
return performRequestRetry(endpointId, node, request, options, backoffIter);
@@ -50,33 +51,41 @@ public Response performRequestRetry(String endpointId, @Nullable Node node, Requ
5051
@Override
5152
public CompletableFuture<Response> performRequestAsync(String endpointId, @Nullable Node node,
5253
Request request, TransportOptions options) {
53-
return performRequestAsyncRetry(endpointId, node, request, options, backoffPolicy.iterator());
54+
RequestFuture<Response> futureResult = new RequestFuture<>();
55+
return performRequestAsyncRetry(endpointId, node, request, options, backoffPolicy.iterator(),
56+
futureResult);
5457
}
5558

5659
public CompletableFuture<Response> performRequestAsyncRetry(String endpointId, @Nullable Node node,
5760
Request request,
5861
TransportOptions options,
59-
Iterator<Long> backoffIter) {
60-
CompletableFuture<Response> fut = delegate.performRequestAsync(endpointId, node, request, options);
61-
try {
62-
fut.get(); // TODO is this problematic?
63-
return fut;
64-
} catch (Exception e) {
65-
if (e.getCause() instanceof ResponseException) {
66-
if (((ResponseException) e.getCause()).getResponse().getStatusLine().getStatusCode() == 503) { // TODO list of statuses
67-
if (backoffIter.hasNext()) {
68-
try {
69-
Thread.sleep(backoffIter.next()); // TODO ... no?
70-
} catch (InterruptedException ie) {
71-
fut.completeExceptionally(e); // TODO masking internal errors and just returning original error okay?
62+
Iterator<Long> backoffIter,
63+
CompletableFuture<Response> futureResult) {
64+
CompletableFuture<Response> res = delegate.performRequestAsync(endpointId, node, request, options);
65+
66+
res.whenComplete((resp, e) -> {
67+
if (e != null) {
68+
if (e instanceof ResponseException) {
69+
if (((ResponseException) e).getResponse().getStatusLine().getStatusCode() == 503) { // TODO list of statuses, configurable or hardcoded?
70+
if (backoffIter.hasNext()) {
71+
try {
72+
Thread.sleep(backoffIter.next());
73+
} catch (InterruptedException ie) {
74+
// TODO okay with masking IE and just returning original exception?
75+
futureResult.completeExceptionally(e);
76+
}
77+
System.out.println("Retrying");
78+
performRequestAsyncRetry(endpointId, node, request, options, backoffIter,futureResult);
7279
}
73-
System.out.println("Retrying");
74-
return performRequestAsyncRetry(endpointId, node, request, options, backoffIter);
7580
}
7681
}
7782
}
78-
return fut;
79-
}
83+
else {
84+
futureResult.complete(resp);
85+
}
86+
});
87+
88+
return futureResult;
8089
}
8190

8291
@Override

0 commit comments

Comments
 (0)