Skip to content

Commit 4eb7ed0

Browse files
committed
sync retry draft + test
1 parent 7843f17 commit 4eb7ed0

File tree

3 files changed

+130
-19
lines changed

3 files changed

+130
-19
lines changed

java-client/src/main/java/co/elastic/clients/transport/ElasticsearchTransportBase.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import co.elastic.clients.transport.instrumentation.Instrumentation;
3535
import co.elastic.clients.transport.instrumentation.NoopInstrumentation;
3636
import co.elastic.clients.transport.instrumentation.OpenTelemetryForElasticsearch;
37+
import co.elastic.clients.transport.rest_client.RetryRestClientHttpClient;
3738
import co.elastic.clients.util.ApiTypeHelper;
3839
import co.elastic.clients.util.BinaryData;
3940
import co.elastic.clients.util.ByteArrayBinaryData;
@@ -100,9 +101,15 @@ public ElasticsearchTransportBase(
100101
@Nullable Instrumentation instrumentation
101102
) {
102103
this.mapper = jsonpMapper;
103-
this.httpClient = httpClient;
104104
this.transportOptions = httpClient.createOptions(options);
105105

106+
if (this.transportOptions.backoffPolicy()!=BackoffPolicy.noBackoff()){
107+
this.httpClient = new RetryRestClientHttpClient(httpClient,this.transportOptions.backoffPolicy());
108+
}
109+
else {
110+
this.httpClient = httpClient;
111+
}
112+
106113
// If no instrumentation is provided, fallback to OpenTelemetry and ultimately noop
107114
if (instrumentation == null) {
108115
instrumentation = OpenTelemetryForElasticsearch.getDefault();
@@ -111,10 +118,6 @@ public ElasticsearchTransportBase(
111118
instrumentation = NoopInstrumentation.INSTANCE;
112119
}
113120
this.instrumentation = instrumentation;
114-
115-
if (this.transportOptions.backoffPolicy()!=BackoffPolicy.noBackoff()){
116-
117-
}
118121
}
119122

120123
@Override

java-client/src/main/java/co/elastic/clients/transport/rest_client/RetryRestClientHttpClient.java

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,60 @@
11
package co.elastic.clients.transport.rest_client;
22

3+
import co.elastic.clients.transport.BackoffPolicy;
34
import co.elastic.clients.transport.TransportOptions;
45
import co.elastic.clients.transport.http.TransportHttpClient;
6+
import org.elasticsearch.client.ResponseException;
57

68
import javax.annotation.Nullable;
79
import java.io.IOException;
10+
import java.util.Iterator;
811
import java.util.concurrent.CompletableFuture;
912

1013
public class RetryRestClientHttpClient implements TransportHttpClient {
1114

1215
private TransportHttpClient delegate;
16+
private BackoffPolicy backoffPolicy;
17+
18+
public RetryRestClientHttpClient(TransportHttpClient delegate, BackoffPolicy backoffPolicy) {
19+
this.delegate = delegate;
20+
this.backoffPolicy = backoffPolicy;
21+
}
1322

1423
@Override
15-
public Response performRequest(String endpointId, @Nullable Node node, Request request, TransportOptions options) throws IOException {
16-
return delegate.performRequest(endpointId, node, request, options);
24+
public Response performRequest(String endpointId, @Nullable Node node, Request request,
25+
TransportOptions options) throws IOException {
26+
return performRequestRetry(endpointId, node, request, options, backoffPolicy.iterator());
27+
}
28+
29+
public Response performRequestRetry(String endpointId, @Nullable Node node, Request request,
30+
TransportOptions options, Iterator<Long> backoffIter) throws IOException {
31+
try {
32+
return delegate.performRequest(endpointId, node, request, options);
33+
} catch (ResponseException e) {
34+
if (e.getResponse().getStatusLine().getStatusCode() == 503) { // TODO list of statuses
35+
// synchronous retry
36+
if (backoffIter.hasNext()) {
37+
try {
38+
Thread.sleep(backoffIter.next()); // TODO ... no
39+
} catch (InterruptedException ie) {
40+
}
41+
System.out.println("Retrying");
42+
return performRequestRetry(endpointId, node, request, options, backoffIter);
43+
}
44+
else {
45+
// retries finished
46+
throw e;
47+
}
48+
} else {
49+
// error not retryable
50+
throw e;
51+
}
52+
}
1753
}
1854

1955
@Override
20-
public CompletableFuture<Response> performRequestAsync(String endpointId, @Nullable Node node, Request request, TransportOptions options) {
56+
public CompletableFuture<Response> performRequestAsync(String endpointId, @Nullable Node node,
57+
Request request, TransportOptions options) {
2158
return delegate.performRequestAsync(endpointId, node, request, options);
2259
}
2360

java-client/src/test/java/co/elastic/clients/transport/TransportTest.java

Lines changed: 82 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,18 @@
2020
package co.elastic.clients.transport;
2121

2222
import co.elastic.clients.elasticsearch.ElasticsearchClient;
23+
import co.elastic.clients.elasticsearch.cat.IndicesResponse;
2324
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
2425
import co.elastic.clients.transport.http.RepeatableBodyResponse;
2526
import co.elastic.clients.transport.rest_client.RestClientOptions;
2627
import co.elastic.clients.transport.rest_client.RestClientTransport;
28+
import co.elastic.clients.transport.rest_client.RetryRestClientHttpClient;
2729
import co.elastic.clients.util.BinaryData;
2830
import com.sun.net.httpserver.HttpServer;
2931
import org.apache.http.HttpHost;
3032
import org.elasticsearch.client.RequestOptions;
3133
import org.elasticsearch.client.Response;
34+
import org.elasticsearch.client.ResponseException;
3235
import org.elasticsearch.client.RestClient;
3336
import org.junit.jupiter.api.Assertions;
3437
import org.junit.jupiter.api.Test;
@@ -39,7 +42,9 @@
3942
import java.net.InetAddress;
4043
import java.net.InetSocketAddress;
4144
import java.nio.charset.StandardCharsets;
45+
import java.util.Arrays;
4246
import java.util.Collections;
47+
import java.util.concurrent.atomic.AtomicInteger;
4348

4449
import static co.elastic.clients.util.ContentType.APPLICATION_JSON;
4550

@@ -122,7 +127,8 @@ public void testOriginalJsonBodyRetrievalException() throws Exception {
122127
assertNotEquals(RepeatableBodyResponse.class, ex.response().getClass());
123128

124129
// setting transport option
125-
RestClientOptions options = new RestClientOptions(RequestOptions.DEFAULT, true,BackoffPolicy.noBackoff());
130+
RestClientOptions options = new RestClientOptions(RequestOptions.DEFAULT, true,
131+
BackoffPolicy.noBackoff());
126132

127133
ElasticsearchTransport transport = new RestClientTransport(
128134
restClient, new JacksonJsonpMapper(), options);
@@ -139,17 +145,82 @@ public void testOriginalJsonBodyRetrievalException() throws Exception {
139145
assertEquals(200, ex.statusCode());
140146
assertEquals(RepeatableBodyResponse.class, ex.response().getClass());
141147

142-
try (RepeatableBodyResponse repeatableResponse = (RepeatableBodyResponse) ex.response()){
148+
try (RepeatableBodyResponse repeatableResponse = (RepeatableBodyResponse) ex.response()) {
143149
BinaryData body = repeatableResponse.body();
144-
StringBuilder sb = new StringBuilder();
145-
BufferedReader br = new BufferedReader(new InputStreamReader(body.asInputStream()));
146-
String read;
147-
148-
while ((read = br.readLine()) != null) {
149-
sb.append(read);
150-
}
151-
br.close();
152-
assertEquals("definitely not json",sb.toString());
150+
StringBuilder sb = new StringBuilder();
151+
BufferedReader br = new BufferedReader(new InputStreamReader(body.asInputStream()));
152+
String read;
153+
154+
while ((read = br.readLine()) != null) {
155+
sb.append(read);
156+
}
157+
br.close();
158+
assertEquals("definitely not json", sb.toString());
153159
}
154160
}
161+
162+
163+
@Test
164+
public void testRetryClientSync() throws Exception {
165+
HttpServer httpServer = HttpServer.create(new InetSocketAddress(InetAddress.getLoopbackAddress(),
166+
0), 0);
167+
168+
// server will return success after 7 retries
169+
AtomicInteger errorCounter = new AtomicInteger();
170+
171+
httpServer.createContext("/_cat/indices", exchange -> {
172+
exchange.getResponseHeaders().put("Content-Type", Collections.singletonList(APPLICATION_JSON));
173+
exchange.getResponseHeaders().put("X-Elastic-Product", Collections.singletonList("Elasticsearch"
174+
));
175+
if (errorCounter.get()>6){
176+
exchange.sendResponseHeaders(200, 0);
177+
OutputStream out = exchange.getResponseBody();
178+
String jsonRes = " [{\n" +
179+
" \"health\": \"green\",\n" +
180+
" \"status\": \"open\",\n" +
181+
" \"index\": \"test\",\n" +
182+
" \"uuid\": \"3iSkOlZAQVq2ir1hOtaVlw\",\n" +
183+
" \"pri\": \"1\",\n" +
184+
" \"rep\": \"1\",\n" +
185+
" \"docs.count\": \"5\",\n" +
186+
" \"docs.deleted\": \"0\",\n" +
187+
" \"store.size\": \"8.8kb\",\n" +
188+
" \"pri.store.size\": \"4.4kb\",\n" +
189+
" \"dataset.size\": \"4.4kb\"\n" +
190+
" }]";
191+
out.write(jsonRes.getBytes(StandardCharsets.UTF_8));
192+
out.close();
193+
}
194+
else {
195+
exchange.sendResponseHeaders(503, 0);
196+
OutputStream out = exchange.getResponseBody();
197+
out.write("{}".getBytes(StandardCharsets.UTF_8));
198+
out.close();
199+
errorCounter.incrementAndGet();
200+
}
201+
});
202+
203+
httpServer.start();
204+
InetSocketAddress address = httpServer.getAddress();
205+
206+
RestClient restClient = RestClient
207+
.builder(new HttpHost(address.getHostString(), address.getPort(), "http"))
208+
.build();
209+
210+
// setting transport option
211+
RestClientOptions options = new RestClientOptions(RequestOptions.DEFAULT, false,
212+
BackoffPolicy.constantBackoff(50L,8));
213+
214+
ElasticsearchTransport transport = new RestClientTransport(
215+
restClient, new JacksonJsonpMapper(), options);
216+
217+
ElasticsearchClient esClient = new ElasticsearchClient(transport);
218+
219+
IndicesResponse res = esClient.cat().indices();
220+
221+
httpServer.stop(0);
222+
223+
assertTrue(errorCounter.get() == 7);
224+
assertEquals("test", res.valueBody().get(0).index());
225+
}
155226
}

0 commit comments

Comments
 (0)