Skip to content

Commit beefad1

Browse files
author
Grzegorz Kołakowski
committed
Revert Sink changes
1 parent ece37a7 commit beefad1

File tree

4 files changed

+23
-45
lines changed

4 files changed

+23
-45
lines changed
Lines changed: 7 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,13 @@
11
package com.getindata.connectors.http.internal;
22

33
import java.util.List;
4-
import java.util.stream.Collectors;
54

65
import lombok.Data;
76
import lombok.NonNull;
87
import lombok.ToString;
98

109
import com.getindata.connectors.http.internal.sink.HttpSinkRequestEntry;
1110
import com.getindata.connectors.http.internal.sink.httpclient.HttpRequest;
12-
import com.getindata.connectors.http.internal.status.HttpResponseStatus;
1311

1412
/**
1513
* Data class holding {@link HttpSinkRequestEntry} instances that {@link SinkHttpClient} attempted
@@ -20,36 +18,14 @@
2018
public class SinkHttpClientResponse {
2119

2220
/**
23-
* A list of requests along with write status.
21+
* A list of successfully written requests.
2422
*/
2523
@NonNull
26-
private final List<ResponseItem> requests;
24+
private final List<HttpRequest> successfulRequests;
2725

28-
public List<HttpRequest> getSuccessfulRequests() {
29-
return requests.stream()
30-
.filter(i -> i.getStatus() == HttpResponseStatus.SUCCESS)
31-
.map(ResponseItem::getRequest)
32-
.collect(Collectors.toList());
33-
}
34-
35-
public List<HttpRequest> getFailedRetryableRequests() {
36-
return requests.stream()
37-
.filter(i -> i.getStatus() == HttpResponseStatus.FAILURE_RETRYABLE)
38-
.map(ResponseItem::getRequest)
39-
.collect(Collectors.toList());
40-
}
41-
42-
public List<HttpRequest> getFailedNotRetryableRequests() {
43-
return requests.stream()
44-
.filter(i -> i.getStatus() == HttpResponseStatus.FAILURE_NOT_RETRYABLE)
45-
.map(ResponseItem::getRequest)
46-
.collect(Collectors.toList());
47-
}
48-
49-
@Data
50-
@ToString
51-
public static class ResponseItem {
52-
private final HttpRequest request;
53-
private final HttpResponseStatus status;
54-
}
26+
/**
27+
* A list of requests that {@link SinkHttpClient} failed to write.
28+
*/
29+
@NonNull
30+
private final List<HttpRequest> failedRequests;
5531
}

src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkWriter.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -98,10 +98,8 @@ protected void submitRequestEntries(
9898
// to the `numRecordsSendErrors` metric. It is due to the fact we do not have
9999
// a clear image how we want to do it, so it would be both efficient and correct.
100100
//requestResult.accept(requestEntries);
101-
} else if (response.getFailedNotRetryableRequests().size()
102-
+ response.getFailedRetryableRequests().size() > 0) {
103-
int failedRequestsNumber = response.getFailedNotRetryableRequests().size()
104-
+ response.getFailedRetryableRequests().size();
101+
} else if (response.getFailedRequests().size() > 0) {
102+
int failedRequestsNumber = response.getFailedRequests().size();
105103
log.error("Http Sink failed to write and will retry {} requests",
106104
failedRequestsNumber);
107105
numRecordsSendErrorsCounter.inc(failedRequestsNumber);

src/main/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClient.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import com.getindata.connectors.http.internal.HeaderPreprocessor;
1717
import com.getindata.connectors.http.internal.SinkHttpClient;
1818
import com.getindata.connectors.http.internal.SinkHttpClientResponse;
19-
import com.getindata.connectors.http.internal.SinkHttpClientResponse.ResponseItem;
2019
import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants;
2120
import com.getindata.connectors.http.internal.sink.HttpSinkRequestEntry;
2221
import com.getindata.connectors.http.internal.status.ComposeHttpStatusCodeChecker;
@@ -98,7 +97,8 @@ private CompletableFuture<List<JavaNetHttpResponseWrapper>> submitRequests(
9897
private SinkHttpClientResponse prepareSinkHttpClientResponse(
9998
List<JavaNetHttpResponseWrapper> responses,
10099
String endpointUrl) {
101-
var responseItems = new ArrayList<ResponseItem>();
100+
var successfulResponses = new ArrayList<HttpRequest>();
101+
var failedResponses = new ArrayList<HttpRequest>();
102102

103103
for (var response : responses) {
104104
var sinkRequestEntry = response.getHttpRequest();
@@ -107,14 +107,19 @@ private SinkHttpClientResponse prepareSinkHttpClientResponse(
107107
httpPostRequestCallback.call(
108108
optResponse.orElse(null), sinkRequestEntry, endpointUrl, headerMap);
109109

110-
// Empty indicates that HttpClient.sendAsync() failed with exception.
111-
HttpResponseStatus status = optResponse.isEmpty()
112-
? HttpResponseStatus.FAILURE_RETRYABLE
113-
: statusCodeChecker.checkStatus(optResponse.get().statusCode());
114-
responseItems.add(new ResponseItem(sinkRequestEntry, status));
110+
// TODO Add response processor here and orchestrate it with statusCodeChecker.
111+
if (optResponse.isEmpty() ||
112+
statusCodeChecker.checkStatus(optResponse.get().statusCode()).equals(
113+
HttpResponseStatus.FAILURE_RETRYABLE) ||
114+
statusCodeChecker.checkStatus(optResponse.get().statusCode()).equals(
115+
HttpResponseStatus.FAILURE_NOT_RETRYABLE)) {
116+
failedResponses.add(sinkRequestEntry);
117+
} else {
118+
successfulResponses.add(sinkRequestEntry);
119+
}
115120
}
116121

117-
return new SinkHttpClientResponse(responseItems);
122+
return new SinkHttpClientResponse(successfulResponses, failedResponses);
118123
}
119124

120125
@VisibleForTesting

src/test/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClientConnectionTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -341,8 +341,7 @@ private void testSinkClientForConnection(
341341
).get();
342342

343343
assertThat(response.getSuccessfulRequests()).isNotEmpty();
344-
assertThat(response.getFailedNotRetryableRequests()).isEmpty();
345-
assertThat(response.getFailedRetryableRequests()).isEmpty();
344+
assertThat(response.getFailedRequests()).isEmpty();
346345
} catch (Exception e) {
347346
throw new RuntimeException(e);
348347
}

0 commit comments

Comments
 (0)