Skip to content

Commit c71588b

Browse files
authored
[Downsampling++] Allow merging of passthrough mappers with object mappers under certain conditions. (#135431)
This PR is an alternative of #134996: This fixes a bug that would be triggered when we would downsample an index that had a metric field under a passthrough field. For example: ``` "metrics": { "type": "passthrough", "priority": 10, "properties": { "cpu_usage": { "type": "double", "time_series_metric": "gauge" } } } ``` In this PR, we extend the merging possibilities of a passthrough field to allow merging with object mappers that have subobjects false and they are not a root object. They also fix merging an object mapper with a passthrough field and ensure that the result will either be a passthrough field or an error if the objects are incompatible.
1 parent 400cf61 commit c71588b

File tree

7 files changed

+219
-14
lines changed

7 files changed

+219
-14
lines changed

docs/changelog/135431.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 135431
2+
summary: "[Downsampling++] Allow merging of passthrough mappers with object mappers\
3+
\ under certain conditions"
4+
area: Mapping
5+
type: bug
6+
issues: []

server/src/main/java/org/elasticsearch/index/mapper/MapperErrors.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,12 @@ static void throwObjectMappingConflictError(String fieldName) throws IllegalArgu
1414
throw new IllegalArgumentException("can't merge a non object mapping [" + fieldName + "] with an object mapping");
1515
}
1616

17+
static void throwPassThroughMappingConflictError(String fieldName) throws IllegalArgumentException {
18+
throw new IllegalArgumentException(
19+
"can't merge a passthrough mapping [" + fieldName + "] with an object mapping that is either root or has subobjects enabled"
20+
);
21+
}
22+
1723
static void throwNestedMappingConflictError(String fieldName) throws IllegalArgumentException {
1824
throw new IllegalArgumentException("can't merge a non-nested mapping [" + fieldName + "] with a nested mapping");
1925
}

