Skip to content

Commit ccb0088

Browse files
committed
Handle individual doc parsing failure gracefully in bulk request with pipeline (elastic#138624)
1 parent 2816ef5 commit ccb0088

File tree

4 files changed

+190
-1
lines changed

4 files changed

+190
-1
lines changed

docs/changelog/138624.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 138624
2+
summary: Handle individual doc parsing failure in bulk request with pipeline
3+
area: Ingest Node
4+
type: bug
5+
issues:
6+
- 138445

server/src/internalClusterTest/java/org/elasticsearch/ingest/IngestClientIT.java

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -378,6 +378,92 @@ public void testPipelineProcessorOnFailure() throws Exception {
378378
assertThat(inserted.get("readme"), equalTo("pipeline with id [3] is a bad pipeline"));
379379
}
380380

381+
public void testBulkRequestWithInvalidJsonAndPipeline() throws Exception {
382+
// Test that when a document with invalid JSON is in a bulk request with a pipeline,
383+
// the invalid document fails gracefully without causing the entire bulk request to fail.
384+
// This tests the fix for https://github.com/elastic/elasticsearch/issues/138445
385+
386+
createIndex("test_index");
387+
388+
putJsonPipeline(
389+
"test-pipeline",
390+
(builder, params) -> builder.field("description", "test pipeline")
391+
.startArray("processors")
392+
.startObject()
393+
.startObject("test")
394+
.endObject()
395+
.endObject()
396+
.endArray()
397+
);
398+
399+
// Create a bulk request with valid and invalid documents
400+
BulkRequest bulkRequest = new BulkRequest();
401+
402+
// Valid document
403+
IndexRequest validRequest = new IndexRequest("test_index").id("valid_doc");
404+
validRequest.source("{\"valid\":\"test\"}", XContentType.JSON);
405+
validRequest.setPipeline("test-pipeline");
406+
bulkRequest.add(validRequest);
407+
408+
// Invalid document with missing closing brace
409+
IndexRequest invalidRequest = new IndexRequest("test_index").id("invalid_doc");
410+
invalidRequest.source("{\"invalid\":\"json\"", XContentType.JSON);
411+
invalidRequest.setPipeline("test-pipeline");
412+
bulkRequest.add(invalidRequest);
413+
414+
// Invalid document with duplicate fields
415+
IndexRequest invalidRequest2 = new IndexRequest("test_index").id("invalid_doc2");
416+
invalidRequest2.source("{\"invalid\":\"json\", \"invalid\":\"json\"}", XContentType.JSON);
417+
invalidRequest2.setPipeline("test-pipeline");
418+
bulkRequest.add(invalidRequest2);
419+
420+
// Another valid document
421+
IndexRequest validRequest2 = new IndexRequest("test_index").id("valid_doc2");
422+
validRequest2.source("{\"valid\":\"test2\"}", XContentType.JSON);
423+
validRequest2.setPipeline("test-pipeline");
424+
bulkRequest.add(validRequest2);
425+
426+
BulkResponse response = client().bulk(bulkRequest).actionGet();
427+
428+
// The bulk request should succeed
429+
assertThat(response.hasFailures(), is(true));
430+
assertThat(response.getItems().length, equalTo(4));
431+
432+
// First document should succeed
433+
BulkItemResponse item0 = response.getItems()[0];
434+
assertThat(item0.isFailed(), is(false));
435+
assertThat(item0.getResponse().getId(), equalTo("valid_doc"));
436+
assertThat(item0.getResponse().getResult(), equalTo(DocWriteResponse.Result.CREATED));
437+
438+
// Second document should fail
439+
BulkItemResponse item1 = response.getItems()[1];
440+
assertThat(item1.isFailed(), is(true));
441+
assertThat(item1.getFailure().getStatus(), equalTo(org.elasticsearch.rest.RestStatus.BAD_REQUEST));
442+
assertThat(item1.getFailure().getCause(), instanceOf(IllegalArgumentException.class));
443+
444+
// Third document should fail
445+
BulkItemResponse item2 = response.getItems()[2];
446+
assertThat(item2.isFailed(), is(true));
447+
assertThat(item2.getFailure().getStatus(), equalTo(org.elasticsearch.rest.RestStatus.BAD_REQUEST));
448+
assertThat(item2.getFailure().getCause(), instanceOf(IllegalArgumentException.class));
449+
450+
// Fourth document should succeed
451+
BulkItemResponse item3 = response.getItems()[3];
452+
assertThat(item3.isFailed(), is(false));
453+
assertThat(item3.getResponse().getId(), equalTo("valid_doc2"));
454+
assertThat(item3.getResponse().getResult(), equalTo(DocWriteResponse.Result.CREATED));
455+
456+
// Verify that the valid documents were indexed
457+
assertThat(client().prepareGet("test_index", "valid_doc").get().isExists(), is(true));
458+
assertThat(client().prepareGet("test_index", "valid_doc2").get().isExists(), is(true));
459+
// Verify that the invalid documents were not indexed
460+
assertThat(client().prepareGet("test_index", "invalid_doc").get().isExists(), is(false));
461+
assertThat(client().prepareGet("test_index", "invalid_doc2").get().isExists(), is(false));
462+
463+
// cleanup
464+
deletePipeline("test-pipeline");
465+
}
466+
381467
public static class ExtendedIngestTestPlugin extends IngestTestPlugin {
382468

383469
@Override

server/src/main/java/org/elasticsearch/ingest/IngestService.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -992,7 +992,20 @@ protected void doRun() {
992992
}
993993
final int slot = i;
994994
final Releasable ref = refs.acquire();
995-
final IngestDocument ingestDocument = newIngestDocument(indexRequest);
995+
final IngestDocument ingestDocument;
996+
try {
997+
ingestDocument = newIngestDocument(indexRequest);
998+
} catch (Exception e) {
999+
// Document parsing failed (e.g. invalid JSON). Handle this gracefully
1000+
// by marking this document as failed and continuing with other documents.
1001+
final long ingestTimeInNanos = System.nanoTime() - startTimeInNanos;
1002+
totalMetrics.postIngest(ingestTimeInNanos);
1003+
totalMetrics.ingestFailed();
1004+
ref.close();
1005+
i++;
1006+
onFailure.apply(slot, e, IndexDocFailureStoreStatus.NOT_APPLICABLE_OR_UNKNOWN);
1007+
continue;
1008+
}
9961009
final Metadata originalDocumentMetadata = ingestDocument.getMetadata().clone();
9971010
// the document listener gives us three-way logic: a document can fail processing (1), or it can
9981011
// be successfully processed. a successfully processed document can be kept (2) or dropped (3).

server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@
7373
import org.elasticsearch.test.MockLog;
7474
import org.elasticsearch.threadpool.ThreadPool;
7575
import org.elasticsearch.xcontent.XContentBuilder;
76+
import org.elasticsearch.xcontent.XContentParseException;
7677
import org.elasticsearch.xcontent.XContentType;
7778
import org.elasticsearch.xcontent.cbor.CborXContent;
7879
import org.junit.Before;
@@ -1784,6 +1785,89 @@ public void testBulkRequestExecutionWithFailures() throws Exception {
17841785
verify(listener, times(1)).onResponse(null);
17851786
}
17861787

1788+
public void testBulkRequestExecutionWithInvalidJsonDocument() throws Exception {
1789+
// Test that when a document with invalid JSON (e.g., duplicate keys) is in a bulk request with a pipeline,
1790+
// the invalid document fails gracefully without causing the entire bulk request to fail.
1791+
BulkRequest bulkRequest = new BulkRequest();
1792+
String pipelineId = "_id";
1793+
1794+
// Valid document that should succeed
1795+
IndexRequest validRequest = new IndexRequest("_index").id("valid").setPipeline(pipelineId).setFinalPipeline("_none");
1796+
validRequest.source(Requests.INDEX_CONTENT_TYPE, "field1", "value1");
1797+
validRequest.setListExecutedPipelines(true);
1798+
bulkRequest.add(validRequest);
1799+
1800+
// Invalid document with missing closing brace
1801+
String invalidJson = "{\"invalid\":\"json\"";
1802+
IndexRequest invalidRequest = new IndexRequest("_index").id("invalid").setPipeline(pipelineId).setFinalPipeline("_none");
1803+
invalidRequest.source(new BytesArray(invalidJson), XContentType.JSON);
1804+
bulkRequest.add(invalidRequest);
1805+
1806+
// Another valid document that should succeed
1807+
IndexRequest validRequest2 = new IndexRequest("_index").id("valid2").setPipeline(pipelineId).setFinalPipeline("_none");
1808+
validRequest2.source(Requests.INDEX_CONTENT_TYPE, "field2", "value2");
1809+
validRequest2.setListExecutedPipelines(true);
1810+
bulkRequest.add(validRequest2);
1811+
1812+
// Invalid document with duplicated keys
1813+
String invalidJson2 = "{\"@timestamp\":\"2024-06-01T00:00:00Z\",\"@timestamp\":\"2024-06-01T00:00:00Z\"}";
1814+
IndexRequest invalidRequest2 = new IndexRequest("_index").id("invalid").setPipeline(pipelineId).setFinalPipeline("_none");
1815+
invalidRequest2.source(new BytesArray(invalidJson2), XContentType.JSON);
1816+
bulkRequest.add(invalidRequest2);
1817+
1818+
final Processor processor = mock(Processor.class);
1819+
when(processor.getType()).thenReturn("mock");
1820+
when(processor.getTag()).thenReturn("mockTag");
1821+
doAnswer(args -> {
1822+
BiConsumer<IngestDocument, Exception> handler = args.getArgument(1);
1823+
handler.accept(RandomDocumentPicks.randomIngestDocument(random()), null);
1824+
return null;
1825+
}).when(processor).execute(any(), any());
1826+
1827+
IngestService ingestService = createWithProcessors(Map.of("mock", (factories, tag, description, config, projectId) -> processor));
1828+
PutPipelineRequest putRequest = putJsonPipelineRequest("_id", "{\"processors\": [{\"mock\" : {}}]}");
1829+
var projectId = randomProjectIdOrDefault();
1830+
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
1831+
.putProjectMetadata(ProjectMetadata.builder(projectId).build())
1832+
.build();
1833+
ClusterState previousClusterState = clusterState;
1834+
clusterState = executePut(projectId, putRequest, clusterState);
1835+
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
1836+
1837+
TriConsumer<Integer, Exception, IndexDocFailureStoreStatus> requestItemErrorHandler = mock();
1838+
final ActionListener<Void> listener = mock();
1839+
1840+
ingestService.executeBulkRequest(
1841+
projectId,
1842+
4,
1843+
bulkRequest.requests(),
1844+
indexReq -> {},
1845+
(s) -> false,
1846+
(slot, targetIndex, e) -> fail("Should not redirect to failure store"),
1847+
requestItemErrorHandler,
1848+
listener
1849+
);
1850+
1851+
// The invalid documents should fail with a parsing error
1852+
verify(requestItemErrorHandler).apply(
1853+
eq(1), // slot 1 is the invalid document
1854+
argThat(e -> e instanceof XContentParseException),
1855+
eq(IndexDocFailureStoreStatus.NOT_APPLICABLE_OR_UNKNOWN)
1856+
);
1857+
verify(requestItemErrorHandler).apply(
1858+
eq(3), // slot 3 is the other invalid document
1859+
argThat(e -> e instanceof XContentParseException),
1860+
eq(IndexDocFailureStoreStatus.NOT_APPLICABLE_OR_UNKNOWN)
1861+
);
1862+
1863+
// The bulk listener should still be called with success
1864+
verify(listener).onResponse(null);
1865+
assertStats(ingestService.stats().totalStats(), 4, 2, 0);
1866+
// Verify that the valid documents were processed (they should have their pipelines executed)
1867+
assertThat(validRequest.getExecutedPipelines(), equalTo(List.of(pipelineId)));
1868+
assertThat(validRequest2.getExecutedPipelines(), equalTo(List.of(pipelineId)));
1869+
}
1870+
17871871
public void testExecuteFailureRedirection() throws Exception {
17881872
final CompoundProcessor processor = mockCompoundProcessor();
17891873
IngestService ingestService = createWithProcessors(

0 commit comments

Comments
 (0)