Skip to content

Commit 938323b

Browse files
kristoffSCswtwsk
andauthored
ESP-171_HttpLookupSource_ErrorCodeSupport - HTTP ErrorCode Status config for Lookup Source (#22)
* ESP-171_HttpLookupSource_ErrorCodeSupport - Adding to Lookup HTTP Source HTTP Error Code configuration from properties. Signed-off-by: Krzysztof Chmielewski <[email protected]> * Update CHANGELOG.md Co-authored-by: Andrzej Swatowski <[email protected]> Signed-off-by: Krzysztof Chmielewski <[email protected]> Co-authored-by: Andrzej Swatowski <[email protected]>
1 parent c2716f0 commit 938323b

File tree

14 files changed

+210
-72
lines changed

14 files changed

+210
-72
lines changed

CHANGELOG.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,9 @@
66
- Fix JavaDoc errors.
77

88
### Added
9-
- Add to Http Sink a new properties `gid.connector.http.sink.error.code` and `gid.connector.http.sink.error.code.exclude`
10-
to set HTTP status code that should be interpreted as errors.
9+
- Add new properties `gid.connector.http.sink.error.code`,`gid.connector.http.sink.error.code.exclude`,
10+
`gid.connector.http.source.lookup.error.code` and `gid.connector.http.source.lookup.error.code.exclude`
11+
to set HTTP status codes that should be interpreted as errors both for HTTP Sink and HTTP Lookup Source.
1112
- Use Flink's format support to Http Lookup Source.
1213
- Add HTTP Lookup source client header configuration via properties.
1314

README.md

Lines changed: 8 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -184,17 +184,18 @@ CREATE TABLE http (
184184
)
185185
```
186186

187-
#### HTTP status code handler (currently supported only for HTTP Sink)
188-
Http Sink connector allows defining list of HTTP status codes that should be treated as errors.
187+
## HTTP status code handler
188+
Http Sink and Lookup Source connectors allow defining list of HTTP status codes that should be treated as errors.
189189
By default all 400s and 500s response codes will be interpreted as error code.
190190

191-
This behavior can be changed by using below properties in table definition (DDL) or passing it via
191+
This behavior can be changed by using below properties in table definition (DDL) for Sink and Lookup Source or passing it via
192192
`setProperty' method from Sink's builder. The property names are:
193-
- `gid.connector.http.sink.error.code` used to defined HTTP status code value that should be treated as error for example 404.
193+
- `gid.connector.http.sink.error.code` and `gid.connector.http.source.lookup.error.code` used to defined HTTP status code value that should be treated as error for example 404.
194194
Many status codes can be defined in one value, where each code should be separated with comma, for example:
195195
`401, 402, 403`. User can use this property also to define a type code mask. In that case, all codes from given HTTP response type will be treated as errors.
196196
An example of such a mask would be `3XX, 4XX, 5XX`. In this case, all 300s, 400s and 500s status codes will be treated as errors.
197-
- `gid.connector.http.sink.error.code.exclude` used to exclude a HTTP code from error list. Many status codes can be defined in one value, where each code should be separated with comma, for example:
197+
- `gid.connector.http.sink.error.code.exclude` and `gid.connector.http.source.lookup.error.code.exclude` used to exclude a HTTP code from error list.
198+
Many status codes can be defined in one value, where each code should be separated with comma, for example:
198199
`401, 402, 403`. In this example, codes 401, 402 and 403 would not be interpreted as error codes.
199200

200201

@@ -291,21 +292,13 @@ Issue was discussed on Flink's user mailing list - https://lists.apache.org/thre
291292
Implementation of an HTTP Sink is based on Flink's `AsyncSinkBase` introduced in Flink 1.15 [3, 4].
292293

293294
#### Http Response to Table schema mapping
294-
The mapping from Http Json Response to SQL table schema is done via Json Paths [5].
295-
This is achieved thanks to `com.jayway.jsonpath:json-path` library.
296-
297-
If no `root` or `field.#.path` option is defined, the connector will use the column name as json path and will try to look for Json Node with that name in received Json. If no node with a given name is found, the connector will return `null` as value for this field.
298-
299-
If the `field.#.path` option is defined, connector will use given Json path from option's value in order to find Json data that should be used for this column.
300-
For example `'field.isActive.path' = '$.details.isActive'` - the value for table column `isActive` will be taken from `$.details.isActive` node from received Json.
295+
The mapping from Http Json Response to SQL table schema is done via Flink's Json Format [5].
301296

302297
## TODO
303298

304299
### HTTP TableLookup Source
305300
- Implement caches.
306-
- Add support for other Flink types. Currently, STRING type is only fully supported.
307301
- Think about Retry Policy for Http Request
308-
- Use Flink Format [7] to parse Json response
309302
- Add Configurable Timeout value
310303
- Check other `//TODO`'s.
311304

@@ -321,9 +314,7 @@ For example `'field.isActive.path' = '$.details.isActive'` - the value for table
321314
</br>
322315
[4] https://nightlies.apache.org/flink/flink-docs-release-1.15/api/java/org/apache/flink/connector/base/sink/AsyncSinkBase.html
323316
</br>
324-
[5] https://support.smartbear.com/alertsite/docs/monitors/api/endpoint/jsonpath.html
317+
[5] https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/json/
325318
</br>
326319
[6] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sqlclient/
327320
</br>
328-
[7] https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/json/
329-
</br>

src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,18 @@ public final class HttpConnectorConfigConstants {
2626
public static final String LOOKUP_SOURCE_HEADER_PREFIX = GID_CONNECTOR_HTTP
2727
+ "source.lookup.header.";
2828

29-
// Error code handling configuration.
30-
public static final String HTTP_ERROR_CODE_WHITE_LIST =
29+
// --------- Error code handling configuration ---------
30+
public static final String HTTP_ERROR_SINK_CODE_WHITE_LIST =
3131
GID_CONNECTOR_HTTP + "sink.error.code.exclude";
3232

33-
public static final String HTTP_ERROR_CODES_LIST = GID_CONNECTOR_HTTP + "sink.error.code";
33+
public static final String HTTP_ERROR_SINK_CODES_LIST = GID_CONNECTOR_HTTP + "sink.error.code";
34+
35+
public static final String HTTP_ERROR_SOURCE_LOOKUP_CODE_WHITE_LIST =
36+
GID_CONNECTOR_HTTP + "source.lookup.error.code.exclude";
37+
38+
public static final String HTTP_ERROR_SOURCE_LOOKUP_CODES_LIST =
39+
GID_CONNECTOR_HTTP + "source.lookup.error.code";
40+
41+
// -----------------------------------------------------
3442

3543
}

src/main/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClient.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,11 @@
2020

2121
import com.getindata.connectors.http.internal.SinkHttpClient;
2222
import com.getindata.connectors.http.internal.SinkHttpClientResponse;
23+
import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants;
2324
import com.getindata.connectors.http.internal.sink.HttpSinkRequestEntry;
24-
import com.getindata.connectors.http.internal.sink.httpclient.status.ComposeHttpStatusCodeChecker;
25-
import com.getindata.connectors.http.internal.sink.httpclient.status.HttpStatusCodeChecker;
25+
import com.getindata.connectors.http.internal.status.ComposeHttpStatusCodeChecker;
26+
import com.getindata.connectors.http.internal.status.ComposeHttpStatusCodeChecker.ComposeHttpStatusCodeCheckerConfig;
27+
import com.getindata.connectors.http.internal.status.HttpStatusCodeChecker;
2628
import com.getindata.connectors.http.internal.utils.ConfigUtils;
2729
import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.SINK_HEADER_PREFIX;
2830

@@ -51,7 +53,14 @@ public JavaNetSinkHttpClient(Properties properties) {
5153

5254
// TODO Inject this via constructor when implementing a response processor.
5355
// Processor will be injected and it will wrap statusChecker implementation.
54-
this.statusCodeChecker = new ComposeHttpStatusCodeChecker(properties);
56+
ComposeHttpStatusCodeCheckerConfig checkerConfig =
57+
ComposeHttpStatusCodeCheckerConfig.builder()
58+
.properties(properties)
59+
.whiteListPrefix(HttpConnectorConfigConstants.HTTP_ERROR_SINK_CODE_WHITE_LIST)
60+
.errorCodePrefix(HttpConnectorConfigConstants.HTTP_ERROR_SINK_CODES_LIST)
61+
.build();
62+
63+
this.statusCodeChecker = new ComposeHttpStatusCodeChecker(checkerConfig);
5564
}
5665

5766
@Override
Lines changed: 37 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
1-
package com.getindata.connectors.http.internal.sink.httpclient.status;
1+
package com.getindata.connectors.http.internal.status;
22

33
import java.util.Arrays;
44
import java.util.HashSet;
55
import java.util.Properties;
66
import java.util.Set;
77
import java.util.stream.Collectors;
88

9+
import lombok.AccessLevel;
10+
import lombok.Builder;
11+
import lombok.Data;
12+
import lombok.RequiredArgsConstructor;
913
import org.apache.flink.util.Preconditions;
1014
import org.apache.flink.util.StringUtils;
1115

@@ -36,9 +40,9 @@ public class ComposeHttpStatusCodeChecker implements HttpStatusCodeChecker {
3640
*/
3741
private final Set<HttpStatusCodeChecker> errorCodes;
3842

39-
public ComposeHttpStatusCodeChecker(Properties properties) {
40-
excludedCodes = prepareWhiteList(properties);
41-
errorCodes = prepareErrorCodes(properties);
43+
public ComposeHttpStatusCodeChecker(ComposeHttpStatusCodeCheckerConfig config) {
44+
excludedCodes = prepareWhiteList(config);
45+
errorCodes = prepareErrorCodes(config);
4246
}
4347

4448
/**
@@ -68,14 +72,19 @@ public boolean isErrorCode(int statusCode) {
6872
.anyMatch(httpStatusCodeChecker -> httpStatusCodeChecker.isErrorCode(statusCode));
6973
}
7074

71-
private Set<HttpStatusCodeChecker> prepareErrorCodes(Properties properties) {
72-
String sErrorCodes =
73-
properties.getProperty(HttpConnectorConfigConstants.HTTP_ERROR_CODES_LIST, "");
75+
private Set<HttpStatusCodeChecker> prepareErrorCodes(
76+
ComposeHttpStatusCodeCheckerConfig config) {
7477

75-
if (StringUtils.isNullOrWhitespaceOnly(sErrorCodes)) {
78+
Properties properties = config.getProperties();
79+
String errorCodePrefix = config.getErrorCodePrefix();
80+
81+
String errorCodes =
82+
properties.getProperty(errorCodePrefix, "");
83+
84+
if (StringUtils.isNullOrWhitespaceOnly(errorCodes)) {
7685
return DEFAULT_ERROR_CODES;
7786
} else {
78-
String[] splitCodes = sErrorCodes.split(HttpConnectorConfigConstants.ERROR_CODE_DELIM);
87+
String[] splitCodes = errorCodes.split(HttpConnectorConfigConstants.ERROR_CODE_DELIM);
7988
return prepareErrorCodes(splitCodes);
8089
}
8190
}
@@ -110,9 +119,14 @@ private Set<HttpStatusCodeChecker> prepareErrorCodes(String[] statusCodes) {
110119
return (errorCodes.isEmpty()) ? DEFAULT_ERROR_CODES : errorCodes;
111120
}
112121

113-
private Set<WhiteListHttpStatusCodeChecker> prepareWhiteList(Properties properties) {
122+
private Set<WhiteListHttpStatusCodeChecker> prepareWhiteList(
123+
ComposeHttpStatusCodeCheckerConfig config) {
124+
125+
Properties properties = config.getProperties();
126+
String whiteListPrefix = config.getWhiteListPrefix();
127+
114128
return Arrays.stream(
115-
properties.getProperty(HttpConnectorConfigConstants.HTTP_ERROR_CODE_WHITE_LIST, "")
129+
properties.getProperty(whiteListPrefix, "")
116130
.split(HttpConnectorConfigConstants.ERROR_CODE_DELIM))
117131
.filter(sCode -> !StringUtils.isNullOrWhitespaceOnly(sCode))
118132
.map(String::trim)
@@ -132,4 +146,16 @@ private Set<WhiteListHttpStatusCodeChecker> prepareWhiteList(Properties properti
132146
private boolean isTypeCode(final String code) {
133147
return code.charAt(1) == 'X' && code.charAt(2) == 'X';
134148
}
149+
150+
@Data
151+
@Builder
152+
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
153+
public static class ComposeHttpStatusCodeCheckerConfig {
154+
155+
private final String whiteListPrefix;
156+
157+
private final String errorCodePrefix;
158+
159+
private final Properties properties;
160+
}
135161
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.getindata.connectors.http.internal.sink.httpclient.status;
1+
package com.getindata.connectors.http.internal.status;
22

33
import java.util.HashMap;
44
import java.util.Map;
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.getindata.connectors.http.internal.sink.httpclient.status;
1+
package com.getindata.connectors.http.internal.status;
22

33
/**
44
* Base interface for all classes that would validate HTTP status
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.getindata.connectors.http.internal.sink.httpclient.status;
1+
package com.getindata.connectors.http.internal.status;
22

33
import lombok.EqualsAndHashCode;
44
import lombok.RequiredArgsConstructor;
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.getindata.connectors.http.internal.sink.httpclient.status;
1+
package com.getindata.connectors.http.internal.status;
22

33
import lombok.EqualsAndHashCode;
44

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.getindata.connectors.http.internal.sink.httpclient.status;
1+
package com.getindata.connectors.http.internal.status;
22

33
import lombok.EqualsAndHashCode;
44
import lombok.RequiredArgsConstructor;

0 commit comments

Comments
 (0)