Skip to content

Commit e93ea67

Browse files
committed
Adding settings to data streams
1 parent e42c118 commit e93ea67

File tree

9 files changed

+279
-7
lines changed

9 files changed

+279
-7
lines changed

modules/data-streams/src/test/java/org/elasticsearch/datastreams/UpdateTimeSeriesRangeServiceTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,7 @@ public void testUpdateTimeSeriesTemporalOneBadDataStream() {
257257
ds2Indices,
258258
2,
259259
ds2.getMetadata(),
260+
ds2.getSettings(),
260261
ds2.isHidden(),
261262
ds2.isReplicated(),
262263
ds2.isSystem(),

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,7 @@ static TransportVersion def(int id) {
223223
public static final TransportVersion ESQL_REPORT_SHARD_PARTITIONING = def(9_050_0_00);
224224
public static final TransportVersion ESQL_QUERY_PLANNING_DURATION = def(9_051_0_00);
225225
public static final TransportVersion ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED = def(9_052_0_00);
226+
public static final TransportVersion SETTINGS_IN_DATA_STREAMS = def(9_053_00_0);
226227

227228
/*
228229
* STOP! READ THIS FIRST! No, really,

server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java

Lines changed: 91 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.elasticsearch.common.bytes.BytesReference;
3232
import org.elasticsearch.common.io.stream.StreamInput;
3333
import org.elasticsearch.common.io.stream.StreamOutput;
34+
import org.elasticsearch.common.settings.Settings;
3435
import org.elasticsearch.common.time.DateFormatter;
3536
import org.elasticsearch.common.time.DateFormatters;
3637
import org.elasticsearch.common.util.FeatureFlag;
@@ -57,6 +58,7 @@
5758
import java.util.ArrayList;
5859
import java.util.Comparator;
5960
import java.util.HashMap;
61+
import java.util.HashSet;
6062
import java.util.List;
6163
import java.util.Locale;
6264
import java.util.Map;
@@ -67,6 +69,7 @@
6769
import java.util.function.Predicate;
6870
import java.util.stream.Collectors;
6971

72+
import static org.elasticsearch.cluster.metadata.MetadataCreateDataStreamService.lookupTemplateForDataStream;
7073
import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
7174
import static org.elasticsearch.index.IndexSettings.LIFECYCLE_ORIGINATION_DATE;
7275
import static org.elasticsearch.index.IndexSettings.PREFER_ILM_SETTING;
@@ -118,6 +121,7 @@ public static boolean isFailureStoreFeatureFlagEnabled() {
118121
private final long generation;
119122
@Nullable
120123
private final Map<String, Object> metadata;
124+
private final Settings settings;
121125
private final boolean hidden;
122126
private final boolean replicated;
123127
private final boolean system;
@@ -137,6 +141,7 @@ public DataStream(
137141
List<Index> indices,
138142
long generation,
139143
Map<String, Object> metadata,
144+
Settings settings,
140145
boolean hidden,
141146
boolean replicated,
142147
boolean system,
@@ -152,6 +157,7 @@ public DataStream(
152157
name,
153158
generation,
154159
metadata,
160+
settings,
155161
hidden,
156162
replicated,
157163
system,
@@ -169,6 +175,7 @@ public DataStream(
169175
String name,
170176
long generation,
171177
Map<String, Object> metadata,
178+
Settings settings,
172179
boolean hidden,
173180
boolean replicated,
174181
boolean system,
@@ -183,6 +190,7 @@ public DataStream(
183190
this.name = name;
184191
this.generation = generation;
185192
this.metadata = metadata;
193+
this.settings = Objects.requireNonNull(settings);
186194
assert system == false || hidden; // system indices must be hidden
187195
this.hidden = hidden;
188196
this.replicated = replicated;
@@ -235,10 +243,17 @@ public static DataStream read(StreamInput in) throws IOException {
235243
// is still behind a feature flag in previous version we use the default value instead of explicitly disabling it.
236244
dataStreamOptions = failureStoreEnabled ? DataStreamOptions.FAILURE_STORE_ENABLED : null;
237245
}
246+
final Settings settings;
247+
if (in.getTransportVersion().onOrAfter(TransportVersions.SETTINGS_IN_DATA_STREAMS)) {
248+
settings = Settings.readSettingsFromStream(in);
249+
} else {
250+
settings = Settings.EMPTY;
251+
}
238252
return new DataStream(
239253
name,
240254
generation,
241255
metadata,
256+
settings,
242257
hidden,
243258
replicated,
244259
system,
@@ -329,6 +344,52 @@ public boolean rolloverOnWrite() {
329344
return backingIndices.rolloverOnWrite;
330345
}
331346

347+
public ComposableIndexTemplate getEffectiveIndexTemplate(ProjectMetadata projectMetadata) {
348+
return mergeSettingsIntoTemplate(lookupTemplateForDataStream(name, projectMetadata), settings);
349+
}
350+
351+
public static ComposableIndexTemplate getEffectiveIndexTemplate(
352+
String dataStreamName,
353+
ProjectMetadata projectMetadata,
354+
Settings settingsOverrides
355+
) {
356+
return mergeSettingsIntoTemplate(lookupTemplateForDataStream(dataStreamName, projectMetadata), settingsOverrides);
357+
}
358+
359+
public Settings getEffectiveSettings(ProjectMetadata projectMetadata) {
360+
ComposableIndexTemplate template = getMatchingIndexTemplate(projectMetadata);
361+
return mergeSettings(template.template() == null ? Settings.EMPTY : template.template().settings(), settings);
362+
}
363+
364+
private ComposableIndexTemplate getMatchingIndexTemplate(ProjectMetadata projectMetadata) {
365+
return lookupTemplateForDataStream(name, projectMetadata);
366+
}
367+
368+
public static ComposableIndexTemplate mergeSettingsIntoTemplate(ComposableIndexTemplate template, Settings settings) {
369+
if (Settings.EMPTY.equals(settings)) {
370+
return template;
371+
}
372+
ComposableIndexTemplate.Builder mergedIndexTemplateBuilder = template.toBuilder();
373+
assert template.template() != null : "Template is unexpectedly null";
374+
Template.Builder mergedTemplateBuilder = Template.builder(template.template());
375+
mergedTemplateBuilder.settings(mergeSettings(template.template().settings(), settings));
376+
mergedIndexTemplateBuilder.template(mergedTemplateBuilder);
377+
return mergedIndexTemplateBuilder.build();
378+
}
379+
380+
private static Settings mergeSettings(Settings originalSettings, Settings newSettings) {
381+
if (Settings.EMPTY.equals(newSettings)) {
382+
return Objects.requireNonNullElse(originalSettings, Settings.EMPTY);
383+
}
384+
Settings.Builder settingsBuilder = Settings.builder().put(originalSettings).put(newSettings);
385+
for (String settingName : new HashSet<>(settingsBuilder.keys())) {
386+
if (settingsBuilder.get(settingName) == null) {
387+
settingsBuilder.remove(settingName);
388+
}
389+
}
390+
return settingsBuilder.build();
391+
}
392+
332393
/**
333394
* We define that a data stream is considered internal either if it is a system index or if
334395
* its name starts with a dot.
@@ -440,6 +501,10 @@ public Map<String, Object> getMetadata() {
440501
return metadata;
441502
}
442503

504+
public Settings getSettings() {
505+
return settings;
506+
}
507+
443508
@Override
444509
public boolean isHidden() {
445510
return hidden;
@@ -1268,6 +1333,9 @@ public void writeTo(StreamOutput out) throws IOException {
12681333
if (out.getTransportVersion().onOrAfter(DataStream.ADD_DATA_STREAM_OPTIONS_VERSION)) {
12691334
out.writeOptionalWriteable(dataStreamOptions.isEmpty() ? null : dataStreamOptions);
12701335
}
1336+
if (out.getTransportVersion().onOrAfter(TransportVersions.SETTINGS_IN_DATA_STREAMS)) {
1337+
settings.writeTo(out);
1338+
}
12711339
}
12721340

12731341
public static final ParseField NAME_FIELD = new ParseField("name");
@@ -1288,26 +1356,28 @@ public void writeTo(StreamOutput out) throws IOException {
12881356
public static final ParseField FAILURE_ROLLOVER_ON_WRITE_FIELD = new ParseField("failure_rollover_on_write");
12891357
public static final ParseField FAILURE_AUTO_SHARDING_FIELD = new ParseField("failure_auto_sharding");
12901358
public static final ParseField DATA_STREAM_OPTIONS_FIELD = new ParseField("options");
1359+
public static final ParseField SETTINGS_FIELD = new ParseField("settings");
12911360

12921361
@SuppressWarnings("unchecked")
12931362
private static final ConstructingObjectParser<DataStream, Void> PARSER = new ConstructingObjectParser<>("data_stream", args -> {
12941363
// Fields behind a feature flag need to be parsed last otherwise the parser will fail when the feature flag is disabled.
12951364
// Until the feature flag is removed we keep them separately to be mindful of this.
1296-
boolean failureStoreEnabled = DataStream.isFailureStoreFeatureFlagEnabled() && args[12] != null && (boolean) args[12];
1365+
boolean failureStoreEnabled = DataStream.isFailureStoreFeatureFlagEnabled() && args[13] != null && (boolean) args[13];
12971366
DataStreamIndices failureIndices = DataStream.isFailureStoreFeatureFlagEnabled()
12981367
? new DataStreamIndices(
12991368
FAILURE_STORE_PREFIX,
1300-
args[13] != null ? (List<Index>) args[13] : List.of(),
1301-
args[14] != null && (boolean) args[14],
1302-
(DataStreamAutoShardingEvent) args[15]
1369+
args[14] != null ? (List<Index>) args[14] : List.of(),
1370+
args[15] != null && (boolean) args[15],
1371+
(DataStreamAutoShardingEvent) args[16]
13031372
)
13041373
: new DataStreamIndices(FAILURE_STORE_PREFIX, List.of(), false, null);
13051374
// We cannot distinguish if failure store was explicitly disabled or not. Given that failure store
13061375
// is still behind a feature flag in previous version we use the default value instead of explicitly disabling it.
13071376
DataStreamOptions dataStreamOptions = DataStreamOptions.EMPTY;
1377+
13081378
if (DataStream.isFailureStoreFeatureFlagEnabled()) {
1309-
if (args[16] != null) {
1310-
dataStreamOptions = (DataStreamOptions) args[16];
1379+
if (args[17] != null) {
1380+
dataStreamOptions = (DataStreamOptions) args[17];
13111381
} else if (failureStoreEnabled) {
13121382
dataStreamOptions = DataStreamOptions.FAILURE_STORE_ENABLED;
13131383
}
@@ -1316,6 +1386,7 @@ public void writeTo(StreamOutput out) throws IOException {
13161386
(String) args[0],
13171387
(Long) args[2],
13181388
(Map<String, Object>) args[3],
1389+
args[12] == null ? Settings.EMPTY : (Settings) args[12],
13191390
args[4] != null && (boolean) args[4],
13201391
args[5] != null && (boolean) args[5],
13211392
args[6] != null && (boolean) args[6],
@@ -1359,6 +1430,7 @@ public void writeTo(StreamOutput out) throws IOException {
13591430
(p, c) -> DataStreamAutoShardingEvent.fromXContent(p),
13601431
AUTO_SHARDING_FIELD
13611432
);
1433+
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> Settings.fromXContent(p), SETTINGS_FIELD);
13621434
// The fields behind the feature flag should always be last.
13631435
if (DataStream.isFailureStoreFeatureFlagEnabled()) {
13641436
// Should be removed after backport
@@ -1415,6 +1487,9 @@ public XContentBuilder toXContent(
14151487
builder.field(REPLICATED_FIELD.getPreferredName(), replicated);
14161488
builder.field(SYSTEM_FIELD.getPreferredName(), system);
14171489
builder.field(ALLOW_CUSTOM_ROUTING.getPreferredName(), allowCustomRouting);
1490+
builder.startObject(SETTINGS_FIELD.getPreferredName());
1491+
this.settings.toXContent(builder, params);
1492+
builder.endObject();
14181493
if (DataStream.isFailureStoreFeatureFlagEnabled()) {
14191494
if (failureIndices.indices.isEmpty() == false) {
14201495
builder.xContentList(FAILURE_INDICES_FIELD.getPreferredName(), failureIndices.indices);
@@ -1455,6 +1530,7 @@ public boolean equals(Object o) {
14551530
return name.equals(that.name)
14561531
&& generation == that.generation
14571532
&& Objects.equals(metadata, that.metadata)
1533+
&& Objects.equals(settings, that.settings)
14581534
&& hidden == that.hidden
14591535
&& system == that.system
14601536
&& replicated == that.replicated
@@ -1472,6 +1548,7 @@ public int hashCode() {
14721548
name,
14731549
generation,
14741550
metadata,
1551+
settings,
14751552
hidden,
14761553
system,
14771554
replicated,
@@ -1784,6 +1861,7 @@ public static class Builder {
17841861
private long generation = 1;
17851862
@Nullable
17861863
private Map<String, Object> metadata = null;
1864+
private Settings settings = Settings.EMPTY;
17871865
private boolean hidden = false;
17881866
private boolean replicated = false;
17891867
private boolean system = false;
@@ -1811,6 +1889,7 @@ private Builder(DataStream dataStream) {
18111889
name = dataStream.name;
18121890
generation = dataStream.generation;
18131891
metadata = dataStream.metadata;
1892+
settings = dataStream.settings;
18141893
hidden = dataStream.hidden;
18151894
replicated = dataStream.replicated;
18161895
system = dataStream.system;
@@ -1842,6 +1921,11 @@ public Builder setMetadata(Map<String, Object> metadata) {
18421921
return this;
18431922
}
18441923

1924+
public Builder setSettings(Settings settings) {
1925+
this.settings = settings;
1926+
return this;
1927+
}
1928+
18451929
public Builder setHidden(boolean hidden) {
18461930
this.hidden = hidden;
18471931
return this;
@@ -1902,6 +1986,7 @@ public DataStream build() {
19021986
name,
19031987
generation,
19041988
metadata,
1989+
settings,
19051990
hidden,
19061991
replicated,
19071992
system,

server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,7 @@ static ClusterState createDataStream(
331331
dataStreamName,
332332
initialGeneration,
333333
template.metadata() != null ? Map.copyOf(template.metadata()) : null,
334+
Settings.EMPTY,
334335
hidden,
335336
false,
336337
isSystem,

0 commit comments

Comments
 (0)