Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/138624.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 138624
summary: Handle individual doc parsing failure in bulk request with pipeline
area: Ingest Node
type: bug
issues:
- 138445
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,92 @@ public void testPipelineProcessorOnFailure() throws Exception {
assertThat(inserted.get("readme"), equalTo("pipeline with id [3] is a bad pipeline"));
}

public void testBulkRequestWithInvalidJsonAndPipeline() throws Exception {
// Test that when a document with invalid JSON is in a bulk request with a pipeline,
// the invalid document fails gracefully without causing the entire bulk request to fail.
// This tests the fix for https://github.com/elastic/elasticsearch/issues/138445

createIndex("test_index");

putJsonPipeline(
"test-pipeline",
(builder, params) -> builder.field("description", "test pipeline")
.startArray("processors")
.startObject()
.startObject("test")
.endObject()
.endObject()
.endArray()
);

// Create a bulk request with valid and invalid documents
BulkRequest bulkRequest = new BulkRequest();

// Valid document
IndexRequest validRequest = new IndexRequest("test_index").id("valid_doc");
validRequest.source("{\"valid\":\"test\"}", XContentType.JSON);
validRequest.setPipeline("test-pipeline");
bulkRequest.add(validRequest);

// Invalid document with missing closing brace
IndexRequest invalidRequest = new IndexRequest("test_index").id("invalid_doc");
invalidRequest.source("{\"invalid\":\"json\"", XContentType.JSON);
invalidRequest.setPipeline("test-pipeline");
bulkRequest.add(invalidRequest);

// Invalid document with duplicate fields
IndexRequest invalidRequest2 = new IndexRequest("test_index").id("invalid_doc2");
invalidRequest2.source("{\"invalid\":\"json\", \"invalid\":\"json\"}", XContentType.JSON);
invalidRequest2.setPipeline("test-pipeline");
bulkRequest.add(invalidRequest2);

// Another valid document
IndexRequest validRequest2 = new IndexRequest("test_index").id("valid_doc2");
validRequest2.source("{\"valid\":\"test2\"}", XContentType.JSON);
validRequest2.setPipeline("test-pipeline");
bulkRequest.add(validRequest2);

BulkResponse response = client().bulk(bulkRequest).actionGet();

// The bulk request should succeed
assertThat(response.hasFailures(), is(true));
assertThat(response.getItems().length, equalTo(4));

// First document should succeed
BulkItemResponse item0 = response.getItems()[0];
assertThat(item0.isFailed(), is(false));
assertThat(item0.getResponse().getId(), equalTo("valid_doc"));
assertThat(item0.getResponse().getResult(), equalTo(DocWriteResponse.Result.CREATED));

// Second document should fail
BulkItemResponse item1 = response.getItems()[1];
assertThat(item1.isFailed(), is(true));
assertThat(item1.getFailure().getStatus(), equalTo(org.elasticsearch.rest.RestStatus.BAD_REQUEST));
assertThat(item1.getFailure().getCause(), instanceOf(IllegalArgumentException.class));

// Second document should fail
BulkItemResponse item2 = response.getItems()[2];
assertThat(item2.isFailed(), is(true));
assertThat(item2.getFailure().getStatus(), equalTo(org.elasticsearch.rest.RestStatus.BAD_REQUEST));
assertThat(item2.getFailure().getCause(), instanceOf(IllegalArgumentException.class));

// Fourth document should succeed
BulkItemResponse item3 = response.getItems()[3];
assertThat(item3.isFailed(), is(false));
assertThat(item3.getResponse().getId(), equalTo("valid_doc2"));
assertThat(item3.getResponse().getResult(), equalTo(DocWriteResponse.Result.CREATED));

// Verify that the valid documents were indexed
assertThat(client().prepareGet("test_index", "valid_doc").get().isExists(), is(true));
assertThat(client().prepareGet("test_index", "valid_doc2").get().isExists(), is(true));
// Verify that the invalid documents were not indexed
assertThat(client().prepareGet("test_index", "invalid_doc").get().isExists(), is(false));
assertThat(client().prepareGet("test_index", "invalid_doc2").get().isExists(), is(false));

// cleanup
deletePipeline("test-pipeline");
}

