Skip to content

Commit 9b3083e

Browse files
samxbrelasticsearchmachine
andauthored
[8.19] Handle individual doc parsing failure gracefully in bulk request with pipeline (#138624) (#138708)
* Handle individual doc parsing failure gracefully in bulk request with pipeline (#138624) (cherry picked from commit 933354b) * [CI] Auto commit changes from spotless * Fix missing methods * fix test --------- Co-authored-by: elasticsearchmachine <[email protected]>
1 parent aed6d9b commit 9b3083e

File tree

4 files changed

+196
-1
lines changed

4 files changed

+196
-1
lines changed

docs/changelog/138624.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 138624
2+
summary: Handle individual doc parsing failure in bulk request with pipeline
3+
area: Ingest Node
4+
type: bug
5+
issues:
6+
- 138445

server/src/internalClusterTest/java/org/elasticsearch/ingest/IngestClientIT.java

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -426,6 +426,96 @@ public void testPipelineProcessorOnFailure() throws Exception {
426426
assertThat(inserted.get("readme"), equalTo("pipeline with id [3] is a bad pipeline"));
427427
}
428428

429+
public void testBulkRequestWithInvalidJsonAndPipeline() throws Exception {
430+
// Test that when a document with invalid JSON is in a bulk request with a pipeline,
431+
// the invalid document fails gracefully without causing the entire bulk request to fail.
432+
// This tests the fix for https://github.com/elastic/elasticsearch/issues/138445
433+
434+
createIndex("test_index");
435+
436+
BytesReference source = BytesReference.bytes(
437+
jsonBuilder().startObject()
438+
.field("description", "test-pipeline")
439+
.startArray("processors")
440+
.startObject()
441+
.startObject("test")
442+
.endObject()
443+
.endObject()
444+
.endArray()
445+
.endObject()
446+
);
447+
PutPipelineRequest putPipelineRequest = new PutPipelineRequest("test-pipeline", source, XContentType.JSON);
448+
clusterAdmin().putPipeline(putPipelineRequest).get();
449+
450+
// Create a bulk request with valid and invalid documents
451+
BulkRequest bulkRequest = new BulkRequest();
452+
453+
// Valid document
454+
IndexRequest validRequest = new IndexRequest("test_index").id("valid_doc");
455+
validRequest.source("{\"valid\":\"test\"}", XContentType.JSON);
456+
validRequest.setPipeline("test-pipeline");
457+
bulkRequest.add(validRequest);
458+
459+
// Invalid document with missing closing brace
460+
IndexRequest invalidRequest = new IndexRequest("test_index").id("invalid_doc");
461+
invalidRequest.source("{\"invalid\":\"json\"", XContentType.JSON);
462+
invalidRequest.setPipeline("test-pipeline");
463+
bulkRequest.add(invalidRequest);
464+
465+
// Invalid document with duplicate fields
466+
IndexRequest invalidRequest2 = new IndexRequest("test_index").id("invalid_doc2");
467+
invalidRequest2.source("{\"invalid\":\"json\", \"invalid\":\"json\"}", XContentType.JSON);
468+
invalidRequest2.setPipeline("test-pipeline");
469+
bulkRequest.add(invalidRequest2);
470+
471+
// Another valid document
472+
IndexRequest validRequest2 = new IndexRequest("test_index").id("valid_doc2");
473+
validRequest2.source("{\"valid\":\"test2\"}", XContentType.JSON);
474+
validRequest2.setPipeline("test-pipeline");
475+
bulkRequest.add(validRequest2);
476+
477+
BulkResponse response = client().bulk(bulkRequest).actionGet();
478+
479+
// The bulk request should succeed
480+
assertThat(response.hasFailures(), is(true));
481+
assertThat(response.getItems().length, equalTo(4));
482+
483+
// First document should succeed
484+
BulkItemResponse item0 = response.getItems()[0];
485+
assertThat(item0.isFailed(), is(false));
486+
assertThat(item0.getResponse().getId(), equalTo("valid_doc"));
487+
assertThat(item0.getResponse().getResult(), equalTo(DocWriteResponse.Result.CREATED));
488+
489+
// Second document should fail
490+
BulkItemResponse item1 = response.getItems()[1];
491+
assertThat(item1.isFailed(), is(true));
492+
assertThat(item1.getFailure().getStatus(), equalTo(org.elasticsearch.rest.RestStatus.BAD_REQUEST));
493+
assertThat(item1.getFailure().getCause(), instanceOf(IllegalArgumentException.class));
494+
495+
// Third document should fail
496+
BulkItemResponse item2 = response.getItems()[2];
497+
assertThat(item2.isFailed(), is(true));
498+
assertThat(item2.getFailure().getStatus(), equalTo(org.elasticsearch.rest.RestStatus.BAD_REQUEST));
499+
assertThat(item2.getFailure().getCause(), instanceOf(IllegalArgumentException.class));
500+
501+
// Fourth document should succeed
502+
BulkItemResponse item3 = response.getItems()[3];
503+
assertThat(item3.isFailed(), is(false));
504+
assertThat(item3.getResponse().getId(), equalTo("valid_doc2"));
505+
assertThat(item3.getResponse().getResult(), equalTo(DocWriteResponse.Result.CREATED));
506+
507+
// Verify that the valid documents were indexed
508+
assertThat(client().prepareGet("test_index", "valid_doc").get().isExists(), is(true));
509+
assertThat(client().prepareGet("test_index", "valid_doc2").get().isExists(), is(true));
510+
// Verify that the invalid documents were not indexed
511+
assertThat(client().prepareGet("test_index", "invalid_doc").get().isExists(), is(false));
512+
assertThat(client().prepareGet("test_index", "invalid_doc2").get().isExists(), is(false));
513+
514+
// cleanup
515+
AcknowledgedResponse deletePipelineResponse = clusterAdmin().prepareDeletePipeline("test-pipeline").get();
516+
assertTrue(deletePipelineResponse.isAcknowledged());
517+
}
518+
429519
public static class ExtendedIngestTestPlugin extends IngestTestPlugin {
430520

431521
@Override

server/src/main/java/org/elasticsearch/ingest/IngestService.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -830,7 +830,20 @@ protected void doRun() {
830830
final XContentMeteringParserDecorator meteringParserDecorator = documentParsingProvider.newMeteringParserDecorator(
831831
indexRequest
832832
);
833-
final IngestDocument ingestDocument = newIngestDocument(indexRequest, meteringParserDecorator);
833+
final IngestDocument ingestDocument;
834+
try {
835+
ingestDocument = newIngestDocument(indexRequest, meteringParserDecorator);
836+
} catch (Exception e) {
837+
// Document parsing failed (e.g. invalid JSON). Handle this gracefully
838+
// by marking this document as failed and continuing with other documents.
839+
final long ingestTimeInNanos = System.nanoTime() - startTimeInNanos;
840+
totalMetrics.postIngest(ingestTimeInNanos);
841+
totalMetrics.ingestFailed();
842+
ref.close();
843+
i++;
844+
onFailure.apply(slot, e, IndexDocFailureStoreStatus.NOT_APPLICABLE_OR_UNKNOWN);
845+
continue;
846+
}
834847
final org.elasticsearch.script.Metadata originalDocumentMetadata = ingestDocument.getMetadata().clone();
835848
// the document listener gives us three-way logic: a document can fail processing (1), or it can
836849
// be successfully processed. a successfully processed document can be kept (2) or dropped (3).

server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969
import org.elasticsearch.test.MockLog;
7070
import org.elasticsearch.threadpool.ThreadPool;
7171
import org.elasticsearch.xcontent.XContentBuilder;
72+
import org.elasticsearch.xcontent.XContentParseException;
7273
import org.elasticsearch.xcontent.XContentType;
7374
import org.elasticsearch.xcontent.cbor.CborXContent;
7475
import org.junit.Before;
@@ -1632,6 +1633,91 @@ public void testBulkRequestExecutionWithFailures() throws Exception {
16321633
verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
16331634
}
16341635

1636+
public void testBulkRequestExecutionWithInvalidJsonDocument() throws Exception {
1637+
// Test that when a document with invalid JSON (e.g., duplicate keys) is in a bulk request with a pipeline,
1638+
// the invalid document fails gracefully without causing the entire bulk request to fail.
1639+
BulkRequest bulkRequest = new BulkRequest();
1640+
String pipelineId = "_id";
1641+
1642+
// Valid document that should succeed
1643+
IndexRequest validRequest = new IndexRequest("_index").id("valid").setPipeline(pipelineId).setFinalPipeline("_none");
1644+
validRequest.source(Requests.INDEX_CONTENT_TYPE, "field1", "value1");
1645+
validRequest.setListExecutedPipelines(true);
1646+
bulkRequest.add(validRequest);
1647+
1648+
// Invalid document with missing closing brace
1649+
String invalidJson = "{\"invalid\":\"json\"";
1650+
IndexRequest invalidRequest = new IndexRequest("_index").id("invalid").setPipeline(pipelineId).setFinalPipeline("_none");
1651+
invalidRequest.source(new BytesArray(invalidJson), XContentType.JSON);
1652+
bulkRequest.add(invalidRequest);
1653+
1654+
// Another valid document that should succeed
1655+
IndexRequest validRequest2 = new IndexRequest("_index").id("valid2").setPipeline(pipelineId).setFinalPipeline("_none");
1656+
validRequest2.source(Requests.INDEX_CONTENT_TYPE, "field2", "value2");
1657+
validRequest2.setListExecutedPipelines(true);
1658+
bulkRequest.add(validRequest2);
1659+
1660+
// Invalid document with duplicated keys
1661+
String invalidJson2 = "{\"@timestamp\":\"2024-06-01T00:00:00Z\",\"@timestamp\":\"2024-06-01T00:00:00Z\"}";
1662+
IndexRequest invalidRequest2 = new IndexRequest("_index").id("invalid").setPipeline(pipelineId).setFinalPipeline("_none");
1663+
invalidRequest2.source(new BytesArray(invalidJson2), XContentType.JSON);
1664+
bulkRequest.add(invalidRequest2);
1665+
1666+
final Processor processor = mock(Processor.class);
1667+
when(processor.getType()).thenReturn("mock");
1668+
when(processor.getTag()).thenReturn("mockTag");
1669+
doAnswer(args -> {
1670+
BiConsumer<IngestDocument, Exception> handler = args.getArgument(1);
1671+
handler.accept(RandomDocumentPicks.randomIngestDocument(random()), null);
1672+
return null;
1673+
}).when(processor).execute(any(), any());
1674+
1675+
IngestService ingestService = createWithProcessors(Map.of("mock", (factories, tag, description, config) -> processor));
1676+
PutPipelineRequest putRequest = new PutPipelineRequest(
1677+
pipelineId,
1678+
new BytesArray("{\"processors\": [{\"mock\" : {}}]}"),
1679+
XContentType.JSON
1680+
);
1681+
1682+
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).build();
1683+
ClusterState previousClusterState = clusterState;
1684+
clusterState = executePut(putRequest, clusterState);
1685+
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
1686+
1687+
TriConsumer<Integer, Exception, IndexDocFailureStoreStatus> requestItemErrorHandler = mock();
1688+
final BiConsumer<Thread, Exception> onCompletion = mock();
1689+
1690+
ingestService.executeBulkRequest(
1691+
4,
1692+
bulkRequest.requests(),
1693+
indexReq -> {},
1694+
(s) -> false,
1695+
(slot, targetIndex, e) -> fail("Should not redirect to failure store"),
1696+
requestItemErrorHandler,
1697+
onCompletion,
1698+
EsExecutors.DIRECT_EXECUTOR_SERVICE
1699+
);
1700+
1701+
// The invalid documents should fail with a parsing error
1702+
verify(requestItemErrorHandler).apply(
1703+
eq(1), // slot 1 is the invalid document
1704+
argThat(e -> e instanceof XContentParseException),
1705+
eq(IndexDocFailureStoreStatus.NOT_APPLICABLE_OR_UNKNOWN)
1706+
);
1707+
verify(requestItemErrorHandler).apply(
1708+
eq(3), // slot 3 is the other invalid document
1709+
argThat(e -> e instanceof XContentParseException),
1710+
eq(IndexDocFailureStoreStatus.NOT_APPLICABLE_OR_UNKNOWN)
1711+
);
1712+
1713+
// The bulk listener should still be called with success
1714+
verify(onCompletion).accept(any(), eq(null));
1715+
assertStats(ingestService.stats().totalStats(), 4, 2, 0);
1716+
// Verify that the valid documents were processed (they should have their pipelines executed)
1717+
assertThat(validRequest.getExecutedPipelines(), equalTo(List.of(pipelineId)));
1718+
assertThat(validRequest2.getExecutedPipelines(), equalTo(List.of(pipelineId)));
1719+
}
1720+
16351721
public void testExecuteFailureRedirection() throws Exception {
16361722
final CompoundProcessor processor = mockCompoundProcessor();
16371723
IngestService ingestService = createWithProcessors(

0 commit comments

Comments
 (0)