Skip to content

Commit ece37a7

Browse files
author
Grzegorz Kołakowski
committed
HTTP-122 Add lookup retries
1 parent 99ffa43 commit ece37a7

19 files changed

+683
-434
lines changed

README.md

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -449,16 +449,21 @@ be requested if the current time is later than the cached token expiry time minu
449449
| lookup.partial-cache.expire-after-write | optional | The max time to live for each rows in lookup cache after writing into the cache. Specify as a [Duration](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#duration). `lookup.cache` must be set to `PARTIAL` to use this option. See the following <a href="#lookup-cache">Lookup Cache</a> section for more details. |
450450
| lookup.partial-cache.expire-after-access | optional | The max time to live for each rows in lookup cache after accessing the entry in the cache. Specify as a [Duration](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#duration). `lookup.cache` must be set to `PARTIAL` to use this option. See the following <a href="#lookup-cache">Lookup Cache</a> section for more details. |
451451
| lookup.partial-cache.cache-missing-key | optional | This is a boolean that defaults to true. Whether to store an empty value into the cache if the lookup key doesn't match any rows in the table. `lookup.cache` must be set to `PARTIAL` to use this option. See the following <a href="#lookup-cache">Lookup Cache</a> section for more details. |
452-
| lookup.max-retries | optional | The max retry times if the lookup failed; default is 3. See the following <a href="#lookup-cache">Lookup Cache</a> section for more details. |
452+
| lookup.retry-strategy.type | optional | Defines the retry strategy to use in case of lookup failures. Accepted values are: `none` (default), `fixed-delay` and `exponential-delay`. |
453+
| lookup.retry-strategy.fixed-delay.attempts | optional | The number of times that connector retries lookup execution before connector returns empty result. |
454+
| lookup.retry-strategy.fixed-delay.delay | optional | Delay between two consecutive retry attempts. |
455+
| lookup.retry-strategy.exponential-delay.attempts | optional | The number of times that connector retries lookup execution before connector returns empty result. |
456+
| lookup.retry-strategy.exponential-delay.initial-delay | optional | Initial delay between two consecutive retry attempts. |
457+
| lookup.retry-strategy.exponential-delay.max-delay | optional | The highest possible duration between two consecutive retry attempts. |
453458
| gid.connector.http.lookup.error.code | optional | List of HTTP status codes that should be treated as errors by HTTP Source, separated with comma. |
454459
| gid.connector.http.lookup.error.code.exclude | optional | List of HTTP status codes that should be excluded from the `gid.connector.http.lookup.error.code` list, separated with comma. |
455460
| gid.connector.http.security.cert.server | optional | Path to trusted HTTP server certificate that should be add to connectors key store. More than one path can be specified using `,` as path delimiter. |
456461
| gid.connector.http.security.cert.client | optional | Path to trusted certificate that should be used by connector's HTTP client for mTLS communication. |
457462
| gid.connector.http.security.key.client | optional | Path to trusted private key that should be used by connector's HTTP client for mTLS communication. |
458463
| gid.connector.http.security.cert.server.allowSelfSigned | optional | Accept untrusted certificates for TLS communication. |
459-
| gid.connector.http.security.oidc.token.request | optional | OIDC `Token Request` body in `application/x-www-form-urlencoded` encoding |
460-
| gid.connector.http.security.oidc.token.endpoint.url | optional | OIDC `Token Endpoint` url, to which the token request will be issued |
461-
| gid.connector.http.security.oidc.token.expiry.reduction | optional | OIDC tokens will be requested if the current time is later than the cached token expiry time minus this value. |
464+
| gid.connector.http.security.oidc.token.request | optional | OIDC `Token Request` body in `application/x-www-form-urlencoded` encoding |
465+
| gid.connector.http.security.oidc.token.endpoint.url | optional | OIDC `Token Endpoint` url, to which the token request will be issued |
466+
| gid.connector.http.security.oidc.token.expiry.reduction | optional | OIDC tokens will be requested if the current time is later than the cached token expiry time minus this value. |
462467
| gid.connector.http.source.lookup.request.timeout | optional | Sets HTTP request timeout in seconds. If not specified, the default value of 30 seconds will be used. |
463468
| gid.connector.http.source.lookup.request.thread-pool.size | optional | Sets the size of pool thread for HTTP lookup request processing. Increasing this value would mean that more concurrent requests can be processed in the same time. If not specified, the default value of 8 threads will be used. |
464469
| gid.connector.http.source.lookup.response.thread-pool.size | optional | Sets the size of pool thread for HTTP lookup response processing. Increasing this value would mean that more concurrent requests can be processed in the same time. If not specified, the default value of 4 threads will be used. |
Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
package com.getindata.connectors.http.internal;
22

33
import java.util.List;
4+
import java.util.stream.Collectors;
45

56
import lombok.Data;
67
import lombok.NonNull;
78
import lombok.ToString;
89

910
import com.getindata.connectors.http.internal.sink.HttpSinkRequestEntry;
1011
import com.getindata.connectors.http.internal.sink.httpclient.HttpRequest;
12+
import com.getindata.connectors.http.internal.status.HttpResponseStatus;
1113

1214
/**
1315
* Data class holding {@link HttpSinkRequestEntry} instances that {@link SinkHttpClient} attempted
@@ -18,14 +20,36 @@
1820
public class SinkHttpClientResponse {
1921

2022
/**
21-
* A list of successfully written requests.
23+
* A list of requests along with write status.
2224
*/
2325
@NonNull
24-
private final List<HttpRequest> successfulRequests;
26+
private final List<ResponseItem> requests;
2527

26-
/**
27-
* A list of requests that {@link SinkHttpClient} failed to write.
28-
*/
29-
@NonNull
30-
private final List<HttpRequest> failedRequests;
28+
public List<HttpRequest> getSuccessfulRequests() {
29+
return requests.stream()
30+
.filter(i -> i.getStatus() == HttpResponseStatus.SUCCESS)
31+
.map(ResponseItem::getRequest)
32+
.collect(Collectors.toList());
33+
}
34+
35+
public List<HttpRequest> getFailedRetryableRequests() {
36+
return requests.stream()
37+
.filter(i -> i.getStatus() == HttpResponseStatus.FAILURE_RETRYABLE)
38+
.map(ResponseItem::getRequest)
39+
.collect(Collectors.toList());
40+
}
41+
42+
public List<HttpRequest> getFailedNotRetryableRequests() {
43+
return requests.stream()
44+
.filter(i -> i.getStatus() == HttpResponseStatus.FAILURE_NOT_RETRYABLE)
45+
.map(ResponseItem::getRequest)
46+
.collect(Collectors.toList());
47+
}
48+
49+
@Data
50+
@ToString
51+
public static class ResponseItem {
52+
private final HttpRequest request;
53+
private final HttpResponseStatus status;
54+
}
3155
}

src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,13 @@ public final class HttpConnectorConfigConstants {
2828
+ "source.lookup.header.";
2929

3030
public static final String OIDC_AUTH_TOKEN_REQUEST = GID_CONNECTOR_HTTP
31-
+ "security.oidc.token.request";
31+
+ "security.oidc.token.request";
3232

3333
public static final String OIDC_AUTH_TOKEN_ENDPOINT_URL = GID_CONNECTOR_HTTP
34-
+ "security.oidc.token.endpoint.url";
34+
+ "security.oidc.token.endpoint.url";
3535

3636
public static final String OIDC_AUTH_TOKEN_EXPIRY_REDUCTION = GID_CONNECTOR_HTTP
37-
+ "security.oidc.token.expiry.reduction";
37+
+ "security.oidc.token.expiry.reduction";
3838
/**
3939
* Whether to use the raw value of the Authorization header. If set, it prevents
4040
* the special treatment of the header for Basic Authentication, thus preserving the passed
@@ -54,6 +54,12 @@ public final class HttpConnectorConfigConstants {
5454

5555
public static final String HTTP_ERROR_SOURCE_LOOKUP_CODES_LIST =
5656
GID_CONNECTOR_HTTP + "source.lookup.error.code";
57+
58+
public static final String HTTP_ERROR_RETRYABLE_SOURCE_LOOKUP_CODE_WHITE_LIST =
59+
GID_CONNECTOR_HTTP + "source.lookup.error-retryable.code.exclude";
60+
61+
public static final String HTTP_ERROR_RETRYABLE_SOURCE_LOOKUP_CODES_LIST =
62+
GID_CONNECTOR_HTTP + "source.lookup.error-retryable.code";
5763
// -----------------------------------------------------
5864

5965
public static final String SOURCE_LOOKUP_REQUEST_CALLBACK_IDENTIFIER =
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package com.getindata.connectors.http.internal.config;
2+
3+
import org.apache.flink.configuration.DescribedEnum;
4+
import org.apache.flink.configuration.description.InlineElement;
5+
import static org.apache.flink.configuration.description.TextElement.text;
6+
7+
public enum RetryStrategyType implements DescribedEnum {
8+
9+
NONE("none", text("None")),
10+
FIXED_DELAY("fixed-delay", text("Fixed delay strategy")),
11+
EXPONENTIAL_DELAY("exponential-delay", text("Exponential delay strategy"));
12+
13+
private final String value;
14+
private final InlineElement inlineElement;
15+
16+
RetryStrategyType(String value, InlineElement inlineElement) {
17+
this.value = value;
18+
this.inlineElement = inlineElement;
19+
}
20+
21+
@Override
22+
public String toString() {
23+
return value;
24+
}
25+
26+
@Override
27+
public InlineElement getDescription() {
28+
return inlineElement;
29+
}
30+
31+
}

src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkWriter.java

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -46,18 +46,18 @@ public class HttpSinkWriter<InputT> extends AsyncSinkWriter<InputT, HttpSinkRequ
4646
private final Counter numRecordsSendErrorsCounter;
4747

4848
public HttpSinkWriter(
49-
ElementConverter<InputT, HttpSinkRequestEntry> elementConverter,
50-
Sink.InitContext context,
51-
int maxBatchSize,
52-
int maxInFlightRequests,
53-
int maxBufferedRequests,
54-
long maxBatchSizeInBytes,
55-
long maxTimeInBufferMS,
56-
long maxRecordSizeInBytes,
57-
String endpointUrl,
58-
SinkHttpClient sinkHttpClient,
59-
Collection<BufferedRequestState<HttpSinkRequestEntry>> bufferedRequestStates,
60-
Properties properties) {
49+
ElementConverter<InputT, HttpSinkRequestEntry> elementConverter,
50+
Sink.InitContext context,
51+
int maxBatchSize,
52+
int maxInFlightRequests,
53+
int maxBufferedRequests,
54+
long maxBatchSizeInBytes,
55+
long maxTimeInBufferMS,
56+
long maxRecordSizeInBytes,
57+
String endpointUrl,
58+
SinkHttpClient sinkHttpClient,
59+
Collection<BufferedRequestState<HttpSinkRequestEntry>> bufferedRequestStates,
60+
Properties properties) {
6161

6262
super(elementConverter, context, maxBatchSize, maxInFlightRequests, maxBufferedRequests,
6363
maxBatchSizeInBytes, maxTimeInBufferMS, maxRecordSizeInBytes, bufferedRequestStates);
@@ -82,8 +82,8 @@ public HttpSinkWriter(
8282
// TODO: Reintroduce retries by adding backoff policy
8383
@Override
8484
protected void submitRequestEntries(
85-
List<HttpSinkRequestEntry> requestEntries,
86-
Consumer<List<HttpSinkRequestEntry>> requestResult) {
85+
List<HttpSinkRequestEntry> requestEntries,
86+
Consumer<List<HttpSinkRequestEntry>> requestResult) {
8787
var future = sinkHttpClient.putRequests(requestEntries, endpointUrl);
8888
future.whenCompleteAsync((response, err) -> {
8989
if (err != null) {
@@ -98,8 +98,10 @@ protected void submitRequestEntries(
9898
// to the `numRecordsSendErrors` metric. It is due to the fact we do not have
9999
// a clear image how we want to do it, so it would be both efficient and correct.
100100
//requestResult.accept(requestEntries);
101-
} else if (response.getFailedRequests().size() > 0) {
102-
int failedRequestsNumber = response.getFailedRequests().size();
101+
} else if (response.getFailedNotRetryableRequests().size()
102+
+ response.getFailedRetryableRequests().size() > 0) {
103+
int failedRequestsNumber = response.getFailedNotRetryableRequests().size()
104+
+ response.getFailedRetryableRequests().size();
103105
log.error("Http Sink failed to write and will retry {} requests",
104106
failedRequestsNumber);
105107
numRecordsSendErrorsCounter.inc(failedRequestsNumber);

0 commit comments

Comments
 (0)