Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.template.put.PutComponentTemplateAction;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.SimulateIndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.cluster.metadata.ComponentTemplate;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.Template;
import org.elasticsearch.common.compress.CompressedXContent;
Expand Down Expand Up @@ -57,7 +59,7 @@ public void testMappingValidationIndexExists() {
}
""";
indicesAdmin().create(new CreateIndexRequest(indexName).mapping(mapping)).actionGet();
BulkRequest bulkRequest = new BulkRequest();
BulkRequest bulkRequest = new SimulateBulkRequest(Map.of(), Map.of());
bulkRequest.add(new IndexRequest(indexName).source("""
{
"foo1": "baz"
Expand Down Expand Up @@ -87,13 +89,125 @@ public void testMappingValidationIndexExists() {
assertThat(fields.size(), equalTo(1));
}

@SuppressWarnings("unchecked")
public void testMappingValidationIndexExistsWithComponentTemplate() throws IOException {
/*
* This test simulates a BulkRequest of two documents into an existing index. Then we make sure the index contains no documents, and
* that the index's mapping in the cluster state has not been updated with the two new field. With the mapping from the template
* that was used to create the index, we would expect the second document to throw an exception because it uses a field that does
* not exist. But we substitute a new version of the component template named "test-component-template" that allows for the new
* field.
*/
String originalComponentTemplateMappingString = """
{
"_doc":{
"dynamic":"strict",
"properties":{
"foo1":{
"type":"text"
}
}
}
}
""";
CompressedXContent mapping = CompressedXContent.fromJSON(originalComponentTemplateMappingString);
Template template = new Template(Settings.EMPTY, mapping, null);
PutComponentTemplateAction.Request componentTemplateActionRequest = new PutComponentTemplateAction.Request(
"test-component-template"
);
ComponentTemplate componentTemplate = new ComponentTemplate(template, null, null);
componentTemplateActionRequest.componentTemplate(componentTemplate);
client().execute(PutComponentTemplateAction.INSTANCE, componentTemplateActionRequest).actionGet();
ComposableIndexTemplate composableIndexTemplate = ComposableIndexTemplate.builder()
.indexPatterns(List.of("my-index-*"))
.componentTemplates(List.of("test-component-template"))
.build();
TransportPutComposableIndexTemplateAction.Request request = new TransportPutComposableIndexTemplateAction.Request("test");
request.indexTemplate(composableIndexTemplate);
client().execute(TransportPutComposableIndexTemplateAction.TYPE, request).actionGet();

String indexName = "my-index-1";
// First, run before the index is created:
assertMappingsUpdatedFromComponentTemplateSubstitutions(indexName);
// Now, create the index and make sure the component template substitutions work the same:
indicesAdmin().create(new CreateIndexRequest(indexName)).actionGet();
assertMappingsUpdatedFromComponentTemplateSubstitutions(indexName);
// Now make sure nothing was actually changed:
indicesAdmin().refresh(new RefreshRequest(indexName)).actionGet();
SearchResponse searchResponse = client().search(new SearchRequest(indexName)).actionGet();
assertThat(searchResponse.getHits().getTotalHits().value, equalTo(0L));
searchResponse.decRef();
ClusterStateResponse clusterStateResponse = admin().cluster().state(new ClusterStateRequest(TEST_REQUEST_TIMEOUT)).actionGet();
Map<String, Object> indexMapping = clusterStateResponse.getState().metadata().index(indexName).mapping().sourceAsMap();
Map<String, Object> fields = (Map<String, Object>) indexMapping.get("properties");
assertThat(fields.size(), equalTo(1));
}

private void assertMappingsUpdatedFromComponentTemplateSubstitutions(String indexName) {
IndexRequest indexRequest1 = new IndexRequest(indexName).source("""
{
"foo1": "baz"
}
""", XContentType.JSON).id(randomUUID());
IndexRequest indexRequest2 = new IndexRequest(indexName).source("""
{
"foo3": "baz"
}
""", XContentType.JSON).id(randomUUID());
{
// First we use the original component template, and expect a failure in the second document:
BulkRequest bulkRequest = new SimulateBulkRequest(Map.of(), Map.of());
bulkRequest.add(indexRequest1);
bulkRequest.add(indexRequest2);
BulkResponse response = client().execute(new ActionType<BulkResponse>(SimulateBulkAction.NAME), bulkRequest).actionGet();
assertThat(response.getItems().length, equalTo(2));
assertThat(response.getItems()[0].getResponse().getResult(), equalTo(DocWriteResponse.Result.CREATED));
assertNull(((SimulateIndexResponse) response.getItems()[0].getResponse()).getException());
assertThat(response.getItems()[1].getResponse().getResult(), equalTo(DocWriteResponse.Result.CREATED));
assertThat(
((SimulateIndexResponse) response.getItems()[1].getResponse()).getException().getMessage(),
containsString("mapping set to strict, dynamic introduction of")
);
}

{
// Now we substitute a "test-component-template" that defines both fields, so we expect no exception:
BulkRequest bulkRequest = new SimulateBulkRequest(
Map.of(),
Map.of(
"test-component-template",
Map.of(
"template",
Map.of(
"mappings",
Map.of(
"dynamic",
"strict",
"properties",
Map.of("foo1", Map.of("type", "text"), "foo3", Map.of("type", "text"))
)
)
)
)
);
bulkRequest.add(indexRequest1);
bulkRequest.add(indexRequest2);
BulkResponse response = client().execute(new ActionType<BulkResponse>(SimulateBulkAction.NAME), bulkRequest).actionGet();
assertThat(response.getItems().length, equalTo(2));
assertThat(response.getItems()[0].getResponse().getResult(), equalTo(DocWriteResponse.Result.CREATED));
assertNull(((SimulateIndexResponse) response.getItems()[0].getResponse()).getException());
assertThat(response.getItems()[1].getResponse().getResult(), equalTo(DocWriteResponse.Result.CREATED));
assertNull(((SimulateIndexResponse) response.getItems()[1].getResponse()).getException());
}
}

public void testMappingValidationIndexDoesNotExistsNoTemplate() {
/*
* This test simulates a BulkRequest of two documents into an index that does not exist. There is no template (other than the
* mapping-less "random-index-template" created by the parent class), so we expect no mapping validation failure.
*/
String indexName = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
BulkRequest bulkRequest = new BulkRequest();
BulkRequest bulkRequest = new SimulateBulkRequest(Map.of(), Map.of());
bulkRequest.add(new IndexRequest(indexName).source("""
{
"foo1": "baz"
Expand Down Expand Up @@ -140,7 +254,7 @@ public void testMappingValidationIndexDoesNotExistsV2Template() throws IOExcepti
request.indexTemplate(composableIndexTemplate);

client().execute(TransportPutComposableIndexTemplateAction.TYPE, request).actionGet();
BulkRequest bulkRequest = new BulkRequest();
BulkRequest bulkRequest = new SimulateBulkRequest(Map.of(), Map.of());
bulkRequest.add(new IndexRequest(indexName).source("""
{
"foo1": "baz"
Expand Down Expand Up @@ -172,7 +286,7 @@ public void testMappingValidationIndexDoesNotExistsV1Template() {
indicesAdmin().putTemplate(
new PutIndexTemplateRequest("test-template").patterns(List.of("my-index-*")).mapping("foo1", "type=integer")
).actionGet();
BulkRequest bulkRequest = new BulkRequest();
BulkRequest bulkRequest = new SimulateBulkRequest(Map.of(), Map.of());
bulkRequest.add(new IndexRequest(indexName).source("""
{
"foo1": "baz"
Expand Down Expand Up @@ -226,7 +340,7 @@ public void testMappingValidationIndexDoesNotExistsDataStream() throws IOExcepti
client().execute(TransportPutComposableIndexTemplateAction.TYPE, request).actionGet();
{
// First, try with no @timestamp to make sure we're picking up data-stream-specific templates
BulkRequest bulkRequest = new BulkRequest();
BulkRequest bulkRequest = new SimulateBulkRequest(Map.of(), Map.of());
bulkRequest.add(new IndexRequest(indexName).source("""
{
"foo1": "baz"
Expand All @@ -252,7 +366,7 @@ public void testMappingValidationIndexDoesNotExistsDataStream() throws IOExcepti
}
{
// Now with @timestamp
BulkRequest bulkRequest = new BulkRequest();
BulkRequest bulkRequest = new SimulateBulkRequest(Map.of(), Map.of());
bulkRequest.add(new IndexRequest(indexName).source("""
{
"@timestamp": "2024-08-27",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.AliasMetadata;
import org.elasticsearch.cluster.metadata.ComponentTemplate;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.DataStreamLifecycle;
Expand Down Expand Up @@ -156,7 +157,8 @@ protected void masterOperation(
xContentRegistry,
indicesService,
systemIndices,
indexSettingProviders
indexSettingProviders,
Map.of()
);

final Map<String, List<String>> overlapping = new HashMap<>();
Expand Down Expand Up @@ -233,7 +235,8 @@ public static Template resolveTemplate(
final NamedXContentRegistry xContentRegistry,
final IndicesService indicesService,
final SystemIndices systemIndices,
Set<IndexSettingProvider> indexSettingProviders
Set<IndexSettingProvider> indexSettingProviders,
Map<String, ComponentTemplate> componentTemplateSubstitutions
) throws Exception {
var metadata = simulatedState.getMetadata();
Settings templateSettings = resolveSettings(simulatedState.metadata(), matchingTemplate);
Expand Down Expand Up @@ -263,6 +266,7 @@ public static Template resolveTemplate(
null, // empty request mapping as the user can't specify any explicit mappings via the simulate api
simulatedState,
matchingTemplate,
componentTemplateSubstitutions,
xContentRegistry,
simulatedIndexName
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,8 @@ protected void masterOperation(
xContentRegistry,
indicesService,
systemIndices,
indexSettingProviders
indexSettingProviders,
Map.of()
);
if (request.includeDefaults()) {
listener.onResponse(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.ComponentTemplate;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.Writeable;
Expand All @@ -39,6 +40,8 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -168,19 +171,21 @@ public void onTimeout(TimeValue timeout) {
private void forkAndExecute(Task task, BulkRequest bulkRequest, Executor executor, ActionListener<BulkResponse> releasingListener) {
executor.execute(new ActionRunnable<>(releasingListener) {
@Override
protected void doRun() {
protected void doRun() throws IOException {
applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, releasingListener);
}
});
}

private boolean applyPipelines(Task task, BulkRequest bulkRequest, Executor executor, ActionListener<BulkResponse> listener) {
private boolean applyPipelines(Task task, BulkRequest bulkRequest, Executor executor, ActionListener<BulkResponse> listener)
throws IOException {
boolean hasIndexRequestsWithPipelines = false;
final Metadata metadata = clusterService.state().getMetadata();
Map<String, ComponentTemplate> templateSubstitutions = bulkRequest.getComponentTemplateSubstitutions();
for (DocWriteRequest<?> actionRequest : bulkRequest.requests) {
IndexRequest indexRequest = getIndexWriteRequest(actionRequest);
if (indexRequest != null) {
IngestService.resolvePipelinesAndUpdateIndexRequest(actionRequest, indexRequest, metadata);
IngestService.resolvePipelinesAndUpdateIndexRequest(actionRequest, indexRequest, metadata, templateSubstitutions);
hasIndexRequestsWithPipelines |= IngestService.hasPipeline(indexRequest);
}

Expand Down Expand Up @@ -250,7 +255,7 @@ private void processBulkIndexIngestRequest(
} else {
ActionRunnable<BulkResponse> runnable = new ActionRunnable<>(actionListener) {
@Override
protected void doRun() {
protected void doRun() throws IOException {
applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, actionListener);
}

Expand Down Expand Up @@ -328,7 +333,7 @@ private void applyPipelinesAndDoInternalExecute(
BulkRequest bulkRequest,
Executor executor,
ActionListener<BulkResponse> listener
) {
) throws IOException {
final long relativeStartTimeNanos = relativeTimeNanos();
if (applyPipelines(task, bulkRequest, executor, listener) == false) {
doInternalExecute(task, bulkRequest, executor, listener, relativeStartTimeNanos);
Expand All @@ -349,6 +354,6 @@ protected abstract void doInternalExecute(
Executor executor,
ActionListener<BulkResponse> listener,
long relativeStartTimeNanos
);
) throws IOException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
Expand Down Expand Up @@ -208,7 +209,11 @@ protected void doInternalExecute(
Executor executor,
ActionListener<BulkResponse> listener,
long relativeStartTimeNanos
) {
) throws IOException {
assert (bulkRequest instanceof SimulateBulkRequest) == false
: "TransportBulkAction should never be called with a SimulateBulkRequest";
assert bulkRequest.getComponentTemplateSubstitutions().isEmpty()
: "Component template substitutions are not allowed in a non-simulated bulk";
trackIndexRequests(bulkRequest);
Map<String, CreateIndexRequest> indicesToAutoCreate = new HashMap<>();
Set<String> dataStreamsToBeRolledOver = new HashSet<>();
Expand Down
Loading