diff --git a/docs/changelog/135431.yaml b/docs/changelog/135431.yaml new file mode 100644 index 0000000000000..e268f948e7eb5 --- /dev/null +++ b/docs/changelog/135431.yaml @@ -0,0 +1,6 @@ +pr: 135431 +summary: "[Downsampling++] Allow merging of passthrough mappers with object mappers\ + \ under certain conditions" +area: Mapping +type: bug +issues: [] diff --git a/server/src/main/java/org/elasticsearch/index/mapper/MapperErrors.java b/server/src/main/java/org/elasticsearch/index/mapper/MapperErrors.java index 7d8df6505a339..ffac92f547589 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/MapperErrors.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/MapperErrors.java @@ -14,6 +14,12 @@ static void throwObjectMappingConflictError(String fieldName) throws IllegalArgu throw new IllegalArgumentException("can't merge a non object mapping [" + fieldName + "] with an object mapping"); } + static void throwPassThroughMappingConflictError(String fieldName) throws IllegalArgumentException { + throw new IllegalArgumentException( + "can't merge a passthrough mapping [" + fieldName + "] with an object mapping that is either root or has subobjects enabled" + ); + } + static void throwNestedMappingConflictError(String fieldName) throws IllegalArgumentException { throw new IllegalArgumentException("can't merge a non-nested mapping [" + fieldName + "] with a nested mapping"); } diff --git a/server/src/main/java/org/elasticsearch/index/mapper/ObjectMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/ObjectMapper.java index 3c71f6bf2db5b..ad1b77c9a2e3a 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/ObjectMapper.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/ObjectMapper.java @@ -567,6 +567,22 @@ public ObjectMapper merge(Mapper mergeWith, MapperMergeContext parentMergeContex MapperErrors.throwNestedMappingConflictError(mergeWith.fullPath()); } var mergeResult = MergeResult.build(this, (ObjectMapper) mergeWith, parentMergeContext); + if (mergeWith instanceof PassThroughObjectMapper passThroughObjectMapper) { + if (PassThroughObjectMapper.isEligibleForMerge(this)) { + return new PassThroughObjectMapper( + leafName(), + fullPath, + mergeResult.enabled, + mergeResult.sourceKeepMode, + mergeResult.dynamic, + mergeResult.mappers, + passThroughObjectMapper.timeSeriesDimensionSubFields(), + passThroughObjectMapper.priority() + ); + } else { + MapperErrors.throwPassThroughMappingConflictError(fullPath()); + } + } return new ObjectMapper( leafName(), fullPath, @@ -605,6 +621,8 @@ static MergeResult build(ObjectMapper existing, ObjectMapper mergeWithObject, Ma if (mergeWithObject.subobjects.isPresent()) { if (reason == MergeReason.INDEX_TEMPLATE) { subObjects = mergeWithObject.subobjects; + } else if (mergeWithObject instanceof PassThroughObjectMapper && existing.subobjects.isEmpty()) { + subObjects = mergeWithObject.subobjects; } else if (existing.subobjects() != mergeWithObject.subobjects()) { throw new MapperException( "the [subobjects] parameter can't be updated for the object mapping [" + existing.fullPath() + "]" diff --git a/server/src/main/java/org/elasticsearch/index/mapper/PassThroughObjectMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/PassThroughObjectMapper.java index fbf8dd4538037..a6169931b4357 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/PassThroughObjectMapper.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/PassThroughObjectMapper.java @@ -136,6 +136,10 @@ public int priority() { return priority; } + public Explicit timeSeriesDimensionSubFields() { + return timeSeriesDimensionSubFields; + } + @Override public PassThroughObjectMapper.Builder newBuilder(IndexVersion indexVersionCreated) { PassThroughObjectMapper.Builder builder = new PassThroughObjectMapper.Builder(leafName()); @@ -148,17 +152,34 @@ public PassThroughObjectMapper.Builder newBuilder(IndexVersion indexVersionCreat @Override public PassThroughObjectMapper merge(Mapper mergeWith, MapperMergeContext parentBuilderContext) { - if (mergeWith instanceof PassThroughObjectMapper == false) { + if (mergeWith instanceof ObjectMapper == false) { MapperErrors.throwObjectMappingConflictError(mergeWith.fullPath()); } + ObjectMapper mergeWithObjectMapper = (ObjectMapper) mergeWith; + if (mergeWithObjectMapper instanceof PassThroughObjectMapper mergeWithPassThrough) { + final var mergeResult = MergeResult.build(this, mergeWithPassThrough, parentBuilderContext); + final Explicit containsDimensions = (mergeWithPassThrough.timeSeriesDimensionSubFields.explicit()) + ? mergeWithPassThrough.timeSeriesDimensionSubFields + : this.timeSeriesDimensionSubFields; - PassThroughObjectMapper mergeWithObject = (PassThroughObjectMapper) mergeWith; - final var mergeResult = MergeResult.build(this, mergeWithObject, parentBuilderContext); - - final Explicit containsDimensions = (mergeWithObject.timeSeriesDimensionSubFields.explicit()) - ? mergeWithObject.timeSeriesDimensionSubFields - : this.timeSeriesDimensionSubFields; - + return new PassThroughObjectMapper( + leafName(), + fullPath(), + mergeResult.enabled(), + mergeResult.sourceKeepMode(), + mergeResult.dynamic(), + mergeResult.mappers(), + containsDimensions, + Math.max(priority, mergeWithPassThrough.priority) + ); + } + if (mergeWithObjectMapper instanceof NestedObjectMapper) { + MapperErrors.throwNestedMappingConflictError(fullPath()); + } + if (isEligibleForMerge(mergeWithObjectMapper) == false) { + MapperErrors.throwPassThroughMappingConflictError(fullPath()); + } + MergeResult mergeResult = MergeResult.build(this, mergeWithObjectMapper, parentBuilderContext); return new PassThroughObjectMapper( leafName(), fullPath(), @@ -166,11 +187,23 @@ public PassThroughObjectMapper merge(Mapper mergeWith, MapperMergeContext parent mergeResult.sourceKeepMode(), mergeResult.dynamic(), mergeResult.mappers(), - containsDimensions, - Math.max(priority, mergeWithObject.priority) + timeSeriesDimensionSubFields, + priority ); } + /** + * An object mapper is compatible to be merged with a passthrough mapper if + * - It is not a root mapper + * - If it does not have subobjects true + */ + static boolean isEligibleForMerge(ObjectMapper objectMapper) { + return objectMapper.isRoot() == false + && (objectMapper.subobjects == null + || objectMapper.subobjects.isEmpty() + || objectMapper.subobjects.get().equals(Subobjects.DISABLED)); + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(leafName()); diff --git a/server/src/test/java/org/elasticsearch/index/mapper/ObjectMapperMergeTests.java b/server/src/test/java/org/elasticsearch/index/mapper/ObjectMapperMergeTests.java index 04e8121fdd90e..a6f827ce23d4c 100644 --- a/server/src/test/java/org/elasticsearch/index/mapper/ObjectMapperMergeTests.java +++ b/server/src/test/java/org/elasticsearch/index/mapper/ObjectMapperMergeTests.java @@ -353,6 +353,36 @@ public void testConflictingDynamicUpdate() { assertThat(e.getMessage(), equalTo("mapper [http.status_code] cannot be changed from type [keyword] to [long]")); } + public void testMergingWithPassThrough() { + boolean isSourceSynthetic = randomBoolean(); + var objectMapper = new RootObjectMapper.Builder("_doc", Optional.empty()).add( + new ObjectMapper.Builder("metrics", Optional.empty()).add(new KeywordFieldMapper.Builder("cpu_usage", IndexVersion.current())) + ).build(MapperBuilderContext.root(isSourceSynthetic, true)); + var passThroughMapper = new RootObjectMapper.Builder("_doc", Optional.empty()).add( + new PassThroughObjectMapper.Builder("metrics").setPriority(10) + .add(new KeywordFieldMapper.Builder("memory_usage", IndexVersion.current())) + ).build(MapperBuilderContext.root(isSourceSynthetic, true)); + RootObjectMapper merged = objectMapper.merge( + passThroughMapper, + MapperMergeContext.root(isSourceSynthetic, true, MAPPING_UPDATE, Long.MAX_VALUE) + ); + assertThat(merged.getMapper("metrics"), instanceOf(PassThroughObjectMapper.class)); + PassThroughObjectMapper metrics = (PassThroughObjectMapper) merged.getMapper("metrics"); + assertThat(metrics.getMapper("cpu_usage"), instanceOf(KeywordFieldMapper.class)); + assertThat(metrics.getMapper("memory_usage"), instanceOf(KeywordFieldMapper.class)); + + var subobjectsTrueMapper = new RootObjectMapper.Builder("_doc", Optional.empty()).add( + new ObjectMapper.Builder("metrics", Optional.of(ObjectMapper.Subobjects.ENABLED)).add( + new KeywordFieldMapper.Builder("cpu_usage", IndexVersion.current()) + ) + ).build(MapperBuilderContext.root(isSourceSynthetic, true)); + MapperException e = expectThrows( + MapperException.class, + () -> subobjectsTrueMapper.merge(passThroughMapper, MapperMergeContext.root(false, false, MAPPING_UPDATE, Long.MAX_VALUE)) + ); + assertThat(e.getMessage(), equalTo("the [subobjects] parameter can't be updated for the object mapping [metrics]")); + } + private static RootObjectMapper createRootSubobjectFalseLeafWithDots() { FieldMapper.Builder fieldBuilder = new KeywordFieldMapper.Builder("host.name", IndexVersion.current()); FieldMapper fieldMapper = fieldBuilder.build(MapperBuilderContext.root(false, false)); diff --git a/server/src/test/java/org/elasticsearch/index/mapper/PassThroughObjectMapperTests.java b/server/src/test/java/org/elasticsearch/index/mapper/PassThroughObjectMapperTests.java index d8d41b1634f15..8acf11db76e1a 100644 --- a/server/src/test/java/org/elasticsearch/index/mapper/PassThroughObjectMapperTests.java +++ b/server/src/test/java/org/elasticsearch/index/mapper/PassThroughObjectMapperTests.java @@ -10,13 +10,16 @@ package org.elasticsearch.index.mapper; import org.elasticsearch.common.Explicit; +import org.elasticsearch.index.IndexVersion; import java.io.IOException; import java.util.List; import java.util.Map; import java.util.Optional; +import static org.elasticsearch.index.mapper.MapperService.MergeReason.MAPPING_UPDATE; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; public class PassThroughObjectMapperTests extends MapperServiceTestCase { @@ -182,6 +185,55 @@ public void testCheckForDuplicatePrioritiesEmpty() throws IOException { PassThroughObjectMapper.checkForDuplicatePriorities(List.of()); } + public void testMergingWithPassThrough() { + boolean isSourceSynthetic = randomBoolean(); + var passThroughMapper = new RootObjectMapper.Builder("_doc", Optional.empty()).add( + new PassThroughObjectMapper.Builder("metrics").setPriority(10) + ).build(MapperBuilderContext.root(isSourceSynthetic, true)); + var objectMapper = new RootObjectMapper.Builder("_doc", Optional.empty()).add( + new ObjectMapper.Builder("metrics", Optional.empty()).add(new KeywordFieldMapper.Builder("cpu_usage", IndexVersion.current())) + ).build(MapperBuilderContext.root(isSourceSynthetic, true)); + + RootObjectMapper merged = passThroughMapper.merge( + objectMapper, + MapperMergeContext.root(isSourceSynthetic, true, MAPPING_UPDATE, Long.MAX_VALUE) + ); + assertThat(merged.getMapper("metrics"), instanceOf(PassThroughObjectMapper.class)); + + var objectMapperWithSubObjectTrue = new RootObjectMapper.Builder("_doc", Optional.empty()).add( + new ObjectMapper.Builder("metrics", Optional.of(ObjectMapper.Subobjects.ENABLED)).add( + new KeywordFieldMapper.Builder("cpu_usage", IndexVersion.current()) + ) + ).build(MapperBuilderContext.root(isSourceSynthetic, true)); + + IllegalArgumentException error = expectThrows( + IllegalArgumentException.class, + () -> passThroughMapper.merge( + objectMapperWithSubObjectTrue, + MapperMergeContext.root(isSourceSynthetic, true, MAPPING_UPDATE, Long.MAX_VALUE) + ) + ); + assertThat( + error.getMessage(), + equalTo("can't merge a passthrough mapping [metrics] with an object mapping that is either root or has subobjects enabled") + ); + + var rootObjectMapper = new RootObjectMapper.Builder("metrics", Optional.empty()).add( + new KeywordFieldMapper.Builder("cpu_usage", IndexVersion.current()) + ).build(MapperBuilderContext.root(isSourceSynthetic, true)); + + error = expectThrows( + IllegalArgumentException.class, + () -> new PassThroughObjectMapper.Builder("metrics").setPriority(10) + .build(MapperBuilderContext.root(isSourceSynthetic, true)) + .merge(rootObjectMapper, MapperMergeContext.root(isSourceSynthetic, true, MAPPING_UPDATE, Long.MAX_VALUE)) + ); + assertThat( + error.getMessage(), + equalTo("can't merge a passthrough mapping [metrics] with an object mapping that is either root or has subobjects enabled") + ); + } + private PassThroughObjectMapper create(String name, int priority) { return new PassThroughObjectMapper( name, diff --git a/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DownsampleIT.java b/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DownsampleIT.java index 70150d4f95bc9..69ae6c2e8b1be 100644 --- a/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DownsampleIT.java +++ b/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DownsampleIT.java @@ -30,14 +30,19 @@ public class DownsampleIT extends DownsamplingIntegTestCase { public void testDownsamplingPassthroughDimensions() throws Exception { String dataStreamName = "metrics-foo"; - // Set up template - putTSDBIndexTemplate("my-template", List.of("metrics-foo"), null, """ + String mapping = """ { "properties": { "attributes": { "type": "passthrough", "priority": 10, - "time_series_dimension": true + "time_series_dimension": true, + "properties": { + "os.name": { + "type": "keyword", + "time_series_dimension": true + } + } }, "metrics.cpu_usage": { "type": "double", @@ -45,7 +50,7 @@ public void testDownsamplingPassthroughDimensions() throws Exception { } } } - """, null, null); + """; // Create data stream by indexing documents final Instant now = Instant.now(); @@ -56,12 +61,67 @@ public void testDownsamplingPassthroughDimensions() throws Exception { .startObject() .field("@timestamp", ts) .field("attributes.host.name", randomFrom("host1", "host2", "host3")) + .field("attributes.os.name", randomFrom("linux", "windows", "macos")) + .field("metrics.cpu_usage", randomDouble()) + .endObject(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }; + downsampleAndAssert(dataStreamName, mapping, sourceSupplier); + } + + public void testDownsamplingPassthroughMetrics() throws Exception { + String dataStreamName = "metrics-foo"; + String mapping = """ + { + "properties": { + "attributes.os.name": { + "type": "keyword", + "time_series_dimension": true + }, + "metrics": { + "type": "passthrough", + "priority": 10, + "properties": { + "cpu_usage": { + "type": "double", + "time_series_metric": "counter" + } + } + } + } + } + """; + + // Create data stream by indexing documents + final Instant now = Instant.now(); + Supplier sourceSupplier = () -> { + String ts = randomDateForRange(now.minusSeconds(60 * 60).toEpochMilli(), now.plusSeconds(60 * 29).toEpochMilli()); + try { + return XContentFactory.jsonBuilder() + .startObject() + .field("@timestamp", ts) + .field("attributes.os.name", randomFrom("linux", "windows", "macos")) .field("metrics.cpu_usage", randomDouble()) + .field("metrics.memory_usage", randomDouble()) .endObject(); } catch (IOException e) { throw new RuntimeException(e); } }; + downsampleAndAssert(dataStreamName, mapping, sourceSupplier); + } + + /** + * Create a data stream with the provided mapping and downsampled the first backing index of this data stream. After downsampling has + * completed, it asserts if the downsampled index is as expected. + */ + private void downsampleAndAssert(String dataStreamName, String mapping, Supplier sourceSupplier) throws Exception { + // Set up template + putTSDBIndexTemplate("my-template", List.of(dataStreamName), null, mapping, null, null); + + // Create data stream by indexing documents bulkIndex(dataStreamName, sourceSupplier, 100); // Rollover to ensure the index we will downsample is not the write index assertAcked(client().admin().indices().rolloverIndex(new RolloverRequest(dataStreamName, null)));