Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -606,3 +606,204 @@ setup:
- match: { docs.0.doc._source.foo: "FOO" }
- match: { docs.0.doc.executed_pipelines: [] }
- not_exists: docs.0.doc.error

---
"Test ingest simulate with component template substitutions for data streams":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

<3

# In this test, we make sure that when the index template is a data stream template, simulte ingest works the same whether the data stream
# has been created or not -- either way, we expect it to use the template rather than the data stream / index mappings and settings.

- skip:
features:
- headers
- allowed_warnings

- requires:
cluster_features: ["simulate.component.template.substitutions"]
reason: "ingest simulate component template substitutions added in 8.16"

- do:
headers:
Content-Type: application/json
ingest.put_pipeline:
id: "foo-pipeline"
body: >
{
"processors": [
{
"set": {
"field": "foo",
"value": true
}
}
]
}
- match: { acknowledged: true }

- do:
cluster.put_component_template:
name: mappings_template
body:
template:
mappings:
dynamic: strict
properties:
foo:
type: keyword

- do:
cluster.put_component_template:
name: settings_template
body:
template:
settings:
index:
default_pipeline: "foo-pipeline"

- do:
allowed_warnings:
- "index template [test-composable-1] has index patterns [foo*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [test-composable-1] will take precedence during new index creation"
indices.put_index_template:
name: test-composable-1
body:
index_patterns:
- foo*
composed_of:
- mappings_template
- settings_template

- do:
allowed_warnings:
- "index template [my-template1] has index patterns [simple-data-stream1] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-template1] will take precedence during new index creation"
indices.put_index_template:
name: my-template1
body:
index_patterns: [simple-data-stream1]
composed_of:
- mappings_template
- settings_template
data_stream: {}

- do:
headers:
Content-Type: application/json
simulate.ingest:
index: simple-data-stream1
body: >
{
"docs": [
{
"_id": "asdf",
"_source": {
"@timestamp": 1234,
"foo": false
}
}
],
"pipeline_substitutions": {
"foo-pipeline-2": {
"processors": [
{
"set": {
"field": "foo",
"value": "FOO"
}
}
]
}
},
"component_template_substitutions": {
"settings_template": {
"template": {
"settings": {
"index": {
"default_pipeline": "foo-pipeline-2"
}
}
}
},
"mappings_template": {
"template": {
"mappings": {
"dynamic": "strict",
"properties": {
"foo": {
"type": "keyword"
}
}
}
}
}
}
}
- length: { docs: 1 }
- match: { docs.0.doc._index: "simple-data-stream1" }
- match: { docs.0.doc._source.foo: "FOO" }
- match: { docs.0.doc.executed_pipelines: ["foo-pipeline-2"] }
- not_exists: docs.0.doc.error

- do:
indices.create_data_stream:
name: simple-data-stream1
- is_true: acknowledged

- do:
cluster.health:
wait_for_status: yellow

- do:
headers:
Content-Type: application/json
simulate.ingest:
index: simple-data-stream1
body: >
{
"docs": [
{
"_id": "asdf",
"_source": {
"@timestamp": 1234,
"foo": false
}
}
],
"pipeline_substitutions": {
"foo-pipeline-2": {
"processors": [
{
"set": {
"field": "foo",
"value": "FOO"
}
}
]
}
},
"component_template_substitutions": {
"settings_template": {
"template": {
"settings": {
"index": {
"default_pipeline": "foo-pipeline-2"
}
}
}
},
"mappings_template": {
"template": {
"mappings": {
"dynamic": "strict",
"properties": {
"foo": {
"type": "keyword"
}
}
}
}
}
}
}
- length: { docs: 1 }
- match: { docs.0.doc._index: "simple-data-stream1" }
- match: { docs.0.doc._source.foo: "FOO" }
- match: { docs.0.doc.executed_pipelines: ["foo-pipeline-2"] }
- not_exists: docs.0.doc.error
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.AliasMetadata;
import org.elasticsearch.cluster.metadata.ComponentTemplate;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.DataStreamLifecycle;
Expand Down Expand Up @@ -157,8 +156,7 @@ protected void masterOperation(
xContentRegistry,
indicesService,
systemIndices,
indexSettingProviders,
Map.of()
indexSettingProviders
);

