Skip to content

Commit 24852cc

Browse files
committed
add test
1 parent 32afe08 commit 24852cc

File tree

2 files changed

+48
-36
lines changed

2 files changed

+48
-36
lines changed

src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java

Lines changed: 7 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -96,43 +96,14 @@ public Optional<RowData> pull(RowData lookupRow) {
9696
}
9797
}
9898

99-
private Optional<RowData> queryAndProcess(RowData lookupData) {
99+
private Optional<RowData> queryAndProcess(RowData lookupData) throws Exception {
100+
100101
HttpLookupSourceRequestEntry request = requestFactory.buildLookupRequest(lookupData);
101-
HttpResponse<String> response = null;
102-
103-
int retryCount = 0;
104-
105-
while (retryCount < this.httpRequestMaxRetries) {
106-
try {
107-
response = httpClient.send(
108-
request.getHttpRequest(),
109-
BodyHandlers.ofString()
110-
);
111-
break;
112-
} catch (IOException e) {
113-
log.error("IOException during HTTP request. Retrying...", e);
114-
retryCount++;
115-
if (retryCount == this.httpRequestMaxRetries) {
116-
log.error("Maximum retries reached. Aborting...");
117-
return Optional.empty();
118-
}
119-
try {
120-
Thread.sleep(this.httpRequestRetryTimeoutMs);
121-
} catch (InterruptedException ie) {
122-
Thread.currentThread().interrupt();
123-
}
124-
} catch (InterruptedException e) {
125-
Thread.currentThread().interrupt();
126-
log.error("HTTP request interrupted. Aborting...", e);
127-
return Optional.empty();
128-
}
129-
}
130-
try {
131-
return processHttpResponse(response, request);
132-
} catch (IOException e) {
133-
log.error("IOException during HTTP response processing.", e);
134-
return Optional.empty();
135-
}
102+
HttpResponse<String> response = httpClient.send(
103+
request.getHttpRequest(),
104+
BodyHandlers.ofString()
105+
);
106+
return processHttpResponse(response, request);
136107
}
137108

138109
private Optional<RowData> processHttpResponse(

src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientConnectionTest.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@
77

88
import com.github.tomakehurst.wiremock.WireMockServer;
99
import com.github.tomakehurst.wiremock.client.MappingBuilder;
10+
import com.github.tomakehurst.wiremock.http.Fault;
1011
import com.github.tomakehurst.wiremock.matching.RequestPatternBuilder;
12+
import com.github.tomakehurst.wiremock.stubbing.Scenario;
1113
import com.github.tomakehurst.wiremock.stubbing.StubMapping;
1214
import org.apache.flink.api.common.serialization.DeserializationSchema;
1315
import org.apache.flink.api.common.serialization.SerializationSchema;
@@ -290,6 +292,45 @@ public void shouldConnectWithBasicAuth(String authorizationHeaderValue,
290292
assertThat(nestedDetailsRow.getString(0).toString()).isEqualTo("$1,729.34");
291293
}
292294

295+
@Test
296+
void shouldRetryOnIOExceptionAndSucceedOnSecondAttempt() {
297+
// GIVEN
298+
this.stubMapping = setUpServerStubForIOExceptionOnFirstAttempt();
299+
Properties properties = new Properties();
300+
properties.setProperty(
301+
HttpConnectorConfigConstants.LOOKUP_HTTP_MAX_RETRIES,
302+
"3"
303+
);
304+
JavaNetHttpPollingClient pollingClient = setUpPollingClient(
305+
getBaseUrl(), properties, setUpGetRequestFactory(properties));
306+
307+
// WHEN
308+
Optional<RowData> poll = pollingClient.pull(lookupRowData);
309+
310+
// THEN
311+
wireMockServer.verify(2, RequestPatternBuilder.forCustomMatcher(stubMapping.getRequest()));
312+
313+
assertThat(poll.isPresent()).isTrue();
314+
}
315+
316+
private StubMapping setUpServerStubForIOExceptionOnFirstAttempt() {
317+
wireMockServer.stubFor(
318+
get(urlEqualTo(ENDPOINT + "?id=1&uuid=2"))
319+
.inScenario("Retry Scenario")
320+
.whenScenarioStateIs(Scenario.STARTED) // Initial state
321+
.willReturn(aResponse()
322+
.withFault(Fault.CONNECTION_RESET_BY_PEER)) // Fail the first request
323+
.willSetStateTo("Second Attempt")); // Set the next state
324+
325+
return wireMockServer.stubFor(
326+
get(urlEqualTo(ENDPOINT + "?id=1&uuid=2"))
327+
.inScenario("Retry Scenario")
328+
.whenScenarioStateIs("Second Attempt") // When the state is "Second Attempt"
329+
.willReturn(aResponse()
330+
.withStatus(200)
331+
.withBody(readTestFile(SAMPLES_FOLDER + "HttpResult.json"))));
332+
}
333+
293334
private String getBaseUrl() {
294335
return wireMockServer.baseUrl() + ENDPOINT;
295336
}

0 commit comments

Comments
 (0)