diff --git a/docs/changelog/125980.yaml b/docs/changelog/125980.yaml new file mode 100644 index 0000000000000..7556b57102e12 --- /dev/null +++ b/docs/changelog/125980.yaml @@ -0,0 +1,5 @@ +pr: 125980 +summary: Data stream template experiment +area: Data streams +type: enhancement +issues: [] diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/CreateDataStreamTransportAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/CreateDataStreamTransportAction.java index af166893660e4..bd880dcdd73aa 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/CreateDataStreamTransportAction.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/CreateDataStreamTransportAction.java @@ -16,6 +16,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; import org.elasticsearch.cluster.metadata.MetadataCreateDataStreamService; import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.service.ClusterService; @@ -68,6 +69,13 @@ protected void masterOperation( request.getName(), threadPool.getThreadContext() ); + ComposableIndexTemplate indexTemplate = request.getIndexTemplate(); + if (indexTemplate != null) { + if (indexTemplate.indexPatterns() != null && indexTemplate.indexPatterns().isEmpty() == false) { + // TODO lots of other things aren't supported + throw new IllegalArgumentException("Cannot specify index_patterns"); + } + } MetadataCreateDataStreamService.CreateDataStreamClusterStateUpdateRequest updateRequest = new MetadataCreateDataStreamService.CreateDataStreamClusterStateUpdateRequest( projectResolver.getProjectId(), @@ -76,7 +84,8 @@ protected void masterOperation( systemDataStreamDescriptor, request.masterNodeTimeout(), request.ackTimeout(), - true + true, + indexTemplate ); metadataCreateDataStreamService.createDataStream(updateRequest, listener); } diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportGetDataStreamsAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportGetDataStreamsAction.java index 1ffc82a263e42..77569ddc7d845 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportGetDataStreamsAction.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportGetDataStreamsAction.java @@ -257,9 +257,15 @@ static GetDataStreamAction.Response innerOperation( indexTemplatePreferIlmValue = PREFER_ILM_SETTING.get(settings); } } else { + ComposableIndexTemplate effectiveTemplate = dataStream.getEffectiveIndexTemplate(); indexTemplate = MetadataIndexTemplateService.findV2Template(state.metadata(), dataStream.getName(), false); - if (indexTemplate != null) { - Settings settings = MetadataIndexTemplateService.resolveSettings(state.metadata(), indexTemplate); + if (indexTemplate != null || effectiveTemplate.template() != null) { + Settings settings; + if (effectiveTemplate.template() != null && effectiveTemplate.template().settings() != null) { + settings = effectiveTemplate.template().settings(); + } else { + settings = MetadataIndexTemplateService.resolveSettings(state.metadata(), indexTemplate); + } ilmPolicyName = settings.get(IndexMetadata.LIFECYCLE_NAME); if (indexMode == null && state.metadata().templatesV2().get(indexTemplate) != null) { indexMode = resolveMode( diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/rest/RestCreateDataStreamAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/rest/RestCreateDataStreamAction.java index 0b35a0f23d683..14c7324f3fbb4 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/rest/RestCreateDataStreamAction.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/rest/RestCreateDataStreamAction.java @@ -10,12 +10,14 @@ import org.elasticsearch.action.datastreams.CreateDataStreamAction; import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.RestUtils; import org.elasticsearch.rest.Scope; import org.elasticsearch.rest.ServerlessScope; import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.xcontent.XContentParser; import java.io.IOException; import java.util.List; @@ -37,10 +39,19 @@ public List routes() { @Override protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + ComposableIndexTemplate indexTemplate; + if (request.hasContent()) { + try (XContentParser parser = request.contentOrSourceParamParser()) { + indexTemplate = ComposableIndexTemplate.parse(parser); + } + } else { + indexTemplate = null; + } CreateDataStreamAction.Request putDataStreamRequest = new CreateDataStreamAction.Request( RestUtils.getMasterNodeTimeout(request), RestUtils.getAckTimeout(request), - request.param("name") + request.param("name"), + indexTemplate ); return channel -> client.execute(CreateDataStreamAction.INSTANCE, putDataStreamRequest, new RestToXContentListener<>(channel)); } diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/DataStreamGetWriteIndexTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/DataStreamGetWriteIndexTests.java index e1fcfdec7b039..1f1f2344b4e2f 100644 --- a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/DataStreamGetWriteIndexTests.java +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/DataStreamGetWriteIndexTests.java @@ -318,7 +318,8 @@ private ClusterState createDataStream(ClusterState state, String name, Instant t null, TimeValue.ZERO, TimeValue.ZERO, - false + false, + null ); return createDataStreamService.createDataStream(request, state, ActionListener.noop(), false); } diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index eececd187f11e..5c0ce63e7dc28 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -209,6 +209,7 @@ static TransportVersion def(int id) { public static final TransportVersion INDEX_STATS_AND_METADATA_INCLUDE_PEAK_WRITE_LOAD = def(9_041_0_00); public static final TransportVersion REPOSITORIES_METADATA_AS_PROJECT_CUSTOM = def(9_042_0_00); public static final TransportVersion BATCHED_QUERY_PHASE_VERSION = def(9_043_0_00); + public static final TransportVersion TEMPLATES_IN_DATA_STREAMS = def(9_044_0_00); /* * STOP! READ THIS FIRST! No, really, diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java b/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java index f4eee1b0ecb65..c059f461ab919 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java @@ -325,7 +325,12 @@ private RolloverResult rolloverDataStream( final SystemDataStreamDescriptor systemDataStreamDescriptor; if (dataStream.isSystem() == false) { systemDataStreamDescriptor = null; - templateV2 = lookupTemplateForDataStream(dataStreamName, metadata); + ComposableIndexTemplate effectiveIndexTemplate = dataStream.getEffectiveIndexTemplate(); + if (effectiveIndexTemplate == null) { + templateV2 = lookupTemplateForDataStream(dataStreamName, metadata, true); + } else { + templateV2 = effectiveIndexTemplate; + } } else { systemDataStreamDescriptor = systemIndices.findMatchingDataStreamDescriptor(dataStreamName); if (systemDataStreamDescriptor == null) { diff --git a/server/src/main/java/org/elasticsearch/action/datastreams/CreateDataStreamAction.java b/server/src/main/java/org/elasticsearch/action/datastreams/CreateDataStreamAction.java index 48b2407e89b97..2ab99720c6bd4 100644 --- a/server/src/main/java/org/elasticsearch/action/datastreams/CreateDataStreamAction.java +++ b/server/src/main/java/org/elasticsearch/action/datastreams/CreateDataStreamAction.java @@ -8,6 +8,7 @@ */ package org.elasticsearch.action.datastreams; +import org.elasticsearch.TransportVersions; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.IndicesRequest; @@ -15,6 +16,7 @@ import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.AcknowledgedRequest; import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -36,16 +38,33 @@ public static class Request extends AcknowledgedRequest implements Indi private final String name; private final long startTime; + private final ComposableIndexTemplate indexTemplate; public Request(TimeValue masterNodeTimeout, TimeValue ackTimeout, String name) { + this(masterNodeTimeout, ackTimeout, name, null); + } + + public Request(TimeValue masterNodeTimeout, TimeValue ackTimeout, String name, ComposableIndexTemplate indexTemplate) { super(masterNodeTimeout, ackTimeout); this.name = name; + this.indexTemplate = indexTemplate; this.startTime = System.currentTimeMillis(); } public Request(TimeValue masterNodeTimeout, TimeValue ackTimeout, String name, long startTime) { + this(masterNodeTimeout, ackTimeout, name, null, startTime); + } + + public Request( + TimeValue masterNodeTimeout, + TimeValue ackTimeout, + String name, + ComposableIndexTemplate indexTemplate, + long startTime + ) { super(masterNodeTimeout, ackTimeout); this.name = name; + this.indexTemplate = indexTemplate; this.startTime = startTime; } @@ -53,6 +72,10 @@ public String getName() { return name; } + public ComposableIndexTemplate getIndexTemplate() { + return indexTemplate; + } + public long getStartTime() { return startTime; } @@ -70,6 +93,11 @@ public Request(StreamInput in) throws IOException { super(in); this.name = in.readString(); this.startTime = in.readVLong(); + if (in.getTransportVersion().onOrAfter(TransportVersions.TEMPLATES_IN_DATA_STREAMS)) { + this.indexTemplate = in.readOptionalWriteable(ComposableIndexTemplate::new); + } else { + this.indexTemplate = null; + } } @Override @@ -77,6 +105,9 @@ public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeString(name); out.writeVLong(startTime); + if (out.getTransportVersion().onOrAfter(TransportVersions.TEMPLATES_IN_DATA_STREAMS)) { + out.writeOptionalWriteable(indexTemplate); + } } @Override @@ -84,12 +115,12 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Request request = (Request) o; - return name.equals(request.name) && startTime == request.startTime; + return name.equals(request.name) && startTime == request.startTime && Objects.equals(indexTemplate, request.indexTemplate); } @Override public int hashCode() { - return Objects.hash(name, startTime); + return Objects.hash(name, startTime, indexTemplate); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/datastreams/GetDataStreamAction.java b/server/src/main/java/org/elasticsearch/action/datastreams/GetDataStreamAction.java index 147c34222138e..3ee3e704fc076 100644 --- a/server/src/main/java/org/elasticsearch/action/datastreams/GetDataStreamAction.java +++ b/server/src/main/java/org/elasticsearch/action/datastreams/GetDataStreamAction.java @@ -18,6 +18,7 @@ import org.elasticsearch.action.support.local.LocalClusterStateRequest; import org.elasticsearch.cluster.SimpleDiffable; import org.elasticsearch.cluster.health.ClusterHealthStatus; +import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.DataStreamAutoShardingEvent; import org.elasticsearch.cluster.metadata.DataStreamGlobalRetention; @@ -213,6 +214,9 @@ public static class DataStreamInfo implements SimpleDiffable, To public static final ParseField STATUS_FIELD = new ParseField("status"); public static final ParseField INDEX_TEMPLATE_FIELD = new ParseField("template"); + public static final ParseField INDEX_TEMPLATE_CONTENTS_FIELD = new ParseField("index_template"); + public static final ParseField INDEX_TEMPLATE_OVERRIDES_FIELD = new ParseField("index_template_overrides"); + public static final ParseField EFFECTIVE_INDEX_TEMPLATE_FIELD = new ParseField("effective_index_template"); public static final ParseField PREFER_ILM = new ParseField("prefer_ilm"); public static final ParseField MANAGED_BY = new ParseField("managed_by"); public static final ParseField NEXT_GENERATION_INDEX_MANAGED_BY = new ParseField("next_generation_managed_by"); @@ -426,6 +430,20 @@ public XContentBuilder toXContent( indicesToXContent(builder, dataStream.getFailureIndices()); addAutoShardingEvent(builder, params, dataStream.getFailureComponent().getAutoShardingEvent()); builder.endObject(); + + if (dataStream.getIndexTemplate() != null) { + builder.field(INDEX_TEMPLATE_CONTENTS_FIELD.getPreferredName()); + dataStream.getIndexTemplate().toXContent(builder, params); + } + if (dataStream.getIndexTemplateOverrides() != null) { + builder.field(INDEX_TEMPLATE_OVERRIDES_FIELD.getPreferredName()); + dataStream.getIndexTemplateOverrides().toXContent(builder, params); + } + ComposableIndexTemplate effectiveTemplate = dataStream.getEffectiveIndexTemplate(); + if (effectiveTemplate != null) { + builder.field(EFFECTIVE_INDEX_TEMPLATE_FIELD.getPreferredName()); + effectiveTemplate.toXContent(builder, params); + } } builder.endObject(); return builder; diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/ComposableIndexTemplate.java b/server/src/main/java/org/elasticsearch/cluster/metadata/ComposableIndexTemplate.java index 923ce3df1e510..dd4bdd72f6ecb 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/ComposableIndexTemplate.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/ComposableIndexTemplate.java @@ -70,7 +70,7 @@ public class ComposableIndexTemplate implements SimpleDiffable failureIndices, boolean rolloverOnWrite, @Nullable DataStreamAutoShardingEvent autoShardingEvent + ) { + this( + name, + indices, + generation, + null, + null, + metadata, + hidden, + replicated, + system, + allowCustomRouting, + indexMode, + lifecycle, + dataStreamOptions, + failureIndices, + rolloverOnWrite, + autoShardingEvent + ); + } + + public DataStream( + String name, + List indices, + long generation, + ComposableIndexTemplate indexTemplate, + ComposableIndexTemplate indexTemplateOverrides, + Map metadata, + boolean hidden, + boolean replicated, + boolean system, + boolean allowCustomRouting, + IndexMode indexMode, + DataStreamLifecycle lifecycle, + @Nullable DataStreamOptions dataStreamOptions, + List failureIndices, + boolean rolloverOnWrite, + @Nullable DataStreamAutoShardingEvent autoShardingEvent ) { this( name, generation, + indexTemplate, + indexTemplateOverrides, metadata, hidden, replicated, @@ -164,6 +210,8 @@ public DataStream( DataStream( String name, long generation, + ComposableIndexTemplate indexTemplate, + ComposableIndexTemplate indexTemplateOverrides, Map metadata, boolean hidden, boolean replicated, @@ -178,6 +226,8 @@ public DataStream( ) { this.name = name; this.generation = generation; + this.indexTemplate = indexTemplate; + this.indexTemplateOverrides = indexTemplateOverrides; this.metadata = metadata; assert system == false || hidden; // system indices must be hidden this.hidden = hidden; @@ -231,9 +281,20 @@ public static DataStream read(StreamInput in) throws IOException { // is still behind a feature flag in previous version we use the default value instead of explicitly disabling it. dataStreamOptions = failureStoreEnabled ? DataStreamOptions.FAILURE_STORE_ENABLED : null; } + final ComposableIndexTemplate indexTemplate; + final ComposableIndexTemplate indexTemplateOverrides; + if (in.getTransportVersion().onOrAfter(TransportVersions.TEMPLATES_IN_DATA_STREAMS)) { + indexTemplate = in.readOptionalWriteable(ComposableIndexTemplate::new); + indexTemplateOverrides = in.readOptionalWriteable(ComposableIndexTemplate::new); + } else { + indexTemplate = null; + indexTemplateOverrides = null; + } return new DataStream( name, generation, + indexTemplate, + indexTemplateOverrides, metadata, hidden, replicated, @@ -325,6 +386,101 @@ public boolean rolloverOnWrite() { return backingIndices.rolloverOnWrite; } + public ComposableIndexTemplate getIndexTemplate() { + return indexTemplate; + } + + public ComposableIndexTemplate getIndexTemplateOverrides() { + return indexTemplateOverrides; + } + + public ComposableIndexTemplate getEffectiveIndexTemplate() { + return mergeTemplates(indexTemplate, indexTemplateOverrides); + } + + public static ComposableIndexTemplate mergeTemplates( + ComposableIndexTemplate originalTemplate, + ComposableIndexTemplate templateOverrides + ) { + if (originalTemplate == null) { + return templateOverrides; + } + if (templateOverrides == null) { + return originalTemplate; + } + ComposableIndexTemplate.Builder mergedIndexTemplateBuilder = originalTemplate.toBuilder(); + Template.Builder mergedTemplateBuilder; + if (originalTemplate.template() == null) { + if (templateOverrides.template() == null) { + return originalTemplate; // no merging can be done + } + mergedTemplateBuilder = Template.builder(templateOverrides.template()); + } else { + mergedTemplateBuilder = Template.builder(originalTemplate.template()); + } + if (originalTemplate.template() != null && templateOverrides.template() != null) { + CompressedXContent originalMappings = originalTemplate.template().mappings(); + CompressedXContent overriddenMappings = templateOverrides.template().mappings(); + CompressedXContent mergedMappings = null; + try { + mergedMappings = mergeMappings(originalMappings, overriddenMappings); + } catch (IOException e) { + throw new RuntimeException(e); + } + mergedTemplateBuilder.mappings(mergedMappings); + + Settings originalSettings = originalTemplate.template().settings(); + Settings overriddenSettings = templateOverrides.template().settings(); + Settings.Builder settingsBuilder = Settings.builder().put(originalSettings).put(overriddenSettings); + mergedTemplateBuilder.settings(settingsBuilder); + mergedIndexTemplateBuilder.template(mergedTemplateBuilder); + } + return mergedIndexTemplateBuilder.build(); + } + + private static CompressedXContent mergeMappings(@Nullable CompressedXContent originalMapping, CompressedXContent mappingAddition) + throws IOException { + if (mappingAddition == null) { + return originalMapping; + } + if (originalMapping == null) { + return mappingAddition; + } + Map combinedMappingMap = new HashMap<>(); + combinedMappingMap.putAll(XContentHelper.convertToMap(originalMapping.uncompressed(), true, XContentType.JSON).v2()); + XContentHelper.update( + combinedMappingMap, + XContentHelper.convertToMap(mappingAddition.uncompressed(), true, XContentType.JSON).v2(), + true + ); + if (combinedMappingMap.isEmpty()) { + return null; + } else { + return convertMappingMapToXContent(combinedMappingMap); + } + } + + private static CompressedXContent convertMappingMapToXContent(Map rawAdditionalMapping) throws IOException { + CompressedXContent compressedXContent; + if (rawAdditionalMapping == null || rawAdditionalMapping.isEmpty()) { + compressedXContent = null; + } else { + try (var parser = XContentHelper.mapToXContentParser(XContentParserConfiguration.EMPTY, rawAdditionalMapping)) { + compressedXContent = mappingFromXContent(parser); + } + } + return compressedXContent; + } + + private static CompressedXContent mappingFromXContent(XContentParser parser) throws IOException { + XContentParser.Token token = parser.nextToken(); + if (token == XContentParser.Token.START_OBJECT) { + return new CompressedXContent(Strings.toString(XContentFactory.jsonBuilder().map(parser.mapOrdered()))); + } else { + throw new IllegalArgumentException("Unexpected token: " + token); + } + } + /** * We define that a data stream is considered internal either if it is a system index or if * its name starts with a dot. @@ -1258,6 +1414,10 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getTransportVersion().onOrAfter(DataStream.ADD_DATA_STREAM_OPTIONS_VERSION)) { out.writeOptionalWriteable(dataStreamOptions.isEmpty() ? null : dataStreamOptions); } + if (out.getTransportVersion().onOrAfter(TransportVersions.TEMPLATES_IN_DATA_STREAMS)) { + out.writeOptionalWriteable(indexTemplate); + out.writeOptionalWriteable(indexTemplateOverrides); + } } public static final ParseField NAME_FIELD = new ParseField("name"); @@ -1278,6 +1438,8 @@ public void writeTo(StreamOutput out) throws IOException { public static final ParseField FAILURE_ROLLOVER_ON_WRITE_FIELD = new ParseField("failure_rollover_on_write"); public static final ParseField FAILURE_AUTO_SHARDING_FIELD = new ParseField("failure_auto_sharding"); public static final ParseField DATA_STREAM_OPTIONS_FIELD = new ParseField("options"); + public static final ParseField INDEX_TEMPLATE_FIELD = new ParseField("index_template"); + public static final ParseField INDEX_TEMPLATE_OVERRIDES_FIELD = new ParseField("index_template_overrides"); @SuppressWarnings("unchecked") private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>("data_stream", args -> { @@ -1302,9 +1464,23 @@ public void writeTo(StreamOutput out) throws IOException { dataStreamOptions = DataStreamOptions.FAILURE_STORE_ENABLED; } } + ComposableIndexTemplate indexTemplate; + if (args[17] != null) { + indexTemplate = (ComposableIndexTemplate) args[17]; + } else { + indexTemplate = null; + } + ComposableIndexTemplate indexTemplateOverrides; + if (args[18] != null) { + indexTemplateOverrides = (ComposableIndexTemplate) args[18]; + } else { + indexTemplateOverrides = null; + } return new DataStream( (String) args[0], (Long) args[2], + indexTemplate, + indexTemplateOverrides, (Map) args[3], args[4] != null && (boolean) args[4], args[5] != null && (boolean) args[5], @@ -1369,6 +1545,16 @@ public void writeTo(StreamOutput out) throws IOException { (p, c) -> DataStreamOptions.fromXContent(p), DATA_STREAM_OPTIONS_FIELD ); + PARSER.declareObject( + ConstructingObjectParser.optionalConstructorArg(), + (p, c) -> ComposableIndexTemplate.parse(p), + INDEX_TEMPLATE_FIELD + ); + PARSER.declareObject( + ConstructingObjectParser.optionalConstructorArg(), + (p, c) -> ComposableIndexTemplate.parse(p), + INDEX_TEMPLATE_OVERRIDES_FIELD + ); } } @@ -1419,6 +1605,14 @@ public XContentBuilder toXContent( builder.field(DATA_STREAM_OPTIONS_FIELD.getPreferredName()); dataStreamOptions.toXContent(builder, params); } + if (indexTemplate != null) { + builder.field(INDEX_TEMPLATE_FIELD.getPreferredName()); + indexTemplate.toXContent(builder, params); + } + if (indexTemplateOverrides != null) { + builder.field(INDEX_TEMPLATE_OVERRIDES_FIELD.getPreferredName()); + indexTemplateOverrides.toXContent(builder, params); + } } if (indexMode != null) { builder.field(INDEX_MODE.getPreferredName(), indexMode); @@ -1772,6 +1966,8 @@ public static class Builder { private LongSupplier timeProvider = System::currentTimeMillis; private String name; private long generation = 1; + ComposableIndexTemplate indexTemplate = null; + ComposableIndexTemplate indexTemplateOverrides = null; @Nullable private Map metadata = null; private boolean hidden = false; @@ -1800,6 +1996,8 @@ private Builder(DataStream dataStream) { timeProvider = dataStream.timeProvider; name = dataStream.name; generation = dataStream.generation; + indexTemplate = dataStream.indexTemplate; + indexTemplateOverrides = dataStream.indexTemplateOverrides; metadata = dataStream.metadata; hidden = dataStream.hidden; replicated = dataStream.replicated; @@ -1891,6 +2089,8 @@ public DataStream build() { return new DataStream( name, generation, + indexTemplate, + indexTemplateOverrides, metadata, hidden, replicated, diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java index 46e5aa4ec1a38..e6fa38b0661b8 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java @@ -150,7 +150,8 @@ public record CreateDataStreamClusterStateUpdateRequest( @Nullable SystemDataStreamDescriptor systemDataStreamDescriptor, TimeValue masterNodeTimeout, TimeValue ackTimeout, - boolean performReroute + boolean performReroute, + ComposableIndexTemplate indexTemplate ) { public CreateDataStreamClusterStateUpdateRequest { Objects.requireNonNull(name); @@ -170,7 +171,16 @@ public CreateDataStreamClusterStateUpdateRequest( TimeValue ackTimeout, boolean performReroute ) { - this(projectId, name, System.currentTimeMillis(), systemDataStreamDescriptor, masterNodeTimeout, ackTimeout, performReroute); + this( + projectId, + name, + System.currentTimeMillis(), + systemDataStreamDescriptor, + masterNodeTimeout, + ackTimeout, + performReroute, + null + ); } public boolean isSystem() { @@ -258,9 +268,22 @@ static ClusterState createDataStream( } final boolean isSystem = systemDataStreamDescriptor != null; + ComposableIndexTemplate templateOverrides = request.indexTemplate(); final ComposableIndexTemplate template = isSystem ? systemDataStreamDescriptor.getComposableIndexTemplate() - : lookupTemplateForDataStream(dataStreamName, currentProject); + : lookupTemplateForDataStream(dataStreamName, currentProject, false); + ComposableIndexTemplate mergedTemplate = DataStream.mergeTemplates(template, templateOverrides); + if (mergedTemplate == null) { + throw new IllegalArgumentException("no matching index template found for data stream [" + dataStreamName + "]"); + } else if (mergedTemplate.getDataStreamTemplate() == null) { + throw new IllegalArgumentException( + "matching index template [" + + MetadataIndexTemplateService.findV2Template(currentProject, dataStreamName, false) + + "] for data stream [" + + dataStreamName + + "] has no data stream template" + ); + } // The initial backing index and the initial failure store index will have the same initial generation. // This is not a problem as both have different prefixes (`.ds-` vs `.fs-`) and both will be using the same `generation` field // when rolling over in the future. @@ -268,7 +291,7 @@ static ClusterState createDataStream( final DataStreamOptions dataStreamOptions = resolveDataStreamOptions( currentProject, systemDataStreamDescriptor, - template, + mergedTemplate, isSystem ); @@ -290,7 +313,7 @@ static ClusterState createDataStream( currentState, request.startTime(), dataStreamName, - template, + mergedTemplate, failureStoreIndexName, null ); @@ -307,7 +330,7 @@ static ClusterState createDataStream( dataStreamName, systemDataStreamDescriptor, isSystem, - template, + mergedTemplate, firstBackingIndexName ); writeIndex = currentState.metadata().getProject(request.projectId()).index(firstBackingIndexName); @@ -325,19 +348,26 @@ static ClusterState createDataStream( .map(IndexMetadata::getIndex) .collect(Collectors.toCollection(ArrayList::new)); dsBackingIndices.add(writeIndex.getIndex()); - boolean hidden = isSystem || template.getDataStreamTemplate().isHidden(); - final IndexMode indexMode = newProject.retrieveIndexModeFromTemplate(template); - final DataStreamLifecycle lifecycle = resolveDataStreamLifecycle(currentProject, systemDataStreamDescriptor, template, isSystem); + boolean hidden = isSystem || mergedTemplate.getDataStreamTemplate().isHidden(); + final IndexMode indexMode = newProject.retrieveIndexModeFromTemplate(mergedTemplate); + final DataStreamLifecycle lifecycle = resolveDataStreamLifecycle( + currentProject, + systemDataStreamDescriptor, + mergedTemplate, + isSystem + ); List failureIndices = failureStoreIndex == null ? List.of() : List.of(failureStoreIndex.getIndex()); DataStream newDataStream = new DataStream( dataStreamName, initialGeneration, - template.metadata() != null ? Map.copyOf(template.metadata()) : null, + template, + templateOverrides, + mergedTemplate.metadata() != null ? Map.copyOf(mergedTemplate.metadata()) : null, hidden, false, isSystem, System::currentTimeMillis, - template.getDataStreamTemplate().isAllowCustomRouting(), + mergedTemplate.getDataStreamTemplate().isAllowCustomRouting(), indexMode, lifecycle == null && isDslOnlyMode ? DataStreamLifecycle.DEFAULT : lifecycle, dataStreamOptions, @@ -348,7 +378,7 @@ static ClusterState createDataStream( ); ProjectMetadata.Builder builder = ProjectMetadata.builder(newProject).put(newDataStream); List aliases = new ArrayList<>(); - var resolvedAliases = MetadataIndexTemplateService.resolveAliases(newProject, template); + var resolvedAliases = MetadataIndexTemplateService.resolveAliases(newProject, mergedTemplate); for (var resolvedAliasMap : resolvedAliases) { for (var alias : resolvedAliasMap.values()) { aliases.add(alias.getAlias()); @@ -463,16 +493,30 @@ public static ClusterState createFailureStoreIndex( return currentState; } - public static ComposableIndexTemplate lookupTemplateForDataStream(String dataStreamName, ProjectMetadata projectMetadata) { - final String v2Template = MetadataIndexTemplateService.findV2Template(projectMetadata, dataStreamName, false); - if (v2Template == null) { - throw new IllegalArgumentException("no matching index template found for data stream [" + dataStreamName + "]"); + public static ComposableIndexTemplate lookupTemplateForDataStream( + String dataStreamName, + ProjectMetadata projectMetadata, + boolean throwExceptionIfMissing + ) throws IOException { + DataStream dataStream = projectMetadata.dataStreams().get(dataStreamName); + ComposableIndexTemplate composableIndexTemplate = null; + if (dataStream != null) { + composableIndexTemplate = projectMetadata.dataStreams().get(dataStreamName).getEffectiveIndexTemplate(); } - ComposableIndexTemplate composableIndexTemplate = projectMetadata.templatesV2().get(v2Template); - if (composableIndexTemplate.getDataStreamTemplate() == null) { - throw new IllegalArgumentException( - "matching index template [" + v2Template + "] for data stream [" + dataStreamName + "] has no data stream template" - ); + if (composableIndexTemplate == null) { + final String v2Template = MetadataIndexTemplateService.findV2Template(projectMetadata, dataStreamName, false); + if (v2Template == null) { + if (throwExceptionIfMissing) { + throw new IllegalArgumentException("no matching index template found for data stream [" + dataStreamName + "]"); + } + } else { + composableIndexTemplate = projectMetadata.templatesV2().get(v2Template); + if (composableIndexTemplate.getDataStreamTemplate() == null && throwExceptionIfMissing) { + throw new IllegalArgumentException( + "matching index template [" + v2Template + "] for data stream [" + dataStreamName + "] has no data stream template" + ); + } + } } return composableIndexTemplate; } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java index 6355fdc8387f9..bc9719da313af 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java @@ -441,6 +441,17 @@ public ClusterState applyCreateIndexRequest( ? IndexMetadata.INDEX_HIDDEN_SETTING.get(request.settings()) : null; + ComposableIndexTemplate templateFromRequest = request.matchingTemplate(); + if (templateFromRequest != null) { + return applyCreateIndexRequestWithV2Template( + currentState, + request, + silent, + templateFromRequest, + metadataTransformer, + rerouteListener + ); + } // Check to see if a v2 template matched final String v2Template = MetadataIndexTemplateService.findV2Template( projectMetadata, @@ -770,6 +781,71 @@ private ClusterState applyCreateIndexRequestWithV2Template( ); } + private ClusterState applyCreateIndexRequestWithV2Template( + final ClusterState currentState, + final CreateIndexClusterStateUpdateRequest request, + final boolean silent, + final ComposableIndexTemplate template, + final BiConsumer projectMetadataTransformer, + final ActionListener rerouteListener + ) throws Exception { + + final Metadata metadata = currentState.getMetadata(); + final ProjectMetadata projectMetadata = metadata.getProject(request.projectId()); + final RoutingTable routingTable = currentState.routingTable(request.projectId()); + + final boolean isDataStream = template.getDataStreamTemplate() != null; + + final List mappings = collectV2Mappings( + request.mappings(), + projectMetadata, + template, + xContentRegistry, + request.index() + ); + final Settings aggregatedIndexSettings = aggregateIndexSettings( + metadata, + projectMetadata, + currentState.nodes(), + currentState.blocks(), + routingTable, + request, + resolveSettings(template, projectMetadata.componentTemplates()), + mappings, + null, + settings, + indexScopedSettings, + shardLimitValidator, + indexSettingProviders + ); + int routingNumShards = getIndexNumberOfRoutingShards(aggregatedIndexSettings, null); + IndexMetadata tmpImd = buildAndValidateTemporaryIndexMetadata(aggregatedIndexSettings, request, routingNumShards); + + return applyCreateIndexWithTemporaryService( + currentState, + request, + silent, + null, + tmpImd, + mappings, + indexService -> resolveAndValidateAliases( + request.index(), + // data stream aliases are created separately in MetadataCreateDataStreamService::createDataStream + isDataStream ? Set.of() : request.aliases(), + isDataStream ? List.of() : MetadataIndexTemplateService.resolveAliases(projectMetadata, template), + projectMetadata, + xContentRegistry, + // the context is used ony for validation so it's fine to pass fake values for the shard id and the current timestamp + indexService.newSearchExecutionContext(0, 0, null, () -> 0L, null, emptyMap()), + IndexService.dateMathExpressionResolverAt(request.getNameResolvedAt()), + systemIndices::isSystemName + ), + Collections.singletonList("provided in request"), + projectMetadataTransformer, + rerouteListener + ); + } + private ClusterState applyCreateIndexRequestForSystemIndex( final ClusterState currentState, final CreateIndexClusterStateUpdateRequest request, @@ -917,6 +993,21 @@ public static List collectV2Mappings( return collectV2Mappings(requestMappings, templateMappings, xContentRegistry); } + public static List collectV2Mappings( + @Nullable final String requestMappings, + final ProjectMetadata projectMetadata, + final ComposableIndexTemplate template, + final NamedXContentRegistry xContentRegistry, + final String indexName + ) throws Exception { + List templateMappings = MetadataIndexTemplateService.collectMappings( + template, + projectMetadata.componentTemplates(), + indexName + ); + return collectV2Mappings(requestMappings, templateMappings, xContentRegistry); + } + private static List collectV2Mappings( @Nullable final String requestMappings, final List templateMappings, diff --git a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java index 5cd45dba4551e..8c7c8262d1238 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java @@ -551,7 +551,9 @@ static void warnIfIndexTemplateMissing( for (var entry : dataStreamsToRestore.entrySet()) { String name = entry.getKey(); DataStream dataStream = entry.getValue(); - if (dataStream.isSystem() == false && templatePatterns.stream().noneMatch(pattern -> Regex.simpleMatch(pattern, name))) { + if (dataStream.isSystem() == false + && templatePatterns.stream().noneMatch(pattern -> Regex.simpleMatch(pattern, name)) + && dataStreamsToRestore.get(name).getEffectiveIndexTemplate() == null) { String warningMessage = format( "Snapshot [%s] contains data stream [%s] but custer does not have a matching index template. This will cause" + " rollover to fail until a matching index template is created", diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java index c93c9a93b9e50..b3ce45ad66d03 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java @@ -177,6 +177,8 @@ protected DataStream mutateInstance(DataStream instance) { return new DataStream( name, generation, + null, + null, metadata, isHidden, isReplicated, diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java b/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java index d171d89a2457c..58960d8bf18d7 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java @@ -355,6 +355,8 @@ public static DataStream randomInstance(String dataStreamName, LongSupplier time return new DataStream( dataStreamName, generation, + null, + null, metadata, randomBoolean(), replicated,