Skip to content

Commit a8540c9

Browse files
authored
[7.9] Handle error conditions when simulating ingest pipelines with verbosity enabled (#63518)
1 parent 34fd573 commit a8540c9

File tree

4 files changed

+100
-13
lines changed

4 files changed

+100
-13
lines changed

modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/90_simulate.yml

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -417,6 +417,58 @@ teardown:
417417
- is_true: docs.1.processor_results.1.doc._ingest.timestamp
418418
- is_true: docs.1.processor_results.1.doc._ingest.pipeline
419419

420+
---
421+
"Test verbose simulate with error in pipeline":
422+
- do:
423+
ingest.put_pipeline:
424+
id: "my_pipeline"
425+
body: >
426+
{
427+
"description": "_description",
428+
"processors": [
429+
{
430+
"rename" : {
431+
"field" : "does_not_exist",
432+
"target_field" : "_value"
433+
}
434+
}
435+
]
436+
}
437+
- match: { acknowledged: true }
438+
439+
- do:
440+
ingest.simulate:
441+
verbose: true
442+
body: >
443+
{
444+
"pipeline": {
445+
"description": "_description",
446+
"processors": [
447+
{
448+
"pipeline" : {
449+
"name" : "my_pipeline"
450+
}
451+
}
452+
]
453+
},
454+
"docs": [
455+
{
456+
"_index": "index",
457+
"_id": "id",
458+
"_source": {
459+
"foo": "bar",
460+
"bar": "hello"
461+
}
462+
}
463+
]
464+
}
465+
- length: { docs: 1 }
466+
- length: { docs.0.processor_results: 1 }
467+
- match: { docs.0.processor_results.0.error.root_cause.0.type: "illegal_argument_exception" }
468+
- match: { docs.0.processor_results.0.error.root_cause.0.reason: "field [does_not_exist] doesn't exist" }
469+
- match: { docs.0.processor_results.0.error.type: "illegal_argument_exception" }
470+
- match: { docs.0.processor_results.0.error.reason: "field [does_not_exist] doesn't exist" }
471+
420472
---
421473
"Test verbose simulate with on_failure":
422474
- do:

server/src/main/java/org/elasticsearch/ingest/IngestDocument.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
public final class IngestDocument {
5252

5353
public static final String INGEST_KEY = "_ingest";
54+
public static final String PIPELINE_CYCLE_ERROR_MESSAGE = "Cycle detected for pipeline: ";
5455
private static final String INGEST_KEY_PREFIX = INGEST_KEY + ".";
5556
private static final String SOURCE_PREFIX = SourceFieldMapper.NAME + ".";
5657

@@ -685,7 +686,7 @@ public void executePipeline(Pipeline pipeline, BiConsumer<IngestDocument, Except
685686
handler.accept(result, e);
686687
});
687688
} else {
688-
handler.accept(null, new IllegalStateException("Cycle detected for pipeline: " + pipeline.getId()));
689+
handler.accept(null, new IllegalStateException(PIPELINE_CYCLE_ERROR_MESSAGE + pipeline.getId()));
689690
}
690691
}
691692

server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import java.util.List;
2727
import java.util.function.BiConsumer;
2828

29+
import static org.elasticsearch.ingest.IngestDocument.PIPELINE_CYCLE_ERROR_MESSAGE;
30+
2931
/**
3032
* Processor to be used within Simulate API to keep track of processors executed in pipeline.
3133
*/
@@ -59,19 +61,17 @@ public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Ex
5961
//runtime check for cycles against a copy of the document. This is needed to properly handle conditionals around pipelines
6062
IngestDocument ingestDocumentCopy = new IngestDocument(ingestDocument);
6163
ingestDocumentCopy.executePipeline(pipelineProcessor.getPipeline(ingestDocument), (result, e) -> {
62-
// do nothing, let the tracking processors throw the exception while recording the path up to the failure
63-
if (e instanceof ElasticsearchException) {
64-
ElasticsearchException elasticsearchException = (ElasticsearchException) e;
65-
//else do nothing, let the tracking processors throw the exception while recording the path up to the failure
66-
if (elasticsearchException.getCause() instanceof IllegalStateException) {
67-
if (ignoreFailure) {
68-
processorResultList.add(new SimulateProcessorResult(pipelineProcessor.getTag(),
69-
new IngestDocument(ingestDocument), e));
70-
} else {
71-
processorResultList.add(new SimulateProcessorResult(pipelineProcessor.getTag(), e));
72-
}
73-
handler.accept(null, elasticsearchException);
64+
// special handling for pipeline cycle errors
65+
if (e instanceof ElasticsearchException &&
66+
e.getCause() instanceof IllegalStateException &&
67+
e.getCause().getMessage().startsWith(PIPELINE_CYCLE_ERROR_MESSAGE)) {
68+
if (ignoreFailure) {
69+
processorResultList.add(new SimulateProcessorResult(pipelineProcessor.getTag(),
70+
new IngestDocument(ingestDocument), e));
71+
} else {
72+
processorResultList.add(new SimulateProcessorResult(pipelineProcessor.getTag(), e));
7473
}
74+
handler.accept(null, e);
7575
} else {
7676
//now that we know that there are no cycles between pipelines, decorate the processors for this pipeline and execute it
7777
CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), null, processorResultList);

server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -476,6 +476,40 @@ pipelineId, null, null, new CompoundProcessor(
476476
assertThat(resultList.get(3).getProcessorTag(), nullValue());
477477
}
478478

479+
public void testActualPipelineProcessorWithUnhandledFailure() throws Exception {
480+
String pipelineId = "pipeline1";
481+
IngestService ingestService = createIngestService();
482+
Map<String, Object> pipelineConfig = new HashMap<>();
483+
pipelineConfig.put("name", pipelineId);
484+
PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService);
485+
486+
String key1 = randomAlphaOfLength(10);
487+
IllegalStateException exception = new IllegalStateException("Not a pipeline cycle error");
488+
489+
Pipeline pipeline = new Pipeline(
490+
pipelineId, null, null, new CompoundProcessor(
491+
new TestProcessor(ingestDocument -> ingestDocument.setFieldValue(key1, randomInt())),
492+
new TestProcessor(ingestDocument -> { throw exception; }))
493+
);
494+
when(ingestService.getPipeline(pipelineId)).thenReturn(pipeline);
495+
496+
PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, null, pipelineConfig);
497+
CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor);
498+
499+
CompoundProcessor trackingProcessor = decorate(actualProcessor, null, resultList);
500+
501+
trackingProcessor.execute(ingestDocument, (result, e) -> {});
502+
503+
SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument);
504+
expectedResult.getIngestDocument().getIngestMetadata().put("pipeline", pipelineId);
505+
506+
verify(ingestService, Mockito.atLeast(1)).getPipeline(pipelineId);
507+
508+
assertThat(resultList.size(), equalTo(2));
509+
assertTrue(resultList.get(0).getIngestDocument().hasField(key1));
510+
assertThat(resultList.get(1).getFailure(), equalTo(exception));
511+
}
512+
479513
public void testActualPipelineProcessorWithCycle() throws Exception {
480514
String pipelineId1 = "pipeline1";
481515
String pipelineId2 = "pipeline2";

0 commit comments

Comments
 (0)