Skip to content

Commit 8bda5f5

Browse files
authored
Merge pull request #364 from youngsofun/retry
fix: fix and unify retry logic
2 parents 9afff6f + b2f2855 commit 8bda5f5

File tree

8 files changed

+229
-266
lines changed

8 files changed

+229
-266
lines changed

databend-client/src/main/java/com/databend/client/DatabendClientV1.java

Lines changed: 39 additions & 148 deletions
Original file line numberDiff line numberDiff line change
@@ -14,29 +14,24 @@
1414

1515
package com.databend.client;
1616

17-
import com.databend.client.errors.CloudErrors;
17+
import com.databend.client.errors.QueryErrors;
1818
import okhttp3.*;
1919

2020
import javax.annotation.concurrent.ThreadSafe;
2121
import java.io.IOException;
22-
import java.net.ConnectException;
2322
import java.net.URI;
24-
import java.time.Duration;
23+
import java.sql.SQLException;
2524
import java.util.List;
2625
import java.util.Map;
2726
import java.util.concurrent.atomic.AtomicReference;
2827
import java.util.function.Consumer;
29-
import java.util.logging.Level;
30-
import java.util.logging.Logger;
3128

3229
import static com.databend.client.JsonCodec.jsonCodec;
3330
import static com.databend.client.constant.DatabendConstant.BOOLEAN_TRUE_STR;
3431
import static com.google.common.base.MoreObjects.firstNonNull;
35-
import static java.lang.String.format;
3632
import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
3733
import static java.net.HttpURLConnection.HTTP_OK;
3834
import static java.util.Objects.requireNonNull;
39-
import static java.util.concurrent.TimeUnit.MILLISECONDS;
4035

4136
@ThreadSafe
4237
public class DatabendClientV1
@@ -51,27 +46,20 @@ public class DatabendClientV1
5146

5247
public static final String QUERY_PATH = "/v1/query";
5348
public static final String DISCOVERY_PATH = "/v1/discovery_nodes";
54-
private static final Long MAX_MATERIALIZED_JSON_RESPONSE_SIZE = 128 * 1024L;
5549
private final OkHttpClient httpClient;
5650
private final String query;
5751
private final String host;
5852

59-
private final int maxRetryAttempts;
60-
private final PaginationOptions paginationOptions;
61-
// request with retry timeout
62-
private final Integer requestTimeoutSecs;
6353
private final Map<String, String> additionalHeaders;
6454
// client session
6555
private final AtomicReference<DatabendSession> databendSession;
6656
private String nodeID;
6757
private final AtomicReference<QueryResults> currentResults = new AtomicReference<>(null);
68-
private static final Logger logger = Logger.getLogger(DatabendClientV1.class.getPackage().getName());
69-
7058
private final Consumer<DatabendSession> on_session_state_update;
7159

