Skip to content

Commit 9a56a00

Browse files
committed
fix: handle ResponseException correctly, honor throwWriteExceptions when using RetryConfiguration
1 parent cc8dc2f commit 9a56a00

File tree

5 files changed

+127
-6
lines changed

5 files changed

+127
-6
lines changed

sdks/java/io/elasticsearch-tests/elasticsearch-tests-8/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -316,4 +316,10 @@ public void testWriteWithClientResponseException() throws Exception {
316316
elasticsearchIOTestCommon.setPipeline(pipeline);
317317
elasticsearchIOTestCommon.testWriteWithElasticClientResponseException();
318318
}
319+
320+
@Test
321+
public void testWriteWithClientResponseExceptionIsRetried() throws Exception {
322+
elasticsearchIOTestCommon.setPipeline(pipeline);
323+
elasticsearchIOTestCommon.testWriteWithElasticClientResponseExceptionIsRetried();
324+
}
319325
}

sdks/java/io/elasticsearch-tests/elasticsearch-tests-9/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -316,4 +316,10 @@ public void testWriteWithClientResponseException() throws Exception {
316316
elasticsearchIOTestCommon.setPipeline(pipeline);
317317
elasticsearchIOTestCommon.testWriteWithElasticClientResponseException();
318318
}
319+
320+
@Test
321+
public void testWriteWithClientResponseExceptionIsRetried() throws Exception {
322+
elasticsearchIOTestCommon.setPipeline(pipeline);
323+
elasticsearchIOTestCommon.testWriteWithElasticClientResponseExceptionIsRetried();
324+
}
319325
}

sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -506,6 +506,55 @@ void testWriteWithElasticClientResponseException() throws Exception {
506506
pipeline.run();
507507
}
508508

509+
void testWriteWithElasticClientResponseExceptionIsRetried() throws Exception {
510+
try (ElasticsearchIOTestUtils.AlwaysFailServer srv =
511+
new ElasticsearchIOTestUtils.AlwaysFailServer(0, 500)) {
512+
int port = srv.getPort();
513+
String[] hosts = {String.format("http://localhost:%d", port)};
514+
ConnectionConfiguration clientConfig = ConnectionConfiguration.create(hosts);
515+
516+
Write write =
517+
ElasticsearchIO.write()
518+
.withConnectionConfiguration(clientConfig)
519+
.withBackendVersion(8) // Mock server does not return proper version
520+
.withMaxBatchSize(numDocs + 1)
521+
.withMaxBatchSizeBytes(
522+
Long.MAX_VALUE) // Max long number to make sure all docs are flushed in one batch.
523+
.withThrowWriteErrors(false)
524+
.withRetryConfiguration(
525+
ElasticsearchIO.RetryConfiguration.create(MAX_ATTEMPTS, Duration.millis(35000))
526+
.withRetryPredicate(CUSTOM_RETRY_PREDICATE))
527+
.withIdFn(new ExtractValueFn("id"))
528+
.withUseStatefulBatches(true);
529+
530+
List<String> data =
531+
ElasticsearchIOTestUtils.createDocuments(1, InjectionMode.DO_NOT_INJECT_INVALID_DOCS);
532+
533+
PCollectionTuple outputs = pipeline.apply(Create.of(data)).apply(write);
534+
535+
// The whole batch should fail and direct to tag FAILED_WRITES because of one invalid doc.
536+
PCollection<String> success =
537+
outputs
538+
.get(Write.SUCCESSFUL_WRITES)
539+
.apply("Convert success to input ID", MapElements.via(mapToInputIdString));
540+
541+
PCollection<String> fail =
542+
outputs
543+
.get(Write.FAILED_WRITES)
544+
.apply("Convert fails to input ID", MapElements.via(mapToInputIdString));
545+
546+
PAssert.that(success).empty();
547+
PAssert.that(fail).containsInAnyOrder("0"); // First and only document
548+
549+
// Verify response item contains the corresponding error message.
550+
String expectedError =
551+
String.format(ElasticsearchIO.BulkIO.RETRY_FAILED_LOG, EXPECTED_RETRIES);
552+
PAssert.that(outputs.get(Write.FAILED_WRITES))
553+
.satisfies(responseItemJsonSubstringValidator(expectedError));
554+
pipeline.run();
555+
}
556+
}
557+
509558
void testWriteWithAllowedErrors() throws Exception {
510559
Set<String> allowedErrors = new HashSet<>();
511560
allowedErrors.add("json_parse_exception");

sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestUtils.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,12 @@
2727
import com.fasterxml.jackson.databind.JsonNode;
2828
import com.fasterxml.jackson.databind.ObjectMapper;
2929
import com.fasterxml.jackson.databind.node.ObjectNode;
30+
import com.sun.net.httpserver.HttpExchange;
31+
import com.sun.net.httpserver.HttpServer;
3032
import java.io.IOException;
33+
import java.io.OutputStream;
34+
import java.net.InetSocketAddress;
35+
import java.nio.charset.StandardCharsets;
3136
import java.time.Duration;
3237
import java.time.LocalDateTime;
3338
import java.util.ArrayList;
@@ -555,4 +560,41 @@ public String apply(Document document) {
555560
}
556561
}
557562
};
563+
564+
/**
565+
* Small server that always returns a specified HTTP error code. This is useful to simulate server
566+
* errors in tests.
567+
*/
568+
static class AlwaysFailServer implements AutoCloseable {
569+
private final HttpServer server;
570+
private final int port;
571+
572+
AlwaysFailServer(int port, int status) throws IOException {
573+
HttpServer server = HttpServer.create(new InetSocketAddress(port), 0);
574+
this.port = server.getAddress().getPort();
575+
server.createContext("/", exchange -> handle(exchange, status));
576+
server.start();
577+
578+
this.server = server;
579+
}
580+
581+
int getPort() {
582+
return port;
583+
}
584+
585+
private static void handle(HttpExchange exchange, int status) throws IOException {
586+
byte[] response = "Internal Server Error".getBytes(StandardCharsets.UTF_8);
587+
exchange.sendResponseHeaders(status, response.length);
588+
try (OutputStream os = exchange.getResponseBody()) {
589+
os.write(response);
590+
}
591+
}
592+
593+
@Override
594+
public void close() throws Exception {
595+
if (server != null) {
596+
server.stop(0);
597+
}
598+
}
599+
}
558600
}

sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2811,14 +2811,23 @@ protected void addAndMaybeFlush(Document doc, ProcessContext context)
28112811
}
28122812

28132813
private boolean isRetryableClientException(Throwable t) {
2814-
// RestClient#performRequest only throws wrapped IOException so we must inspect the
2814+
// RestClient#performRequest mainly throws wrapped IOException so we must inspect the
28152815
// exception cause to determine if the exception is likely transient i.e. retryable or
2816-
// not.
2816+
// not. One exception is the ResponseException that is thrown when attempting to parse the
2817+
// response. This exception is not wrapped.
2818+
2819+
// ResponseException should not be wrapped, but check the cause to be safe for future
2820+
// changes
2821+
ResponseException re = null;
2822+
if (t instanceof ResponseException) {
2823+
re = (ResponseException) t;
2824+
} else if (t.getCause() instanceof ResponseException) {
2825+
re = (ResponseException) t.getCause();
2826+
}
28172827

28182828
// Retry for 500-range response code except for 501.
2819-
if (t.getCause() instanceof ResponseException) {
2820-
ResponseException ex = (ResponseException) t.getCause();
2821-
int statusCode = ex.getResponse().getStatusLine().getStatusCode();
2829+
if (re != null) {
2830+
int statusCode = re.getResponse().getStatusLine().getStatusCode();
28222831
return statusCode >= 500 && statusCode != 501;
28232832
}
28242833
return t.getCause() instanceof ConnectTimeoutException
@@ -2893,7 +2902,16 @@ private List<Document> flushBatch() throws IOException, InterruptedException {
28932902
&& spec.getRetryConfiguration().getRetryPredicate().test(responseEntity)) {
28942903
LOG.warn("ES Cluster is responding with HTP 429 - TOO_MANY_REQUESTS.");
28952904
}
2896-
responseEntity = handleRetry("POST", endPoint, Collections.emptyMap(), requestBody);
2905+
try {
2906+
responseEntity = handleRetry("POST", endPoint, Collections.emptyMap(), requestBody);
2907+
} catch (java.io.IOException ex) {
2908+
// No more retry attempts, determine what to do using throwWriteErrors
2909+
if (spec.getThrowWriteErrors()) {
2910+
throw ex;
2911+
} else {
2912+
elasticResponseExceptionMessage = ex.getMessage();
2913+
}
2914+
}
28972915
}
28982916

28992917
List<Document> responses;

0 commit comments

Comments
 (0)