Skip to content

Commit 9c9be76

Browse files
author
Egbert van der Wal
committed
fix: handle ResponseException correctly, honor throwWriteExceptions when using RetryConfiguration
1 parent cc8dc2f commit 9c9be76

File tree

5 files changed

+119
-5
lines changed

5 files changed

+119
-5
lines changed

sdks/java/io/elasticsearch-tests/elasticsearch-tests-8/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -316,4 +316,10 @@ public void testWriteWithClientResponseException() throws Exception {
316316
elasticsearchIOTestCommon.setPipeline(pipeline);
317317
elasticsearchIOTestCommon.testWriteWithElasticClientResponseException();
318318
}
319+
320+
@Test
321+
public void testWriteWithClientResponseExceptionIsRetried() throws Exception {
322+
elasticsearchIOTestCommon.setPipeline(pipeline);
323+
elasticsearchIOTestCommon.testWriteWithElasticClientResponseExceptionIsRetried();
324+
}
319325
}

sdks/java/io/elasticsearch-tests/elasticsearch-tests-9/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -316,4 +316,10 @@ public void testWriteWithClientResponseException() throws Exception {
316316
elasticsearchIOTestCommon.setPipeline(pipeline);
317317
elasticsearchIOTestCommon.testWriteWithElasticClientResponseException();
318318
}
319+
320+
@Test
321+
public void testWriteWithClientResponseExceptionIsRetried() throws Exception {
322+
elasticsearchIOTestCommon.setPipeline(pipeline);
323+
elasticsearchIOTestCommon.testWriteWithElasticClientResponseExceptionIsRetried();
324+
}
319325
}

sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -506,6 +506,55 @@ void testWriteWithElasticClientResponseException() throws Exception {
506506
pipeline.run();
507507
}
508508

