Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions docs/changelog/130325.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
pr: 130325
summary: "Simulate API: Return 400 on invalid processor(s)"
area: Ingest Node
type: bug
issues:
- 120731
breaking:
title: Return 400 on invalid processor(s) in Simulate API
area: Ingest
details: "In earlier versions of {es}, the Simulate API would return a 500 error\
\ when encountering invalid processors. Now, it returns a 400 Bad Request error\
\ instead."
impact: Callers should expect a 400 Bad Request response when the Simulate API encounters
invalid processors. This change improves error handling and provides clearer feedback
on request issues.
notable: false
2 changes: 1 addition & 1 deletion modules/ingest-common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ dependencies {

restResources {
restApi {
include '_common', 'ingest', 'cluster', 'indices', 'index', 'bulk', 'nodes', 'get', 'update', 'cat', 'mget', 'search'
include '_common', 'ingest', 'cluster', 'indices', 'index', 'bulk', 'nodes', 'get', 'update', 'cat', 'mget', 'search', 'simulate'
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,3 +154,45 @@ teardown:
ingest.processor_grok: {}
- length: { patterns: 318 }
- match: { patterns.PATH: "(?:%{UNIXPATH}|%{WINPATH})" }


---
"Test simulate with invalid GROK pattern":
- skip:
features: headers
- do:
catch: bad_request
headers:
Content-Type: application/json
simulate.ingest:
pipeline: "invalid-grok"
body: >
{
"docs": [
{
"_index": "index-1",
"_source": {
"foo": "bar"
}
}
],
"pipeline_substitutions": {
"invalid-grok": {
"description": "invalid grok pattern",
"processors": [
{
"grok": {
"field": "field",
"patterns": [
"%{INVALID_PATTERN:field}"
]
}
}
]
}
}
}
- match: { status: 400 }
- match: { error.reason: "[patterns] Invalid regex pattern found in: [%{INVALID_PATTERN:field}]. Unable to find pattern [INVALID_PATTERN] in Grok's pattern dictionary" }
- match: { error.property_name: "patterns" }
- match: { error.processor_type: "grok" }
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,6 @@ setup:

---
"Test bad pipeline substitution":

- skip:
features: [headers, allowed_warnings]

Expand Down Expand Up @@ -628,7 +627,7 @@ setup:
default_pipeline: "my-pipeline"

- do:
catch: "request"
catch: bad_request
headers:
Content-Type: application/json
simulate.ingest:
Expand Down Expand Up @@ -661,7 +660,8 @@ setup:
}
}
}
- match: { status: 500 }
- match: { status: 400 }
- match: { error.reason: "No processor type exists with name [non-existent-processor]" }

---
"Test index in path":
Expand Down Expand Up @@ -763,7 +763,7 @@ setup:
- skip:
features: headers
- do:
catch: request
catch: bad_request
headers:
Content-Type: application/json
simulate.ingest:
Expand All @@ -785,5 +785,5 @@ setup:
}
}
}
- match: { status: 500 }
- match: { status: 400 }
- contains: { error.reason: "Provided a pipeline property which is managed by the system: created_date." }
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

package org.elasticsearch.ingest;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.SimulateBulkRequest;
import org.elasticsearch.cluster.metadata.ProjectId;
Expand All @@ -28,6 +29,8 @@ public SimulateIngestService(IngestService ingestService, BulkRequest request) {
if (request instanceof SimulateBulkRequest simulateBulkRequest) {
try {
pipelineSubstitutions = getPipelineSubstitutions(simulateBulkRequest.getPipelineSubstitutions(), ingestService);
} catch (ElasticsearchException elasticEx) {
throw elasticEx;
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

package org.elasticsearch.ingest;

import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.bulk.FailureStoreMetrics;
import org.elasticsearch.action.bulk.SimulateBulkRequest;
import org.elasticsearch.client.internal.Client;
Expand All @@ -34,6 +35,7 @@
import static org.elasticsearch.test.LambdaMatchers.transformedMatch;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -118,6 +120,32 @@ public void testGetPipeline() {
}
}

public void testRethrowingOfElasticParseExceptionFromProcessors() {
final PipelineConfiguration pipelineConfiguration = new PipelineConfiguration("pipeline1", new BytesArray("""
{"processors": []}"""), XContentType.JSON);
final IngestMetadata ingestMetadata = new IngestMetadata(Map.of("pipeline1", pipelineConfiguration));
final Processor.Factory factoryThatThrowsElasticParseException = (factory, tag, description, config, projectId) -> {
throw new ElasticsearchParseException("exception to be caught");
};
final Map<String, Processor.Factory> processors = Map.of("parse_exception_processor", factoryThatThrowsElasticParseException);
final var projectId = randomProjectIdOrDefault();
IngestService ingestService = createWithProcessors(projectId, processors);
ingestService.innerUpdatePipelines(projectId, ingestMetadata);
SimulateBulkRequest simulateBulkRequest = new SimulateBulkRequest(
newHashMap("pipeline1", newHashMap("processors", List.of(Map.of("parse_exception_processor", new HashMap<>(0))))),
Map.of(),
Map.of(),
Map.of(),
null
);

final ElasticsearchParseException ex = assertThrows(
ElasticsearchParseException.class,
() -> new SimulateIngestService(ingestService, simulateBulkRequest)
);
assertThat(ex.getMessage(), is("exception to be caught"));
}

private static IngestService createWithProcessors(ProjectId projectId, Map<String, Processor.Factory> processors) {
Client client = mock(Client.class);
ThreadPool threadPool = mock(ThreadPool.class);
Expand Down