Skip to content

Commit 292f65b

Browse files
authored
Add coerce setting to histogram field mapper (#137191)
1 parent ae8cbcf commit 292f65b

File tree

7 files changed

+339
-7
lines changed

7 files changed

+339
-7
lines changed

x-pack/plugin/analytics/build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@ dependencies {
2727
implementation(project(":libs:exponential-histogram"))
2828
compileOnly project(path: xpackModule('core'))
2929
compileOnly project(":server")
30+
3031
testImplementation project(path: ':modules:aggregations')
32+
testImplementation project(xpackModule('otel-data')) // for coerce compatibility tests
3133
testImplementation(testArtifact(project(xpackModule('core'))))
3234
testImplementation(testArtifact(project(":libs:exponential-histogram")))
3335
}

x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/mapper/ExponentialHistogramParser.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.Collections;
2222
import java.util.Comparator;
2323
import java.util.List;
24+
import java.util.Set;
2425

2526
import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
2627

@@ -68,6 +69,20 @@ public class ExponentialHistogramParser {
6869

6970
public static final FeatureFlag EXPONENTIAL_HISTOGRAM_FEATURE = new FeatureFlag("exponential_histogram");
7071

72+
private static final Set<String> ROOT_FIELD_NAMES = Set.of(
73+
SCALE_FIELD.getPreferredName(),
74+
SUM_FIELD.getPreferredName(),
75+
MIN_FIELD.getPreferredName(),
76+
MAX_FIELD.getPreferredName(),
77+
ZERO_FIELD.getPreferredName(),
78+
POSITIVE_FIELD.getPreferredName(),
79+
NEGATIVE_FIELD.getPreferredName()
80+
);
81+
82+
public static boolean isExponentialHistogramSubFieldName(String subFieldName) {
83+
return ROOT_FIELD_NAMES.contains(subFieldName);
84+
}
85+
7186
/**
7287
* Represents a parsed exponential histogram.
7388
* The values are validated, excepted for {@link #sum()}, {@link #min()} and {@link #max()}.

x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/mapper/HistogramFieldMapper.java

Lines changed: 40 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.elasticsearch.common.Explicit;
2020
import org.elasticsearch.common.io.stream.ByteArrayStreamInput;
2121
import org.elasticsearch.common.io.stream.BytesStreamOutput;
22+
import org.elasticsearch.common.settings.Setting;
2223
import org.elasticsearch.common.util.BigArrays;
2324
import org.elasticsearch.index.fielddata.FieldDataContext;
2425
import org.elasticsearch.index.fielddata.FormattedDocValues;
@@ -37,6 +38,7 @@
3738
import org.elasticsearch.index.mapper.IndexType;
3839
import org.elasticsearch.index.mapper.MappedFieldType;
3940
import org.elasticsearch.index.mapper.MapperBuilderContext;
41+
import org.elasticsearch.index.mapper.NumberFieldMapper;
4042
import org.elasticsearch.index.mapper.SourceLoader;
4143
import org.elasticsearch.index.mapper.SourceValueFetcher;
4244
import org.elasticsearch.index.mapper.ValueFetcher;
@@ -62,8 +64,12 @@
6264
* Field Mapper for pre-aggregated histograms.
6365
*/
6466
public class HistogramFieldMapper extends FieldMapper {
67+
6568
public static final String CONTENT_TYPE = "histogram";
6669

70+
// use the same default as numbers
71+
private static final Setting<Boolean> COERCE_SETTING = NumberFieldMapper.COERCE_SETTING;
72+
6773
private static HistogramFieldMapper toType(FieldMapper in) {
6874
return (HistogramFieldMapper) in;
6975
}
@@ -72,20 +78,26 @@ public static class Builder extends FieldMapper.Builder {
7278

7379
private final Parameter<Map<String, String>> meta = Parameter.metaParam();
7480
private final Parameter<Explicit<Boolean>> ignoreMalformed;
81+
private final Parameter<Explicit<Boolean>> coerce;
7582

76-
public Builder(String name, boolean ignoreMalformedByDefault) {
83+
public Builder(String name, boolean ignoreMalformedByDefault, boolean coerceByDefault) {
7784
super(name);
7885
this.ignoreMalformed = Parameter.explicitBoolParam(
7986
"ignore_malformed",
8087
true,
8188
m -> toType(m).ignoreMalformed,
8289
ignoreMalformedByDefault
8390
);
91+
this.coerce = Parameter.explicitBoolParam("coerce", true, m -> toType(m).coerce, coerceByDefault);
8492
}
8593

8694
@Override
8795
protected Parameter<?>[] getParameters() {
88-
return new Parameter<?>[] { ignoreMalformed, meta };
96+
if (ExponentialHistogramParser.EXPONENTIAL_HISTOGRAM_FEATURE.isEnabled()) {
97+
return new Parameter<?>[] { ignoreMalformed, coerce, meta };
98+
} else {
99+
return new Parameter<?>[] { ignoreMalformed, meta };
100+
}
89101
}
90102

91103
@Override
@@ -100,32 +112,41 @@ public HistogramFieldMapper build(MapperBuilderContext context) {
100112
}
101113

102114
public static final TypeParser PARSER = new TypeParser(
103-
(n, c) -> new Builder(n, IGNORE_MALFORMED_SETTING.get(c.getSettings())),
115+
(n, c) -> new Builder(n, IGNORE_MALFORMED_SETTING.get(c.getSettings()), COERCE_SETTING.get(c.getSettings())),
104116
notInMultiFields(CONTENT_TYPE)
105117
);
106118

107119
private final Explicit<Boolean> ignoreMalformed;
108120
private final boolean ignoreMalformedByDefault;
109121

122+
private final Explicit<Boolean> coerce;
123+
private final boolean coerceByDefault;
124+
110125
public HistogramFieldMapper(String simpleName, MappedFieldType mappedFieldType, BuilderParams builderParams, Builder builder) {
111126
super(simpleName, mappedFieldType, builderParams);
112127
this.ignoreMalformed = builder.ignoreMalformed.getValue();
113128
this.ignoreMalformedByDefault = builder.ignoreMalformed.getDefaultValue().value();
129+
this.coerce = builder.coerce.getValue();
130+
this.coerceByDefault = builder.coerce.getDefaultValue().value();
114131
}
115132

116133
@Override
117134
public boolean ignoreMalformed() {
118135
return ignoreMalformed.value();
119136
}
120137

138+
boolean coerce() {
139+
return coerce.value();
140+
}
141+
121142
@Override
122143
protected String contentType() {
123144
return CONTENT_TYPE;
124145
}
125146

126147
@Override
127148
public FieldMapper.Builder getMergeBuilder() {
128-
return new Builder(leafName(), ignoreMalformedByDefault).init(this);
149+
return new Builder(leafName(), ignoreMalformedByDefault, coerceByDefault).init(this);
129150
}
130151

131152
@Override
@@ -300,7 +321,20 @@ public void parse(DocumentParserContext context) throws IOException {
300321
subParser = new XContentSubParser(context.parser());
301322
}
302323
subParser.nextToken();
303-
HistogramParser.ParsedHistogram parsedHistogram = HistogramParser.parse(fullPath(), subParser);
324+
325+
HistogramParser.ParsedHistogram parsedHistogram;
326+
if (ExponentialHistogramParser.EXPONENTIAL_HISTOGRAM_FEATURE.isEnabled()
327+
&& coerce()
328+
&& subParser.currentToken() == XContentParser.Token.FIELD_NAME
329+
&& ExponentialHistogramParser.isExponentialHistogramSubFieldName(subParser.currentName())) {
330+
ExponentialHistogramParser.ParsedExponentialHistogram parsedExponential = ExponentialHistogramParser.parse(
331+
fullPath(),
332+
subParser
333+
);
334+
parsedHistogram = ParsedHistogramConverter.exponentialToTDigest(parsedExponential);
335+
} else {
336+
parsedHistogram = HistogramParser.parse(fullPath(), subParser);
337+
}
304338

305339
BytesStreamOutput streamOutput = new BytesStreamOutput();
306340
for (int i = 0; i < parsedHistogram.values().size(); i++) {
@@ -358,7 +392,7 @@ public void parse(DocumentParserContext context) throws IOException {
358392
}
359393

360394
/** re-usable {@link HistogramValue} implementation */
361-
private static class InternalHistogramValue extends HistogramValue {
395+
static class InternalHistogramValue extends HistogramValue {
362396
double value;
363397
long count;
364398
boolean isExhausted;
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.analytics.mapper;
9+
10+
import org.elasticsearch.exponentialhistogram.ExponentialScaleUtils;
11+
12+
import java.util.ArrayList;
13+
import java.util.List;
14+
15+
public class ParsedHistogramConverter {
16+
17+
/**
18+
* Converts exponential histograms to t-digests using the very same algorithm as the built-in OTLP metrics endpoint.
19+
*
20+
* @param expHisto the exponential histogram to convert
21+
* @return the resulting t-digest histogram
22+
*/
23+
public static HistogramParser.ParsedHistogram exponentialToTDigest(ExponentialHistogramParser.ParsedExponentialHistogram expHisto) {
24+
// We don't want to reuse the code across the OTLP intake an the field mappers because they use different data models
25+
// and shuffling the data into a common format or interface would be more expensive and complex than just duplicating the logic.
26+
List<Double> centroids = new ArrayList<>(); // sorted from descending to ascending
27+
List<Long> counts = new ArrayList<>();
28+
29+
List<IndexWithCount> neg = expHisto.negativeBuckets();
30+
for (int i = neg.size() - 1; i >= 0; i--) {
31+
appendBucketCentroid(centroids, counts, neg.get(i), expHisto.scale(), -1);
32+
}
33+
if (expHisto.zeroCount() > 0) {
34+
centroids.add(0.0);
35+
counts.add(expHisto.zeroCount());
36+
}
37+
for (IndexWithCount positiveBucket : expHisto.positiveBuckets()) {
38+
appendBucketCentroid(centroids, counts, positiveBucket, expHisto.scale(), 1);
39+
}
40+
assert centroids.size() == counts.size();
41+
assert centroids.stream().sorted().toList().equals(centroids);
42+
return new HistogramParser.ParsedHistogram(centroids, counts);
43+
}
44+
45+
private static void appendBucketCentroid(
46+
List<Double> centroids,
47+
List<Long> counts,
48+
IndexWithCount expHistoBucket,
49+
int scale,
50+
int sign
51+
) {
52+
double lowerBound = ExponentialScaleUtils.getLowerBucketBoundary(expHistoBucket.index(), scale);
53+
double upperBound = ExponentialScaleUtils.getUpperBucketBoundary(expHistoBucket.index(), scale);
54+
double center = sign * (lowerBound + upperBound) / 2.0;
55+
centroids.add(center);
56+
counts.add(expHistoBucket.count());
57+
}
58+
}

x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/mapper/HistogramFieldMapperTests.java

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,13 @@
66
*/
77
package org.elasticsearch.xpack.analytics.mapper;
88

9+
import org.apache.lucene.index.IndexableField;
910
import org.elasticsearch.common.Strings;
11+
import org.elasticsearch.common.bytes.BytesReference;
1012
import org.elasticsearch.core.CheckedConsumer;
13+
import org.elasticsearch.exponentialhistogram.ExponentialHistogram;
14+
import org.elasticsearch.exponentialhistogram.ExponentialHistogramTestUtils;
15+
import org.elasticsearch.exponentialhistogram.ExponentialHistogramXContent;
1116
import org.elasticsearch.index.mapper.DocumentMapper;
1217
import org.elasticsearch.index.mapper.DocumentParsingException;
1318
import org.elasticsearch.index.mapper.MappedFieldType;
@@ -17,6 +22,10 @@
1722
import org.elasticsearch.index.mapper.SourceToParse;
1823
import org.elasticsearch.plugins.Plugin;
1924
import org.elasticsearch.xcontent.XContentBuilder;
25+
import org.elasticsearch.xcontent.XContentFactory;
26+
import org.elasticsearch.xcontent.XContentParser;
27+
import org.elasticsearch.xcontent.XContentParserConfiguration;
28+
import org.elasticsearch.xcontent.XContentType;
2029
import org.elasticsearch.xcontent.json.JsonXContent;
2130
import org.elasticsearch.xpack.analytics.AnalyticsPlugin;
2231
import org.junit.AssumptionViolatedException;
@@ -29,6 +38,7 @@
2938
import java.util.Map;
3039

3140
import static org.hamcrest.Matchers.containsString;
41+
import static org.hamcrest.Matchers.equalTo;
3242
import static org.hamcrest.Matchers.notNullValue;
3343
import static org.hamcrest.Matchers.nullValue;
3444

@@ -57,6 +67,9 @@ protected void minimalMapping(XContentBuilder b) throws IOException {
5767
@Override
5868
protected void registerParameters(ParameterChecker checker) throws IOException {
5969
checker.registerUpdateCheck(b -> b.field("ignore_malformed", true), m -> assertTrue(((HistogramFieldMapper) m).ignoreMalformed()));
70+
if (ExponentialHistogramParser.EXPONENTIAL_HISTOGRAM_FEATURE.isEnabled()) {
71+
checker.registerUpdateCheck(b -> b.field("coerce", false), m -> assertFalse(((HistogramFieldMapper) m).coerce()));
72+
}
6073
}
6174

6275
@Override
@@ -116,6 +129,62 @@ public void testMissingFieldCounts() throws Exception {
116129
assertThat(e.getCause().getMessage(), containsString("expected field called [counts]"));
117130
}
118131

132+
public void testCoerce() throws IOException {
133+
ExponentialHistogram input = ExponentialHistogramTestUtils.randomHistogram();
134+
135+
XContentBuilder inputJson = XContentFactory.jsonBuilder();
136+
inputJson.startObject().field("field");
137+
ExponentialHistogramXContent.serialize(inputJson, input);
138+
inputJson.endObject();
139+
BytesReference inputDocBytes = BytesReference.bytes(inputJson);
140+
141+
XContentParser docParser = XContentType.JSON.xContent()
142+
.createParser(XContentParserConfiguration.EMPTY, inputDocBytes.streamInput());
143+
docParser.nextToken(); // start object
144+
docParser.nextToken(); // field name
145+
docParser.nextToken(); // start object
146+
docParser.nextToken(); // point at first sub-field
147+
HistogramParser.ParsedHistogram expectedCoerced = ParsedHistogramConverter.exponentialToTDigest(
148+
ExponentialHistogramParser.parse("field", docParser)
149+
);
150+
151+
DocumentMapper defaultMapper = createDocumentMapper(fieldMapping(this::minimalMapping));
152+
if (ExponentialHistogramParser.EXPONENTIAL_HISTOGRAM_FEATURE.isEnabled() == false) {
153+
// feature flag is disabled, so coerce should not work
154+
ThrowingRunnable runnable = () -> defaultMapper.parse(new SourceToParse("1", inputDocBytes, XContentType.JSON));
155+
DocumentParsingException e = expectThrows(DocumentParsingException.class, runnable);
156+
assertThat(e.getCause().getMessage(), containsString("unknown parameter [scale]"));
157+
} else {
158+
ParsedDocument doc = defaultMapper.parse(new SourceToParse("1", inputDocBytes, XContentType.JSON));
159+
List<IndexableField> fields = doc.rootDoc().getFields("field");
160+
assertThat(fields.size(), equalTo(1));
161+
assertThat(docValueToParsedHistogram(fields.getFirst()), equalTo(expectedCoerced));
162+
163+
DocumentMapper coerceDisabledMapper = createDocumentMapper(
164+
fieldMapping(b -> b.field("type", "histogram").field("coerce", false))
165+
);
166+
ThrowingRunnable runnable = () -> coerceDisabledMapper.parse(new SourceToParse("1", inputDocBytes, XContentType.JSON));
167+
DocumentParsingException e = expectThrows(DocumentParsingException.class, runnable);
168+
assertThat(e.getCause().getMessage(), containsString("unknown parameter [scale]"));
169+
}
170+
}
171+
172+
private static HistogramParser.ParsedHistogram docValueToParsedHistogram(IndexableField indexableField) {
173+
HistogramFieldMapper.InternalHistogramValue histogramValue = new HistogramFieldMapper.InternalHistogramValue();
174+
histogramValue.reset(indexableField.binaryValue());
175+
List<Long> counts = new ArrayList<>();
176+
List<Double> values = new ArrayList<>();
177+
try {
178+
while (histogramValue.next()) {
179+
counts.add(histogramValue.count());
180+
values.add(histogramValue.value());
181+
}
182+
} catch (IOException e) {
183+
throw new RuntimeException(e);
184+
}
185+
return new HistogramParser.ParsedHistogram(values, counts);
186+
}
187+
119188
@Override
120189
protected boolean supportsIgnoreMalformed() {
121190
return true;

0 commit comments

Comments
 (0)