Skip to content

Commit 528bd9c

Browse files
authored
Adding mappings to data streams (#129787)
1 parent 5be4100 commit 528bd9c

File tree

9 files changed

+369
-26
lines changed

9 files changed

+369
-26
lines changed

modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportGetDataStreamsAction.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import org.elasticsearch.threadpool.ThreadPool;
5555
import org.elasticsearch.transport.TransportService;
5656

57+
import java.io.IOException;
5758
import java.time.Instant;
5859
import java.util.ArrayList;
5960
import java.util.Arrays;
@@ -261,13 +262,17 @@ static GetDataStreamAction.Response innerOperation(
261262
Settings settings = dataStream.getEffectiveSettings(state.metadata());
262263
ilmPolicyName = settings.get(IndexMetadata.LIFECYCLE_NAME);
263264
if (indexMode == null && state.metadata().templatesV2().get(indexTemplate) != null) {
264-
indexMode = resolveMode(
265-
state,
266-
indexSettingProviders,
267-
dataStream,
268-
settings,
269-
dataStream.getEffectiveIndexTemplate(state.metadata())
270-
);
265+
try {
266+
indexMode = resolveMode(
267+
state,
268+
indexSettingProviders,
269+
dataStream,
270+
settings,
271+
dataStream.getEffectiveIndexTemplate(state.metadata())
272+
);
273+
} catch (IOException e) {
274+
throw new RuntimeException(e);
275+
}
271276
}
272277
indexTemplatePreferIlmValue = PREFER_ILM_SETTING.get(settings);
273278
} else {

modules/data-streams/src/test/java/org/elasticsearch/datastreams/UpdateTimeSeriesRangeServiceTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,7 @@ public void testUpdateTimeSeriesTemporalOneBadDataStream() {
258258
2,
259259
ds2.getMetadata(),
260260
ds2.getSettings(),
261+
ds2.getMappings(),
261262
ds2.isHidden(),
262263
ds2.isReplicated(),
263264
ds2.isSystem(),

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,7 @@ static TransportVersion def(int id) {
323323
public static final TransportVersion ML_INFERENCE_ELASTIC_DENSE_TEXT_EMBEDDINGS_ADDED = def(9_109_00_0);
324324
public static final TransportVersion ML_INFERENCE_COHERE_API_VERSION = def(9_110_0_00);
325325
public static final TransportVersion ESQL_PROFILE_INCLUDE_PLAN = def(9_111_0_00);
326+
public static final TransportVersion MAPPINGS_IN_DATA_STREAMS = def(9_112_0_00);
326327

327328
/*
328329
* STOP! READ THIS FIRST! No, really,

server/src/main/java/org/elasticsearch/cluster/metadata/ComposableIndexTemplate.java

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,24 @@
1919
import org.elasticsearch.common.io.stream.StreamOutput;
2020
import org.elasticsearch.common.io.stream.Writeable;
2121
import org.elasticsearch.common.settings.Settings;
22+
import org.elasticsearch.common.xcontent.XContentHelper;
2223
import org.elasticsearch.core.Nullable;
2324
import org.elasticsearch.index.mapper.DataStreamTimestampFieldMapper;
2425
import org.elasticsearch.index.mapper.MapperService;
26+
import org.elasticsearch.index.mapper.Mapping;
2527
import org.elasticsearch.xcontent.ConstructingObjectParser;
2628
import org.elasticsearch.xcontent.ParseField;
2729
import org.elasticsearch.xcontent.ToXContentObject;
2830
import org.elasticsearch.xcontent.XContentBuilder;
31+
import org.elasticsearch.xcontent.XContentFactory;
2932
import org.elasticsearch.xcontent.XContentParser;
33+
import org.elasticsearch.xcontent.XContentParserConfiguration;
34+
import org.elasticsearch.xcontent.XContentType;
3035

3136
import java.io.IOException;
3237
import java.util.ArrayList;
3338
import java.util.Collections;
39+
import java.util.HashMap;
3440
import java.util.List;
3541
import java.util.Map;
3642
import java.util.Objects;
@@ -51,6 +57,14 @@ public class ComposableIndexTemplate implements SimpleDiffable<ComposableIndexTe
5157
private static final ParseField ALLOW_AUTO_CREATE = new ParseField("allow_auto_create");
5258
private static final ParseField IGNORE_MISSING_COMPONENT_TEMPLATES = new ParseField("ignore_missing_component_templates");
5359
private static final ParseField DEPRECATED = new ParseField("deprecated");
60+
public static final CompressedXContent EMPTY_MAPPINGS;
61+
static {
62+
try {
63+
EMPTY_MAPPINGS = new CompressedXContent(Map.of());
64+
} catch (IOException e) {
65+
throw new RuntimeException(e);
66+
}
67+
}
5468

5569
@SuppressWarnings("unchecked")
5670
public static final ConstructingObjectParser<ComposableIndexTemplate, Void> PARSER = new ConstructingObjectParser<>(
@@ -338,6 +352,64 @@ public ComposableIndexTemplate mergeSettings(Settings settings) {
338352
return mergedIndexTemplateBuilder.build();
339353
}
340354

355+
public ComposableIndexTemplate mergeMappings(CompressedXContent mappings) throws IOException {
356+
Objects.requireNonNull(mappings);
357+
if (Mapping.EMPTY.toCompressedXContent().equals(mappings) && this.template() != null && this.template().mappings() != null) {
358+
return this;
359+
}
360+
ComposableIndexTemplate.Builder mergedIndexTemplateBuilder = this.toBuilder();
361+
Template.Builder mergedTemplateBuilder;
362+
CompressedXContent templateMappings;
363+
if (this.template() == null) {
364+
mergedTemplateBuilder = Template.builder();
365+
templateMappings = null;
366+
} else {
367+
mergedTemplateBuilder = Template.builder(this.template());
368+
templateMappings = this.template().mappings();
369+
}
370+
mergedTemplateBuilder.mappings(templateMappings == null ? mappings : merge(templateMappings, mappings));
371+
mergedIndexTemplateBuilder.template(mergedTemplateBuilder);
372+
return mergedIndexTemplateBuilder.build();
373+
}
374+
375+
@SuppressWarnings("unchecked")
376+
private CompressedXContent merge(CompressedXContent originalMapping, CompressedXContent mappingAddition) throws IOException {
377+
Map<String, Object> mappingAdditionMap = XContentHelper.convertToMap(mappingAddition.uncompressed(), true, XContentType.JSON).v2();
378+
Map<String, Object> combinedMappingMap = new HashMap<>();
379+
if (originalMapping != null) {
380+
Map<String, Object> originalMappingMap = XContentHelper.convertToMap(originalMapping.uncompressed(), true, XContentType.JSON)
381+
.v2();
382+
if (originalMappingMap.containsKey(MapperService.SINGLE_MAPPING_NAME)) {
383+
combinedMappingMap.putAll((Map<String, ?>) originalMappingMap.get(MapperService.SINGLE_MAPPING_NAME));
384+
} else {
385+
combinedMappingMap.putAll(originalMappingMap);
386+
}
387+
}
388+
XContentHelper.update(combinedMappingMap, mappingAdditionMap, true);
389+
return convertMappingMapToXContent(combinedMappingMap);
390+
}
391+
392+
private static CompressedXContent convertMappingMapToXContent(Map<String, Object> rawAdditionalMapping) throws IOException {
393+
CompressedXContent compressedXContent;
394+
if (rawAdditionalMapping.isEmpty()) {
395+
compressedXContent = EMPTY_MAPPINGS;
396+
} else {
397+
try (var parser = XContentHelper.mapToXContentParser(XContentParserConfiguration.EMPTY, rawAdditionalMapping)) {
398+
compressedXContent = mappingFromXContent(parser);
399+
}
400+
}
401+
return compressedXContent;
402+
}
403+
404+
private static CompressedXContent mappingFromXContent(XContentParser parser) throws IOException {
405+
XContentParser.Token token = parser.nextToken();
406+
if (token == XContentParser.Token.START_OBJECT) {
407+
return new CompressedXContent(Strings.toString(XContentFactory.jsonBuilder().map(parser.mapOrdered())));
408+
} else {
409+
throw new IllegalArgumentException("Unexpected token: " + token);
410+
}
411+
}
412+
341413
@Override
342414
public int hashCode() {
343415
return Objects.hash(

server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java

Lines changed: 68 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.elasticsearch.common.ParsingException;
3030
import org.elasticsearch.common.Strings;
3131
import org.elasticsearch.common.bytes.BytesReference;
32+
import org.elasticsearch.common.compress.CompressedXContent;
3233
import org.elasticsearch.common.io.stream.StreamInput;
3334
import org.elasticsearch.common.io.stream.StreamOutput;
3435
import org.elasticsearch.common.settings.Settings;
@@ -47,9 +48,11 @@
4748
import org.elasticsearch.index.mapper.DateFieldMapper;
4849
import org.elasticsearch.indices.SystemIndices;
4950
import org.elasticsearch.xcontent.ConstructingObjectParser;
51+
import org.elasticsearch.xcontent.ObjectParser;
5052
import org.elasticsearch.xcontent.ParseField;
5153
import org.elasticsearch.xcontent.ToXContentObject;
5254
import org.elasticsearch.xcontent.XContentBuilder;
55+
import org.elasticsearch.xcontent.XContentFactory;
5356
import org.elasticsearch.xcontent.XContentParser;
5457
import org.elasticsearch.xcontent.XContentParserConfiguration;
5558
import org.elasticsearch.xcontent.XContentType;
@@ -58,6 +61,7 @@
5861
import java.time.Instant;
5962
import java.time.temporal.ChronoUnit;
6063
import java.util.ArrayList;
64+
import java.util.Base64;
6165
import java.util.Comparator;
6266
import java.util.HashMap;
6367
import java.util.List;
@@ -70,6 +74,7 @@
7074
import java.util.function.Predicate;
7175
import java.util.stream.Collectors;
7276

77+
import static org.elasticsearch.cluster.metadata.ComposableIndexTemplate.EMPTY_MAPPINGS;
7378
import static org.elasticsearch.cluster.metadata.MetadataCreateDataStreamService.lookupTemplateForDataStream;
7479
import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
7580
import static org.elasticsearch.index.IndexSettings.LIFECYCLE_ORIGINATION_DATE;
@@ -89,6 +94,7 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
8994
public static final String FAILURE_STORE_PREFIX = ".fs-";
9095
public static final DateFormatter DATE_FORMATTER = DateFormatter.forPattern("uuuu.MM.dd");
9196
public static final String TIMESTAMP_FIELD_NAME = "@timestamp";
97+
9298
// Timeseries indices' leaf readers should be sorted by desc order of their timestamp field, as it allows search time optimizations
9399
public static final Comparator<LeafReader> TIMESERIES_LEAF_READERS_SORTER = Comparator.comparingLong((LeafReader r) -> {
94100
try {
@@ -120,6 +126,7 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
120126
@Nullable
121127
private final Map<String, Object> metadata;
122128
private final Settings settings;
129+
private final CompressedXContent mappings;
123130
private final boolean hidden;
124131
private final boolean replicated;
125132
private final boolean system;
@@ -156,6 +163,7 @@ public DataStream(
156163
generation,
157164
metadata,
158165
Settings.EMPTY,
166+
EMPTY_MAPPINGS,
159167
hidden,
160168
replicated,
161169
system,
@@ -176,6 +184,7 @@ public DataStream(
176184
long generation,
177185
Map<String, Object> metadata,
178186
Settings settings,
187+
CompressedXContent mappings,
179188
boolean hidden,
180189
boolean replicated,
181190
boolean system,
@@ -192,6 +201,7 @@ public DataStream(
192201
generation,
193202
metadata,
194203
settings,
204+
mappings,
195205
hidden,
196206
replicated,
197207
system,
@@ -210,6 +220,7 @@ public DataStream(
210220
long generation,
211221
Map<String, Object> metadata,
212222
Settings settings,
223+
CompressedXContent mappings,
213224
boolean hidden,
214225
boolean replicated,
215226
boolean system,
@@ -225,6 +236,7 @@ public DataStream(
225236
this.generation = generation;
226237
this.metadata = metadata;
227238
this.settings = Objects.requireNonNull(settings);
239+
this.mappings = Objects.requireNonNull(mappings);
228240
assert system == false || hidden; // system indices must be hidden
229241
this.hidden = hidden;
230242
this.replicated = replicated;
@@ -286,11 +298,18 @@ public static DataStream read(StreamInput in) throws IOException {
286298
} else {
287299
settings = Settings.EMPTY;
288300
}
301+
CompressedXContent mappings;
302+
if (in.getTransportVersion().onOrAfter(TransportVersions.MAPPINGS_IN_DATA_STREAMS)) {
303+
mappings = CompressedXContent.readCompressedString(in);
304+
} else {
305+
mappings = EMPTY_MAPPINGS;
306+
}
289307
return new DataStream(
290308
name,
291309
generation,
292310
metadata,
293311
settings,
312+
mappings,
294313
hidden,
295314
replicated,
296315
system,
@@ -381,8 +400,8 @@ public boolean rolloverOnWrite() {
381400
return backingIndices.rolloverOnWrite;
382401
}
383402

384-
public ComposableIndexTemplate getEffectiveIndexTemplate(ProjectMetadata projectMetadata) {
385-
return getMatchingIndexTemplate(projectMetadata).mergeSettings(settings);
403+
public ComposableIndexTemplate getEffectiveIndexTemplate(ProjectMetadata projectMetadata) throws IOException {
404+
return getMatchingIndexTemplate(projectMetadata).mergeSettings(settings).mergeMappings(mappings);
386405
}
387406

388407
public Settings getEffectiveSettings(ProjectMetadata projectMetadata) {
@@ -391,6 +410,10 @@ public Settings getEffectiveSettings(ProjectMetadata projectMetadata) {
391410
return templateSettings.merge(settings);
392411
}
393412

413+
public CompressedXContent getEffectiveMappings(ProjectMetadata projectMetadata) throws IOException {
414+
return getMatchingIndexTemplate(projectMetadata).mergeMappings(mappings).template().mappings();
415+
}
416+
394417
private ComposableIndexTemplate getMatchingIndexTemplate(ProjectMetadata projectMetadata) {
395418
return lookupTemplateForDataStream(name, projectMetadata);
396419
}
@@ -510,6 +533,10 @@ public Settings getSettings() {
510533
return settings;
511534
}
512535

536+
public CompressedXContent getMappings() {
537+
return mappings;
538+
}
539+
513540
@Override
514541
public boolean isHidden() {
515542
return hidden;
@@ -1354,6 +1381,9 @@ public void writeTo(StreamOutput out) throws IOException {
13541381
|| out.getTransportVersion().isPatchFrom(TransportVersions.SETTINGS_IN_DATA_STREAMS_8_19)) {
13551382
settings.writeTo(out);
13561383
}
1384+
if (out.getTransportVersion().onOrAfter(TransportVersions.MAPPINGS_IN_DATA_STREAMS)) {
1385+
mappings.writeTo(out);
1386+
}
13571387
}
13581388

13591389
public static final ParseField NAME_FIELD = new ParseField("name");
@@ -1376,6 +1406,7 @@ public void writeTo(StreamOutput out) throws IOException {
13761406
public static final ParseField FAILURE_AUTO_SHARDING_FIELD = new ParseField("failure_auto_sharding");
13771407
public static final ParseField DATA_STREAM_OPTIONS_FIELD = new ParseField("options");
13781408
public static final ParseField SETTINGS_FIELD = new ParseField("settings");
1409+
public static final ParseField MAPPINGS_FIELD = new ParseField("mappings");
13791410

13801411
@SuppressWarnings("unchecked")
13811412
private static final ConstructingObjectParser<DataStream, Void> PARSER = new ConstructingObjectParser<>(
@@ -1385,6 +1416,7 @@ public void writeTo(StreamOutput out) throws IOException {
13851416
(Long) args[2],
13861417
(Map<String, Object>) args[3],
13871418
args[17] == null ? Settings.EMPTY : (Settings) args[17],
1419+
args[18] == null ? EMPTY_MAPPINGS : (CompressedXContent) args[18],
13881420
args[4] != null && (boolean) args[4],
13891421
args[5] != null && (boolean) args[5],
13901422
args[6] != null && (boolean) args[6],
@@ -1456,6 +1488,18 @@ public void writeTo(StreamOutput out) throws IOException {
14561488
DATA_STREAM_OPTIONS_FIELD
14571489
);
14581490
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> Settings.fromXContent(p), SETTINGS_FIELD);
1491+
PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> {
1492+
XContentParser.Token token = p.currentToken();
1493+
if (token == XContentParser.Token.VALUE_STRING) {
1494+
return new CompressedXContent(Base64.getDecoder().decode(p.text()));
1495+
} else if (token == XContentParser.Token.VALUE_EMBEDDED_OBJECT) {
1496+
return new CompressedXContent(p.binaryValue());
1497+
} else if (token == XContentParser.Token.START_OBJECT) {
1498+
return new CompressedXContent(Strings.toString(XContentFactory.jsonBuilder().map(p.mapOrdered())));
1499+
} else {
1500+
throw new IllegalArgumentException("Unexpected token: " + token);
1501+
}
1502+
}, MAPPINGS_FIELD, ObjectParser.ValueType.VALUE_OBJECT_ARRAY);
14591503
}
14601504

14611505
public static DataStream fromXContent(XContentParser parser) throws IOException {
@@ -1520,6 +1564,20 @@ public XContentBuilder toXContent(
15201564
builder.startObject(SETTINGS_FIELD.getPreferredName());
15211565
this.settings.toXContent(builder, params);
15221566
builder.endObject();
1567+
1568+
String context = params.param(Metadata.CONTEXT_MODE_PARAM, Metadata.CONTEXT_MODE_API);
1569+
boolean binary = params.paramAsBoolean("binary", false);
1570+
if (Metadata.CONTEXT_MODE_API.equals(context) || binary == false) {
1571+
Map<String, Object> uncompressedMapping = XContentHelper.convertToMap(this.mappings.uncompressed(), true, XContentType.JSON)
1572+
.v2();
1573+
if (uncompressedMapping.isEmpty() == false) {
1574+
builder.field(MAPPINGS_FIELD.getPreferredName());
1575+
builder.map(uncompressedMapping);
1576+
}
1577+
} else {
1578+
builder.field(MAPPINGS_FIELD.getPreferredName(), mappings.compressed());
1579+
}
1580+
15231581
builder.endObject();
15241582
return builder;
15251583
}
@@ -1864,6 +1922,7 @@ public static class Builder {
18641922
@Nullable
18651923
private Map<String, Object> metadata = null;
18661924
private Settings settings = Settings.EMPTY;
1925+
private CompressedXContent mappings = EMPTY_MAPPINGS;
18671926
private boolean hidden = false;
18681927
private boolean replicated = false;
18691928
private boolean system = false;
@@ -1892,6 +1951,7 @@ private Builder(DataStream dataStream) {
18921951
generation = dataStream.generation;
18931952
metadata = dataStream.metadata;
18941953
settings = dataStream.settings;
1954+
mappings = dataStream.mappings;
18951955
hidden = dataStream.hidden;
18961956
replicated = dataStream.replicated;
18971957
system = dataStream.system;
@@ -1928,6 +1988,11 @@ public Builder setSettings(Settings settings) {
19281988
return this;
19291989
}
19301990

1991+
public Builder setMappings(CompressedXContent mappings) {
1992+
this.mappings = mappings;
1993+
return this;
1994+
}
1995+
19311996
public Builder setHidden(boolean hidden) {
19321997
this.hidden = hidden;
19331998
return this;
@@ -1989,6 +2054,7 @@ public DataStream build() {
19892054
generation,
19902055
metadata,
19912056
settings,
2057+
mappings,
19922058
hidden,
19932059
replicated,
19942060
system,

0 commit comments

Comments
 (0)