Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/135431.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 135431
summary: "[Downsampling++] Allow merging of passthrough mappers with object mappers\
\ under certain conditions"
area: Mapping
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ static void throwObjectMappingConflictError(String fieldName) throws IllegalArgu
throw new IllegalArgumentException("can't merge a non object mapping [" + fieldName + "] with an object mapping");
}

static void throwPassThroughMappingConflictError(String fieldName) throws IllegalArgumentException {
throw new IllegalArgumentException(
"can't merge a passthrough mapping [" + fieldName + "] with an object mapping that is either root or has subobjects enabled"
);
}

static void throwNestedMappingConflictError(String fieldName) throws IllegalArgumentException {
throw new IllegalArgumentException("can't merge a non-nested mapping [" + fieldName + "] with a nested mapping");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,22 @@ public ObjectMapper merge(Mapper mergeWith, MapperMergeContext parentMergeContex
MapperErrors.throwNestedMappingConflictError(mergeWith.fullPath());
}
var mergeResult = MergeResult.build(this, (ObjectMapper) mergeWith, parentMergeContext);
if (mergeWith instanceof PassThroughObjectMapper passThroughObjectMapper) {
if (PassThroughObjectMapper.isEligibleForMerge(this)) {
return new PassThroughObjectMapper(
leafName(),
fullPath,
mergeResult.enabled,
mergeResult.sourceKeepMode,
mergeResult.dynamic,
mergeResult.mappers,
passThroughObjectMapper.timeSeriesDimensionSubFields(),
passThroughObjectMapper.priority()
);
} else {
MapperErrors.throwPassThroughMappingConflictError(fullPath());
}
}
return new ObjectMapper(
leafName(),
fullPath,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ public int priority() {
return priority;
}

public Explicit<Boolean> timeSeriesDimensionSubFields() {
return timeSeriesDimensionSubFields;
}

@Override
public PassThroughObjectMapper.Builder newBuilder(IndexVersion indexVersionCreated) {
PassThroughObjectMapper.Builder builder = new PassThroughObjectMapper.Builder(leafName());
Expand All @@ -148,29 +152,58 @@ public PassThroughObjectMapper.Builder newBuilder(IndexVersion indexVersionCreat

@Override
public PassThroughObjectMapper merge(Mapper mergeWith, MapperMergeContext parentBuilderContext) {
if (mergeWith instanceof PassThroughObjectMapper == false) {
if (mergeWith instanceof ObjectMapper == false) {
MapperErrors.throwObjectMappingConflictError(mergeWith.fullPath());
}
ObjectMapper mergeWithObjectMapper = (ObjectMapper) mergeWith;
if (mergeWithObjectMapper instanceof PassThroughObjectMapper mergeWithPassThrough) {
final var mergeResult = MergeResult.build(this, mergeWithPassThrough, parentBuilderContext);
final Explicit<Boolean> containsDimensions = (mergeWithPassThrough.timeSeriesDimensionSubFields.explicit())
? mergeWithPassThrough.timeSeriesDimensionSubFields
: this.timeSeriesDimensionSubFields;

PassThroughObjectMapper mergeWithObject = (PassThroughObjectMapper) mergeWith;
final var mergeResult = MergeResult.build(this, mergeWithObject, parentBuilderContext);

final Explicit<Boolean> containsDimensions = (mergeWithObject.timeSeriesDimensionSubFields.explicit())
? mergeWithObject.timeSeriesDimensionSubFields
: this.timeSeriesDimensionSubFields;

return new PassThroughObjectMapper(
leafName(),
fullPath(),
mergeResult.enabled(),
mergeResult.sourceKeepMode(),
mergeResult.dynamic(),
mergeResult.mappers(),
containsDimensions,
Math.max(priority, mergeWithPassThrough.priority)
);
}
if (mergeWithObjectMapper instanceof NestedObjectMapper) {
MapperErrors.throwNestedMappingConflictError(fullPath());
}
if (isEligibleForMerge(mergeWithObjectMapper) == false) {
MapperErrors.throwPassThroughMappingConflictError(fullPath());
}
MergeResult mergeResult = MergeResult.build(this, mergeWithObjectMapper, parentBuilderContext);
return new PassThroughObjectMapper(
leafName(),
fullPath(),
mergeResult.enabled(),
mergeResult.sourceKeepMode(),
mergeResult.dynamic(),
mergeResult.mappers(),
containsDimensions,
Math.max(priority, mergeWithObject.priority)
timeSeriesDimensionSubFields,
priority
);
}

/**
* An object mapper is compatible to be merged with a passthrough mapper if
* - It is not a root mapper
* - If it does not have subobjects true
*/
static boolean isEligibleForMerge(ObjectMapper objectMapper) {
return objectMapper.isRoot() == false
&& (objectMapper.subobjects == null
|| objectMapper.subobjects.explicit() == false
|| objectMapper.subobjects.value().equals(Subobjects.DISABLED));
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(leafName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,39 @@ public void testConflictingDynamicUpdate() {
assertThat(e.getMessage(), equalTo("mapper [http.status_code] cannot be changed from type [keyword] to [long]"));
}

public void testMergingWithPassThrough() {
boolean isSourceSynthetic = randomBoolean();
var objectMapper = new RootObjectMapper.Builder("_doc").add(
new ObjectMapper.Builder("metrics").add(new KeywordFieldMapper.Builder("cpu_usage", IndexVersion.current()))
).build(MapperBuilderContext.root(isSourceSynthetic, true));
var passThroughMapper = new RootObjectMapper.Builder("_doc").add(
new PassThroughObjectMapper.Builder("metrics").setPriority(10)
.add(new KeywordFieldMapper.Builder("memory_usage", IndexVersion.current()))
).build(MapperBuilderContext.root(isSourceSynthetic, true));
RootObjectMapper merged = objectMapper.merge(
passThroughMapper,
MapperMergeContext.root(isSourceSynthetic, true, MAPPING_UPDATE, Long.MAX_VALUE)
);
assertThat(merged.getMapper("metrics"), instanceOf(PassThroughObjectMapper.class));
PassThroughObjectMapper metrics = (PassThroughObjectMapper) merged.getMapper("metrics");
assertThat(metrics.getMapper("cpu_usage"), instanceOf(KeywordFieldMapper.class));
assertThat(metrics.getMapper("memory_usage"), instanceOf(KeywordFieldMapper.class));

var subobjectsTrueMapper = new RootObjectMapper.Builder("_doc").add(
new ObjectMapper.Builder("metrics", Explicit.of(ObjectMapper.Subobjects.ENABLED)).add(
new KeywordFieldMapper.Builder("cpu_usage", IndexVersion.current())
)
).build(MapperBuilderContext.root(isSourceSynthetic, true));
IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> subobjectsTrueMapper.merge(passThroughMapper, MapperMergeContext.root(false, false, MAPPING_UPDATE, Long.MAX_VALUE))
);
assertThat(
e.getMessage(),
equalTo("can't merge a passthrough mapping [metrics] with an object mapping that is either root or has subobjects enabled")
);
}

private static RootObjectMapper createRootSubobjectFalseLeafWithDots() {
FieldMapper.Builder fieldBuilder = new KeywordFieldMapper.Builder("host.name", IndexVersion.current());
FieldMapper fieldMapper = fieldBuilder.build(MapperBuilderContext.root(false, false));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,16 @@
package org.elasticsearch.index.mapper;

import org.elasticsearch.common.Explicit;
import org.elasticsearch.index.IndexVersion;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static org.elasticsearch.index.mapper.MapperService.MergeReason.MAPPING_UPDATE;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;

public class PassThroughObjectMapperTests extends MapperServiceTestCase {
Expand Down Expand Up @@ -182,6 +185,54 @@ public void testCheckForDuplicatePrioritiesEmpty() throws IOException {
PassThroughObjectMapper.checkForDuplicatePriorities(List.of());
}

public void testMergingWithPassThrough() {
boolean isSourceSynthetic = randomBoolean();
var passThroughMapper = new RootObjectMapper.Builder("_doc").add(new PassThroughObjectMapper.Builder("metrics").setPriority(10))
.build(MapperBuilderContext.root(isSourceSynthetic, true));
var objectMapper = new RootObjectMapper.Builder("_doc").add(
new ObjectMapper.Builder("metrics").add(new KeywordFieldMapper.Builder("cpu_usage", IndexVersion.current()))
).build(MapperBuilderContext.root(isSourceSynthetic, true));

RootObjectMapper merged = passThroughMapper.merge(
objectMapper,
MapperMergeContext.root(isSourceSynthetic, true, MAPPING_UPDATE, Long.MAX_VALUE)
);
assertThat(merged.getMapper("metrics"), instanceOf(PassThroughObjectMapper.class));

var objectMapperWithSubObjectTrue = new RootObjectMapper.Builder("_doc").add(
new ObjectMapper.Builder("metrics", Explicit.of(ObjectMapper.Subobjects.ENABLED)).add(
new KeywordFieldMapper.Builder("cpu_usage", IndexVersion.current())
)
).build(MapperBuilderContext.root(isSourceSynthetic, true));

IllegalArgumentException error = expectThrows(
IllegalArgumentException.class,
() -> passThroughMapper.merge(
objectMapperWithSubObjectTrue,
MapperMergeContext.root(isSourceSynthetic, true, MAPPING_UPDATE, Long.MAX_VALUE)
)
);
assertThat(
error.getMessage(),
equalTo("can't merge a passthrough mapping [metrics] with an object mapping that is either root or has subobjects enabled")
);

var rootObjectMapper = new RootObjectMapper.Builder("metrics").add(
new KeywordFieldMapper.Builder("cpu_usage", IndexVersion.current())
).build(MapperBuilderContext.root(isSourceSynthetic, true));

error = expectThrows(
IllegalArgumentException.class,
() -> new PassThroughObjectMapper.Builder("metrics").setPriority(10)
.build(MapperBuilderContext.root(isSourceSynthetic, true))
.merge(rootObjectMapper, MapperMergeContext.root(isSourceSynthetic, true, MAPPING_UPDATE, Long.MAX_VALUE))
);
assertThat(
error.getMessage(),
equalTo("can't merge a passthrough mapping [metrics] with an object mapping that is either root or has subobjects enabled")
);
}

private PassThroughObjectMapper create(String name, int priority) {
return new PassThroughObjectMapper(
name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,13 @@ public void testDownsamplingPassthroughDimensions() throws Exception {
"attributes": {
"type": "passthrough",
"priority": 10,
"time_series_dimension": true
"time_series_dimension": true,
"properties": {
"os.name": {
"type": "keyword",
"time_series_dimension": true
}
}
},
"metrics.cpu_usage": {
"type": "double",
Expand All @@ -70,7 +76,83 @@ public void testDownsamplingPassthroughDimensions() throws Exception {
.startObject()
.field("@timestamp", ts)
.field("attributes.host.name", randomFrom("host1", "host2", "host3"))
.field("attributes.os.name", randomFrom("linux", "windows", "macos"))
.field("metrics.cpu_usage", randomDouble())
.endObject();
} catch (IOException e) {
throw new RuntimeException(e);
}
};
bulkIndex(dataStreamName, sourceSupplier, 100);
// Rollover to ensure the index we will downsample is not the write index
assertAcked(client().admin().indices().rolloverIndex(new RolloverRequest(dataStreamName, null)));
List<String> backingIndices = waitForDataStreamBackingIndices(dataStreamName, 2);
String sourceIndex = backingIndices.get(0);
String interval = "5m";
String targetIndex = "downsample-" + interval + "-" + sourceIndex;
// Set the source index to read-only state
assertAcked(
indicesAdmin().prepareUpdateSettings(sourceIndex)
.setSettings(Settings.builder().put(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey(), true).build())
);

DownsampleConfig downsampleConfig = new DownsampleConfig(new DateHistogramInterval(interval));
assertAcked(
client().execute(
DownsampleAction.INSTANCE,
new DownsampleAction.Request(TEST_REQUEST_TIMEOUT, sourceIndex, targetIndex, TIMEOUT, downsampleConfig)
)
);

// Wait for downsampling to complete
SubscribableListener<Void> listener = ClusterServiceUtils.addMasterTemporaryStateListener(clusterState -> {
final var indexMetadata = clusterState.metadata().getProject().index(targetIndex);
if (indexMetadata == null) {
return false;
}
var downsampleStatus = IndexMetadata.INDEX_DOWNSAMPLE_STATUS.get(indexMetadata.getSettings());
return downsampleStatus == IndexMetadata.DownsampleTaskStatus.SUCCESS;
});
safeAwait(listener);

assertDownsampleIndexFieldsAndDimensions(sourceIndex, targetIndex, downsampleConfig);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider deduplicating the shared parts.

}

public void testDownsamplingPassthroughMetrics() throws Exception {
String dataStreamName = "metrics-foo";
// Set up template
putTSDBIndexTemplate("my-template", List.of("metrics-foo"), null, """
{
"properties": {
"attributes.os.name": {
"type": "keyword",
"time_series_dimension": true
},
"metrics": {
"type": "passthrough",
"priority": 10,
"properties": {
"cpu_usage": {
"type": "double",
"time_series_metric": "counter"
}
}
}
}
}
""", null, null);

// Create data stream by indexing documents
final Instant now = Instant.now();
Supplier<XContentBuilder> sourceSupplier = () -> {
String ts = randomDateForRange(now.minusSeconds(60 * 60).toEpochMilli(), now.plusSeconds(60 * 29).toEpochMilli());
try {
return XContentFactory.jsonBuilder()
.startObject()
.field("@timestamp", ts)
.field("attributes.os.name", randomFrom("linux", "windows", "macos"))
.field("metrics.cpu_usage", randomDouble())
.field("metrics.memory_usage", randomDouble())
.endObject();
} catch (IOException e) {
throw new RuntimeException(e);
Expand Down