Skip to content

Commit a14e42c

Browse files
committed
storing settings in data streams
1 parent de42ba3 commit a14e42c

File tree

9 files changed

+272
-7
lines changed

9 files changed

+272
-7
lines changed

modules/data-streams/src/test/java/org/elasticsearch/datastreams/UpdateTimeSeriesRangeServiceTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,7 @@ public void testUpdateTimeSeriesTemporalOneBadDataStream() {
257257
ds2Indices,
258258
2,
259259
ds2.getMetadata(),
260+
ds2.getSettings(),
260261
ds2.isHidden(),
261262
ds2.isReplicated(),
262263
ds2.isSystem(),

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,7 @@ static TransportVersion def(int id) {
221221
public static final TransportVersion AMAZON_BEDROCK_TASK_SETTINGS = def(9_049_0_00);
222222
public static final TransportVersion ESQL_REPORT_SHARD_PARTITIONING = def(9_050_0_00);
223223
public static final TransportVersion ESQL_QUERY_PLANNING_DURATION = def(9_051_0_00);
224+
public static final TransportVersion SETTINGS_IN_DATA_STREAMS = def(9_052_00_0);
224225

225226
/*
226227
* STOP! READ THIS FIRST! No, really,

server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java

Lines changed: 84 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.elasticsearch.common.bytes.BytesReference;
3232
import org.elasticsearch.common.io.stream.StreamInput;
3333
import org.elasticsearch.common.io.stream.StreamOutput;
34+
import org.elasticsearch.common.settings.Settings;
3435
import org.elasticsearch.common.time.DateFormatter;
3536
import org.elasticsearch.common.time.DateFormatters;
3637
import org.elasticsearch.common.util.FeatureFlag;
@@ -57,6 +58,7 @@
5758
import java.util.ArrayList;
5859
import java.util.Comparator;
5960
import java.util.HashMap;
61+
import java.util.HashSet;
6062
import java.util.List;
6163
import java.util.Locale;
6264
import java.util.Map;
@@ -67,6 +69,7 @@
6769
import java.util.function.Predicate;
6870
import java.util.stream.Collectors;
6971

72+
import static org.elasticsearch.cluster.metadata.MetadataCreateDataStreamService.lookupTemplateForDataStream;
7073
import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
7174
import static org.elasticsearch.index.IndexSettings.LIFECYCLE_ORIGINATION_DATE;
7275
import static org.elasticsearch.index.IndexSettings.PREFER_ILM_SETTING;
@@ -118,6 +121,7 @@ public static boolean isFailureStoreFeatureFlagEnabled() {
118121
private final long generation;
119122
@Nullable
120123
private final Map<String, Object> metadata;
124+
private final Settings settings;
121125
private final boolean hidden;
122126
private final boolean replicated;
123127
private final boolean system;
@@ -137,6 +141,7 @@ public DataStream(
137141
List<Index> indices,
138142
long generation,
139143
Map<String, Object> metadata,
144+
Settings settings,
140145
boolean hidden,
141146
boolean replicated,
142147
boolean system,
@@ -152,6 +157,7 @@ public DataStream(
152157
name,
153158
generation,
154159
metadata,
160+
settings,
155161
hidden,
156162
replicated,
157163
system,
@@ -169,6 +175,7 @@ public DataStream(
169175
String name,
170176
long generation,
171177
Map<String, Object> metadata,
178+
Settings settings,
172179
boolean hidden,
173180
boolean replicated,
174181
boolean system,
@@ -183,6 +190,7 @@ public DataStream(
183190
this.name = name;
184191
this.generation = generation;
185192
this.metadata = metadata;
193+
this.settings = settings;
186194
assert system == false || hidden; // system indices must be hidden
187195
this.hidden = hidden;
188196
this.replicated = replicated;
@@ -235,10 +243,17 @@ public static DataStream read(StreamInput in) throws IOException {
235243
// is still behind a feature flag in previous version we use the default value instead of explicitly disabling it.
236244
dataStreamOptions = failureStoreEnabled ? DataStreamOptions.FAILURE_STORE_ENABLED : null;
237245
}
246+
final Settings settings;
247+
if (in.getTransportVersion().onOrAfter(TransportVersions.SETTINGS_IN_DATA_STREAMS)) {
248+
settings = Settings.readSettingsFromStream(in);
249+
} else {
250+
settings = Settings.EMPTY;
251+
}
238252
return new DataStream(
239253
name,
240254
generation,
241255
metadata,
256+
settings,
242257
hidden,
243258
replicated,
244259
system,
@@ -329,6 +344,45 @@ public boolean rolloverOnWrite() {
329344
return backingIndices.rolloverOnWrite;
330345
}
331346

347+
public ComposableIndexTemplate getEffectiveIndexTemplate(ProjectMetadata projectMetadata) {
348+
return mergeTemplates(getMatchingIndexTemplate(projectMetadata));
349+
}
350+
351+
public Settings getEffectiveSettings(ProjectMetadata projectMetadata) {
352+
return mergeSettings(getMatchingIndexTemplate(projectMetadata));
353+
}
354+
355+
private ComposableIndexTemplate getMatchingIndexTemplate(ProjectMetadata projectMetadata) {
356+
return lookupTemplateForDataStream(name, projectMetadata);
357+
}
358+
359+
private ComposableIndexTemplate mergeTemplates(ComposableIndexTemplate originalTemplate) {
360+
if (Settings.EMPTY.equals(settings)) {
361+
return originalTemplate;
362+
}
363+
ComposableIndexTemplate.Builder mergedIndexTemplateBuilder = originalTemplate.toBuilder();
364+
assert originalTemplate.template() != null : "Matching template is null for data stream ";
365+
Template.Builder mergedTemplateBuilder = Template.builder(originalTemplate.template());
366+
mergedTemplateBuilder.settings(mergeSettings(originalTemplate));
367+
mergedIndexTemplateBuilder.template(mergedTemplateBuilder);
368+
return mergedIndexTemplateBuilder.build();
369+
}
370+
371+
private Settings mergeSettings(ComposableIndexTemplate originalTemplate) {
372+
assert originalTemplate.template() != null : "Matching template is null for data stream ";
373+
Settings originalSettings = originalTemplate.template().settings();
374+
if (Settings.EMPTY.equals(settings)) {
375+
return originalSettings;
376+
}
377+
Settings.Builder settingsBuilder = Settings.builder().put(originalSettings).put(settings);
378+
for (String settingName : new HashSet<>(settingsBuilder.keys())) {
379+
if (settingsBuilder.get(settingName) == null) {
380+
settingsBuilder.remove(settingName);
381+
}
382+
}
383+
return settingsBuilder.build();
384+
}
385+
332386
/**
333387
* We define that a data stream is considered internal either if it is a system index or if
334388
* its name starts with a dot.
@@ -440,6 +494,10 @@ public Map<String, Object> getMetadata() {
440494
return metadata;
441495
}
442496

497+
public Settings getSettings() {
498+
return settings;
499+
}
500+
443501
@Override
444502
public boolean isHidden() {
445503
return hidden;
@@ -1268,6 +1326,9 @@ public void writeTo(StreamOutput out) throws IOException {
12681326
if (out.getTransportVersion().onOrAfter(DataStream.ADD_DATA_STREAM_OPTIONS_VERSION)) {
12691327
out.writeOptionalWriteable(dataStreamOptions.isEmpty() ? null : dataStreamOptions);
12701328
}
1329+
if (out.getTransportVersion().onOrAfter(TransportVersions.SETTINGS_IN_DATA_STREAMS)) {
1330+
settings.writeTo(out);
1331+
}
12711332
}
12721333

12731334
public static final ParseField NAME_FIELD = new ParseField("name");
@@ -1288,26 +1349,28 @@ public void writeTo(StreamOutput out) throws IOException {
12881349
public static final ParseField FAILURE_ROLLOVER_ON_WRITE_FIELD = new ParseField("failure_rollover_on_write");
12891350
public static final ParseField FAILURE_AUTO_SHARDING_FIELD = new ParseField("failure_auto_sharding");
12901351
public static final ParseField DATA_STREAM_OPTIONS_FIELD = new ParseField("options");
1352+
public static final ParseField SETTINGS_FIELD = new ParseField("settings");
12911353

12921354
@SuppressWarnings("unchecked")
12931355
private static final ConstructingObjectParser<DataStream, Void> PARSER = new ConstructingObjectParser<>("data_stream", args -> {
12941356
// Fields behind a feature flag need to be parsed last otherwise the parser will fail when the feature flag is disabled.
12951357
// Until the feature flag is removed we keep them separately to be mindful of this.
1296-
boolean failureStoreEnabled = DataStream.isFailureStoreFeatureFlagEnabled() && args[12] != null && (boolean) args[12];
1358+
boolean failureStoreEnabled = DataStream.isFailureStoreFeatureFlagEnabled() && args[13] != null && (boolean) args[13];
12971359
DataStreamIndices failureIndices = DataStream.isFailureStoreFeatureFlagEnabled()
12981360
? new DataStreamIndices(
12991361
FAILURE_STORE_PREFIX,
1300-
args[13] != null ? (List<Index>) args[13] : List.of(),
1301-
args[14] != null && (boolean) args[14],
1302-
(DataStreamAutoShardingEvent) args[15]
1362+
args[14] != null ? (List<Index>) args[14] : List.of(),
1363+
args[15] != null && (boolean) args[15],
1364+
(DataStreamAutoShardingEvent) args[16]
13031365
)
13041366
: new DataStreamIndices(FAILURE_STORE_PREFIX, List.of(), false, null);
13051367
// We cannot distinguish if failure store was explicitly disabled or not. Given that failure store
13061368
// is still behind a feature flag in previous version we use the default value instead of explicitly disabling it.
13071369
DataStreamOptions dataStreamOptions = DataStreamOptions.EMPTY;
1370+
13081371
if (DataStream.isFailureStoreFeatureFlagEnabled()) {
1309-
if (args[16] != null) {
1310-
dataStreamOptions = (DataStreamOptions) args[16];
1372+
if (args[17] != null) {
1373+
dataStreamOptions = (DataStreamOptions) args[17];
13111374
} else if (failureStoreEnabled) {
13121375
dataStreamOptions = DataStreamOptions.FAILURE_STORE_ENABLED;
13131376
}
@@ -1316,6 +1379,7 @@ public void writeTo(StreamOutput out) throws IOException {
13161379
(String) args[0],
13171380
(Long) args[2],
13181381
(Map<String, Object>) args[3],
1382+
(Settings) args[12],
13191383
args[4] != null && (boolean) args[4],
13201384
args[5] != null && (boolean) args[5],
13211385
args[6] != null && (boolean) args[6],
@@ -1359,6 +1423,7 @@ public void writeTo(StreamOutput out) throws IOException {
13591423
(p, c) -> DataStreamAutoShardingEvent.fromXContent(p),
13601424
AUTO_SHARDING_FIELD
13611425
);
1426+
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> Settings.fromXContent(p), SETTINGS_FIELD);
13621427
// The fields behind the feature flag should always be last.
13631428
if (DataStream.isFailureStoreFeatureFlagEnabled()) {
13641429
// Should be removed after backport
@@ -1415,6 +1480,9 @@ public XContentBuilder toXContent(
14151480
builder.field(REPLICATED_FIELD.getPreferredName(), replicated);
14161481
builder.field(SYSTEM_FIELD.getPreferredName(), system);
14171482
builder.field(ALLOW_CUSTOM_ROUTING.getPreferredName(), allowCustomRouting);
1483+
builder.startObject(SETTINGS_FIELD.getPreferredName());
1484+
this.settings.toXContent(builder, params);
1485+
builder.endObject();
14181486
if (DataStream.isFailureStoreFeatureFlagEnabled()) {
14191487
if (failureIndices.indices.isEmpty() == false) {
14201488
builder.xContentList(FAILURE_INDICES_FIELD.getPreferredName(), failureIndices.indices);
@@ -1455,6 +1523,7 @@ public boolean equals(Object o) {
14551523
return name.equals(that.name)
14561524
&& generation == that.generation
14571525
&& Objects.equals(metadata, that.metadata)
1526+
&& Objects.equals(settings, that.settings)
14581527
&& hidden == that.hidden
14591528
&& system == that.system
14601529
&& replicated == that.replicated
@@ -1472,6 +1541,7 @@ public int hashCode() {
14721541
name,
14731542
generation,
14741543
metadata,
1544+
settings,
14751545
hidden,
14761546
system,
14771547
replicated,
@@ -1784,6 +1854,7 @@ public static class Builder {
17841854
private long generation = 1;
17851855
@Nullable
17861856
private Map<String, Object> metadata = null;
1857+
private Settings settings = Settings.EMPTY;
17871858
private boolean hidden = false;
17881859
private boolean replicated = false;
17891860
private boolean system = false;
@@ -1811,6 +1882,7 @@ private Builder(DataStream dataStream) {
18111882
name = dataStream.name;
18121883
generation = dataStream.generation;
18131884
metadata = dataStream.metadata;
1885+
settings = dataStream.settings;
18141886
hidden = dataStream.hidden;
18151887
replicated = dataStream.replicated;
18161888
system = dataStream.system;
@@ -1842,6 +1914,11 @@ public Builder setMetadata(Map<String, Object> metadata) {
18421914
return this;
18431915
}
18441916

1917+
public Builder setSettings(Settings settings) {
1918+
this.settings = settings;
1919+
return this;
1920+
}
1921+
18451922
public Builder setHidden(boolean hidden) {
18461923
this.hidden = hidden;
18471924
return this;
@@ -1902,6 +1979,7 @@ public DataStream build() {
19021979
name,
19031980
generation,
19041981
metadata,
1982+
settings,
19051983
hidden,
19061984
replicated,
19071985
system,

server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,7 @@ static ClusterState createDataStream(
331331
dataStreamName,
332332
initialGeneration,
333333
template.metadata() != null ? Map.copyOf(template.metadata()) : null,
334+
Settings.EMPTY,
334335
hidden,
335336
false,
336337
isSystem,

0 commit comments

Comments
 (0)