7260
public DatabendClientV1(OkHttpClient httpClient, String sql, ClientSettings settings,
73-
Consumer<DatabendSession> on_session_state_update,
74-
AtomicReference<String> last_node_id) {
61+
Consumer<DatabendSession> on_session_state_update,
62+
AtomicReference<String> last_node_id) {
7563
requireNonNull(httpClient, "httpClient is null");
7664
requireNonNull(sql, "sql is null");
7765
requireNonNull(settings, "settings is null");
@@ -80,10 +68,7 @@ public DatabendClientV1(OkHttpClient httpClient, String sql, ClientSettings sett
8068
this.query = sql;
8169
this.on_session_state_update = on_session_state_update;
8270
this.host = settings.getHost();
83-
this.paginationOptions = settings.getPaginationOptions();
84-
this.requestTimeoutSecs = settings.getQueryTimeoutSecs();
8571
this.additionalHeaders = settings.getAdditionalHeaders();
86-
this.maxRetryAttempts = settings.getRetryAttempts();
8772
this.databendSession = new AtomicReference<>(settings.getSession());
8873
this.nodeID = last_node_id.get();
8974

@@ -102,7 +87,7 @@ public static List<DiscoveryNode> discoverNodes(OkHttpClient httpClient, ClientS
10287
requireNonNull(settings, "settings is null");
10388
requireNonNull(settings.getHost(), "settings.host is null");
10489
Request request = buildDiscoveryRequest(settings);
105-
DiscoveryResponseCodec.DiscoveryResponse response = getDiscoveryResponse(httpClient, request, null, settings.getQueryTimeoutSecs());
90+
DiscoveryResponseCodec.DiscoveryResponse response = getDiscoveryResponse(httpClient, request);
10691
return response.getNodes();
10792
}
10893

@@ -158,153 +143,59 @@ public String getQuery() {
158143
return query;
159144
}
160145

161-
private static DiscoveryResponseCodec.DiscoveryResponse getDiscoveryResponse(OkHttpClient httpClient, Request request, Long materializedJsonSizeLimit, int requestTimeoutSecs) {
146+
private static DiscoveryResponseCodec.DiscoveryResponse getDiscoveryResponse(OkHttpClient httpClient, Request request) {
162147
requireNonNull(request, "request is null");
148+
JsonResponse<DiscoveryResponseCodec.DiscoveryResponse> response;
163149

164-
long start = System.nanoTime();
165-
int attempts = 0;
166-
Exception lastException = null;
167-
168-
while (true) {
169-
if (attempts > 0) {
170-
Duration sinceStart = Duration.ofNanos(System.nanoTime() - start);
171-
if (sinceStart.compareTo(Duration.ofSeconds(requestTimeoutSecs)) > 0) {
172-
throw new RuntimeException(format("Error fetching discovery nodes (attempts: %s, duration: %s)", attempts, sinceStart.getSeconds()), lastException);
173-
}
174-
175-
try {
176-
// Exponential backoff
177-
MILLISECONDS.sleep(attempts * 100L);
178-
} catch (InterruptedException e) {
179-
Thread.currentThread().interrupt();
180-
throw new RuntimeException("Interrupted while fetching discovery nodes", e);
181-
}
182-
}
183-
attempts++;
184-
185-
JsonResponse<DiscoveryResponseCodec.DiscoveryResponse> response;
186-
try {
187-
response = JsonResponse.execute(
188-
DISCOVERY_RESULT_CODEC,
189-
httpClient,
190-
request,
191-
materializedJsonSizeLimit);
192-
} catch (RuntimeException e) {
193-
lastException = e;
194-
if (e.getCause() instanceof ConnectException) {
195-
// Retry on connection refused errors
196-
continue;
197-
}
198-
throw new RuntimeException("Failed to fetch discovery nodes: " + e.getMessage(), e);
199-
}
200-
201-
if (response.getStatusCode() == HTTP_OK && response.hasValue()) {
150+
try {
151+
RetryPolicy retryPolicy = new RetryPolicy(true, true);
152+
RetryPolicy.ResponseWithBody resp = retryPolicy.sendRequestWithRetry(httpClient, request);
153+
long code = resp.response.code();
154+
if (code == HTTP_OK) {
155+
response = JsonResponse.decode(DISCOVERY_RESULT_CODEC, resp);
202156
DiscoveryResponseCodec.DiscoveryResponse discoveryResponse = response.getValue();
203-
if (discoveryResponse.getError() == null) {
204-
// Successful response
157+
QueryErrors errors = discoveryResponse.getError();
158+
if (errors == null) {
205159
return discoveryResponse;
160+
} else {
161+
throw new RuntimeException("Discovery request failed: " + discoveryResponse.getError());
206162
}
207-
if (discoveryResponse.getError().notFound()) {
208-
throw new UnsupportedOperationException("Discovery request feature not supported: " + discoveryResponse.getError());
209-
}
210-
throw new RuntimeException("Discovery request failed: " + discoveryResponse.getError());
211-
} else if (response.getStatusCode() == HTTP_NOT_FOUND) {
163+
} else if (code == HTTP_NOT_FOUND) {
212164
throw new UnsupportedOperationException("Discovery request feature not supported");
213165
}
214-
215-
// Handle other HTTP error codes and response body parsing for errors
216-
if (response.getResponseBody().isPresent()) {
217-
CloudErrors errors = CloudErrors.tryParse(response.getResponseBody().get());
218-
if (errors != null && errors.tryGetErrorKind().canRetry()) {
219-
continue;
220-
}
221-
}
222-
223-
if (response.getStatusCode() != 520) {
224-
throw new RuntimeException("Discovery request failed with status code: " + response.getStatusCode());
225-
}
166+
throw new RuntimeException("Discovery request failed, code = " + code + " :" + resp.body);
167+
} catch (SQLException e) {
168+
throw new RuntimeException(e);
226169
}
227170
}
228171

229-
private boolean executeInternal(Request request, Long materializedJsonSizeLimit) {
172+
private boolean executeInternal(Request request) {
230173
requireNonNull(request, "request is null");
174+
JsonResponse<QueryResults> response;
231175

232-
long start = System.nanoTime();
233-
int attempts = 0;
234-
// Exception cause = null;
235-
236-
while (true) {
237-
if (attempts > 0) {
238-
Duration sinceStart = Duration.ofNanos(System.nanoTime() - start);
239-
if (sinceStart.compareTo(Duration.ofSeconds(requestTimeoutSecs)) > 0) {
240-
throw new RuntimeException(format("Error fetching next (attempts: %s, duration: %s)",
241-
attempts, sinceStart.getSeconds()), null);
242-
}
243-
244-
try {
245-
logger.log(Level.FINE, "Executing query attempt #" + attempts);
246-
// Apply exponential backoff with a cap
247-
// Max 5 seconds
248-
long sleepTime = Math.min(100 * (1 << Math.min(attempts - 1, 10)), 5000);
249-
MILLISECONDS.sleep(sleepTime);
250-
} catch (InterruptedException e) {
251-
try {
252-
close();
253-
} finally {
254-
Thread.currentThread().interrupt();
255-
}
256-
throw new RuntimeException("StatementClient thread was interrupted");
257-
}
258-
}
259-
260-
attempts++;
261-
JsonResponse<QueryResults> response;
262-
263-
try {
264-
response = JsonResponse.execute(QUERY_RESULTS_CODEC, httpClient, request, materializedJsonSizeLimit);
265-
} catch (RuntimeException e) {
266-
if (e.getCause() instanceof ConnectException) {
267-
// Log the connection exception but rethrow it to match original behavior
268-
logger.log(Level.WARNING, "Connection exception on attempt " + attempts + ": " + e.getMessage());
269-
// This will be caught by the caller's retry mechanism
270-
throw e;
271-
}
272-
throw new RuntimeException("Query failed: " + e.getMessage(), e);
273-
}
176+
try {
177+
RetryPolicy retryPolicy = new RetryPolicy(false, true);
178+
RetryPolicy.ResponseWithBody resp = retryPolicy.sendRequestWithRetry(httpClient, request);
179+
response = JsonResponse.decode(QUERY_RESULTS_CODEC, resp);
180+
} catch (SQLException e) {
181+
throw new RuntimeException(e);
182+
}
274183

275-
// Success case
276-
if ((response.getStatusCode() == HTTP_OK) &&
277-
response.hasValue() &&
278-
(response.getValue().getError() == null)) {
184+
if ((response.getStatusCode() == HTTP_OK) && response.hasValue() ) {
185+
QueryErrors e = response.getValue().getError();
186+
if (e == null) {
279187
processResponse(response.getHeaders(), response.getValue());
280-
return true;
281-
}
282-
283-
// Try to parse error response
284-
if (response.getResponseBody().isPresent()) {
285-
CloudErrors errors = CloudErrors.tryParse(response.getResponseBody().get());
286-
if (errors != null) {
287-
if (errors.tryGetErrorKind().canRetry()) {
288-
logger.log(Level.WARNING, "Retryable error on attempt " + attempts + ": " + errors.getMessage());
289-
continue;
290-
} else {
291-
throw new RuntimeException(String.valueOf(response.getValue().getError()));
292-
}
293-
}
294-
}
295-
296-
// Handle status code 520
297-
if (response.getStatusCode() == 520) {
298-
return false;
188+
} else {
189+
throw new RuntimeException("Query Failed: " + e);
299190
}
300-
301-
throw new RuntimeException("Query failed: " + response.getValue().getError());
191+
return true;
302192
}
193+
return false;
303194
}
304195

305196
@Override
306197
public boolean execute(Request request) {
307-
return executeInternal(request, null);
198+
return executeInternal(request);
308199
}
309200

310201
private void processResponse(Headers headers, QueryResults results) {
@@ -347,7 +238,7 @@ public boolean advance() {
347238
Request.Builder builder = prepareRequest(url, this.additionalHeaders);
348239
builder.addHeader(ClientSettings.X_DATABEND_STICKY_NODE, this.nodeID);
349240
Request request = builder.get().build();
350-
return executeInternal(request, MAX_MATERIALIZED_JSON_RESPONSE_SIZE);
241+
return executeInternal(request);
351242
}
352243

353244
@Override

databend-client/src/main/java/com/databend/client/JsonResponse.java

Lines changed: 9 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,8 @@
1818
import okhttp3.*;
1919

2020
import javax.annotation.Nullable;
21-
import java.io.IOException;
22-
import java.io.UncheckedIOException;
23-
import java.util.Objects;
24-
import java.util.Optional;
2521

2622
import static com.google.common.base.MoreObjects.toStringHelper;
27-
import static com.google.common.net.HttpHeaders.LOCATION;
2823
import static java.lang.String.format;
2924
import static java.util.Objects.requireNonNull;
3025

@@ -59,32 +54,14 @@ private JsonResponse(int statusCode, String statusMessage, Headers headers, @Nul
5954
this.hasValue = (exception == null);
6055
}
6156

62-
public static <T> JsonResponse<T> execute(JsonCodec<T> codec, OkHttpClient client, Request request, Long materializedJsonSizeLimit) throws RuntimeException {
63-
try (Response response = client.newCall(request).execute()) {
64-
// TODO: fix in OkHttp: https://github.com/square/okhttp/issues/3111
65-
if ((response.code() == 307) || (response.code() == 308)) {
66-
String location = response.header(LOCATION);
67-
if (location != null) {
68-
request = request.newBuilder().url(location).build();
69-
return execute(codec, client, request, materializedJsonSizeLimit);
70-
}
71-
}
72-
73-
ResponseBody responseBody = requireNonNull(response.body());
74-
if (isJson(responseBody.contentType())) {
75-
String body = null;
76-
T value = null;
77-
IllegalArgumentException exception = null;
57+
public static <T> JsonResponse<T> decode(JsonCodec<T> codec, RetryPolicy.ResponseWithBody responseWithBody) throws RuntimeException {
58+
Response response = responseWithBody.response;
59+
String body = responseWithBody.body;
60+
if (isJson(response.body().contentType())) {
61+
T value;
62+
IllegalArgumentException exception;
7863
try {
79-
if (!Objects.isNull(materializedJsonSizeLimit) && (responseBody.contentLength() < 0 || responseBody.contentLength() > materializedJsonSizeLimit)) {
80-
// Parse from input stream, response is either of unknown size or too large to materialize. Raw response body
81-
// will not be available if parsing fails
82-
value = codec.fromJson(responseBody.byteStream());
83-
} else {
84-
// parse from materialized response body string
85-
body = responseBody.string();
86-
value = codec.fromJson(body);
87-
}
64+
value = codec.fromJson(body);
8865
} catch (JsonProcessingException e) {
8966
String message;
9067
if (body != null) {
@@ -95,12 +72,9 @@ public static <T> JsonResponse<T> execute(JsonCodec<T> codec, OkHttpClient clien
9572
exception = new IllegalArgumentException(message, e);
9673
throw exception;
9774
}
98-
return new JsonResponse<>(response.code(), response.message(), response.headers(), body, value, exception);
75+
return new JsonResponse<>(response.code(), response.message(), response.headers(), body, value, null);
9976
}
100-
return new JsonResponse<>(response.code(), response.message(), response.headers(), responseBody.string());
101-
} catch (IOException e) {
102-
throw new UncheckedIOException(e);
103-
}
77+
return new JsonResponse<>(response.code(), response.message(), response.headers(), body);
10478
}
10579

10680
private static boolean isJson(MediaType type) {
@@ -111,10 +85,6 @@ public int getStatusCode() {
11185
return statusCode;
11286
}
11387

114-
public String getStatusMessage() {
115-
return statusMessage;
116-
}
117-
11888
public Headers getHeaders() {
11989
return headers;
12090
}
@@ -130,9 +100,6 @@ public T getValue() {
130100
return value;
131101
}
132102

133-
public Optional<String> getResponseBody() {
134-
return Optional.ofNullable(responseBody);
135-
}
136103

137104
@Nullable
138105
public IllegalArgumentException getException() {

0 commit comments

Comments
 (0)