Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/125980.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 125980
summary: Data stream template experiment
area: Data streams
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.MetadataCreateDataStreamService;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -68,6 +69,13 @@ protected void masterOperation(
request.getName(),
threadPool.getThreadContext()
);
ComposableIndexTemplate indexTemplate = request.getIndexTemplate();
if (indexTemplate != null) {
if (indexTemplate.indexPatterns() != null && indexTemplate.indexPatterns().isEmpty() == false) {
// TODO lots of other things aren't supported
throw new IllegalArgumentException("Cannot specify index_patterns");
}
}
MetadataCreateDataStreamService.CreateDataStreamClusterStateUpdateRequest updateRequest =
new MetadataCreateDataStreamService.CreateDataStreamClusterStateUpdateRequest(
projectResolver.getProjectId(),
Expand All @@ -76,7 +84,8 @@ protected void masterOperation(
systemDataStreamDescriptor,
request.masterNodeTimeout(),
request.ackTimeout(),
true
true,
indexTemplate
);
metadataCreateDataStreamService.createDataStream(updateRequest, listener);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,9 +257,15 @@ static GetDataStreamAction.Response innerOperation(
indexTemplatePreferIlmValue = PREFER_ILM_SETTING.get(settings);
}
} else {
ComposableIndexTemplate effectiveTemplate = dataStream.getEffectiveIndexTemplate();
indexTemplate = MetadataIndexTemplateService.findV2Template(state.metadata(), dataStream.getName(), false);
if (indexTemplate != null) {
Settings settings = MetadataIndexTemplateService.resolveSettings(state.metadata(), indexTemplate);
if (indexTemplate != null || effectiveTemplate.template() != null) {
Settings settings;
if (effectiveTemplate.template() != null && effectiveTemplate.template().settings() != null) {
settings = effectiveTemplate.template().settings();
} else {
settings = MetadataIndexTemplateService.resolveSettings(state.metadata(), indexTemplate);
}
ilmPolicyName = settings.get(IndexMetadata.LIFECYCLE_NAME);
if (indexMode == null && state.metadata().templatesV2().get(indexTemplate) != null) {
indexMode = resolveMode(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@

import org.elasticsearch.action.datastreams.CreateDataStreamAction;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestUtils;
import org.elasticsearch.rest.Scope;
import org.elasticsearch.rest.ServerlessScope;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xcontent.XContentParser;

import java.io.IOException;
import java.util.List;
Expand All @@ -37,10 +39,19 @@ public List<Route> routes() {

@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
ComposableIndexTemplate indexTemplate;
if (request.hasContent()) {
try (XContentParser parser = request.contentOrSourceParamParser()) {
indexTemplate = ComposableIndexTemplate.parse(parser);
}
} else {
indexTemplate = null;
}
CreateDataStreamAction.Request putDataStreamRequest = new CreateDataStreamAction.Request(
RestUtils.getMasterNodeTimeout(request),
RestUtils.getAckTimeout(request),
request.param("name")
request.param("name"),
indexTemplate
);
return channel -> client.execute(CreateDataStreamAction.INSTANCE, putDataStreamRequest, new RestToXContentListener<>(channel));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,8 @@ private ClusterState createDataStream(ClusterState state, String name, Instant t
null,
TimeValue.ZERO,
TimeValue.ZERO,
false
false,
null
);
return createDataStreamService.createDataStream(request, state, ActionListener.noop(), false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ static TransportVersion def(int id) {
public static final TransportVersion INDEX_STATS_AND_METADATA_INCLUDE_PEAK_WRITE_LOAD = def(9_041_0_00);
public static final TransportVersion REPOSITORIES_METADATA_AS_PROJECT_CUSTOM = def(9_042_0_00);
public static final TransportVersion BATCHED_QUERY_PHASE_VERSION = def(9_043_0_00);
public static final TransportVersion TEMPLATES_IN_DATA_STREAMS = def(9_044_0_00);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,12 @@ private RolloverResult rolloverDataStream(
final SystemDataStreamDescriptor systemDataStreamDescriptor;
if (dataStream.isSystem() == false) {
systemDataStreamDescriptor = null;
templateV2 = lookupTemplateForDataStream(dataStreamName, metadata);
ComposableIndexTemplate effectiveIndexTemplate = dataStream.getEffectiveIndexTemplate();
if (effectiveIndexTemplate == null) {
templateV2 = lookupTemplateForDataStream(dataStreamName, metadata, true);
} else {
templateV2 = effectiveIndexTemplate;
}
} else {
systemDataStreamDescriptor = systemIndices.findMatchingDataStreamDescriptor(dataStreamName);
if (systemDataStreamDescriptor == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@
*/
package org.elasticsearch.action.datastreams;

import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.ValidateActions;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand All @@ -36,23 +38,44 @@ public static class Request extends AcknowledgedRequest<Request> implements Indi

private final String name;
private final long startTime;
private final ComposableIndexTemplate indexTemplate;

public Request(TimeValue masterNodeTimeout, TimeValue ackTimeout, String name) {
this(masterNodeTimeout, ackTimeout, name, null);
}

public Request(TimeValue masterNodeTimeout, TimeValue ackTimeout, String name, ComposableIndexTemplate indexTemplate) {
super(masterNodeTimeout, ackTimeout);
this.name = name;
this.indexTemplate = indexTemplate;
this.startTime = System.currentTimeMillis();
}

public Request(TimeValue masterNodeTimeout, TimeValue ackTimeout, String name, long startTime) {
this(masterNodeTimeout, ackTimeout, name, null, startTime);
}

public Request(
TimeValue masterNodeTimeout,
TimeValue ackTimeout,
String name,
ComposableIndexTemplate indexTemplate,
long startTime
) {
super(masterNodeTimeout, ackTimeout);
this.name = name;
this.indexTemplate = indexTemplate;
this.startTime = startTime;
}

public String getName() {
return name;
}

public ComposableIndexTemplate getIndexTemplate() {
return indexTemplate;
}

public long getStartTime() {
return startTime;
}
Expand All @@ -70,26 +93,34 @@ public Request(StreamInput in) throws IOException {
super(in);
this.name = in.readString();
this.startTime = in.readVLong();
if (in.getTransportVersion().onOrAfter(TransportVersions.TEMPLATES_IN_DATA_STREAMS)) {
this.indexTemplate = in.readOptionalWriteable(ComposableIndexTemplate::new);
} else {
this.indexTemplate = null;
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(name);
out.writeVLong(startTime);
if (out.getTransportVersion().onOrAfter(TransportVersions.TEMPLATES_IN_DATA_STREAMS)) {
out.writeOptionalWriteable(indexTemplate);
}
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Request request = (Request) o;
return name.equals(request.name) && startTime == request.startTime;
return name.equals(request.name) && startTime == request.startTime && Objects.equals(indexTemplate, request.indexTemplate);
}

@Override
public int hashCode() {
return Objects.hash(name, startTime);
return Objects.hash(name, startTime, indexTemplate);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.action.support.local.LocalClusterStateRequest;
import org.elasticsearch.cluster.SimpleDiffable;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.DataStreamAutoShardingEvent;
import org.elasticsearch.cluster.metadata.DataStreamGlobalRetention;
Expand Down Expand Up @@ -213,6 +214,9 @@ public static class DataStreamInfo implements SimpleDiffable<DataStreamInfo>, To

public static final ParseField STATUS_FIELD = new ParseField("status");
public static final ParseField INDEX_TEMPLATE_FIELD = new ParseField("template");
public static final ParseField INDEX_TEMPLATE_CONTENTS_FIELD = new ParseField("index_template");
public static final ParseField INDEX_TEMPLATE_OVERRIDES_FIELD = new ParseField("index_template_overrides");
public static final ParseField EFFECTIVE_INDEX_TEMPLATE_FIELD = new ParseField("effective_index_template");
public static final ParseField PREFER_ILM = new ParseField("prefer_ilm");
public static final ParseField MANAGED_BY = new ParseField("managed_by");
public static final ParseField NEXT_GENERATION_INDEX_MANAGED_BY = new ParseField("next_generation_managed_by");
Expand Down Expand Up @@ -426,6 +430,20 @@ public XContentBuilder toXContent(
indicesToXContent(builder, dataStream.getFailureIndices());
addAutoShardingEvent(builder, params, dataStream.getFailureComponent().getAutoShardingEvent());
builder.endObject();

if (dataStream.getIndexTemplate() != null) {
builder.field(INDEX_TEMPLATE_CONTENTS_FIELD.getPreferredName());
dataStream.getIndexTemplate().toXContent(builder, params);
}
if (dataStream.getIndexTemplateOverrides() != null) {
builder.field(INDEX_TEMPLATE_OVERRIDES_FIELD.getPreferredName());
dataStream.getIndexTemplateOverrides().toXContent(builder, params);
}
ComposableIndexTemplate effectiveTemplate = dataStream.getEffectiveIndexTemplate();
if (effectiveTemplate != null) {
builder.field(EFFECTIVE_INDEX_TEMPLATE_FIELD.getPreferredName());
effectiveTemplate.toXContent(builder, params);
}
}
builder.endObject();
return builder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public class ComposableIndexTemplate implements SimpleDiffable<ComposableIndexTe
);

static {
PARSER.declareStringArray(ConstructingObjectParser.constructorArg(), INDEX_PATTERNS);
PARSER.declareStringArray(ConstructingObjectParser.optionalConstructorArg(), INDEX_PATTERNS);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), Template.PARSER, TEMPLATE);
PARSER.declareStringArray(ConstructingObjectParser.optionalConstructorArg(), COMPOSED_OF);
PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), PRIORITY);
Expand Down
Loading
Loading