509+
void testWriteWithElasticClientResponseExceptionIsRetried() throws Exception {
510+
try (ElasticsearchIOTestUtils.AlwaysFailServer srv =
511+
new ElasticsearchIOTestUtils.AlwaysFailServer(0, 500)) {
512+
int port = srv.getPort();
513+
String[] hosts = {String.format("http://localhost:%d", port)};
514+
ConnectionConfiguration clientConfig = ConnectionConfiguration.create(hosts);
515+
516+
Write write =
517+
ElasticsearchIO.write()
518+
.withConnectionConfiguration(clientConfig)
519+
.withBackendVersion(8) // Mock server does not return proper version
520+
.withMaxBatchSize(numDocs + 1)
521+
.withMaxBatchSizeBytes(
522+
Long.MAX_VALUE) // Max long number to make sure all docs are flushed in one batch.
523+
.withThrowWriteErrors(false)
524+
.withRetryConfiguration(
525+
ElasticsearchIO.RetryConfiguration.create(MAX_ATTEMPTS, Duration.millis(35000))
526+
.withRetryPredicate(CUSTOM_RETRY_PREDICATE))
527+
.withIdFn(new ExtractValueFn("id"))
528+
.withUseStatefulBatches(true);
529+
530+
List<String> data =
531+
ElasticsearchIOTestUtils.createDocuments(1, InjectionMode.DO_NOT_INJECT_INVALID_DOCS);
532+
533+
PCollectionTuple outputs = pipeline.apply(Create.of(data)).apply(write);
534+
535+
// The whole batch should fail and direct to tag FAILED_WRITES because of one invalid doc.
536+
PCollection<String> success =
537+
outputs
538+
.get(Write.SUCCESSFUL_WRITES)
539+
.apply("Convert success to input ID", MapElements.via(mapToInputIdString));
540+
541+
PCollection<String> fail =
542+
outputs
543+
.get(Write.FAILED_WRITES)
544+
.apply("Convert fails to input ID", MapElements.via(mapToInputIdString));
545+
546+
PAssert.that(success).empty();
547+
PAssert.that(fail).containsInAnyOrder("0"); // First and only document
548+
549+
// Verify response item contains the corresponding error message.
550+
String expectedError =
551+
String.format(ElasticsearchIO.BulkIO.RETRY_FAILED_LOG, EXPECTED_RETRIES);
552+
PAssert.that(outputs.get(Write.FAILED_WRITES))
553+
.satisfies(responseItemJsonSubstringValidator(expectedError));
554+
pipeline.run();
555+
}
556+
}
557+
509558
void testWriteWithAllowedErrors() throws Exception {
510559
Set<String> allowedErrors = new HashSet<>();
511560
allowedErrors.add("json_parse_exception");

sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestUtils.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,12 @@
2727
import com.fasterxml.jackson.databind.JsonNode;
2828
import com.fasterxml.jackson.databind.ObjectMapper;
2929
import com.fasterxml.jackson.databind.node.ObjectNode;
30+
import com.sun.net.httpserver.HttpExchange;
31+
import com.sun.net.httpserver.HttpServer;
3032
import java.io.IOException;
33+
import java.io.OutputStream;
34+
import java.net.InetSocketAddress;
35+
import java.nio.charset.StandardCharsets;
3136
import java.time.Duration;
3237
import java.time.LocalDateTime;
3338
import java.util.ArrayList;
@@ -546,6 +551,7 @@ public Integer apply(Document document) {
546551
new SimpleFunction<Document, String>() {
547552
@Override
548553
public String apply(Document document) {
554+
System.err.println("INPUT DOC: " + document.getResponseItemJson());
549555
try {
550556
// Account for intentionally invalid input json docs
551557
String fixedJson = document.getInputDoc().replaceAll(";", ":");
@@ -555,4 +561,41 @@ public String apply(Document document) {
555561
}
556562
}
557563
};
564+
565+
/**
566+
* Small server that always returns a specified HTTP error code. This is useful to simulate server
567+
* errors in tests.
568+
*/
569+
static class AlwaysFailServer implements AutoCloseable {
570+
private final HttpServer server;
571+
private final int port;
572+
573+
AlwaysFailServer(int port, int status) throws IOException {
574+
HttpServer server = HttpServer.create(new InetSocketAddress(port), 0);
575+
this.port = server.getAddress().getPort();
576+
server.createContext("/", exchange -> handle(exchange, status));
577+
server.start();
578+
579+
this.server = server;
580+
}
581+
582+
int getPort() {
583+
return port;
584+
}
585+
586+
private static void handle(HttpExchange exchange, int status) throws IOException {
587+
byte[] response = "Internal Server Error".getBytes(StandardCharsets.UTF_8);
588+
exchange.sendResponseHeaders(status, response.length);
589+
try (OutputStream os = exchange.getResponseBody()) {
590+
os.write(response);
591+
}
592+
}
593+
594+
@Override
595+
public void close() throws Exception {
596+
if (server != null) {
597+
server.stop(0);
598+
}
599+
}
600+
}
558601
}

sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2811,13 +2811,14 @@ protected void addAndMaybeFlush(Document doc, ProcessContext context)
28112811
}
28122812

28132813
private boolean isRetryableClientException(Throwable t) {
2814-
// RestClient#performRequest only throws wrapped IOException so we must inspect the
2814+
// RestClient#performRequest mainly throws wrapped IOException so we must inspect the
28152815
// exception cause to determine if the exception is likely transient i.e. retryable or
2816-
// not.
2816+
// not. One exception is the ResponseException which is thrown when attempting to parse the
2817+
// response. This exception is not wrapped.
28172818

28182819
// Retry for 500-range response code except for 501.
2819-
if (t.getCause() instanceof ResponseException) {
2820-
ResponseException ex = (ResponseException) t.getCause();
2820+
if (t instanceof ResponseException) {
2821+
ResponseException ex = (ResponseException) t;
28212822
int statusCode = ex.getResponse().getStatusLine().getStatusCode();
28222823
return statusCode >= 500 && statusCode != 501;
28232824
}
@@ -2893,7 +2894,16 @@ private List<Document> flushBatch() throws IOException, InterruptedException {
28932894
&& spec.getRetryConfiguration().getRetryPredicate().test(responseEntity)) {
28942895
LOG.warn("ES Cluster is responding with HTP 429 - TOO_MANY_REQUESTS.");
28952896
}
2896-
responseEntity = handleRetry("POST", endPoint, Collections.emptyMap(), requestBody);
2897+
try {
2898+
responseEntity = handleRetry("POST", endPoint, Collections.emptyMap(), requestBody);
2899+
} catch (java.io.IOException ex) {
2900+
// No more retry attempts, determine what to do using throwWriteErrors
2901+
if (spec.getThrowWriteErrors()) {
2902+
throw ex;
2903+
} else {
2904+
elasticResponseExceptionMessage = ex.getMessage();
2905+
}
2906+
}
28972907
}
28982908

28992909
List<Document> responses;

0 commit comments

Comments
 (0)