From d7f945c0f833d5a4d040b258d3d02c614321dbd9 Mon Sep 17 00:00:00 2001 From: Sam Xiao Date: Tue, 25 Nov 2025 15:31:38 -0500 Subject: [PATCH 1/5] Handle individual doc parsing failure in bulk request --- .../elasticsearch/ingest/IngestClientIT.java | 87 +++++++++++++++++++ .../elasticsearch/ingest/IngestService.java | 15 +++- .../ingest/IngestServiceTests.java | 87 +++++++++++++++++++ 3 files changed, 188 insertions(+), 1 deletion(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/ingest/IngestClientIT.java b/server/src/internalClusterTest/java/org/elasticsearch/ingest/IngestClientIT.java index af82907ce5641..89464adac238a 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/ingest/IngestClientIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/ingest/IngestClientIT.java @@ -30,6 +30,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.xcontent.XContentParseException; import org.elasticsearch.xcontent.XContentType; import java.util.Collection; @@ -378,6 +379,92 @@ public void testPipelineProcessorOnFailure() throws Exception { assertThat(inserted.get("readme"), equalTo("pipeline with id [3] is a bad pipeline")); } + public void testBulkRequestWithInvalidJsonAndPipeline() throws Exception { + // Test that when a document with invalid JSON is in a bulk request with a pipeline, + // the invalid document fails gracefully without causing the entire bulk request to fail. + // This tests the fix for https://github.com/elastic/elasticsearch/issues/138445 + + createIndex("test_index"); + + putJsonPipeline( + "test-pipeline", + (builder, params) -> builder.field("description", "test pipeline") + .startArray("processors") + .startObject() + .startObject("test") + .endObject() + .endObject() + .endArray() + ); + + // Create a bulk request with valid and invalid documents + BulkRequest bulkRequest = new BulkRequest(); + + // Valid document + IndexRequest validRequest = new IndexRequest("test_index").id("valid_doc"); + validRequest.source("{\"valid\":\"test\"}", XContentType.JSON); + validRequest.setPipeline("test-pipeline"); + bulkRequest.add(validRequest); + + // Invalid document with missing closing brace + IndexRequest invalidRequest = new IndexRequest("test_index").id("invalid_doc"); + invalidRequest.source("{\"invalid\":\"json\"", XContentType.JSON); + invalidRequest.setPipeline("test-pipeline"); + bulkRequest.add(invalidRequest); + + // Invalid document with duplicate fields + IndexRequest invalidRequest2 = new IndexRequest("test_index").id("invalid_doc2"); + invalidRequest2.source("{\"invalid\":\"json\", \"invalid\":\"json\"}", XContentType.JSON); + invalidRequest2.setPipeline("test-pipeline"); + bulkRequest.add(invalidRequest2); + + // Another valid document + IndexRequest validRequest2 = new IndexRequest("test_index").id("valid_doc2"); + validRequest2.source("{\"valid\":\"test2\"}", XContentType.JSON); + validRequest2.setPipeline("test-pipeline"); + bulkRequest.add(validRequest2); + + BulkResponse response = client().bulk(bulkRequest).actionGet(); + + // The bulk request should succeed + assertThat(response.hasFailures(), is(true)); + assertThat(response.getItems().length, equalTo(4)); + + // First document should succeed + BulkItemResponse item0 = response.getItems()[0]; + assertThat(item0.isFailed(), is(false)); + assertThat(item0.getResponse().getId(), equalTo("valid_doc")); + assertThat(item0.getResponse().getResult(), equalTo(DocWriteResponse.Result.CREATED)); + + // Second document should fail + BulkItemResponse item1 = response.getItems()[1]; + assertThat(item1.isFailed(), is(true)); + assertThat(item1.getFailure().getStatus(), equalTo(org.elasticsearch.rest.RestStatus.BAD_REQUEST)); + assertThat(item1.getFailure().getCause(), instanceOf(IllegalArgumentException.class)); + + // Second document should fail + BulkItemResponse item2 = response.getItems()[2]; + assertThat(item2.isFailed(), is(true)); + assertThat(item2.getFailure().getStatus(), equalTo(org.elasticsearch.rest.RestStatus.BAD_REQUEST)); + assertThat(item2.getFailure().getCause(), instanceOf(IllegalArgumentException.class)); + + // Fourth document should succeed + BulkItemResponse item3 = response.getItems()[3]; + assertThat(item3.isFailed(), is(false)); + assertThat(item3.getResponse().getId(), equalTo("valid_doc2")); + assertThat(item3.getResponse().getResult(), equalTo(DocWriteResponse.Result.CREATED)); + + // Verify that the valid documents were indexed + assertThat(client().prepareGet("test_index", "valid_doc").get().isExists(), is(true)); + assertThat(client().prepareGet("test_index", "valid_doc2").get().isExists(), is(true)); + // Verify that the invalid documents were not indexed + assertThat(client().prepareGet("test_index", "invalid_doc").get().isExists(), is(false)); + assertThat(client().prepareGet("test_index", "invalid_doc2").get().isExists(), is(false)); + + // cleanup + deletePipeline("test-pipeline"); + } + public static class ExtendedIngestTestPlugin extends IngestTestPlugin { @Override diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 685f249aa06d6..5833651c93f83 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -992,7 +992,20 @@ protected void doRun() { } final int slot = i; final Releasable ref = refs.acquire(); - final IngestDocument ingestDocument = newIngestDocument(indexRequest); + final IngestDocument ingestDocument; + try { + ingestDocument = newIngestDocument(indexRequest); + } catch (Exception e) { + // Document parsing failed (e.g. invalid JSON). Handle this gracefully + // by marking this document as failed and continuing with other documents. + final long ingestTimeInNanos = System.nanoTime() - startTimeInNanos; + totalMetrics.postIngest(ingestTimeInNanos); + totalMetrics.ingestFailed(); + ref.close(); + i++; + onFailure.apply(slot, e, IndexDocFailureStoreStatus.NOT_APPLICABLE_OR_UNKNOWN); + continue; + } final Metadata originalDocumentMetadata = ingestDocument.getMetadata().clone(); // the document listener gives us three-way logic: a document can fail processing (1), or it can // be successfully processed. a successfully processed document can be kept (2) or dropped (3). diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index bae950119685f..d46b2e0558fd2 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -73,6 +73,7 @@ import org.elasticsearch.test.MockLog; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentParseException; import org.elasticsearch.xcontent.XContentType; import org.elasticsearch.xcontent.cbor.CborXContent; import org.junit.Before; @@ -1784,6 +1785,92 @@ public void testBulkRequestExecutionWithFailures() throws Exception { verify(listener, times(1)).onResponse(null); } + public void testBulkRequestExecutionWithInvalidJsonDocument() throws Exception { + // Test that when a document with invalid JSON (e.g., duplicate keys) is in a bulk request with a pipeline, + // the invalid document fails gracefully without causing the entire bulk request to fail. + BulkRequest bulkRequest = new BulkRequest(); + String pipelineId = "_id"; + + // Valid document that should succeed + IndexRequest validRequest = new IndexRequest("_index").id("valid").setPipeline(pipelineId).setFinalPipeline("_none"); + validRequest.source(Requests.INDEX_CONTENT_TYPE, "field1", "value1"); + validRequest.setListExecutedPipelines(true); + bulkRequest.add(validRequest); + + // Invalid document with missing closing brace + String invalidJson = "{\"invalid\":\"json\""; + IndexRequest invalidRequest = new IndexRequest("_index").id("invalid").setPipeline(pipelineId).setFinalPipeline("_none"); + invalidRequest.source(new BytesArray(invalidJson), XContentType.JSON); + bulkRequest.add(invalidRequest); + + // Another valid document that should succeed + IndexRequest validRequest2 = new IndexRequest("_index").id("valid2").setPipeline(pipelineId).setFinalPipeline("_none"); + validRequest2.source(Requests.INDEX_CONTENT_TYPE, "field2", "value2"); + validRequest2.setListExecutedPipelines(true); + bulkRequest.add(validRequest2); + + // Invalid document with duplicated keys + String invalidJson2 = "{\"@timestamp\":\"2024-06-01T00:00:00Z\",\"@timestamp\":\"2024-06-01T00:00:00Z\"}"; + IndexRequest invalidRequest2 = new IndexRequest("_index").id("invalid").setPipeline(pipelineId).setFinalPipeline("_none"); + invalidRequest2.source(new BytesArray(invalidJson2), XContentType.JSON); + bulkRequest.add(invalidRequest2); + + final Processor processor = mock(Processor.class); + when(processor.getType()).thenReturn("mock"); + when(processor.getTag()).thenReturn("mockTag"); + doAnswer(args -> { + @SuppressWarnings("unchecked") + BiConsumer handler = (BiConsumer) args.getArguments()[1]; + handler.accept(RandomDocumentPicks.randomIngestDocument(random()), null); + return null; + }).when(processor).execute(any(), any()); + + IngestService ingestService = createWithProcessors(Map.of("mock", (factories, tag, description, config, projectId) -> processor)); + PutPipelineRequest putRequest = putJsonPipelineRequest("_id", "{\"processors\": [{\"mock\" : {}}]}"); + var projectId = randomProjectIdOrDefault(); + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) + .putProjectMetadata(ProjectMetadata.builder(projectId).build()) + .build(); + ClusterState previousClusterState = clusterState; + clusterState = executePut(projectId, putRequest, clusterState); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + + @SuppressWarnings("unchecked") + TriConsumer requestItemErrorHandler = mock(TriConsumer.class); + @SuppressWarnings("unchecked") + final ActionListener listener = mock(ActionListener.class); + + ingestService.executeBulkRequest( + projectId, + 4, + bulkRequest.requests(), + indexReq -> {}, + (s) -> false, + (slot, targetIndex, e) -> fail("Should not redirect to failure store"), + requestItemErrorHandler, + listener + ); + + // The invalid documents should fail with a parsing error + verify(requestItemErrorHandler).apply( + eq(1), // slot 1 is the invalid document + argThat(e -> e instanceof XContentParseException), + eq(IndexDocFailureStoreStatus.NOT_APPLICABLE_OR_UNKNOWN) + ); + verify(requestItemErrorHandler).apply( + eq(3), // slot 3 is the other invalid document + argThat(e -> e instanceof XContentParseException), + eq(IndexDocFailureStoreStatus.NOT_APPLICABLE_OR_UNKNOWN) + ); + + // The bulk listener should still be called with success + verify(listener).onResponse(null); + assertStats(ingestService.stats().totalStats(), 4,2,0); + // Verify that the valid documents were processed (they should have their pipelines executed) + assertThat(validRequest.getExecutedPipelines(), equalTo(List.of(pipelineId))); + assertThat(validRequest2.getExecutedPipelines(), equalTo(List.of(pipelineId))); + } + public void testExecuteFailureRedirection() throws Exception { final CompoundProcessor processor = mockCompoundProcessor(); IngestService ingestService = createWithProcessors( From 1d25bc30996d32169d49bbd83dd77b5524a7e5dc Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Tue, 25 Nov 2025 20:41:19 +0000 Subject: [PATCH 2/5] [CI] Auto commit changes from spotless --- .../java/org/elasticsearch/ingest/IngestClientIT.java | 1 - .../test/java/org/elasticsearch/ingest/IngestServiceTests.java | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/ingest/IngestClientIT.java b/server/src/internalClusterTest/java/org/elasticsearch/ingest/IngestClientIT.java index 89464adac238a..c7c5bce4a4c35 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/ingest/IngestClientIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/ingest/IngestClientIT.java @@ -30,7 +30,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; -import org.elasticsearch.xcontent.XContentParseException; import org.elasticsearch.xcontent.XContentType; import java.util.Collection; diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index d46b2e0558fd2..d15665eec4593 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -1865,7 +1865,7 @@ public void testBulkRequestExecutionWithInvalidJsonDocument() throws Exception { // The bulk listener should still be called with success verify(listener).onResponse(null); - assertStats(ingestService.stats().totalStats(), 4,2,0); + assertStats(ingestService.stats().totalStats(), 4, 2, 0); // Verify that the valid documents were processed (they should have their pipelines executed) assertThat(validRequest.getExecutedPipelines(), equalTo(List.of(pipelineId))); assertThat(validRequest2.getExecutedPipelines(), equalTo(List.of(pipelineId))); From b3310500bebd620279ee354b7a694137b56422c4 Mon Sep 17 00:00:00 2001 From: Sam Xiao Date: Tue, 25 Nov 2025 16:02:06 -0500 Subject: [PATCH 3/5] Update docs/changelog/138624.yaml --- docs/changelog/138624.yaml | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 docs/changelog/138624.yaml diff --git a/docs/changelog/138624.yaml b/docs/changelog/138624.yaml new file mode 100644 index 0000000000000..679a4ca8a4525 --- /dev/null +++ b/docs/changelog/138624.yaml @@ -0,0 +1,6 @@ +pr: 138624 +summary: Handle individual doc parsing failure in bulk request with pipeline +area: Ingest Node +type: bug +issues: + - 138445 From e0ee15fb641a1af57fed79b3cf422494bfd51e8a Mon Sep 17 00:00:00 2001 From: Sam Xiao Date: Wed, 26 Nov 2025 12:57:51 -0500 Subject: [PATCH 4/5] remove unchecked --- .../org/elasticsearch/ingest/IngestServiceTests.java | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index d15665eec4593..ff8416863be0c 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -1819,8 +1819,7 @@ public void testBulkRequestExecutionWithInvalidJsonDocument() throws Exception { when(processor.getType()).thenReturn("mock"); when(processor.getTag()).thenReturn("mockTag"); doAnswer(args -> { - @SuppressWarnings("unchecked") - BiConsumer handler = (BiConsumer) args.getArguments()[1]; + BiConsumer handler = args.getArgument(1); handler.accept(RandomDocumentPicks.randomIngestDocument(random()), null); return null; }).when(processor).execute(any(), any()); @@ -1835,10 +1834,8 @@ public void testBulkRequestExecutionWithInvalidJsonDocument() throws Exception { clusterState = executePut(projectId, putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); - @SuppressWarnings("unchecked") - TriConsumer requestItemErrorHandler = mock(TriConsumer.class); - @SuppressWarnings("unchecked") - final ActionListener listener = mock(ActionListener.class); + TriConsumer requestItemErrorHandler = mock(); + final ActionListener listener = mock(); ingestService.executeBulkRequest( projectId, From 0207eb727db8e9c6152c1ce07a4a886472975b78 Mon Sep 17 00:00:00 2001 From: Sam Xiao Date: Wed, 26 Nov 2025 13:04:33 -0500 Subject: [PATCH 5/5] minor typo --- .../java/org/elasticsearch/ingest/IngestClientIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/ingest/IngestClientIT.java b/server/src/internalClusterTest/java/org/elasticsearch/ingest/IngestClientIT.java index c7c5bce4a4c35..b95589cf0563c 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/ingest/IngestClientIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/ingest/IngestClientIT.java @@ -441,7 +441,7 @@ public void testBulkRequestWithInvalidJsonAndPipeline() throws Exception { assertThat(item1.getFailure().getStatus(), equalTo(org.elasticsearch.rest.RestStatus.BAD_REQUEST)); assertThat(item1.getFailure().getCause(), instanceOf(IllegalArgumentException.class)); - // Second document should fail + // Third document should fail BulkItemResponse item2 = response.getItems()[2]; assertThat(item2.isFailed(), is(true)); assertThat(item2.getFailure().getStatus(), equalTo(org.elasticsearch.rest.RestStatus.BAD_REQUEST));