Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/135431.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 135431
summary: "[Downsampling++] Allow merging of passthrough mappers with object mappers\
\ under certain conditions"
area: Mapping
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ static void throwObjectMappingConflictError(String fieldName) throws IllegalArgu
throw new IllegalArgumentException("can't merge a non object mapping [" + fieldName + "] with an object mapping");
}

static void throwPassThroughMappingConflictError(String fieldName) throws IllegalArgumentException {
throw new IllegalArgumentException(
"can't merge a passthrough mapping [" + fieldName + "] with an object mapping that is either root or has subobjects enabled"
);
}

static void throwNestedMappingConflictError(String fieldName) throws IllegalArgumentException {
throw new IllegalArgumentException("can't merge a non-nested mapping [" + fieldName + "] with a nested mapping");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,22 @@ public ObjectMapper merge(Mapper mergeWith, MapperMergeContext parentMergeContex
MapperErrors.throwNestedMappingConflictError(mergeWith.fullPath());
}
var mergeResult = MergeResult.build(this, (ObjectMapper) mergeWith, parentMergeContext);
if (mergeWith instanceof PassThroughObjectMapper passThroughObjectMapper) {
if (PassThroughObjectMapper.isEligibleForMerge(this)) {
return new PassThroughObjectMapper(
leafName(),
fullPath,
mergeResult.enabled,
mergeResult.sourceKeepMode,
mergeResult.dynamic,
mergeResult.mappers,
passThroughObjectMapper.timeSeriesDimensionSubFields(),
passThroughObjectMapper.priority()
);
} else {
MapperErrors.throwPassThroughMappingConflictError(fullPath());
}
}
return new ObjectMapper(
leafName(),
fullPath,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ public int priority() {
return priority;
}

public Explicit<Boolean> timeSeriesDimensionSubFields() {
return timeSeriesDimensionSubFields;
}

@Override
public PassThroughObjectMapper.Builder newBuilder(IndexVersion indexVersionCreated) {
PassThroughObjectMapper.Builder builder = new PassThroughObjectMapper.Builder(leafName());
Expand All @@ -148,29 +152,58 @@ public PassThroughObjectMapper.Builder newBuilder(IndexVersion indexVersionCreat

@Override
public PassThroughObjectMapper merge(Mapper mergeWith, MapperMergeContext parentBuilderContext) {
if (mergeWith instanceof PassThroughObjectMapper == false) {
if (mergeWith instanceof ObjectMapper == false) {
MapperErrors.throwObjectMappingConflictError(mergeWith.fullPath());
}
ObjectMapper mergeWithObjectMapper = (ObjectMapper) mergeWith;
if (mergeWithObjectMapper instanceof PassThroughObjectMapper mergeWithPassThrough) {
final var mergeResult = MergeResult.build(this, mergeWithPassThrough, parentBuilderContext);
final Explicit<Boolean> containsDimensions = (mergeWithPassThrough.timeSeriesDimensionSubFields.explicit())
? mergeWithPassThrough.timeSeriesDimensionSubFields
: this.timeSeriesDimensionSubFields;

PassThroughObjectMapper mergeWithObject = (PassThroughObjectMapper) mergeWith;
final var mergeResult = MergeResult.build(this, mergeWithObject, parentBuilderContext);

final Explicit<Boolean> containsDimensions = (mergeWithObject.timeSeriesDimensionSubFields.explicit())
? mergeWithObject.timeSeriesDimensionSubFields
: this.timeSeriesDimensionSubFields;

return new PassThroughObjectMapper(
leafName(),
fullPath(),
mergeResult.enabled(),
mergeResult.sourceKeepMode(),
mergeResult.dynamic(),
mergeResult.mappers(),
containsDimensions,
Math.max(priority, mergeWithPassThrough.priority)
);
}
if (mergeWithObjectMapper instanceof NestedObjectMapper) {
MapperErrors.throwNestedMappingConflictError(fullPath());
}
if (isEligibleForMerge(mergeWithObjectMapper) == false) {
MapperErrors.throwPassThroughMappingConflictError(fullPath());
}
MergeResult mergeResult = MergeResult.build(this, mergeWithObjectMapper, parentBuilderContext);
return new PassThroughObjectMapper(
leafName(),
fullPath(),
mergeResult.enabled(),
mergeResult.sourceKeepMode(),
mergeResult.dynamic(),
mergeResult.mappers(),
containsDimensions,
Math.max(priority, mergeWithObject.priority)
timeSeriesDimensionSubFields,
priority
);
}

/**
* An object mapper is compatible to be merged with a passthrough mapper if
* - It is not a root mapper
* - If it does not have subobjects true
*/
static boolean isEligibleForMerge(ObjectMapper objectMapper) {
return objectMapper.isRoot() == false
&& (objectMapper.subobjects == null
|| objectMapper.subobjects.explicit() == false
|| objectMapper.subobjects.value().equals(Subobjects.DISABLED));
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(leafName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,39 @@ public void testConflictingDynamicUpdate() {
assertThat(e.getMessage(), equalTo("mapper [http.status_code] cannot be changed from type [keyword] to [long]"));
}

public void testMergingWithPassThrough() {
boolean isSourceSynthetic = randomBoolean();
var objectMapper = new RootObjectMapper.Builder("_doc").add(
new ObjectMapper.Builder("metrics").add(new KeywordFieldMapper.Builder("cpu_usage", IndexVersion.current()))
).build(MapperBuilderContext.root(isSourceSynthetic, true));
var passThroughMapper = new RootObjectMapper.Builder("_doc").add(
new PassThroughObjectMapper.Builder("metrics").setPriority(10)
.add(new KeywordFieldMapper.Builder("memory_usage", IndexVersion.current()))
).build(MapperBuilderContext.root(isSourceSynthetic, true));
RootObjectMapper merged = objectMapper.merge(
passThroughMapper,
MapperMergeContext.root(isSourceSynthetic, true, MAPPING_UPDATE, Long.MAX_VALUE)
);
assertThat(merged.getMapper("metrics"), instanceOf(PassThroughObjectMapper.class));
PassThroughObjectMapper metrics = (PassThroughObjectMapper) merged.getMapper("metrics");
assertThat(metrics.getMapper("cpu_usage"), instanceOf(KeywordFieldMapper.class));
assertThat(metrics.getMapper("memory_usage"), instanceOf(KeywordFieldMapper.class));

var subobjectsTrueMapper = new RootObjectMapper.Builder("_doc").add(
new ObjectMapper.Builder("metrics", Explicit.of(ObjectMapper.Subobjects.ENABLED)).add(
new KeywordFieldMapper.Builder("cpu_usage", IndexVersion.current())
)
).build(MapperBuilderContext.root(isSourceSynthetic, true));
IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> subobjectsTrueMapper.merge(passThroughMapper, MapperMergeContext.root(false, false, MAPPING_UPDATE, Long.MAX_VALUE))
);
assertThat(
e.getMessage(),
equalTo("can't merge a passthrough mapping [metrics] with an object mapping that is either root or has subobjects enabled")
);
}

private static RootObjectMapper createRootSubobjectFalseLeafWithDots() {
FieldMapper.Builder fieldBuilder = new KeywordFieldMapper.Builder("host.name", IndexVersion.current());
FieldMapper fieldMapper = fieldBuilder.build(MapperBuilderContext.root(false, false));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,16 @@
package org.elasticsearch.index.mapper;

import org.elasticsearch.common.Explicit;
import org.elasticsearch.index.IndexVersion;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static org.elasticsearch.index.mapper.MapperService.MergeReason.MAPPING_UPDATE;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;

public class PassThroughObjectMapperTests extends MapperServiceTestCase {
Expand Down Expand Up @@ -182,6 +185,54 @@ public void testCheckForDuplicatePrioritiesEmpty() throws IOException {
PassThroughObjectMapper.checkForDuplicatePriorities(List.of());
}

public void testMergingWithPassThrough() {
boolean isSourceSynthetic = randomBoolean();
var passThroughMapper = new RootObjectMapper.Builder("_doc").add(new PassThroughObjectMapper.Builder("metrics").setPriority(10))
.build(MapperBuilderContext.root(isSourceSynthetic, true));
var objectMapper = new RootObjectMapper.Builder("_doc").add(
new ObjectMapper.Builder("metrics").add(new KeywordFieldMapper.Builder("cpu_usage", IndexVersion.current()))
).build(MapperBuilderContext.root(isSourceSynthetic, true));

RootObjectMapper merged = passThroughMapper.merge(
objectMapper,
MapperMergeContext.root(isSourceSynthetic, true, MAPPING_UPDATE, Long.MAX_VALUE)
);
assertThat(merged.getMapper("metrics"), instanceOf(PassThroughObjectMapper.class));

var objectMapperWithSubObjectTrue = new RootObjectMapper.Builder("_doc").add(
new ObjectMapper.Builder("metrics", Explicit.of(ObjectMapper.Subobjects.ENABLED)).add(
new KeywordFieldMapper.Builder("cpu_usage", IndexVersion.current())
)
).build(MapperBuilderContext.root(isSourceSynthetic, true));

IllegalArgumentException error = expectThrows(
IllegalArgumentException.class,
() -> passThroughMapper.merge(
objectMapperWithSubObjectTrue,
MapperMergeContext.root(isSourceSynthetic, true, MAPPING_UPDATE, Long.MAX_VALUE)
)
);
assertThat(
error.getMessage(),
equalTo("can't merge a passthrough mapping [metrics] with an object mapping that is either root or has subobjects enabled")
);

var rootObjectMapper = new RootObjectMapper.Builder("metrics").add(
new KeywordFieldMapper.Builder("cpu_usage", IndexVersion.current())
).build(MapperBuilderContext.root(isSourceSynthetic, true));

error = expectThrows(
IllegalArgumentException.class,
() -> new PassThroughObjectMapper.Builder("metrics").setPriority(10)
.build(MapperBuilderContext.root(isSourceSynthetic, true))
.merge(rootObjectMapper, MapperMergeContext.root(isSourceSynthetic, true, MAPPING_UPDATE, Long.MAX_VALUE))
);
assertThat(
error.getMessage(),
equalTo("can't merge a passthrough mapping [metrics] with an object mapping that is either root or has subobjects enabled")
);
}

private PassThroughObjectMapper create(String name, int priority) {
return new PassThroughObjectMapper(
name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,22 +44,27 @@ public class DownsampleIT extends DownsamplingIntegTestCase {

public void testDownsamplingPassthroughDimensions() throws Exception {
String dataStreamName = "metrics-foo";
// Set up template
putTSDBIndexTemplate("my-template", List.of("metrics-foo"), null, """
String mapping = """
{
"properties": {
"attributes": {
"type": "passthrough",
"priority": 10,
"time_series_dimension": true
"time_series_dimension": true,
"properties": {
"os.name": {
"type": "keyword",
"time_series_dimension": true
}
}
},
"metrics.cpu_usage": {
"type": "double",
"time_series_metric": "counter"
}
}
}
""", null, null);
""";

// Create data stream by indexing documents
final Instant now = Instant.now();
Expand All @@ -70,12 +75,67 @@ public void testDownsamplingPassthroughDimensions() throws Exception {
.startObject()
.field("@timestamp", ts)
.field("attributes.host.name", randomFrom("host1", "host2", "host3"))
.field("attributes.os.name", randomFrom("linux", "windows", "macos"))
.field("metrics.cpu_usage", randomDouble())
.endObject();
} catch (IOException e) {
throw new RuntimeException(e);
}
};
downsampleAndAssert(dataStreamName, mapping, sourceSupplier);
}

public void testDownsamplingPassthroughMetrics() throws Exception {
String dataStreamName = "metrics-foo";
String mapping = """
{
"properties": {
"attributes.os.name": {
"type": "keyword",
"time_series_dimension": true
},
"metrics": {
"type": "passthrough",
"priority": 10,
"properties": {
"cpu_usage": {
"type": "double",
"time_series_metric": "counter"
}
}
}
}
}
""";

// Create data stream by indexing documents
final Instant now = Instant.now();
Supplier<XContentBuilder> sourceSupplier = () -> {
String ts = randomDateForRange(now.minusSeconds(60 * 60).toEpochMilli(), now.plusSeconds(60 * 29).toEpochMilli());
try {
return XContentFactory.jsonBuilder()
.startObject()
.field("@timestamp", ts)
.field("attributes.os.name", randomFrom("linux", "windows", "macos"))
.field("metrics.cpu_usage", randomDouble())
.field("metrics.memory_usage", randomDouble())
.endObject();
} catch (IOException e) {
throw new RuntimeException(e);
}
};
downsampleAndAssert(dataStreamName, mapping, sourceSupplier);
}

/**
* Create a data stream with the provided mapping and downsampled the first backing index of this data stream. After downsampling has
* completed, it asserts if the downsampled index is as expected.
*/
private void downsampleAndAssert(String dataStreamName, String mapping, Supplier<XContentBuilder> sourceSupplier) throws Exception {
// Set up template
putTSDBIndexTemplate("my-template", List.of(dataStreamName), null, mapping, null, null);

// Create data stream by indexing documents
bulkIndex(dataStreamName, sourceSupplier, 100);
// Rollover to ensure the index we will downsample is not the write index
assertAcked(client().admin().indices().rolloverIndex(new RolloverRequest(dataStreamName, null)));
Expand Down