final Map<String, List<String>> overlapping = new HashMap<>();
Expand Down Expand Up @@ -235,8 +233,7 @@ public static Template resolveTemplate(
final NamedXContentRegistry xContentRegistry,
final IndicesService indicesService,
final SystemIndices systemIndices,
Set<IndexSettingProvider> indexSettingProviders,
Map<String, ComponentTemplate> componentTemplateSubstitutions
Set<IndexSettingProvider> indexSettingProviders
) throws Exception {
var metadata = simulatedState.getMetadata();
Settings templateSettings = resolveSettings(simulatedState.metadata(), matchingTemplate);
Expand Down Expand Up @@ -266,7 +263,6 @@ public static Template resolveTemplate(
null, // empty request mapping as the user can't specify any explicit mappings via the simulate api
simulatedState,
matchingTemplate,
componentTemplateSubstitutions,
xContentRegistry,
simulatedIndexName
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,7 @@ protected void masterOperation(
xContentRegistry,
indicesService,
systemIndices,
indexSettingProviders,
Map.of()
indexSettingProviders
);
if (request.includeDefaults()) {
listener.onResponse(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -180,12 +181,48 @@ protected void doRun() throws IOException {
private boolean applyPipelines(Task task, BulkRequest bulkRequest, Executor executor, ActionListener<BulkResponse> listener)
throws IOException {
boolean hasIndexRequestsWithPipelines = false;
final Metadata metadata = clusterService.state().getMetadata();
Map<String, ComponentTemplate> templateSubstitutions = bulkRequest.getComponentTemplateSubstitutions();
final Metadata metadata;
Map<String, ComponentTemplate> componentTemplateSubstitutions = bulkRequest.getComponentTemplateSubstitutions();
if (bulkRequest.isSimulated() && componentTemplateSubstitutions.isEmpty() == false) {
/*
* If this is a simulated request, and there are template substitutions, then we want to create and use a new metadata that has
* those templates. That is, we want to add the new templates (which will replace any that already existed with the same name),
* and remove the indices and data streams that are referred to from the bulkRequest so that we get settings from the templates
* rather than from the indices/data streams.
*/
Metadata.Builder simulatedMetadataBuilder = Metadata.builder(clusterService.state().getMetadata());
if (componentTemplateSubstitutions.isEmpty() == false) {
Map<String, ComponentTemplate> updatedComponentTemplates = new HashMap<>();
updatedComponentTemplates.putAll(clusterService.state().metadata().componentTemplates());
updatedComponentTemplates.putAll(componentTemplateSubstitutions);
simulatedMetadataBuilder.componentTemplates(updatedComponentTemplates);
}
/*
* We now remove the index from the simulated metadata to force the templates to be used. Note that simulated requests are
* always index requests -- no other type of request is supported.
*/
for (DocWriteRequest<?> actionRequest : bulkRequest.requests) {
assert actionRequest != null : "Requests cannot be null in simulate mode";
assert actionRequest instanceof IndexRequest
: "Only IndexRequests are supported in simulate mode, but got " + actionRequest.getClass();
if (actionRequest != null) {
IndexRequest indexRequest = (IndexRequest) actionRequest;
String indexName = indexRequest.index();
if (indexName != null) {
simulatedMetadataBuilder.remove(indexName);
simulatedMetadataBuilder.removeDataStream(indexName);
}
}
}
metadata = simulatedMetadataBuilder.build();
} else {
metadata = clusterService.state().getMetadata();
}

for (DocWriteRequest<?> actionRequest : bulkRequest.requests) {
IndexRequest indexRequest = getIndexWriteRequest(actionRequest);
if (indexRequest != null) {
IngestService.resolvePipelinesAndUpdateIndexRequest(actionRequest, indexRequest, metadata, templateSubstitutions);
IngestService.resolvePipelinesAndUpdateIndexRequest(actionRequest, indexRequest, metadata);
hasIndexRequestsWithPipelines |= IngestService.hasPipeline(indexRequest);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.elasticsearch.xcontent.NamedXContentRegistry;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -197,12 +198,33 @@ private Exception validateMappings(Map<String, ComponentTemplate> componentTempl
* path for when the index does not exist). And it does not deal with system indices since we do not intend for users to
* simulate writing to system indices.
*/
// First, we remove the index from the cluster state if necessary (since we're going to use the templates)
ClusterState simulatedState = indexAbstraction == null
? state
: new ClusterState.Builder(state).metadata(Metadata.builder(state.metadata()).remove(request.index()).build()).build();
ClusterState.Builder simulatedClusterStateBuilder = new ClusterState.Builder(state);
Metadata.Builder simulatedMetadata = Metadata.builder(state.metadata());
if (indexAbstraction != null) {
/*
* We remove the index or data stream from the cluster state so that we are forced to fall back to the templates to get
* mappings.
*/
String indexRequest = request.index();
assert indexRequest != null : "Index requests cannot be null in a simulate bulk call";
if (indexRequest != null) {
simulatedMetadata.remove(indexRequest);
simulatedMetadata.removeDataStream(indexRequest);
}
}
if (componentTemplateSubstitutions.isEmpty() == false) {
/*
* We put the template substitutions into the cluster state. If they have the same name as an existing one, the
* existing one is replaced.
*/
Map<String, ComponentTemplate> updatedComponentTemplates = new HashMap<>();
updatedComponentTemplates.putAll(state.metadata().componentTemplates());
updatedComponentTemplates.putAll(componentTemplateSubstitutions);
simulatedMetadata.componentTemplates(updatedComponentTemplates);
}
ClusterState simulatedState = simulatedClusterStateBuilder.metadata(simulatedMetadata).build();

String matchingTemplate = findV2Template(state.metadata(), request.index(), false);
String matchingTemplate = findV2Template(simulatedState.metadata(), request.index(), false);
if (matchingTemplate != null) {
final Template template = TransportSimulateIndexTemplateAction.resolveTemplate(
matchingTemplate,
Expand All @@ -212,8 +234,7 @@ private Exception validateMappings(Map<String, ComponentTemplate> componentTempl
xContentRegistry,
indicesService,
systemIndices,
indexSettingProviders,
componentTemplateSubstitutions
indexSettingProviders
);
CompressedXContent mappings = template.mappings();
if (mappings != null) {
Expand Down Expand Up @@ -247,7 +268,7 @@ private Exception validateMappings(Map<String, ComponentTemplate> componentTempl
});
}
} else {
List<IndexTemplateMetadata> matchingTemplates = findV1Templates(state.metadata(), request.index(), false);
List<IndexTemplateMetadata> matchingTemplates = findV1Templates(simulatedState.metadata(), request.index(), false);
final Map<String, Object> mappingsMap = MetadataCreateIndexService.parseV1Mappings(
"{}",
matchingTemplates.stream().map(IndexTemplateMetadata::getMappings).collect(toList()),
Expand Down
Loading