Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.AliasMetadata;
import org.elasticsearch.cluster.metadata.ComponentTemplate;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.DataStreamLifecycle;
Expand Down Expand Up @@ -157,8 +156,7 @@ protected void masterOperation(
xContentRegistry,
indicesService,
systemIndices,
indexSettingProviders,
Map.of()
indexSettingProviders
);

final Map<String, List<String>> overlapping = new HashMap<>();
Expand Down Expand Up @@ -235,8 +233,7 @@ public static Template resolveTemplate(
final NamedXContentRegistry xContentRegistry,
final IndicesService indicesService,
final SystemIndices systemIndices,
Set<IndexSettingProvider> indexSettingProviders,
Map<String, ComponentTemplate> componentTemplateSubstitutions
Set<IndexSettingProvider> indexSettingProviders
) throws Exception {
var metadata = simulatedState.getMetadata();
Settings templateSettings = resolveSettings(simulatedState.metadata(), matchingTemplate);
Expand Down Expand Up @@ -266,7 +263,6 @@ public static Template resolveTemplate(
null, // empty request mapping as the user can't specify any explicit mappings via the simulate api
simulatedState,
matchingTemplate,
componentTemplateSubstitutions,
xContentRegistry,
simulatedIndexName
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,7 @@ protected void masterOperation(
xContentRegistry,
indicesService,
systemIndices,
indexSettingProviders,
Map.of()
indexSettingProviders
);
if (request.includeDefaults()) {
listener.onResponse(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -180,12 +181,34 @@ protected void doRun() throws IOException {
private boolean applyPipelines(Task task, BulkRequest bulkRequest, Executor executor, ActionListener<BulkResponse> listener)
throws IOException {
boolean hasIndexRequestsWithPipelines = false;
final Metadata metadata = clusterService.state().getMetadata();
Map<String, ComponentTemplate> templateSubstitutions = bulkRequest.getComponentTemplateSubstitutions();
final Metadata metadata;
Map<String, ComponentTemplate> componentTemplateSubstitutions = bulkRequest.getComponentTemplateSubstitutions();
if (bulkRequest.isSimulated() && componentTemplateSubstitutions.isEmpty() == false) {
/*
* If this is a simulated request, and there are template substitutions, then we want to create and use a new metadata that has
* those templates.
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it will be really helpful to refer here to the methods that require this to work properly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a bit here. Let me know if there's something else that would be helpful.

Metadata.Builder simulatedMetadataBuilder = Metadata.builder(clusterService.state().getMetadata());
if (componentTemplateSubstitutions.isEmpty() == false) {
Map<String, ComponentTemplate> updatedComponentTemplates = new HashMap<>();
updatedComponentTemplates.putAll(clusterService.state().metadata().componentTemplates());
updatedComponentTemplates.putAll(componentTemplateSubstitutions);
simulatedMetadataBuilder.componentTemplates(updatedComponentTemplates);
}
// We now remove the index from the simulated metadata to force the templates to be used
for (DocWriteRequest<?> actionRequest : bulkRequest.requests) {
IndexRequest indexRequest = getIndexWriteRequest(actionRequest);
simulatedMetadataBuilder.remove(indexRequest.index());
}
metadata = simulatedMetadataBuilder.build();
} else {
metadata = clusterService.state().getMetadata();
}

for (DocWriteRequest<?> actionRequest : bulkRequest.requests) {
IndexRequest indexRequest = getIndexWriteRequest(actionRequest);
if (indexRequest != null) {
IngestService.resolvePipelinesAndUpdateIndexRequest(actionRequest, indexRequest, metadata, templateSubstitutions);
IngestService.resolvePipelinesAndUpdateIndexRequest(actionRequest, indexRequest, metadata);
hasIndexRequestsWithPipelines |= IngestService.hasPipeline(indexRequest);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.elasticsearch.xcontent.NamedXContentRegistry;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -197,12 +198,22 @@ private Exception validateMappings(Map<String, ComponentTemplate> componentTempl
* path for when the index does not exist). And it does not deal with system indices since we do not intend for users to
* simulate writing to system indices.
*/
// First, we remove the index from the cluster state if necessary (since we're going to use the templates)
ClusterState simulatedState = indexAbstraction == null
? state
: new ClusterState.Builder(state).metadata(Metadata.builder(state.metadata()).remove(request.index()).build()).build();

String matchingTemplate = findV2Template(state.metadata(), request.index(), false);
ClusterState.Builder simulatedClusterStateBuilder = new ClusterState.Builder(state);
Metadata.Builder simulatedMetadata = Metadata.builder(state.metadata());
if (indexAbstraction != null) {
// We remove the index from the cluster state if necessary (since we're going to use the templates)
simulatedMetadata.remove(request.index());
}
if (componentTemplateSubstitutions.isEmpty() == false) {
Map<String, ComponentTemplate> updatedComponentTemplates = new HashMap<>();
updatedComponentTemplates.putAll(state.metadata().componentTemplates());
updatedComponentTemplates.putAll(componentTemplateSubstitutions);
simulatedMetadata.componentTemplates(updatedComponentTemplates);
}
ClusterState simulatedState = simulatedClusterStateBuilder.metadata(simulatedMetadata).build();

String matchingTemplate = findV2Template(simulatedState.metadata(), request.index(), false);
if (matchingTemplate != null) {
final Template template = TransportSimulateIndexTemplateAction.resolveTemplate(
matchingTemplate,
Expand All @@ -212,8 +223,7 @@ private Exception validateMappings(Map<String, ComponentTemplate> componentTempl
xContentRegistry,
indicesService,
systemIndices,
indexSettingProviders,
componentTemplateSubstitutions
indexSettingProviders
);
CompressedXContent mappings = template.mappings();
if (mappings != null) {
Expand Down Expand Up @@ -247,7 +257,7 @@ private Exception validateMappings(Map<String, ComponentTemplate> componentTempl
});
}
} else {
List<IndexTemplateMetadata> matchingTemplates = findV1Templates(state.metadata(), request.index(), false);
List<IndexTemplateMetadata> matchingTemplates = findV1Templates(simulatedState.metadata(), request.index(), false);
final Map<String, Object> mappingsMap = MetadataCreateIndexService.parseV1Mappings(
"{}",
matchingTemplates.stream().map(IndexTemplateMetadata::getMappings).collect(toList()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,6 @@ private ClusterState applyCreateIndexRequestWithV2Template(
request.mappings(),
currentState,
templateName,
Map.of(),
xContentRegistry,
request.index()
);
Expand Down Expand Up @@ -824,7 +823,6 @@ private static List<CompressedXContent> collectSystemV2Mappings(
List<CompressedXContent> templateMappings = MetadataIndexTemplateService.collectMappings(
composableIndexTemplate,
componentTemplates,
Map.of(),
indexName
);
return collectV2Mappings(null, templateMappings, xContentRegistry);
Expand All @@ -834,16 +832,10 @@ public static List<CompressedXContent> collectV2Mappings(
@Nullable final String requestMappings,
final ClusterState currentState,
final String templateName,
Map<String, ComponentTemplate> componentTemplateSubstitutions,
final NamedXContentRegistry xContentRegistry,
final String indexName
) throws Exception {
List<CompressedXContent> templateMappings = MetadataIndexTemplateService.collectMappings(
currentState,
templateName,
componentTemplateSubstitutions,
indexName
);
List<CompressedXContent> templateMappings = MetadataIndexTemplateService.collectMappings(currentState, templateName, indexName);
return collectV2Mappings(requestMappings, templateMappings, xContentRegistry);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -698,7 +698,7 @@ private void validateIndexTemplateV2(String name, ComposableIndexTemplate indexT
final var now = Instant.now();
final var metadata = currentState.getMetadata();

final var combinedMappings = collectMappings(indexTemplate, metadata.componentTemplates(), Map.of(), "tmp_idx");
final var combinedMappings = collectMappings(indexTemplate, metadata.componentTemplates(), "tmp_idx");
final var combinedSettings = resolveSettings(indexTemplate, metadata.componentTemplates());
// First apply settings sourced from index setting providers:
for (var provider : indexSettingProviders) {
Expand Down Expand Up @@ -1341,12 +1341,7 @@ private static boolean isGlobalAndHasIndexHiddenSetting(Metadata metadata, Compo
/**
* Collect the given v2 template into an ordered list of mappings.
*/
public static List<CompressedXContent> collectMappings(
final ClusterState state,
final String templateName,
Map<String, ComponentTemplate> componentTemplateSubstitutions,
final String indexName
) {
public static List<CompressedXContent> collectMappings(final ClusterState state, final String templateName, final String indexName) {
final ComposableIndexTemplate template = state.metadata().templatesV2().get(templateName);
assert template != null
: "attempted to resolve mappings for a template [" + templateName + "] that did not exist in the cluster state";
Expand All @@ -1355,7 +1350,7 @@ public static List<CompressedXContent> collectMappings(
}

final Map<String, ComponentTemplate> componentTemplates = state.metadata().componentTemplates();
return collectMappings(template, componentTemplates, componentTemplateSubstitutions, indexName);
return collectMappings(template, componentTemplates, indexName);
}

/**
Expand All @@ -1364,7 +1359,6 @@ public static List<CompressedXContent> collectMappings(
public static List<CompressedXContent> collectMappings(
final ComposableIndexTemplate template,
final Map<String, ComponentTemplate> componentTemplates,
final Map<String, ComponentTemplate> componentTemplateSubstitutions,
final String indexName
) {
Objects.requireNonNull(template, "Composable index template must be provided");
Expand All @@ -1375,12 +1369,9 @@ public static List<CompressedXContent> collectMappings(
ComposableIndexTemplate.DataStreamTemplate.DATA_STREAM_MAPPING_SNIPPET
);
}
final Map<String, ComponentTemplate> combinedComponentTemplates = new HashMap<>();
combinedComponentTemplates.putAll(componentTemplates);
combinedComponentTemplates.putAll(componentTemplateSubstitutions);
List<CompressedXContent> mappings = template.composedOf()
.stream()
.map(combinedComponentTemplates::get)
.map(componentTemplates::get)
.filter(Objects::nonNull)
.map(ComponentTemplate::template)
.map(Template::mappings)
Expand Down Expand Up @@ -1716,7 +1707,7 @@ private static void validateCompositeTemplate(
String indexName = DataStream.BACKING_INDEX_PREFIX + temporaryIndexName;
// Parse mappings to ensure they are valid after being composed

List<CompressedXContent> mappings = collectMappings(stateWithIndex, templateName, Map.of(), indexName);
List<CompressedXContent> mappings = collectMappings(stateWithIndex, templateName, indexName);
try {
MapperService mapperService = tempIndexService.mapperService();
mapperService.merge(MapperService.SINGLE_MAPPING_NAME, mappings, MapperService.MergeReason.INDEX_TEMPLATE);
Expand Down
42 changes: 7 additions & 35 deletions server/src/main/java/org/elasticsearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.elasticsearch.cluster.ClusterStateApplier;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.metadata.ComponentTemplate;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexAbstraction;
import org.elasticsearch.cluster.metadata.IndexMetadata;
Expand Down Expand Up @@ -271,30 +270,14 @@ public static void resolvePipelinesAndUpdateIndexRequest(
final IndexRequest indexRequest,
final Metadata metadata
) {
resolvePipelinesAndUpdateIndexRequest(originalRequest, indexRequest, metadata, Map.of());
}

public static void resolvePipelinesAndUpdateIndexRequest(
final DocWriteRequest<?> originalRequest,
final IndexRequest indexRequest,
final Metadata metadata,
Map<String, ComponentTemplate> componentTemplateSubstitutions
) {
resolvePipelinesAndUpdateIndexRequest(
originalRequest,
indexRequest,
metadata,
System.currentTimeMillis(),
componentTemplateSubstitutions
);
resolvePipelinesAndUpdateIndexRequest(originalRequest, indexRequest, metadata, System.currentTimeMillis());
}

static void resolvePipelinesAndUpdateIndexRequest(
final DocWriteRequest<?> originalRequest,
final IndexRequest indexRequest,
final Metadata metadata,
final long epochMillis,
final Map<String, ComponentTemplate> componentTemplateSubstitutions
final long epochMillis
) {
if (indexRequest.isPipelineResolved()) {
return;
Expand All @@ -307,16 +290,9 @@ static void resolvePipelinesAndUpdateIndexRequest(
* we don't fall back to the existing index if we don't find any because it is possible the user has intentionally removed the
* pipeline.
*/
final Pipelines pipelines;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to update this comment and mention that in order of the simulation to work we would need to remove any existing index/data stream. I am thinking that now this can easily be missed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh good point! I had forgotten about this comment.

if (componentTemplateSubstitutions.isEmpty()) {
pipelines = resolvePipelinesFromMetadata(originalRequest, indexRequest, metadata, epochMillis) //
.or(() -> resolvePipelinesFromIndexTemplates(indexRequest, metadata, Map.of()))
.orElse(Pipelines.NO_PIPELINES_DEFINED);
} else {
pipelines = resolvePipelinesFromIndexTemplates(indexRequest, metadata, componentTemplateSubstitutions).orElse(
Pipelines.NO_PIPELINES_DEFINED
);
}
final Pipelines pipelines = resolvePipelinesFromMetadata(originalRequest, indexRequest, metadata, epochMillis).or(
() -> resolvePipelinesFromIndexTemplates(indexRequest, metadata)
).orElse(Pipelines.NO_PIPELINES_DEFINED);

// The pipeline coming as part of the request always has priority over the resolved one from metadata or templates
String requestPipeline = indexRequest.getPipeline();
Expand Down Expand Up @@ -1466,11 +1442,7 @@ private static Optional<Pipelines> resolvePipelinesFromMetadata(
return Optional.of(new Pipelines(IndexSettings.DEFAULT_PIPELINE.get(settings), IndexSettings.FINAL_PIPELINE.get(settings)));
}

private static Optional<Pipelines> resolvePipelinesFromIndexTemplates(
IndexRequest indexRequest,
Metadata metadata,
Map<String, ComponentTemplate> componentTemplateSubstitutions
) {
private static Optional<Pipelines> resolvePipelinesFromIndexTemplates(IndexRequest indexRequest, Metadata metadata) {
if (indexRequest.index() == null) {
return Optional.empty();
}
Expand All @@ -1480,7 +1452,7 @@ private static Optional<Pipelines> resolvePipelinesFromIndexTemplates(
// precedence), or if a V2 template does not match, any V1 templates
String v2Template = MetadataIndexTemplateService.findV2Template(metadata, indexRequest.index(), false);
if (v2Template != null) {
final Settings settings = MetadataIndexTemplateService.resolveSettings(metadata, v2Template, componentTemplateSubstitutions);
final Settings settings = MetadataIndexTemplateService.resolveSettings(metadata, v2Template);
return Optional.of(new Pipelines(IndexSettings.DEFAULT_PIPELINE.get(settings), IndexSettings.FINAL_PIPELINE.get(settings)));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,7 @@ public Settings getAdditionalIndexSettings(
xContentRegistry(),
indicesService,
systemIndices,
indexSettingsProviders,
Map.of()
indexSettingsProviders
);

assertThat(resolvedTemplate.settings().getAsInt("test-setting", -1), is(1));
Expand Down
Loading