Skip to content

Commit 7f7e44e

Browse files
not-napoleonelasticsearchmachine
andauthored
T digest csv test support (#138391)
Wire up the t-digest field type to the ESQL CSV tests. Mostly this involves adding the element type and plumbing it though, with some additional work filling out the block builder behaviors. --------- Co-authored-by: elasticsearchmachine <[email protected]>
1 parent f1539fd commit 7f7e44e

File tree

43 files changed

+1021
-76
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+1021
-76
lines changed

libs/tdigest/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ apply plugin: 'elasticsearch.publish'
2323

2424
dependencies {
2525
api project(':libs:core')
26+
api project(':libs:x-content')
2627
api "org.apache.lucene:lucene-core:${versions.lucene}"
2728

2829
testImplementation(project(":test:framework")) {

libs/tdigest/src/main/java/module-info.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@
2020
module org.elasticsearch.tdigest {
2121
requires org.elasticsearch.base;
2222
requires org.apache.lucene.core;
23+
requires org.elasticsearch.xcontent;
2324

2425
exports org.elasticsearch.tdigest;
2526
exports org.elasticsearch.tdigest.arrays;
27+
exports org.elasticsearch.tdigest.parsing;
2628
}
Lines changed: 80 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,41 @@
11
/*
2-
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3-
* or more contributor license agreements. Licensed under the Elastic License
4-
* 2.0; you may not use this file except in compliance with the Elastic License
5-
* 2.0.
2+
* Licensed to Elasticsearch B.V. under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch B.V. licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*
19+
* This project is based on a modification of https://github.com/tdunning/t-digest which is licensed under the Apache 2.0 License.
620
*/
721

8-
package org.elasticsearch.xpack.analytics.mapper;
22+
package org.elasticsearch.tdigest.parsing;
923

10-
import org.elasticsearch.index.mapper.DocumentParsingException;
1124
import org.elasticsearch.xcontent.ParseField;
25+
import org.elasticsearch.xcontent.XContentLocation;
1226
import org.elasticsearch.xcontent.XContentParser;
1327

1428
import java.io.IOException;
1529
import java.util.ArrayList;
1630
import java.util.List;
17-
18-
import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
19-
import static org.elasticsearch.xpack.analytics.mapper.TDigestFieldMapper.CENTROIDS_NAME;
20-
import static org.elasticsearch.xpack.analytics.mapper.TDigestFieldMapper.COUNTS_NAME;
21-
import static org.elasticsearch.xpack.analytics.mapper.TDigestFieldMapper.MAX_FIELD_NAME;
22-
import static org.elasticsearch.xpack.analytics.mapper.TDigestFieldMapper.MIN_FIELD_NAME;
23-
import static org.elasticsearch.xpack.analytics.mapper.TDigestFieldMapper.SUM_FIELD_NAME;
31+
import java.util.function.BiFunction;
2432

2533
public class TDigestParser {
34+
public static final String CENTROIDS_NAME = "centroids";
35+
public static final String COUNTS_NAME = "counts";
36+
public static final String SUM_FIELD_NAME = "sum";
37+
public static final String MIN_FIELD_NAME = "min";
38+
public static final String MAX_FIELD_NAME = "max";
2639

2740
private static final ParseField COUNTS_FIELD = new ParseField(COUNTS_NAME);
2841
private static final ParseField CENTROIDS_FIELD = new ParseField(CENTROIDS_NAME);
@@ -91,9 +104,15 @@ public Long count() {
91104
*
92105
* @param mappedFieldName the name of the field being parsed, used for error messages
93106
* @param parser the parser to use
107+
* @param documentParsingExceptionProvider factory function for generating document parsing exceptions. Required for visibility.
94108
* @return the parsed histogram
95109
*/
96-
public static ParsedTDigest parse(String mappedFieldName, XContentParser parser) throws IOException {
110+
public static ParsedTDigest parse(
111+
String mappedFieldName,
112+
XContentParser parser,
113+
BiFunction<XContentLocation, String, RuntimeException> documentParsingExceptionProvider,
114+
ParsingExceptionProvider parsingExceptionProvider
115+
) throws IOException {
97116
ArrayList<Double> centroids = null;
98117
ArrayList<Long> counts = null;
99118
Double sum = null;
@@ -102,46 +121,46 @@ public static ParsedTDigest parse(String mappedFieldName, XContentParser parser)
102121
XContentParser.Token token = parser.currentToken();
103122
while (token != XContentParser.Token.END_OBJECT) {
104123
// should be a field
105-
ensureExpectedToken(XContentParser.Token.FIELD_NAME, token, parser);
124+
ensureExpectedToken(XContentParser.Token.FIELD_NAME, token, parser, parsingExceptionProvider);
106125
String fieldName = parser.currentName();
107126
if (fieldName.equals(CENTROIDS_FIELD.getPreferredName())) {
108-
centroids = getCentroids(mappedFieldName, parser);
127+
centroids = getCentroids(mappedFieldName, parser, documentParsingExceptionProvider, parsingExceptionProvider);
109128
} else if (fieldName.equals(COUNTS_FIELD.getPreferredName())) {
110-
counts = getCounts(mappedFieldName, parser);
129+
counts = getCounts(mappedFieldName, parser, documentParsingExceptionProvider, parsingExceptionProvider);
111130
} else if (fieldName.equals(SUM_FIELD.getPreferredName())) {
112131
token = parser.nextToken();
113-
ensureExpectedToken(XContentParser.Token.VALUE_NUMBER, token, parser);
132+
ensureExpectedToken(XContentParser.Token.VALUE_NUMBER, token, parser, parsingExceptionProvider);
114133
sum = parser.doubleValue();
115134
} else if (fieldName.equals(MIN_FIELD.getPreferredName())) {
116135
token = parser.nextToken();
117-
ensureExpectedToken(XContentParser.Token.VALUE_NUMBER, token, parser);
136+
ensureExpectedToken(XContentParser.Token.VALUE_NUMBER, token, parser, parsingExceptionProvider);
118137
min = parser.doubleValue();
119138
} else if (fieldName.equals(MAX_FIELD.getPreferredName())) {
120139
token = parser.nextToken();
121-
ensureExpectedToken(XContentParser.Token.VALUE_NUMBER, token, parser);
140+
ensureExpectedToken(XContentParser.Token.VALUE_NUMBER, token, parser, parsingExceptionProvider);
122141
max = parser.doubleValue();
123142
} else {
124-
throw new DocumentParsingException(
143+
throw documentParsingExceptionProvider.apply(
125144
parser.getTokenLocation(),
126145
"error parsing field [" + mappedFieldName + "], with unknown parameter [" + fieldName + "]"
127146
);
128147
}
129148
token = parser.nextToken();
130149
}
131150
if (centroids == null) {
132-
throw new DocumentParsingException(
151+
throw documentParsingExceptionProvider.apply(
133152
parser.getTokenLocation(),
134153
"error parsing field [" + mappedFieldName + "], expected field called [" + CENTROIDS_FIELD.getPreferredName() + "]"
135154
);
136155
}
137156
if (counts == null) {
138-
throw new DocumentParsingException(
157+
throw documentParsingExceptionProvider.apply(
139158
parser.getTokenLocation(),
140159
"error parsing field [" + mappedFieldName + "], expected field called [" + COUNTS_FIELD.getPreferredName() + "]"
141160
);
142161
}
143162
if (centroids.size() != counts.size()) {
144-
throw new DocumentParsingException(
163+
throw documentParsingExceptionProvider.apply(
145164
parser.getTokenLocation(),
146165
"error parsing field ["
147166
+ mappedFieldName
@@ -165,20 +184,25 @@ public static ParsedTDigest parse(String mappedFieldName, XContentParser parser)
165184
return new ParsedTDigest(centroids, counts, sum, min, max);
166185
}
167186

168-
private static ArrayList<Long> getCounts(String mappedFieldName, XContentParser parser) throws IOException {
187+
private static ArrayList<Long> getCounts(
188+
String mappedFieldName,
189+
XContentParser parser,
190+
BiFunction<XContentLocation, String, RuntimeException> documentParsingExceptionProvider,
191+
ParsingExceptionProvider parsingExceptionProvider
192+
) throws IOException {
169193
ArrayList<Long> counts;
170194
XContentParser.Token token;
171195
token = parser.nextToken();
172196
// should be an array
173-
ensureExpectedToken(XContentParser.Token.START_ARRAY, token, parser);
197+
ensureExpectedToken(XContentParser.Token.START_ARRAY, token, parser, parsingExceptionProvider);
174198
counts = new ArrayList<>();
175199
token = parser.nextToken();
176200
while (token != XContentParser.Token.END_ARRAY) {
177201
// should be a number
178-
ensureExpectedToken(XContentParser.Token.VALUE_NUMBER, token, parser);
202+
ensureExpectedToken(XContentParser.Token.VALUE_NUMBER, token, parser, parsingExceptionProvider);
179203
long count = parser.longValue();
180204
if (count < 0) {
181-
throw new DocumentParsingException(
205+
throw documentParsingExceptionProvider.apply(
182206
parser.getTokenLocation(),
183207
"error parsing field [" + mappedFieldName + "], [" + COUNTS_FIELD + "] elements must be >= 0 but got " + count
184208
);
@@ -189,22 +213,27 @@ private static ArrayList<Long> getCounts(String mappedFieldName, XContentParser
189213
return counts;
190214
}
191215

192-
private static ArrayList<Double> getCentroids(String mappedFieldName, XContentParser parser) throws IOException {
216+
private static ArrayList<Double> getCentroids(
217+
String mappedFieldName,
218+
XContentParser parser,
219+
BiFunction<XContentLocation, String, RuntimeException> documentParsingExceptionProvider,
220+
ParsingExceptionProvider parsingExceptionProvider
221+
) throws IOException {
193222
XContentParser.Token token;
194223
ArrayList<Double> centroids;
195224
token = parser.nextToken();
196225
// should be an array
197-
ensureExpectedToken(XContentParser.Token.START_ARRAY, token, parser);
226+
ensureExpectedToken(XContentParser.Token.START_ARRAY, token, parser, parsingExceptionProvider);
198227
centroids = new ArrayList<>();
199228
token = parser.nextToken();
200229
double previousVal = -Double.MAX_VALUE;
201230
while (token != XContentParser.Token.END_ARRAY) {
202231
// should be a number
203-
ensureExpectedToken(XContentParser.Token.VALUE_NUMBER, token, parser);
232+
ensureExpectedToken(XContentParser.Token.VALUE_NUMBER, token, parser, parsingExceptionProvider);
204233
double val = parser.doubleValue();
205234
if (val < previousVal) {
206235
// centroids must be in increasing order
207-
throw new DocumentParsingException(
236+
throw documentParsingExceptionProvider.apply(
208237
parser.getTokenLocation(),
209238
"error parsing field ["
210239
+ mappedFieldName
@@ -224,4 +253,23 @@ private static ArrayList<Double> getCentroids(String mappedFieldName, XContentPa
224253
return centroids;
225254
}
226255

256+
/**
257+
* Interface for throwing a parsing exception, needed for visibility
258+
*/
259+
@FunctionalInterface
260+
public interface ParsingExceptionProvider {
261+
RuntimeException apply(XContentParser parser, XContentParser.Token expected, XContentParser.Token actual) throws IOException;
262+
}
263+
264+
public static void ensureExpectedToken(
265+
XContentParser.Token expected,
266+
XContentParser.Token actual,
267+
XContentParser parser,
268+
ParsingExceptionProvider parsingExceptionProvider
269+
) throws IOException {
270+
if (actual != expected) {
271+
throw parsingExceptionProvider.apply(parser, expected, actual);
272+
}
273+
}
274+
227275
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Licensed to Elasticsearch B.V. under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch B.V. licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*
19+
* This project is based on a modification of https://github.com/tdunning/t-digest which is licensed under the Apache 2.0 License.
20+
*/
21+
22+
/**
23+
* Parsing package contains Elasticsearch specific classes for serializing and deserializing
24+
* t-digests from various formats via Elasticsearch's XContent abstraction layer.
25+
*/
26+
27+
package org.elasticsearch.tdigest.parsing;

server/src/main/java/org/elasticsearch/common/xcontent/XContentParserUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ public static void expectValueToken(Token actual, XContentParser parser) {
8686
}
8787
}
8888

89-
private static ParsingException parsingException(XContentParser parser, Token expected, Token actual) {
89+
public static ParsingException parsingException(XContentParser parser, Token expected, Token actual) {
9090
return new ParsingException(
9191
parser.getTokenLocation(),
9292
String.format(Locale.ROOT, "Failed to parse object: expecting token of type [%s] but found [%s]", expected, actual)

x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/mapper/TDigestFieldMapper.java

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.common.io.stream.BytesStreamOutput;
2424
import org.elasticsearch.common.util.BigArrays;
2525
import org.elasticsearch.common.util.FeatureFlag;
26+
import org.elasticsearch.common.xcontent.XContentParserUtils;
2627
import org.elasticsearch.common.xcontent.support.XContentMapValues;
2728
import org.elasticsearch.index.fielddata.FieldDataContext;
2829
import org.elasticsearch.index.fielddata.FormattedDocValues;
@@ -54,6 +55,7 @@
5455
import org.elasticsearch.search.aggregations.metrics.TDigestState;
5556
import org.elasticsearch.search.sort.BucketedSort;
5657
import org.elasticsearch.search.sort.SortOrder;
58+
import org.elasticsearch.tdigest.parsing.TDigestParser;
5759
import org.elasticsearch.xcontent.CopyingXContentParser;
5860
import org.elasticsearch.xcontent.XContentBuilder;
5961
import org.elasticsearch.xcontent.XContentParser;
@@ -62,6 +64,7 @@
6264

6365
import java.io.IOException;
6466
import java.io.UncheckedIOException;
67+
import java.util.List;
6568
import java.util.Map;
6669
import java.util.Objects;
6770

@@ -368,21 +371,14 @@ public void parse(DocumentParserContext context) throws IOException {
368371
}
369372
subParser.nextToken();
370373
// TODO: Here we should build a t-digest out of the input, based on the settings on the field
371-
TDigestParser.ParsedTDigest parsedTDigest = TDigestParser.parse(fullPath(), subParser);
372-
373-
BytesStreamOutput streamOutput = new BytesStreamOutput();
374-
375-
for (int i = 0; i < parsedTDigest.centroids().size(); i++) {
376-
long count = parsedTDigest.counts().get(i);
377-
assert count >= 0;
378-
// we do not add elements with count == 0
379-
if (count > 0) {
380-
streamOutput.writeVLong(count);
381-
streamOutput.writeDouble(parsedTDigest.centroids().get(i));
382-
}
383-
}
374+
TDigestParser.ParsedTDigest parsedTDigest = TDigestParser.parse(
375+
fullPath(),
376+
subParser,
377+
DocumentParsingException::new,
378+
XContentParserUtils::parsingException
379+
);
384380

385-
BytesRef docValue = streamOutput.bytes().toBytesRef();
381+
BytesRef docValue = encodeCentroidsAndCounts(parsedTDigest.centroids(), parsedTDigest.counts());
386382
Field digestField = new BinaryDocValuesField(fullPath(), docValue);
387383

388384
// Add numeric doc values fields for the summary data
@@ -458,6 +454,23 @@ public void parse(DocumentParserContext context) throws IOException {
458454
context.path().remove();
459455
}
460456

457+
private static BytesRef encodeCentroidsAndCounts(List<Double> centroids, List<Long> counts) throws IOException {
458+
BytesStreamOutput streamOutput = new BytesStreamOutput();
459+
460+
for (int i = 0; i < centroids.size(); i++) {
461+
long count = counts.get(i);
462+
assert count >= 0;
463+
// we do not add elements with count == 0
464+
if (count > 0) {
465+
streamOutput.writeVLong(count);
466+
streamOutput.writeDouble(centroids.get(i));
467+
}
468+
}
469+
470+
BytesRef docValue = streamOutput.bytes().toBytesRef();
471+
return docValue;
472+
}
473+
461474
private static String valuesCountSubFieldName(String fullPath) {
462475
return fullPath + "._values_count";
463476
}

x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/plugin/EsqlCorePlugin.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,8 @@
1414
public class EsqlCorePlugin extends Plugin implements ExtensiblePlugin {
1515

1616
public static final FeatureFlag EXPONENTIAL_HISTOGRAM_FEATURE_FLAG = new FeatureFlag("esql_exponential_histogram");
17+
18+
// Note, there is also a feature flag for the field type in the analytics plugin, but for visibility reasons we need
19+
// another one here.
20+
public static final FeatureFlag T_DIGEST_ESQL_SUPPORT = new FeatureFlag("esql_t_digest_support");
1721
}

x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/type/DataType.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,16 @@ public enum DataType implements Writeable {
349349
.underConstruction()
350350
),
351351

352+
/*
353+
TDIGEST(
354+
builder().esType("exponential_histogram")
355+
.estimatedSize(16 * 160)// guess 160 buckets (OTEL default for positive values only histograms) with 16 bytes per bucket
356+
.docValues()
357+
.underConstruction()
358+
),
359+
360+
*/
361+
352362
/**
353363
* Fields with this type are dense vectors, represented as an array of float values.
354364
*/

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockFactory.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -496,10 +496,18 @@ public ExponentialHistogramBlockBuilder newExponentialHistogramBlockBuilder(int
496496
return new ExponentialHistogramBlockBuilder(estimatedSize, this);
497497
}
498498

499+
public TDigestBlockBuilder newTDigestBlockBuilder(int estimatedSize) {
500+
return new TDigestBlockBuilder(estimatedSize, this);
501+
}
502+
499503
public final ExponentialHistogramBlock newConstantExponentialHistogramBlock(ExponentialHistogram value, int positionCount) {
500504
return ExponentialHistogramArrayBlock.createConstant(value, positionCount, this);
501505
}
502506

507+
public final TDigestBlock newConstantTDigestBlock(TDigestHolder value, int positions) {
508+
return TDigestArrayBlock.createConstant(value, positions, this);
509+
}
510+
503511
public BlockLoader.Block newExponentialHistogramBlockFromDocValues(
504512
DoubleBlock minima,
505513
DoubleBlock maxima,

0 commit comments

Comments
 (0)