server/src/main/java/org/elasticsearch/index/mapper/ObjectMapper.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -577,6 +577,22 @@ public ObjectMapper merge(Mapper mergeWith, MapperMergeContext parentMergeContex
577577
MapperErrors.throwNestedMappingConflictError(mergeWith.fullPath());
578578
}
579579
var mergeResult = MergeResult.build(this, (ObjectMapper) mergeWith, parentMergeContext);
580+
if (mergeWith instanceof PassThroughObjectMapper passThroughObjectMapper) {
581+
if (PassThroughObjectMapper.isEligibleForMerge(this)) {
582+
return new PassThroughObjectMapper(
583+
leafName(),
584+
fullPath,
585+
mergeResult.enabled,
586+
mergeResult.sourceKeepMode,
587+
mergeResult.dynamic,
588+
mergeResult.mappers,
589+
passThroughObjectMapper.timeSeriesDimensionSubFields(),
590+
passThroughObjectMapper.priority()
591+
);
592+
} else {
593+
MapperErrors.throwPassThroughMappingConflictError(fullPath());
594+
}
595+
}
580596
return new ObjectMapper(
581597
leafName(),
582598
fullPath,

server/src/main/java/org/elasticsearch/index/mapper/PassThroughObjectMapper.java

Lines changed: 43 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,10 @@ public int priority() {
136136
return priority;
137137
}
138138

139+
public Explicit<Boolean> timeSeriesDimensionSubFields() {
140+
return timeSeriesDimensionSubFields;
141+
}
142+
139143
@Override
140144
public PassThroughObjectMapper.Builder newBuilder(IndexVersion indexVersionCreated) {
141145
PassThroughObjectMapper.Builder builder = new PassThroughObjectMapper.Builder(leafName());
@@ -148,29 +152,58 @@ public PassThroughObjectMapper.Builder newBuilder(IndexVersion indexVersionCreat
148152

149153
@Override
150154
public PassThroughObjectMapper merge(Mapper mergeWith, MapperMergeContext parentBuilderContext) {
151-
if (mergeWith instanceof PassThroughObjectMapper == false) {
155+
if (mergeWith instanceof ObjectMapper == false) {
152156
MapperErrors.throwObjectMappingConflictError(mergeWith.fullPath());
153157
}
158+
ObjectMapper mergeWithObjectMapper = (ObjectMapper) mergeWith;
159+
if (mergeWithObjectMapper instanceof PassThroughObjectMapper mergeWithPassThrough) {
160+
final var mergeResult = MergeResult.build(this, mergeWithPassThrough, parentBuilderContext);
161+
final Explicit<Boolean> containsDimensions = (mergeWithPassThrough.timeSeriesDimensionSubFields.explicit())
162+
? mergeWithPassThrough.timeSeriesDimensionSubFields
163+
: this.timeSeriesDimensionSubFields;
154164

155-
PassThroughObjectMapper mergeWithObject = (PassThroughObjectMapper) mergeWith;
156-
final var mergeResult = MergeResult.build(this, mergeWithObject, parentBuilderContext);
157-
158-
final Explicit<Boolean> containsDimensions = (mergeWithObject.timeSeriesDimensionSubFields.explicit())
159-
? mergeWithObject.timeSeriesDimensionSubFields
160-
: this.timeSeriesDimensionSubFields;
161-
165+
return new PassThroughObjectMapper(
166+
leafName(),
167+
fullPath(),
168+
mergeResult.enabled(),
169+
mergeResult.sourceKeepMode(),
170+
mergeResult.dynamic(),
171+
mergeResult.mappers(),
172+
containsDimensions,
173+
Math.max(priority, mergeWithPassThrough.priority)
174+
);
175+
}
176+
if (mergeWithObjectMapper instanceof NestedObjectMapper) {
177+
MapperErrors.throwNestedMappingConflictError(fullPath());
178+
}
179+
if (isEligibleForMerge(mergeWithObjectMapper) == false) {
180+
MapperErrors.throwPassThroughMappingConflictError(fullPath());
181+
}
182+
MergeResult mergeResult = MergeResult.build(this, mergeWithObjectMapper, parentBuilderContext);
162183
return new PassThroughObjectMapper(
163184
leafName(),
164185
fullPath(),
165186
mergeResult.enabled(),
166187
mergeResult.sourceKeepMode(),
167188
mergeResult.dynamic(),
168189
mergeResult.mappers(),
169-
containsDimensions,
170-
Math.max(priority, mergeWithObject.priority)
190+
timeSeriesDimensionSubFields,
191+
priority
171192
);
172193
}
173194

195+
/**
196+
* An object mapper is compatible to be merged with a passthrough mapper if
197+
* - It is not a root mapper
198+
* - If it does not have subobjects true
199+
*/
200+
static boolean isEligibleForMerge(ObjectMapper objectMapper) {
201+
return objectMapper.isRoot() == false
202+
&& (objectMapper.subobjects == null
203+
|| objectMapper.subobjects.explicit() == false
204+
|| objectMapper.subobjects.value().equals(Subobjects.DISABLED));
205+
}
206+
174207
@Override
175208
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
176209
builder.startObject(leafName());

server/src/test/java/org/elasticsearch/index/mapper/ObjectMapperMergeTests.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,39 @@ public void testConflictingDynamicUpdate() {
344344
assertThat(e.getMessage(), equalTo("mapper [http.status_code] cannot be changed from type [keyword] to [long]"));
345345
}
346346

347+
public void testMergingWithPassThrough() {
348+
boolean isSourceSynthetic = randomBoolean();
349+
var objectMapper = new RootObjectMapper.Builder("_doc").add(
350+
new ObjectMapper.Builder("metrics").add(new KeywordFieldMapper.Builder("cpu_usage", IndexVersion.current()))
351+
).build(MapperBuilderContext.root(isSourceSynthetic, true));
352+
var passThroughMapper = new RootObjectMapper.Builder("_doc").add(
353+
new PassThroughObjectMapper.Builder("metrics").setPriority(10)
354+
.add(new KeywordFieldMapper.Builder("memory_usage", IndexVersion.current()))
355+
).build(MapperBuilderContext.root(isSourceSynthetic, true));
356+
RootObjectMapper merged = objectMapper.merge(
357+
passThroughMapper,
358+
MapperMergeContext.root(isSourceSynthetic, true, MAPPING_UPDATE, Long.MAX_VALUE)
359+
);
360+
assertThat(merged.getMapper("metrics"), instanceOf(PassThroughObjectMapper.class));
361+
PassThroughObjectMapper metrics = (PassThroughObjectMapper) merged.getMapper("metrics");
362+
assertThat(metrics.getMapper("cpu_usage"), instanceOf(KeywordFieldMapper.class));
363+
assertThat(metrics.getMapper("memory_usage"), instanceOf(KeywordFieldMapper.class));
364+
365+
var subobjectsTrueMapper = new RootObjectMapper.Builder("_doc").add(
366+
new ObjectMapper.Builder("metrics", Explicit.of(ObjectMapper.Subobjects.ENABLED)).add(
367+
new KeywordFieldMapper.Builder("cpu_usage", IndexVersion.current())
368+
)
369+
).build(MapperBuilderContext.root(isSourceSynthetic, true));
370+
IllegalArgumentException e = expectThrows(
371+
IllegalArgumentException.class,
372+
() -> subobjectsTrueMapper.merge(passThroughMapper, MapperMergeContext.root(false, false, MAPPING_UPDATE, Long.MAX_VALUE))
373+
);
374+
assertThat(
375+
e.getMessage(),
376+
equalTo("can't merge a passthrough mapping [metrics] with an object mapping that is either root or has subobjects enabled")
377+
);
378+
}
379+
347380
private static RootObjectMapper createRootSubobjectFalseLeafWithDots() {
348381
FieldMapper.Builder fieldBuilder = new KeywordFieldMapper.Builder("host.name", IndexVersion.current());
349382
FieldMapper fieldMapper = fieldBuilder.build(MapperBuilderContext.root(false, false));

server/src/test/java/org/elasticsearch/index/mapper/PassThroughObjectMapperTests.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,16 @@
1010
package org.elasticsearch.index.mapper;
1111

1212
import org.elasticsearch.common.Explicit;
13+
import org.elasticsearch.index.IndexVersion;
1314

1415
import java.io.IOException;
1516
import java.util.List;
1617
import java.util.Map;
1718
import java.util.Optional;
1819

20+
import static org.elasticsearch.index.mapper.MapperService.MergeReason.MAPPING_UPDATE;
1921
import static org.hamcrest.Matchers.containsString;
22+
import static org.hamcrest.Matchers.equalTo;
2023
import static org.hamcrest.Matchers.instanceOf;
2124

2225
public class PassThroughObjectMapperTests extends MapperServiceTestCase {
@@ -182,6 +185,54 @@ public void testCheckForDuplicatePrioritiesEmpty() throws IOException {
182185
PassThroughObjectMapper.checkForDuplicatePriorities(List.of());
183186
}
184187

188+
public void testMergingWithPassThrough() {
189+
boolean isSourceSynthetic = randomBoolean();
190+
var passThroughMapper = new RootObjectMapper.Builder("_doc").add(new PassThroughObjectMapper.Builder("metrics").setPriority(10))
191+
.build(MapperBuilderContext.root(isSourceSynthetic, true));
192+
var objectMapper = new RootObjectMapper.Builder("_doc").add(
193+
new ObjectMapper.Builder("metrics").add(new KeywordFieldMapper.Builder("cpu_usage", IndexVersion.current()))
194+
).build(MapperBuilderContext.root(isSourceSynthetic, true));
195+
196+
RootObjectMapper merged = passThroughMapper.merge(
197+
objectMapper,
198+
MapperMergeContext.root(isSourceSynthetic, true, MAPPING_UPDATE, Long.MAX_VALUE)
199+
);
200+
assertThat(merged.getMapper("metrics"), instanceOf(PassThroughObjectMapper.class));
201+
202+
var objectMapperWithSubObjectTrue = new RootObjectMapper.Builder("_doc").add(
203+
new ObjectMapper.Builder("metrics", Explicit.of(ObjectMapper.Subobjects.ENABLED)).add(
204+
new KeywordFieldMapper.Builder("cpu_usage", IndexVersion.current())
205+
)
206+
).build(MapperBuilderContext.root(isSourceSynthetic, true));
207+
208+
IllegalArgumentException error = expectThrows(
209+
IllegalArgumentException.class,
210+
() -> passThroughMapper.merge(
211+
objectMapperWithSubObjectTrue,
212+
MapperMergeContext.root(isSourceSynthetic, true, MAPPING_UPDATE, Long.MAX_VALUE)
213+
)
214+
);
215+
assertThat(
216+
error.getMessage(),
217+
equalTo("can't merge a passthrough mapping [metrics] with an object mapping that is either root or has subobjects enabled")
218+
);
219+
220+
var rootObjectMapper = new RootObjectMapper.Builder("metrics").add(
221+
new KeywordFieldMapper.Builder("cpu_usage", IndexVersion.current())
222+
).build(MapperBuilderContext.root(isSourceSynthetic, true));
223+
224+
error = expectThrows(
225+
IllegalArgumentException.class,
226+
() -> new PassThroughObjectMapper.Builder("metrics").setPriority(10)
227+
.build(MapperBuilderContext.root(isSourceSynthetic, true))
228+
.merge(rootObjectMapper, MapperMergeContext.root(isSourceSynthetic, true, MAPPING_UPDATE, Long.MAX_VALUE))
229+
);
230+
assertThat(
231+
error.getMessage(),
232+
equalTo("can't merge a passthrough mapping [metrics] with an object mapping that is either root or has subobjects enabled")
233+
);
234+
}
235+
185236
private PassThroughObjectMapper create(String name, int priority) {
186237
return new PassThroughObjectMapper(
187238
name,

x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DownsampleIT.java

Lines changed: 64 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,22 +44,27 @@ public class DownsampleIT extends DownsamplingIntegTestCase {
4444

4545
public void testDownsamplingPassthroughDimensions() throws Exception {
4646
String dataStreamName = "metrics-foo";
47-
// Set up template
48-
putTSDBIndexTemplate("my-template", List.of("metrics-foo"), null, """
47+
String mapping = """
4948
{
5049
"properties": {
5150
"attributes": {
5251
"type": "passthrough",
5352
"priority": 10,
54-
"time_series_dimension": true
53+
"time_series_dimension": true,
54+
"properties": {
55+
"os.name": {
56+
"type": "keyword",
57+
"time_series_dimension": true
58+
}
59+
}
5560
},
5661
"metrics.cpu_usage": {
5762
"type": "double",
5863
"time_series_metric": "counter"
5964
}
6065
}
6166
}
62-
""", null, null);
67+
""";
6368

6469
// Create data stream by indexing documents
6570
final Instant now = Instant.now();
@@ -70,12 +75,67 @@ public void testDownsamplingPassthroughDimensions() throws Exception {
7075
.startObject()
7176
.field("@timestamp", ts)
7277
.field("attributes.host.name", randomFrom("host1", "host2", "host3"))
78+
.field("attributes.os.name", randomFrom("linux", "windows", "macos"))
79+
.field("metrics.cpu_usage", randomDouble())
80+
.endObject();
81+
} catch (IOException e) {
82+
throw new RuntimeException(e);
83+
}
84+
};
85+
downsampleAndAssert(dataStreamName, mapping, sourceSupplier);
86+
}
87+
88+
public void testDownsamplingPassthroughMetrics() throws Exception {
89+
String dataStreamName = "metrics-foo";
90+
String mapping = """
91+
{
92+
"properties": {
93+
"attributes.os.name": {
94+
"type": "keyword",
95+
"time_series_dimension": true
96+
},
97+
"metrics": {
98+
"type": "passthrough",
99+
"priority": 10,
100+
"properties": {
101+
"cpu_usage": {
102+
"type": "double",
103+
"time_series_metric": "counter"
104+
}
105+
}
106+
}
107+
}
108+
}
109+
""";
110+
111+
// Create data stream by indexing documents
112+
final Instant now = Instant.now();
113+
Supplier<XContentBuilder> sourceSupplier = () -> {
114+
String ts = randomDateForRange(now.minusSeconds(60 * 60).toEpochMilli(), now.plusSeconds(60 * 29).toEpochMilli());
115+
try {
116+
return XContentFactory.jsonBuilder()
117+
.startObject()
118+
.field("@timestamp", ts)
119+
.field("attributes.os.name", randomFrom("linux", "windows", "macos"))
73120
.field("metrics.cpu_usage", randomDouble())
121+
.field("metrics.memory_usage", randomDouble())
74122
.endObject();
75123
} catch (IOException e) {
76124
throw new RuntimeException(e);
77125
}
78126
};
127+
downsampleAndAssert(dataStreamName, mapping, sourceSupplier);
128+
}
129+
130+
/**
131+
* Create a data stream with the provided mapping and downsampled the first backing index of this data stream. After downsampling has
132+
* completed, it asserts if the downsampled index is as expected.
133+
*/
134+
private void downsampleAndAssert(String dataStreamName, String mapping, Supplier<XContentBuilder> sourceSupplier) throws Exception {
135+
// Set up template
136+
putTSDBIndexTemplate("my-template", List.of(dataStreamName), null, mapping, null, null);
137+
138+
// Create data stream by indexing documents
79139
bulkIndex(dataStreamName, sourceSupplier, 100);
80140
// Rollover to ensure the index we will downsample is not the write index
81141
assertAcked(client().admin().indices().rolloverIndex(new RolloverRequest(dataStreamName, null)));

0 commit comments

Comments
 (0)