Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions docs/changelog/130325.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
pr: 130325
summary: "Simulate API: Return 400 on invalid processor(s)"
area: Ingest Node
type: bug
issues:
- 120731
breaking:
title: Return 400 on invalid processor(s) in Simulate API
area: Ingest
details: "In earlier versions of {es}, the Simulate API would return a 500 error\
\ when encountering invalid processors. Now, it returns a 400 Bad Request error\
\ instead."
impact: Callers should expect a 400 Bad Request response when the Simulate API encounters
invalid processors. This change improves error handling and provides clearer feedback
on request issues.
notable: false
2 changes: 1 addition & 1 deletion modules/ingest-common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ dependencies {

restResources {
restApi {
include '_common', 'ingest', 'cluster', 'indices', 'index', 'bulk', 'nodes', 'get', 'update', 'cat', 'mget', 'search'
include '_common', 'ingest', 'cluster', 'indices', 'index', 'bulk', 'nodes', 'get', 'update', 'cat', 'mget', 'search', 'simulate'
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,3 +154,48 @@ teardown:
ingest.processor_grok: {}
- length: { patterns: 318 }
- match: { patterns.PATH: "(?:%{UNIXPATH}|%{WINPATH})" }


---
"Test simulate with invalid GROK pattern":
- requires:
cluster_features: [ "simulate.ingest.400_on_failure" ]
reason: "simulate.ingest returned 500 on failure before"
- skip:
features: headers
- do:
catch: bad_request
headers:
Content-Type: application/json
simulate.ingest:
pipeline: "invalid-grok"
body: >
{
"docs": [
{
"_index": "index-1",
"_source": {
"foo": "bar"
}
}
],
"pipeline_substitutions": {
"invalid-grok": {
"description": "invalid grok pattern",
"processors": [
{
"grok": {
"field": "field",
"patterns": [
"%{INVALID_PATTERN:field}"
]
}
}
]
}
}
}
- match: { status: 400 }
- match: { error.reason: "[patterns] Invalid regex pattern found in: [%{INVALID_PATTERN:field}]. Unable to find pattern [INVALID_PATTERN] in Grok's pattern dictionary" }
- match: { error.property_name: "patterns" }
- match: { error.processor_type: "grok" }
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,9 @@ setup:

---
"Test bad pipeline substitution":

- requires:
cluster_features: [ "simulate.ingest.400_on_failure" ]
reason: "simulate.ingest returned 500 on failure before"
- skip:
features: [headers, allowed_warnings]

Expand Down Expand Up @@ -628,7 +630,7 @@ setup:
default_pipeline: "my-pipeline"

- do:
catch: "request"
catch: bad_request
headers:
Content-Type: application/json
simulate.ingest:
Expand Down Expand Up @@ -661,7 +663,8 @@ setup:
}
}
}
- match: { status: 500 }
- match: { status: 400 }
- match: { error.reason: "No processor type exists with name [non-existent-processor]" }

---
"Test index in path":
Expand Down Expand Up @@ -751,6 +754,9 @@ setup:

---
"Test simulate with pipeline with created_date":
- requires:
cluster_features: [ "simulate.ingest.400_on_failure" ]
reason: "simulate.ingest returned 500 on failure before"
- requires:
test_runner_features: capabilities
capabilities:
Expand All @@ -763,7 +769,7 @@ setup:
- skip:
features: headers
- do:
catch: request
catch: bad_request
headers:
Content-Type: application/json
simulate.ingest:
Expand All @@ -785,5 +791,5 @@ setup:
}
}
}
- match: { status: 500 }
- match: { status: 400 }
- contains: { error.reason: "Provided a pipeline property which is managed by the system: created_date." }
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import java.util.Set;

public class IngestFeatures implements FeatureSpecification {
private static final NodeFeature SIMULATE_INGEST_400_ON_FAILURE = new NodeFeature("simulate.ingest.400_on_failure", true);

@Override
public Set<NodeFeature> getFeatures() {
if (DataStream.LOGS_STREAM_FEATURE_FLAG) {
Expand All @@ -24,4 +26,9 @@ public Set<NodeFeature> getFeatures() {
return Set.of();
}
}

@Override
public Set<NodeFeature> getTestFeatures() {
return Set.of(SIMULATE_INGEST_400_ON_FAILURE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

package org.elasticsearch.ingest;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.SimulateBulkRequest;
import org.elasticsearch.cluster.metadata.ProjectId;
Expand All @@ -28,6 +29,8 @@ public SimulateIngestService(IngestService ingestService, BulkRequest request) {
if (request instanceof SimulateBulkRequest simulateBulkRequest) {
try {
pipelineSubstitutions = getPipelineSubstitutions(simulateBulkRequest.getPipelineSubstitutions(), ingestService);
} catch (ElasticsearchException elasticEx) {
throw elasticEx;
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

package org.elasticsearch.ingest;

import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.bulk.FailureStoreMetrics;
import org.elasticsearch.action.bulk.SimulateBulkRequest;
import org.elasticsearch.client.internal.Client;
Expand All @@ -34,6 +35,7 @@
import static org.elasticsearch.test.LambdaMatchers.transformedMatch;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -118,6 +120,32 @@ public void testGetPipeline() {
}
}

public void testRethrowingOfElasticParseExceptionFromProcessors() {
final PipelineConfiguration pipelineConfiguration = new PipelineConfiguration("pipeline1", new BytesArray("""
{"processors": []}"""), XContentType.JSON);
final IngestMetadata ingestMetadata = new IngestMetadata(Map.of("pipeline1", pipelineConfiguration));
final Processor.Factory factoryThatThrowsElasticParseException = (factory, tag, description, config, projectId) -> {
throw new ElasticsearchParseException("exception to be caught");
};
final Map<String, Processor.Factory> processors = Map.of("parse_exception_processor", factoryThatThrowsElasticParseException);
final var projectId = randomProjectIdOrDefault();
IngestService ingestService = createWithProcessors(projectId, processors);
ingestService.innerUpdatePipelines(projectId, ingestMetadata);
SimulateBulkRequest simulateBulkRequest = new SimulateBulkRequest(
newHashMap("pipeline1", newHashMap("processors", List.of(Map.of("parse_exception_processor", new HashMap<>(0))))),
Map.of(),
Map.of(),
Map.of(),
null
);

final ElasticsearchParseException ex = assertThrows(
ElasticsearchParseException.class,
() -> new SimulateIngestService(ingestService, simulateBulkRequest)
);
assertThat(ex.getMessage(), is("exception to be caught"));
}

private static IngestService createWithProcessors(ProjectId projectId, Map<String, Processor.Factory> processors) {
Client client = mock(Client.class);
ThreadPool threadPool = mock(ThreadPool.class);
Expand Down