diff --git a/docs/changelog/126344.yaml b/docs/changelog/126344.yaml new file mode 100644 index 0000000000000..de6ddfea94bf2 --- /dev/null +++ b/docs/changelog/126344.yaml @@ -0,0 +1,5 @@ +pr: 126344 +summary: Experiment setting settings on data stream +area: Data streams +type: enhancement +issues: [] diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java index 1a2b68ef3b59a..915eeb08035e6 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java @@ -16,6 +16,7 @@ import org.elasticsearch.action.datastreams.GetDataStreamAction; import org.elasticsearch.action.datastreams.MigrateToDataStreamAction; import org.elasticsearch.action.datastreams.ModifyDataStreamsAction; +import org.elasticsearch.action.datastreams.PostDataStreamAction; import org.elasticsearch.action.datastreams.PromoteDataStreamAction; import org.elasticsearch.action.datastreams.lifecycle.ExplainDataStreamLifecycleAction; import org.elasticsearch.action.datastreams.lifecycle.GetDataStreamLifecycleAction; @@ -37,6 +38,7 @@ import org.elasticsearch.datastreams.action.DeleteDataStreamTransportAction; import org.elasticsearch.datastreams.action.MigrateToDataStreamTransportAction; import org.elasticsearch.datastreams.action.ModifyDataStreamsTransportAction; +import org.elasticsearch.datastreams.action.PostDataStreamTransportAction; import org.elasticsearch.datastreams.action.PromoteDataStreamTransportAction; import org.elasticsearch.datastreams.action.TransportGetDataStreamsAction; import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleErrorStore; @@ -70,6 +72,7 @@ import org.elasticsearch.datastreams.rest.RestGetDataStreamsAction; import org.elasticsearch.datastreams.rest.RestMigrateToDataStreamAction; import org.elasticsearch.datastreams.rest.RestModifyDataStreamsAction; +import org.elasticsearch.datastreams.rest.RestPostDataStreamAction; import org.elasticsearch.datastreams.rest.RestPromoteDataStreamAction; import org.elasticsearch.features.NodeFeature; import org.elasticsearch.health.HealthIndicatorService; @@ -226,6 +229,7 @@ public Collection createComponents(PluginServices services) { public List getActions() { List actions = new ArrayList<>(); actions.add(new ActionHandler(CreateDataStreamAction.INSTANCE, CreateDataStreamTransportAction.class)); + actions.add(new ActionHandler(PostDataStreamAction.INSTANCE, PostDataStreamTransportAction.class)); actions.add(new ActionHandler(DeleteDataStreamAction.INSTANCE, DeleteDataStreamTransportAction.class)); actions.add(new ActionHandler(GetDataStreamAction.INSTANCE, TransportGetDataStreamsAction.class)); actions.add(new ActionHandler(DataStreamsStatsAction.INSTANCE, DataStreamsStatsTransportAction.class)); @@ -263,6 +267,7 @@ public List getRestHandlers( }); List handlers = new ArrayList<>(); handlers.add(new RestCreateDataStreamAction()); + handlers.add(new RestPostDataStreamAction()); handlers.add(new RestDeleteDataStreamAction()); handlers.add(new RestGetDataStreamsAction()); handlers.add(new RestDataStreamsStatsAction()); diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/PostDataStreamTransportAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/PostDataStreamTransportAction.java new file mode 100644 index 0000000000000..d8c4a12f46d30 --- /dev/null +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/PostDataStreamTransportAction.java @@ -0,0 +1,333 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ +package org.elasticsearch.datastreams.action; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsClusterStateUpdateRequest; +import org.elasticsearch.action.datastreams.PostDataStreamAction; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.CountDownActionListener; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.MetadataDataStreamsService; +import org.elasticsearch.cluster.metadata.MetadataUpdateSettingsService; +import org.elasticsearch.cluster.project.ProjectResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.Index; +import org.elasticsearch.indices.SystemIndices; +import org.elasticsearch.injection.guice.Inject; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class PostDataStreamTransportAction extends TransportMasterNodeAction { + private static final Logger logger = LogManager.getLogger(PostDataStreamTransportAction.class); + private static final Set APPLY_TO_BACKING_INDICES = Set.of("index.number_of_replicas", "index.lifecycle.name"); + private final MetadataDataStreamsService metadataDataStreamsService; + private final MetadataUpdateSettingsService updateSettingsService; + private final IndexNameExpressionResolver indexNameExpressionResolver; + private final SystemIndices systemIndices; + private final ProjectResolver projectResolver; + + @Inject + public PostDataStreamTransportAction( + TransportService transportService, + ClusterService clusterService, + ThreadPool threadPool, + ActionFilters actionFilters, + ProjectResolver projectResolver, + MetadataDataStreamsService metadataDataStreamsService, + MetadataUpdateSettingsService updateSettingsService, + IndexNameExpressionResolver indexNameExpressionResolver, + SystemIndices systemIndices + ) { + super( + PostDataStreamAction.NAME, + transportService, + clusterService, + threadPool, + actionFilters, + PostDataStreamAction.Request::new, + PostDataStreamAction.Response::new, + EsExecutors.DIRECT_EXECUTOR_SERVICE + ); + this.projectResolver = projectResolver; + this.metadataDataStreamsService = metadataDataStreamsService; + this.updateSettingsService = updateSettingsService; + this.indexNameExpressionResolver = indexNameExpressionResolver; + this.systemIndices = systemIndices; + } + + @Override + protected void masterOperation( + Task task, + PostDataStreamAction.Request request, + ClusterState state, + ActionListener listener + ) throws Exception { + String dataStreamNamePattern = request.getName(); + List dataStreamNames = indexNameExpressionResolver.dataStreamNames( + clusterService.state(), + IndicesOptions.DEFAULT, + dataStreamNamePattern + ); + List dataStreamResponses = new ArrayList<>(); + CountDownActionListener countDownListener = new CountDownActionListener(dataStreamNames.size(), new ActionListener<>() { + @Override + public void onResponse(Void unused) { + listener.onResponse(new PostDataStreamAction.Response(dataStreamResponses)); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); // TODO ? + } + }); + // TODO there's a much better way + for (String dataStreamName : dataStreamNames) { + updateSingleDataStream( + dataStreamName, + request.getTemplateOverrides(), + request.masterNodeTimeout(), + request.ackTimeout(), + new ActionListener<>() { + @Override + public void onResponse(PostDataStreamAction.DataStreamResponse dataStreamResponse) { + dataStreamResponses.add(dataStreamResponse); + countDownListener.onResponse(null); + } + + @Override + public void onFailure(Exception e) { + // TODO this is not actually correct at all: + dataStreamResponses.add( + new PostDataStreamAction.DataStreamResponse(dataStreamName, false, e.getMessage(), null, List.of()) + ); + countDownListener.onResponse(null); + } + } + ); + } + } + + private void updateSingleDataStream( + String dataStreamName, + ComposableIndexTemplate templateOverrides, + TimeValue masterNodeTimeout, + TimeValue ackTimeout, + ActionListener listener + ) { + if (systemIndices.isSystemDataStream(dataStreamName)) { + listener.onResponse( + new PostDataStreamAction.DataStreamResponse(dataStreamName, false, "Cannot update a system data stream", null, List.of()) + ); + return; + } + metadataDataStreamsService.updateTemplateOverrides( + projectResolver, + masterNodeTimeout, + ackTimeout, + dataStreamName, + templateOverrides, + new ActionListener<>() { + @Override + public void onResponse(AcknowledgedResponse acknowledgedResponse) { + if (acknowledgedResponse.isAcknowledged()) { + updateSettingsOnIndices(dataStreamName, templateOverrides, masterNodeTimeout, ackTimeout, listener); + } else { + listener.onResponse( + new PostDataStreamAction.DataStreamResponse( + dataStreamName, + false, + "Updating template overrides not accepted for unknown reasons", + null, + List.of() + ) + ); + } + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + } + ); + } + + private void updateSettingsOnIndices( + String dataStreamName, + ComposableIndexTemplate templateOverrides, + TimeValue masterNodeTimeout, + TimeValue ackTimeout, + ActionListener listener + ) { + final List concreteIndices = clusterService.state() + .projectState(projectResolver.getProjectId()) + .metadata() + .dataStreams() + .get(dataStreamName) + .getIndices(); + final Settings requestSettings = templateOverrides.template().settings(); + final List indexSettingResults = new ArrayList<>(); + CountDownActionListener settingCountDownListener = new CountDownActionListener(requestSettings.size() + 1, new ActionListener<>() { + // Called once all settings are completed for all indices + @Override + public void onResponse(Void unused) { + ComposableIndexTemplate effectiveIndexTemplate = clusterService.state() + .projectState(projectResolver.getProjectId()) + .metadata() + .dataStreams() + .get(dataStreamName) + .getEffectiveIndexTemplate(clusterService.state().projectState(projectResolver.getProjectId()).metadata()); + listener.onResponse( + new PostDataStreamAction.DataStreamResponse(dataStreamName, true, null, effectiveIndexTemplate, indexSettingResults) + ); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }); + settingCountDownListener.onResponse(null); // handles the case when there were zero settings + for (String setting : requestSettings.keySet()) { + updateSingleSettingOnIndices( + setting, + requestSettings.get(setting), + concreteIndices, + masterNodeTimeout, + ackTimeout, + new ActionListener() { + @Override + public void onResponse(PostDataStreamAction.DataStreamResponse.IndexSettingResult indexSettingResult) { + indexSettingResults.add(indexSettingResult); + settingCountDownListener.onResponse(null); + } + + @Override + public void onFailure(Exception e) { + settingCountDownListener.onFailure(e); + } + } + ); + } + } + + private void updateSingleSettingOnIndices( + String setting, + Object settingValue, + List concreteIndices, + TimeValue masterNodeTimeout, + TimeValue ackTimeout, + ActionListener listener + ) { + if (APPLY_TO_BACKING_INDICES.contains(setting)) { + final List errors = new ArrayList<>(); + CountDownActionListener indexCountDownListener = new CountDownActionListener( + concreteIndices.size() + 1, + new ActionListener<>() { + // Called when all indices for a single setting are complete + @Override + public void onResponse(Void unused) { + listener.onResponse(new PostDataStreamAction.DataStreamResponse.IndexSettingResult(setting, true, errors)); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + } + ); + indexCountDownListener.onResponse(null); // handles the case where there were zero indices + for (Index index : concreteIndices) { + updateSingleSettingForSingleIndex(setting, settingValue, index, masterNodeTimeout, ackTimeout, new ActionListener<>() { + // Called when a single setting for a single index is complete + @Override + public void onResponse(PostDataStreamAction.DataStreamResponse.IndexSettingError response) { + if (response != null) { + errors.add(response); + } + indexCountDownListener.onResponse(null); + } + + @Override + public void onFailure(Exception e) { + indexCountDownListener.onFailure(e); + } + }); + } + } else { + // This is not a setting that we will apply to backing indices + listener.onResponse(new PostDataStreamAction.DataStreamResponse.IndexSettingResult(setting, false, List.of())); + } + } + + private void updateSingleSettingForSingleIndex( + String settingName, + Object settingValue, + Index index, + TimeValue masterNodeTimeout, + TimeValue ackTimeout, + ActionListener listener + ) { + updateSettingsService.updateSettings( + new UpdateSettingsClusterStateUpdateRequest( + projectResolver.getProjectId(), + masterNodeTimeout, + ackTimeout, + Settings.builder().loadFromMap(Map.of(settingName, settingValue)).build(), + UpdateSettingsClusterStateUpdateRequest.OnExisting.OVERWRITE, + UpdateSettingsClusterStateUpdateRequest.OnStaticSetting.REOPEN_INDICES, + index + ), + new ActionListener<>() { + @Override + public void onResponse(AcknowledgedResponse response) { + PostDataStreamAction.DataStreamResponse.IndexSettingError error; + if (response.isAcknowledged() == false) { + error = new PostDataStreamAction.DataStreamResponse.IndexSettingError( + index.getName(), + "Updating setting not acknowledged for unknown reason" + ); + } else { + error = null; + } + listener.onResponse(error); + } + + @Override + public void onFailure(Exception e) { + listener.onResponse(new PostDataStreamAction.DataStreamResponse.IndexSettingError(index.getName(), e.getMessage())); + } + } + ); + } + + @Override + protected ClusterBlockException checkBlock(PostDataStreamAction.Request request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } +} diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportGetDataStreamsAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportGetDataStreamsAction.java index 1ffc82a263e42..a4eb3c42c5f9c 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportGetDataStreamsAction.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportGetDataStreamsAction.java @@ -258,19 +258,21 @@ static GetDataStreamAction.Response innerOperation( } } else { indexTemplate = MetadataIndexTemplateService.findV2Template(state.metadata(), dataStream.getName(), false); - if (indexTemplate != null) { - Settings settings = MetadataIndexTemplateService.resolveSettings(state.metadata(), indexTemplate); - ilmPolicyName = settings.get(IndexMetadata.LIFECYCLE_NAME); - if (indexMode == null && state.metadata().templatesV2().get(indexTemplate) != null) { - indexMode = resolveMode( - state, - indexSettingProviders, - dataStream, - settings, - state.metadata().templatesV2().get(indexTemplate) - ); + if (indexTemplate != null) { // TODO hack + ComposableIndexTemplate effectiveTemplate = dataStream.getEffectiveIndexTemplate(state.metadata()); + if (effectiveTemplate != null && effectiveTemplate.template() != null) { + Settings settings; + if (effectiveTemplate.template() != null && effectiveTemplate.template().settings() != null) { + settings = effectiveTemplate.template().settings(); + } else { + settings = Settings.EMPTY; + } + ilmPolicyName = settings.get(IndexMetadata.LIFECYCLE_NAME); + if (indexMode == null) { + indexMode = resolveMode(state, indexSettingProviders, dataStream, settings, effectiveTemplate); + } + indexTemplatePreferIlmValue = PREFER_ILM_SETTING.get(settings); } - indexTemplatePreferIlmValue = PREFER_ILM_SETTING.get(settings); } else { LOGGER.warn( "couldn't find any matching template for data stream [{}]. has it been restored (and possibly renamed)" diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/rest/RestPostDataStreamAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/rest/RestPostDataStreamAction.java new file mode 100644 index 0000000000000..29dab44219abe --- /dev/null +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/rest/RestPostDataStreamAction.java @@ -0,0 +1,58 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ +package org.elasticsearch.datastreams.rest; + +import org.elasticsearch.action.datastreams.PostDataStreamAction; +import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.RestUtils; +import org.elasticsearch.rest.Scope; +import org.elasticsearch.rest.ServerlessScope; +import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.xcontent.XContentParser; + +import java.io.IOException; +import java.util.List; + +import static org.elasticsearch.rest.RestRequest.Method.POST; + +@ServerlessScope(Scope.PUBLIC) +public class RestPostDataStreamAction extends BaseRestHandler { + + @Override + public String getName() { + return "post_data_stream_action"; + } + + @Override + public List routes() { + return List.of(new Route(POST, "/_data_stream/{name}")); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + ComposableIndexTemplate indexTemplate; + if (request.hasContent()) { + try (XContentParser parser = request.contentOrSourceParamParser()) { + indexTemplate = ComposableIndexTemplate.parseNoIndexPatterns(parser); + } + } else { + indexTemplate = null; + } + PostDataStreamAction.Request putDataStreamRequest = new PostDataStreamAction.Request( + RestUtils.getMasterNodeTimeout(request), + RestUtils.getAckTimeout(request), + request.param("name"), + indexTemplate + ); + return channel -> client.execute(PostDataStreamAction.INSTANCE, putDataStreamRequest, new RestToXContentListener<>(channel)); + } +} diff --git a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/240_data_stream_overrides.yml b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/240_data_stream_overrides.yml new file mode 100644 index 0000000000000..783de4380395b --- /dev/null +++ b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/240_data_stream_overrides.yml @@ -0,0 +1,90 @@ +--- +setup: + - requires: + reason: "Data stream failure stores config in templates was added in 8.16+" + test_runner_features: [ capabilities, allowed_warnings ] + +--- +"Test single data stream": + - do: + allowed_warnings: + - "index template [my-template] has index patterns [data-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-template] will take precedence during new index creation" + indices.put_index_template: + name: my-template + body: + index_patterns: [ my-data-stream-* ] + data_stream: { } + template: + settings: + number_of_replicas: 0 + lifecycle.name: my-policy + + - do: + indices.create_data_stream: + name: my-data-stream-1 + + - do: + indices.rollover: + alias: "my-data-stream-1" + + - do: + cluster.health: + index: "my-data-stream-1" + wait_for_status: green + + - do: + indices.post_data_stream: + name: my-data-stream-1 + body: + template: + settings: + index: + number_of_shards: 2 + - match: { data_streams.0.name: my-data-stream-1 } + - match: { data_streams.0.applied_to_data_stream: true } + - match: { data_streams.0.index_settings_results.0.name: index.number_of_shards } + - match: { data_streams.0.index_settings_results.0.applied_to_backing_indices: false } + - match: { data_streams.0.effective_template.index_patterns.0: "my-data-stream-*" } + - match: { data_streams.0.effective_template.template.settings.index.number_of_shards: "2" } + - match: { data_streams.0.effective_template.template.settings.index.number_of_replicas: "0" } + - match: { data_streams.0.effective_template.template.settings.index.lifecycle.name: "my-policy" } + +--- +"Test invalid settings": + - do: + allowed_warnings: + - "index template [my-template] has index patterns [data-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-template] will take precedence during new index creation" + indices.put_index_template: + name: my-template + body: + index_patterns: [ my-data-stream-* ] + data_stream: { } + template: + settings: + number_of_replicas: 0 + lifecycle.name: my-policy + + - do: + indices.create_data_stream: + name: my-data-stream-1 + + - do: + indices.rollover: + alias: "my-data-stream-1" + + - do: + cluster.health: + index: "my-data-stream-1" + wait_for_status: green + + - do: + indices.post_data_stream: + name: my-data-stream-1 + body: + template: + settings: + index: + fake_setting: 1234 + - match: { data_streams.0.name: my-data-stream-1 } + - match: { data_streams.0.applied_to_data_stream: false } + - match: { data_streams.0.error: "unknown setting [index.fake_setting] please check that any required plugins are installed, or check the breaking changes documentation for removed settings" } diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.post_data_stream.json b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.post_data_stream.json new file mode 100644 index 0000000000000..b3046dda90b30 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.post_data_stream.json @@ -0,0 +1,42 @@ +{ + "indices.post_data_stream":{ + "documentation":{ + "url":"https://www.elastic.co/guide/en/elasticsearch/reference/master/data-streams.html", + "description":"Updates a data stream" + }, + "stability":"stable", + "visibility":"public", + "headers":{ + "accept": [ "application/json"] + }, + "url":{ + "paths":[ + { + "path":"/_data_stream/{name}", + "methods":[ + "POST" + ], + "parts":{ + "name":{ + "type":"string", + "description":"The name of the data stream or data stream pattern" + } + } + } + ] + }, + "params":{ + "timeout":{ + "type":"time", + "description":"Specify timeout for acknowledging the cluster state update" + }, + "master_timeout":{ + "type":"time", + "description":"Specify timeout for connection to master" + } + }, + "body":{ + "description":"The index template overrides" + } + } +} diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 00943d04275dd..dfda986a3e323 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -214,6 +214,7 @@ static TransportVersion def(int id) { public static final TransportVersion ESQL_REMOVE_AGGREGATE_TYPE = def(9_045_0_00); public static final TransportVersion ADD_PROJECT_ID_TO_DSL_ERROR_INFO = def(9_046_0_00); public static final TransportVersion SEMANTIC_TEXT_CHUNKING_CONFIG = def(9_047_00_0); + public static final TransportVersion TEMPLATES_IN_DATA_STREAMS = def(9_048_0_00); /* * STOP! READ THIS FIRST! No, really, diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java b/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java index f4eee1b0ecb65..3d47c8753fb24 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java @@ -66,7 +66,6 @@ import static org.elasticsearch.cluster.metadata.IndexAbstraction.Type.ALIAS; import static org.elasticsearch.cluster.metadata.IndexAbstraction.Type.DATA_STREAM; -import static org.elasticsearch.cluster.metadata.MetadataCreateDataStreamService.lookupTemplateForDataStream; import static org.elasticsearch.cluster.metadata.MetadataIndexTemplateService.findV1Templates; import static org.elasticsearch.cluster.metadata.MetadataIndexTemplateService.findV2Template; import static org.elasticsearch.cluster.routing.allocation.allocator.AllocationActionListener.rerouteCompletionIsNotRequired; @@ -325,7 +324,7 @@ private RolloverResult rolloverDataStream( final SystemDataStreamDescriptor systemDataStreamDescriptor; if (dataStream.isSystem() == false) { systemDataStreamDescriptor = null; - templateV2 = lookupTemplateForDataStream(dataStreamName, metadata); + templateV2 = dataStream.getEffectiveIndexTemplate(projectState.metadata()); } else { systemDataStreamDescriptor = systemIndices.findMatchingDataStreamDescriptor(dataStreamName); if (systemDataStreamDescriptor == null) { diff --git a/server/src/main/java/org/elasticsearch/action/datastreams/GetDataStreamAction.java b/server/src/main/java/org/elasticsearch/action/datastreams/GetDataStreamAction.java index 147c34222138e..9f78ded73486c 100644 --- a/server/src/main/java/org/elasticsearch/action/datastreams/GetDataStreamAction.java +++ b/server/src/main/java/org/elasticsearch/action/datastreams/GetDataStreamAction.java @@ -213,6 +213,8 @@ public static class DataStreamInfo implements SimpleDiffable, To public static final ParseField STATUS_FIELD = new ParseField("status"); public static final ParseField INDEX_TEMPLATE_FIELD = new ParseField("template"); + public static final ParseField INDEX_TEMPLATE_OVERRIDES_FIELD = new ParseField("index_template_overrides"); + public static final ParseField EFFECTIVE_INDEX_TEMPLATE_FIELD = new ParseField("effective_index_template"); public static final ParseField PREFER_ILM = new ParseField("prefer_ilm"); public static final ParseField MANAGED_BY = new ParseField("managed_by"); public static final ParseField NEXT_GENERATION_INDEX_MANAGED_BY = new ParseField("next_generation_managed_by"); @@ -427,6 +429,15 @@ public XContentBuilder toXContent( addAutoShardingEvent(builder, params, dataStream.getFailureComponent().getAutoShardingEvent()); builder.endObject(); } + if (dataStream.getIndexTemplateOverrides() != null) { + builder.field(INDEX_TEMPLATE_OVERRIDES_FIELD.getPreferredName()); + dataStream.getIndexTemplateOverrides().toXContent(builder, params); + } + // ComposableIndexTemplate effectiveTemplate = dataStream.getEffectiveIndexTemplate(pr); + // if (effectiveTemplate != null) { + // builder.field(EFFECTIVE_INDEX_TEMPLATE_FIELD.getPreferredName()); + // effectiveTemplate.toXContent(builder, params); + // } builder.endObject(); return builder; } diff --git a/server/src/main/java/org/elasticsearch/action/datastreams/PostDataStreamAction.java b/server/src/main/java/org/elasticsearch/action/datastreams/PostDataStreamAction.java new file mode 100644 index 0000000000000..7b1f569eca475 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/datastreams/PostDataStreamAction.java @@ -0,0 +1,232 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ +package org.elasticsearch.action.datastreams; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.ValidateActions; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.master.AcknowledgedRequest; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.xcontent.ToXContent; +import org.elasticsearch.xcontent.ToXContentObject; +import org.elasticsearch.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.List; +import java.util.Objects; + +public class PostDataStreamAction extends ActionType { + + public static final PostDataStreamAction INSTANCE = new PostDataStreamAction(); + public static final String NAME = "indices:admin/data_stream/update"; + + private PostDataStreamAction() { + super(NAME); + } + + public static class Request extends AcknowledgedRequest implements IndicesRequest { + + private final String name; + private final ComposableIndexTemplate templateOverrides; + + public Request(TimeValue masterNodeTimeout, TimeValue ackTimeout, String name, ComposableIndexTemplate templateOverrides) { + super(masterNodeTimeout, ackTimeout); + this.name = name; + this.templateOverrides = templateOverrides; + } + + @Override + public boolean includeDataStreams() { + return true; + } + + public String getName() { + return name; + } + + public ComposableIndexTemplate getTemplateOverrides() { + return templateOverrides; + } + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException validationException = null; + if (Strings.hasText(name) == false) { + validationException = ValidateActions.addValidationError("name is missing", validationException); + } + return validationException; + } + + public Request(StreamInput in) throws IOException { + super(in); + this.name = in.readString(); + this.templateOverrides = in.readOptionalWriteable(ComposableIndexTemplate::new); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(name); + out.writeOptionalWriteable(templateOverrides); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Request request = (Request) o; + return name.equals(request.name) && templateOverrides.equals(request.templateOverrides); + } + + @Override + public int hashCode() { + return Objects.hash(name, templateOverrides); + } + + @Override + public String[] indices() { + return new String[] { name }; + } + + @Override + public IndicesOptions indicesOptions() { + return IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED; + } + } + + public static class Response extends ActionResponse implements ToXContentObject { + private final List dataStreamResponses; + + public Response(List dataStreamResponses) { + this.dataStreamResponses = dataStreamResponses; + } + + public Response(StreamInput in) throws IOException { + this(in.readCollectionAsList(DataStreamResponse::new)); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeCollection(dataStreamResponses, (out1, value) -> value.writeTo(out1)); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.startArray("data_streams"); + for (DataStreamResponse dataStreamResponse : dataStreamResponses) { + dataStreamResponse.toXContent(builder, params); + } + builder.endArray(); + builder.endObject(); + return builder; + } + } + + public record DataStreamResponse( + String dataStreamName, + boolean dataStreamSucceeded, + String dataStreamErrorMessage, + ComposableIndexTemplate effectiveTemplate, + List indexSettingResults + ) implements ToXContent, Writeable { + + public DataStreamResponse(StreamInput in) throws IOException { + this( + in.readString(), + in.readBoolean(), + in.readOptionalString(), + in.readOptionalWriteable(ComposableIndexTemplate::new), + in.readCollectionAsList(IndexSettingResult::new) + ); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(dataStreamName); + out.writeBoolean(dataStreamSucceeded); + out.writeOptionalString(dataStreamErrorMessage); + out.writeOptionalWriteable(effectiveTemplate); + out.writeCollection(indexSettingResults, (out1, value) -> value.writeTo(out1)); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field("name", dataStreamName); + builder.field("applied_to_data_stream", dataStreamSucceeded); + if (dataStreamErrorMessage != null) { + builder.field("error", dataStreamErrorMessage); + } + builder.field("effective_template", effectiveTemplate); + builder.field("index_settings_results", indexSettingResults); + builder.endObject(); + return builder; + } + + public record IndexSettingResult(String name, boolean appliedToBackingIndices, List indexSettingErrors) + implements + ToXContent, + Writeable { + + public IndexSettingResult(StreamInput in) throws IOException { + this(in.readString(), in.readBoolean(), in.readCollectionAsList(IndexSettingError::new)); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field("name", name); + builder.field("applied_to_backing_indices", appliedToBackingIndices); + if (indexSettingErrors.isEmpty() == false) { + builder.field("errors", indexSettingErrors); + } + builder.endObject(); + return builder; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(name); + out.writeBoolean(appliedToBackingIndices); + out.writeCollection(indexSettingErrors, (out1, value) -> value.writeTo(out1)); + } + } + + public record IndexSettingError(String indexName, String errorMessage) implements ToXContent, Writeable { + public IndexSettingError(StreamInput in) throws IOException { + this(in.readString(), in.readString()); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(indexName); + out.writeString(errorMessage); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field("index", indexName); + builder.field("error", errorMessage); + builder.endObject(); + return builder; + } + } + } +} diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/ComposableIndexTemplate.java b/server/src/main/java/org/elasticsearch/cluster/metadata/ComposableIndexTemplate.java index 923ce3df1e510..483ab3604f2b9 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/ComposableIndexTemplate.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/ComposableIndexTemplate.java @@ -56,7 +56,7 @@ public class ComposableIndexTemplate implements SimpleDiffable new ComposableIndexTemplate( - (List) a[0], + a[0] == null ? List.of() : (List) a[0], (Template) a[1], (List) a[2], (Long) a[3], @@ -70,7 +70,7 @@ public class ComposableIndexTemplate implements SimpleDiffable readITV2DiffFrom(StreamInput in) throws IOE } public static ComposableIndexTemplate parse(XContentParser parser) throws IOException { + ComposableIndexTemplate composableIndexTemplate = PARSER.parse(parser, null); + if (composableIndexTemplate.indexPatterns().isEmpty()) { + throw new IllegalArgumentException("Required [index_patterns]"); + } + return composableIndexTemplate; + } + + public static ComposableIndexTemplate parseNoIndexPatterns(XContentParser parser) throws IOException { return PARSER.parse(parser, null); } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java index c09f1d48f1583..7191e227c1ca1 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java @@ -27,8 +27,10 @@ import org.elasticsearch.common.ParsingException; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.time.DateFormatter; import org.elasticsearch.common.time.DateFormatters; import org.elasticsearch.common.util.FeatureFlag; @@ -45,6 +47,7 @@ import org.elasticsearch.xcontent.ParseField; import org.elasticsearch.xcontent.ToXContentObject; import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentFactory; import org.elasticsearch.xcontent.XContentParser; import org.elasticsearch.xcontent.XContentParserConfiguration; import org.elasticsearch.xcontent.XContentType; @@ -55,6 +58,7 @@ import java.util.ArrayList; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Map; @@ -65,6 +69,7 @@ import java.util.function.Predicate; import java.util.stream.Collectors; +import static org.elasticsearch.cluster.metadata.MetadataCreateDataStreamService.lookupTemplateForDataStream; import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken; import static org.elasticsearch.index.IndexSettings.LIFECYCLE_ORIGINATION_DATE; import static org.elasticsearch.index.IndexSettings.PREFER_ILM_SETTING; @@ -114,6 +119,8 @@ public static boolean isFailureStoreFeatureFlagEnabled() { private final long generation; @Nullable private final Map metadata; + @Nullable + private final ComposableIndexTemplate indexTemplateOverrides; private final boolean hidden; private final boolean replicated; private final boolean system; @@ -133,6 +140,7 @@ public DataStream( List indices, long generation, Map metadata, + @Nullable ComposableIndexTemplate indexTemplateOverrides, boolean hidden, boolean replicated, boolean system, @@ -148,6 +156,7 @@ public DataStream( name, generation, metadata, + null, hidden, replicated, system, @@ -165,6 +174,7 @@ public DataStream( String name, long generation, Map metadata, + @Nullable ComposableIndexTemplate indexTemplateOverrides, boolean hidden, boolean replicated, boolean system, @@ -179,6 +189,7 @@ public DataStream( this.name = name; this.generation = generation; this.metadata = metadata; + this.indexTemplateOverrides = indexTemplateOverrides; assert system == false || hidden; // system indices must be hidden this.hidden = hidden; this.replicated = replicated; @@ -193,6 +204,9 @@ public DataStream( : "replicated data streams cannot be marked for lazy rollover"; this.backingIndices = backingIndices; this.failureIndices = failureIndices; + if (indexTemplateOverrides != null && indexTemplateOverrides.indexPatterns() == null) { + throw new RuntimeException("Here i am"); + } } public static DataStream read(StreamInput in) throws IOException { @@ -231,10 +245,17 @@ public static DataStream read(StreamInput in) throws IOException { // is still behind a feature flag in previous version we use the default value instead of explicitly disabling it. dataStreamOptions = failureStoreEnabled ? DataStreamOptions.FAILURE_STORE_ENABLED : null; } + final ComposableIndexTemplate indexTemplateOverrides; + if (in.getTransportVersion().onOrAfter(TransportVersions.TEMPLATES_IN_DATA_STREAMS)) { + indexTemplateOverrides = in.readOptionalWriteable(ComposableIndexTemplate::new); + } else { + indexTemplateOverrides = null; + } return new DataStream( name, generation, metadata, + indexTemplateOverrides, hidden, replicated, system, @@ -325,6 +346,106 @@ public boolean rolloverOnWrite() { return backingIndices.rolloverOnWrite; } + public ComposableIndexTemplate getIndexTemplateOverrides() { + return indexTemplateOverrides; + } + + public ComposableIndexTemplate getEffectiveIndexTemplate(ProjectMetadata projectMetadata) { + return mergeTemplates(getMatchingIndexTemplate(projectMetadata), indexTemplateOverrides); + } + + private ComposableIndexTemplate getMatchingIndexTemplate(ProjectMetadata projectMetadata) { + return lookupTemplateForDataStream(name, projectMetadata); + } + + public static ComposableIndexTemplate mergeTemplates( + ComposableIndexTemplate originalTemplate, + ComposableIndexTemplate templateOverrides + ) { + if (originalTemplate == null) { + return templateOverrides; + } + if (templateOverrides == null) { + return originalTemplate; + } + ComposableIndexTemplate.Builder mergedIndexTemplateBuilder = originalTemplate.toBuilder(); + Template.Builder mergedTemplateBuilder; + if (originalTemplate.template() == null) { + if (templateOverrides.template() == null) { + return originalTemplate; // no merging can be done + } + mergedTemplateBuilder = Template.builder(templateOverrides.template()); + } else { + mergedTemplateBuilder = Template.builder(originalTemplate.template()); + } + if (originalTemplate.template() != null && templateOverrides.template() != null) { + CompressedXContent originalMappings = originalTemplate.template().mappings(); + CompressedXContent overriddenMappings = templateOverrides.template().mappings(); + CompressedXContent mergedMappings = null; + try { + mergedMappings = mergeMappings(originalMappings, overriddenMappings); + } catch (IOException e) { + throw new RuntimeException(e); + } + mergedTemplateBuilder.mappings(mergedMappings); + + Settings originalSettings = originalTemplate.template().settings(); + Settings overriddenSettings = templateOverrides.template().settings(); + Settings.Builder settingsBuilder = Settings.builder().put(originalSettings).put(overriddenSettings); + for (String settingName : new HashSet<>(settingsBuilder.keys())) { + if (settingsBuilder.get(settingName) == null) { + settingsBuilder.remove(settingName); + } + } + mergedTemplateBuilder.settings(settingsBuilder); + mergedIndexTemplateBuilder.template(mergedTemplateBuilder); + } + return mergedIndexTemplateBuilder.build(); + } + + private static CompressedXContent mergeMappings(@Nullable CompressedXContent originalMapping, CompressedXContent mappingAddition) + throws IOException { + if (mappingAddition == null) { + return originalMapping; + } + if (originalMapping == null) { + return mappingAddition; + } + Map combinedMappingMap = new HashMap<>(); + combinedMappingMap.putAll(XContentHelper.convertToMap(originalMapping.uncompressed(), true, XContentType.JSON).v2()); + XContentHelper.update( + combinedMappingMap, + XContentHelper.convertToMap(mappingAddition.uncompressed(), true, XContentType.JSON).v2(), + true + ); + if (combinedMappingMap.isEmpty()) { + return null; + } else { + return convertMappingMapToXContent(combinedMappingMap); + } + } + + private static CompressedXContent convertMappingMapToXContent(Map rawAdditionalMapping) throws IOException { + CompressedXContent compressedXContent; + if (rawAdditionalMapping == null || rawAdditionalMapping.isEmpty()) { + compressedXContent = null; + } else { + try (var parser = XContentHelper.mapToXContentParser(XContentParserConfiguration.EMPTY, rawAdditionalMapping)) { + compressedXContent = mappingFromXContent(parser); + } + } + return compressedXContent; + } + + private static CompressedXContent mappingFromXContent(XContentParser parser) throws IOException { + XContentParser.Token token = parser.nextToken(); + if (token == XContentParser.Token.START_OBJECT) { + return new CompressedXContent(Strings.toString(XContentFactory.jsonBuilder().map(parser.mapOrdered()))); + } else { + throw new IllegalArgumentException("Unexpected token: " + token); + } + } + /** * We define that a data stream is considered internal either if it is a system index or if * its name starts with a dot. @@ -1258,6 +1379,9 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getTransportVersion().onOrAfter(DataStream.ADD_DATA_STREAM_OPTIONS_VERSION)) { out.writeOptionalWriteable(dataStreamOptions.isEmpty() ? null : dataStreamOptions); } + if (out.getTransportVersion().onOrAfter(TransportVersions.TEMPLATES_IN_DATA_STREAMS)) { + out.writeOptionalWriteable(indexTemplateOverrides); + } } public static final ParseField NAME_FIELD = new ParseField("name"); @@ -1278,6 +1402,7 @@ public void writeTo(StreamOutput out) throws IOException { public static final ParseField FAILURE_ROLLOVER_ON_WRITE_FIELD = new ParseField("failure_rollover_on_write"); public static final ParseField FAILURE_AUTO_SHARDING_FIELD = new ParseField("failure_auto_sharding"); public static final ParseField DATA_STREAM_OPTIONS_FIELD = new ParseField("options"); + public static final ParseField INDEX_TEMPLATE_OVERRIDES_FIELD = new ParseField("index_template_overrides"); @SuppressWarnings("unchecked") private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>("data_stream", args -> { @@ -1302,10 +1427,17 @@ public void writeTo(StreamOutput out) throws IOException { dataStreamOptions = DataStreamOptions.FAILURE_STORE_ENABLED; } } + ComposableIndexTemplate indexTemplateOverrides; + if (args[17] != null) { + indexTemplateOverrides = (ComposableIndexTemplate) args[17]; + } else { + indexTemplateOverrides = null; + } return new DataStream( (String) args[0], (Long) args[2], (Map) args[3], + indexTemplateOverrides, args[4] != null && (boolean) args[4], args[5] != null && (boolean) args[5], args[6] != null && (boolean) args[6], @@ -1370,6 +1502,11 @@ public void writeTo(StreamOutput out) throws IOException { DATA_STREAM_OPTIONS_FIELD ); } + PARSER.declareObject( + ConstructingObjectParser.optionalConstructorArg(), + (p, c) -> ComposableIndexTemplate.parseNoIndexPatterns(p), + INDEX_TEMPLATE_OVERRIDES_FIELD + ); } public static DataStream fromXContent(XContentParser parser) throws IOException { @@ -1419,6 +1556,10 @@ public XContentBuilder toXContent( builder.field(DATA_STREAM_OPTIONS_FIELD.getPreferredName()); dataStreamOptions.toXContent(builder, params); } + if (indexTemplateOverrides != null) { + builder.field(INDEX_TEMPLATE_OVERRIDES_FIELD.getPreferredName()); + indexTemplateOverrides.toXContent(builder, params); + } } if (indexMode != null) { builder.field(INDEX_MODE.getPreferredName(), indexMode); @@ -1774,6 +1915,7 @@ public static class Builder { private long generation = 1; @Nullable private Map metadata = null; + private ComposableIndexTemplate indexTemplateOverrides = null; private boolean hidden = false; private boolean replicated = false; private boolean system = false; @@ -1801,6 +1943,7 @@ private Builder(DataStream dataStream) { name = dataStream.name; generation = dataStream.generation; metadata = dataStream.metadata; + indexTemplateOverrides = dataStream.indexTemplateOverrides; hidden = dataStream.hidden; replicated = dataStream.replicated; system = dataStream.system; @@ -1832,6 +1975,11 @@ public Builder setMetadata(Map metadata) { return this; } + public Builder setIndexTemplateOverrides(ComposableIndexTemplate indexTemplateOverrides) { + this.indexTemplateOverrides = indexTemplateOverrides; + return this; + } + public Builder setHidden(boolean hidden) { this.hidden = hidden; return this; @@ -1892,6 +2040,7 @@ public DataStream build() { name, generation, metadata, + indexTemplateOverrides, hidden, replicated, system, diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java index 2e02245aeca4d..ba2a7ad9fae97 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java @@ -333,6 +333,7 @@ static ClusterState createDataStream( dataStreamName, initialGeneration, template.metadata() != null ? Map.copyOf(template.metadata()) : null, + null, hidden, false, isSystem, diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java index 6355fdc8387f9..62d82e6297729 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java @@ -441,6 +441,18 @@ public ClusterState applyCreateIndexRequest( ? IndexMetadata.INDEX_HIDDEN_SETTING.get(request.settings()) : null; + ComposableIndexTemplate templateFromRequest = request.matchingTemplate(); + if (templateFromRequest != null) { + return applyCreateIndexRequestWithV2Template( + currentState, + request, + silent, + templateFromRequest, + metadataTransformer, + rerouteListener + ); + } + // Check to see if a v2 template matched final String v2Template = MetadataIndexTemplateService.findV2Template( projectMetadata, @@ -770,6 +782,71 @@ private ClusterState applyCreateIndexRequestWithV2Template( ); } + private ClusterState applyCreateIndexRequestWithV2Template( + final ClusterState currentState, + final CreateIndexClusterStateUpdateRequest request, + final boolean silent, + final ComposableIndexTemplate template, + final BiConsumer projectMetadataTransformer, + final ActionListener rerouteListener + ) throws Exception { + + final Metadata metadata = currentState.getMetadata(); + final ProjectMetadata projectMetadata = metadata.getProject(request.projectId()); + final RoutingTable routingTable = currentState.routingTable(request.projectId()); + + final boolean isDataStream = template.getDataStreamTemplate() != null; + + final List mappings = collectV2Mappings( + request.mappings(), + projectMetadata, + template, + xContentRegistry, + request.index() + ); + final Settings aggregatedIndexSettings = aggregateIndexSettings( + metadata, + projectMetadata, + currentState.nodes(), + currentState.blocks(), + routingTable, + request, + resolveSettings(template, projectMetadata.componentTemplates()), + mappings, + null, + settings, + indexScopedSettings, + shardLimitValidator, + indexSettingProviders + ); + int routingNumShards = getIndexNumberOfRoutingShards(aggregatedIndexSettings, null); + IndexMetadata tmpImd = buildAndValidateTemporaryIndexMetadata(aggregatedIndexSettings, request, routingNumShards); + + return applyCreateIndexWithTemporaryService( + currentState, + request, + silent, + null, + tmpImd, + mappings, + indexService -> resolveAndValidateAliases( + request.index(), + // data stream aliases are created separately in MetadataCreateDataStreamService::createDataStream + isDataStream ? Set.of() : request.aliases(), + isDataStream ? List.of() : MetadataIndexTemplateService.resolveAliases(projectMetadata, template), + projectMetadata, + xContentRegistry, + // the context is used ony for validation so it's fine to pass fake values for the shard id and the current timestamp + indexService.newSearchExecutionContext(0, 0, null, () -> 0L, null, emptyMap()), + IndexService.dateMathExpressionResolverAt(request.getNameResolvedAt()), + systemIndices::isSystemName + ), + Collections.singletonList("provided in request"), + projectMetadataTransformer, + rerouteListener + ); + } + private ClusterState applyCreateIndexRequestForSystemIndex( final ClusterState currentState, final CreateIndexClusterStateUpdateRequest request, @@ -933,6 +1010,21 @@ private static List collectV2Mappings( return result; } + public static List collectV2Mappings( + @Nullable final String requestMappings, + final ProjectMetadata projectMetadata, + final ComposableIndexTemplate template, + final NamedXContentRegistry xContentRegistry, + final String indexName + ) throws Exception { + List templateMappings = MetadataIndexTemplateService.collectMappings( + template, + projectMetadata.componentTemplates(), + indexName + ); + return collectV2Mappings(requestMappings, templateMappings, xContentRegistry); + } + private ClusterState applyCreateIndexRequestWithExistingMetadata( final ClusterState currentState, final CreateIndexClusterStateUpdateRequest request, diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java index 62417cbdd5863..d9a67dd381ad5 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java @@ -21,6 +21,7 @@ import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.ProjectState; import org.elasticsearch.cluster.SimpleBatchedAckListenerTaskExecutor; +import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.MasterServiceTaskQueue; import org.elasticsearch.common.Priority; @@ -40,10 +41,13 @@ import java.io.IOException; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; +import static org.elasticsearch.cluster.metadata.MetadataCreateDataStreamService.lookupTemplateForDataStream; + /** * Handles data stream modification requests. */ @@ -496,6 +500,69 @@ public static ClusterState deleteDataStreams(ProjectState projectState, Set listener + ) { + submitUnbatchedTask( + "updating settings on data stream [" + dataStreamName + "]", + new AckedClusterStateUpdateTask(Priority.HIGH, masterNodeTimeout, ackTimeout, listener) { + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + ProjectMetadata projectMetadata = currentState.projectState(projectResolver.getProjectId()).metadata(); + ProjectMetadata.Builder projectMetadataBuilder = ProjectMetadata.builder(projectMetadata); + Map dataStreamMap = projectMetadata.dataStreams(); + DataStream dataStream = dataStreamMap.get(dataStreamName); + Settings existingSettings = Settings.EMPTY; + ComposableIndexTemplate existingIndexTemplateOverrides = dataStream.getIndexTemplateOverrides(); + if (existingIndexTemplateOverrides != null) { + Template existingTemplate = dataStream.getIndexTemplateOverrides().template(); + if (existingTemplate != null) { + Settings settings = existingTemplate.settings(); + if (settings != null) { + existingSettings = settings; + } + } + } + + ProjectMetadata currentProject = currentState.projectState(projectResolver.getProjectId()).metadata(); + final ComposableIndexTemplate template = lookupTemplateForDataStream(dataStreamName, currentProject); + ComposableIndexTemplate mergedTemplate = DataStream.mergeTemplates(template, templateOverrides); + if (templateOverrides.template() != null) { + MetadataIndexTemplateService.validateTemplate( + mergedTemplate.template().settings(), + mergedTemplate.template().mappings(), + indicesService + ); + } + + Template.Builder templateBuilder = Template.builder(); + Settings.Builder mergedSettingsBuilder = Settings.builder().put(existingSettings); + if (templateOverrides.template() != null && templateOverrides.template().settings() != null) { + mergedSettingsBuilder.put(templateOverrides.template().settings()); + } + templateBuilder.settings(mergedSettingsBuilder); + if (templateOverrides.template() != null && templateOverrides.template().mappings() != null) { + templateBuilder.mappings(templateOverrides.template().mappings()); + } + // Currently, the only thing we support having in template overrides is settings and mappings: + ComposableIndexTemplate indexTemplateOverrides = ComposableIndexTemplate.builder() + .template(templateBuilder) + .indexPatterns(List.of()) + .build(); + DataStream.Builder dataStreamBuilder = dataStream.copy().setIndexTemplateOverrides(indexTemplateOverrides); + projectMetadataBuilder.removeDataStream(dataStreamName); + projectMetadataBuilder.put(dataStreamBuilder.build()); + return ClusterState.builder(currentState).putProjectMetadata(projectMetadataBuilder).build(); + } + } + ); + } + /** * A cluster state update task that consists of the cluster state request and the listeners that need to be notified upon completion. */ diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateService.java index 492d057c80af3..e8f8bd974984c 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateService.java @@ -1914,7 +1914,7 @@ private static void validateCompositeTemplate( }); } - private static void validateTemplate(Settings validateSettings, CompressedXContent mappings, IndicesService indicesService) + public static void validateTemplate(Settings validateSettings, CompressedXContent mappings, IndicesService indicesService) throws Exception { // Hard to validate settings if they're non-existent, so used empty ones if none were provided Settings settings = validateSettings; diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java index ee911edd1789d..87ed16277036a 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java @@ -178,6 +178,7 @@ protected DataStream mutateInstance(DataStream instance) { name, generation, metadata, + null, isHidden, isReplicated, isSystem, @@ -2084,6 +2085,7 @@ public void testXContentSerializationWithRolloverAndEffectiveRetention() throws indices, generation, metadata, + null, isSystem, randomBoolean(), isSystem, @@ -2278,6 +2280,7 @@ public void testWriteFailureIndex() { randomNonEmptyIndexInstances(), randomNonNegativeInt(), null, + null, hidden, replicated, system, @@ -2296,6 +2299,7 @@ public void testWriteFailureIndex() { randomNonEmptyIndexInstances(), randomNonNegativeInt(), null, + null, hidden, replicated, system, @@ -2321,6 +2325,7 @@ public void testWriteFailureIndex() { randomNonEmptyIndexInstances(), randomNonNegativeInt(), null, + null, hidden, replicated, system, @@ -2345,6 +2350,7 @@ public void testIsFailureIndex() { backingIndices, randomNonNegativeInt(), null, + null, hidden, replicated, system, @@ -2367,6 +2373,7 @@ public void testIsFailureIndex() { backingIndices, randomNonNegativeInt(), null, + null, hidden, replicated, system, @@ -2398,6 +2405,7 @@ public void testIsFailureIndex() { backingIndices, randomNonNegativeInt(), null, + null, hidden, replicated, system, diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java b/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java index 233826b168379..c3e4f5725dd80 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java @@ -357,6 +357,7 @@ public static DataStream randomInstance(String dataStreamName, LongSupplier time dataStreamName, generation, metadata, + null, randomBoolean(), replicated, false, // Some tests don't work well with system data streams, since these data streams require special handling diff --git a/x-pack/plugin/core/src/internalClusterTest/java/org/elasticsearch/xpack/core/action/DataStreamLifecycleUsageTransportActionIT.java b/x-pack/plugin/core/src/internalClusterTest/java/org/elasticsearch/xpack/core/action/DataStreamLifecycleUsageTransportActionIT.java index 24a59ad1c6393..c40517ba10102 100644 --- a/x-pack/plugin/core/src/internalClusterTest/java/org/elasticsearch/xpack/core/action/DataStreamLifecycleUsageTransportActionIT.java +++ b/x-pack/plugin/core/src/internalClusterTest/java/org/elasticsearch/xpack/core/action/DataStreamLifecycleUsageTransportActionIT.java @@ -174,6 +174,7 @@ public void testAction() throws Exception { indices, randomLongBetween(0, 1000), Map.of(), + null, systemDataStream || randomBoolean(), replicated, systemDataStream, diff --git a/x-pack/plugin/deprecation/src/test/java/org/elasticsearch/xpack/deprecation/DataStreamDeprecationCheckerTests.java b/x-pack/plugin/deprecation/src/test/java/org/elasticsearch/xpack/deprecation/DataStreamDeprecationCheckerTests.java index e0dfaef605af7..2a1468b4563fd 100644 --- a/x-pack/plugin/deprecation/src/test/java/org/elasticsearch/xpack/deprecation/DataStreamDeprecationCheckerTests.java +++ b/x-pack/plugin/deprecation/src/test/java/org/elasticsearch/xpack/deprecation/DataStreamDeprecationCheckerTests.java @@ -174,6 +174,7 @@ private DataStream createTestDataStream( allIndices, randomNegativeLong(), Map.of(), + null, randomBoolean(), false, false, @@ -264,6 +265,7 @@ public void testOldIndicesIgnoredWarningCheck() { allIndices, randomNegativeLong(), Map.of(), + null, randomBoolean(), false, false, @@ -331,6 +333,7 @@ public void testOldSystemDataStreamIgnored() { allIndices, randomNegativeLong(), Map.of(), + null, true, false, true, diff --git a/x-pack/plugin/deprecation/src/test/java/org/elasticsearch/xpack/deprecation/IndexDeprecationCheckerTests.java b/x-pack/plugin/deprecation/src/test/java/org/elasticsearch/xpack/deprecation/IndexDeprecationCheckerTests.java index 5dd1304890f22..c7d5dc9e1b618 100644 --- a/x-pack/plugin/deprecation/src/test/java/org/elasticsearch/xpack/deprecation/IndexDeprecationCheckerTests.java +++ b/x-pack/plugin/deprecation/src/test/java/org/elasticsearch/xpack/deprecation/IndexDeprecationCheckerTests.java @@ -216,6 +216,7 @@ public void testOldIndicesCheckDataStreamIndex() { List.of(indexMetadata.getIndex()), randomNegativeLong(), Map.of(), + null, randomBoolean(), false, false, diff --git a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java index c48e16cb570e5..206c8cdf80862 100644 --- a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java +++ b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java @@ -513,6 +513,7 @@ public class Constants { "indices:admin/data_stream/lifecycle/get", "indices:admin/data_stream/lifecycle/put", "indices:admin/data_stream/lifecycle/explain", + "indices:admin/data_stream/update", DataStream.isFailureStoreFeatureFlagEnabled() ? "indices:admin/data_stream/options/delete" : null, DataStream.isFailureStoreFeatureFlagEnabled() ? "indices:admin/data_stream/options/get" : null, DataStream.isFailureStoreFeatureFlagEnabled() ? "indices:admin/data_stream/options/put" : null,