public static class ExtendedIngestTestPlugin extends IngestTestPlugin {

@Override
Expand Down
15 changes: 14 additions & 1 deletion server/src/main/java/org/elasticsearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -992,7 +992,20 @@ protected void doRun() {
}
final int slot = i;
final Releasable ref = refs.acquire();
final IngestDocument ingestDocument = newIngestDocument(indexRequest);
final IngestDocument ingestDocument;
try {
ingestDocument = newIngestDocument(indexRequest);
} catch (Exception e) {
// Document parsing failed (e.g. invalid JSON). Handle this gracefully
// by marking this document as failed and continuing with other documents.
final long ingestTimeInNanos = System.nanoTime() - startTimeInNanos;
totalMetrics.postIngest(ingestTimeInNanos);
totalMetrics.ingestFailed();
ref.close();
i++;
onFailure.apply(slot, e, IndexDocFailureStoreStatus.NOT_APPLICABLE_OR_UNKNOWN);
continue;
}
final Metadata originalDocumentMetadata = ingestDocument.getMetadata().clone();
// the document listener gives us three-way logic: a document can fail processing (1), or it can
// be successfully processed. a successfully processed document can be kept (2) or dropped (3).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import org.elasticsearch.test.MockLog;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentParseException;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.xcontent.cbor.CborXContent;
import org.junit.Before;
Expand Down Expand Up @@ -1784,6 +1785,92 @@ public void testBulkRequestExecutionWithFailures() throws Exception {
verify(listener, times(1)).onResponse(null);
}

public void testBulkRequestExecutionWithInvalidJsonDocument() throws Exception {
// Test that when a document with invalid JSON (e.g., duplicate keys) is in a bulk request with a pipeline,
// the invalid document fails gracefully without causing the entire bulk request to fail.
BulkRequest bulkRequest = new BulkRequest();
String pipelineId = "_id";

// Valid document that should succeed
IndexRequest validRequest = new IndexRequest("_index").id("valid").setPipeline(pipelineId).setFinalPipeline("_none");
validRequest.source(Requests.INDEX_CONTENT_TYPE, "field1", "value1");
validRequest.setListExecutedPipelines(true);
bulkRequest.add(validRequest);

// Invalid document with missing closing brace
String invalidJson = "{\"invalid\":\"json\"";
IndexRequest invalidRequest = new IndexRequest("_index").id("invalid").setPipeline(pipelineId).setFinalPipeline("_none");
invalidRequest.source(new BytesArray(invalidJson), XContentType.JSON);
bulkRequest.add(invalidRequest);

// Another valid document that should succeed
IndexRequest validRequest2 = new IndexRequest("_index").id("valid2").setPipeline(pipelineId).setFinalPipeline("_none");
validRequest2.source(Requests.INDEX_CONTENT_TYPE, "field2", "value2");
validRequest2.setListExecutedPipelines(true);
bulkRequest.add(validRequest2);

// Invalid document with duplicated keys
String invalidJson2 = "{\"@timestamp\":\"2024-06-01T00:00:00Z\",\"@timestamp\":\"2024-06-01T00:00:00Z\"}";
IndexRequest invalidRequest2 = new IndexRequest("_index").id("invalid").setPipeline(pipelineId).setFinalPipeline("_none");
invalidRequest2.source(new BytesArray(invalidJson2), XContentType.JSON);
bulkRequest.add(invalidRequest2);

final Processor processor = mock(Processor.class);
when(processor.getType()).thenReturn("mock");
when(processor.getTag()).thenReturn("mockTag");
doAnswer(args -> {
@SuppressWarnings("unchecked")
BiConsumer<IngestDocument, Exception> handler = (BiConsumer) args.getArguments()[1];
handler.accept(RandomDocumentPicks.randomIngestDocument(random()), null);
return null;
}).when(processor).execute(any(), any());

IngestService ingestService = createWithProcessors(Map.of("mock", (factories, tag, description, config, projectId) -> processor));
PutPipelineRequest putRequest = putJsonPipelineRequest("_id", "{\"processors\": [{\"mock\" : {}}]}");
var projectId = randomProjectIdOrDefault();
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
.putProjectMetadata(ProjectMetadata.builder(projectId).build())
.build();
ClusterState previousClusterState = clusterState;
clusterState = executePut(projectId, putRequest, clusterState);
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));

@SuppressWarnings("unchecked")
TriConsumer<Integer, Exception, IndexDocFailureStoreStatus> requestItemErrorHandler = mock(TriConsumer.class);
@SuppressWarnings("unchecked")
final ActionListener<Void> listener = mock(ActionListener.class);

ingestService.executeBulkRequest(
projectId,
4,
bulkRequest.requests(),
indexReq -> {},
(s) -> false,
(slot, targetIndex, e) -> fail("Should not redirect to failure store"),
requestItemErrorHandler,
listener
);

// The invalid documents should fail with a parsing error
verify(requestItemErrorHandler).apply(
eq(1), // slot 1 is the invalid document
argThat(e -> e instanceof XContentParseException),
eq(IndexDocFailureStoreStatus.NOT_APPLICABLE_OR_UNKNOWN)
);
verify(requestItemErrorHandler).apply(
eq(3), // slot 3 is the other invalid document
argThat(e -> e instanceof XContentParseException),
eq(IndexDocFailureStoreStatus.NOT_APPLICABLE_OR_UNKNOWN)
);

// The bulk listener should still be called with success
verify(listener).onResponse(null);
assertStats(ingestService.stats().totalStats(), 4, 2, 0);
// Verify that the valid documents were processed (they should have their pipelines executed)
assertThat(validRequest.getExecutedPipelines(), equalTo(List.of(pipelineId)));
assertThat(validRequest2.getExecutedPipelines(), equalTo(List.of(pipelineId)));
}

public void testExecuteFailureRedirection() throws Exception {
final CompoundProcessor processor = mockCompoundProcessor();
IngestService ingestService = createWithProcessors(
Expand Down