Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions libs/tdigest/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ apply plugin: 'elasticsearch.publish'

dependencies {
api project(':libs:core')
api project(':libs:x-content')
api "org.apache.lucene:lucene-core:${versions.lucene}"

testImplementation(project(":test:framework")) {
Expand Down
2 changes: 2 additions & 0 deletions libs/tdigest/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
module org.elasticsearch.tdigest {
requires org.elasticsearch.base;
requires org.apache.lucene.core;
requires org.elasticsearch.xcontent;

exports org.elasticsearch.tdigest;
exports org.elasticsearch.tdigest.arrays;
exports org.elasticsearch.tdigest.parsing;
}
Original file line number Diff line number Diff line change
@@ -1,28 +1,29 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.xpack.analytics.mapper;
package org.elasticsearch.tdigest.parsing;

import org.elasticsearch.index.mapper.DocumentParsingException;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.XContentLocation;
import org.elasticsearch.xcontent.XContentParser;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.elasticsearch.xpack.analytics.mapper.TDigestFieldMapper.CENTROIDS_NAME;
import static org.elasticsearch.xpack.analytics.mapper.TDigestFieldMapper.COUNTS_NAME;
import static org.elasticsearch.xpack.analytics.mapper.TDigestFieldMapper.MAX_FIELD_NAME;
import static org.elasticsearch.xpack.analytics.mapper.TDigestFieldMapper.MIN_FIELD_NAME;
import static org.elasticsearch.xpack.analytics.mapper.TDigestFieldMapper.SUM_FIELD_NAME;
import java.util.function.BiFunction;

public class TDigestParser {
public static final String CENTROIDS_NAME = "centroids";
public static final String COUNTS_NAME = "counts";
public static final String SUM_FIELD_NAME = "sum";
public static final String MIN_FIELD_NAME = "min";
public static final String MAX_FIELD_NAME = "max";

private static final ParseField COUNTS_FIELD = new ParseField(COUNTS_NAME);
private static final ParseField CENTROIDS_FIELD = new ParseField(CENTROIDS_NAME);
Expand Down Expand Up @@ -91,9 +92,15 @@ public Long count() {
*
* @param mappedFieldName the name of the field being parsed, used for error messages
* @param parser the parser to use
* @param documentParsingExceptionProvider factory function for generating document parsing exceptions. Required for visibility.
* @return the parsed histogram
*/
public static ParsedTDigest parse(String mappedFieldName, XContentParser parser) throws IOException {
public static ParsedTDigest parse(
String mappedFieldName,
XContentParser parser,
BiFunction<XContentLocation, String, RuntimeException> documentParsingExceptionProvider,
ParsingExceptionProvider parsingExceptionProvider
) throws IOException {
ArrayList<Double> centroids = null;
ArrayList<Long> counts = null;
Double sum = null;
Expand All @@ -102,46 +109,46 @@ public static ParsedTDigest parse(String mappedFieldName, XContentParser parser)
XContentParser.Token token = parser.currentToken();
while (token != XContentParser.Token.END_OBJECT) {
// should be a field
ensureExpectedToken(XContentParser.Token.FIELD_NAME, token, parser);
ensureExpectedToken(XContentParser.Token.FIELD_NAME, token, parser, parsingExceptionProvider);
String fieldName = parser.currentName();
if (fieldName.equals(CENTROIDS_FIELD.getPreferredName())) {
centroids = getCentroids(mappedFieldName, parser);
centroids = getCentroids(mappedFieldName, parser, documentParsingExceptionProvider, parsingExceptionProvider);
} else if (fieldName.equals(COUNTS_FIELD.getPreferredName())) {
counts = getCounts(mappedFieldName, parser);
counts = getCounts(mappedFieldName, parser, documentParsingExceptionProvider, parsingExceptionProvider);
} else if (fieldName.equals(SUM_FIELD.getPreferredName())) {
token = parser.nextToken();
ensureExpectedToken(XContentParser.Token.VALUE_NUMBER, token, parser);
ensureExpectedToken(XContentParser.Token.VALUE_NUMBER, token, parser, parsingExceptionProvider);
sum = parser.doubleValue();
} else if (fieldName.equals(MIN_FIELD.getPreferredName())) {
token = parser.nextToken();
ensureExpectedToken(XContentParser.Token.VALUE_NUMBER, token, parser);
ensureExpectedToken(XContentParser.Token.VALUE_NUMBER, token, parser, parsingExceptionProvider);
min = parser.doubleValue();
} else if (fieldName.equals(MAX_FIELD.getPreferredName())) {
token = parser.nextToken();
ensureExpectedToken(XContentParser.Token.VALUE_NUMBER, token, parser);
ensureExpectedToken(XContentParser.Token.VALUE_NUMBER, token, parser, parsingExceptionProvider);
max = parser.doubleValue();
} else {
throw new DocumentParsingException(
throw documentParsingExceptionProvider.apply(
parser.getTokenLocation(),
"error parsing field [" + mappedFieldName + "], with unknown parameter [" + fieldName + "]"
);
}
token = parser.nextToken();
}
if (centroids == null) {
throw new DocumentParsingException(
throw documentParsingExceptionProvider.apply(
parser.getTokenLocation(),
"error parsing field [" + mappedFieldName + "], expected field called [" + CENTROIDS_FIELD.getPreferredName() + "]"
);
}
if (counts == null) {
throw new DocumentParsingException(
throw documentParsingExceptionProvider.apply(
parser.getTokenLocation(),
"error parsing field [" + mappedFieldName + "], expected field called [" + COUNTS_FIELD.getPreferredName() + "]"
);
}
if (centroids.size() != counts.size()) {
throw new DocumentParsingException(
throw documentParsingExceptionProvider.apply(
parser.getTokenLocation(),
"error parsing field ["
+ mappedFieldName
Expand All @@ -165,20 +172,25 @@ public static ParsedTDigest parse(String mappedFieldName, XContentParser parser)
return new ParsedTDigest(centroids, counts, sum, min, max);
}

private static ArrayList<Long> getCounts(String mappedFieldName, XContentParser parser) throws IOException {
private static ArrayList<Long> getCounts(
String mappedFieldName,
XContentParser parser,
BiFunction<XContentLocation, String, RuntimeException> documentParsingExceptionProvider,
ParsingExceptionProvider parsingExceptionProvider
) throws IOException {
ArrayList<Long> counts;
XContentParser.Token token;
token = parser.nextToken();
// should be an array
ensureExpectedToken(XContentParser.Token.START_ARRAY, token, parser);
ensureExpectedToken(XContentParser.Token.START_ARRAY, token, parser, parsingExceptionProvider);
counts = new ArrayList<>();
token = parser.nextToken();
while (token != XContentParser.Token.END_ARRAY) {
// should be a number
ensureExpectedToken(XContentParser.Token.VALUE_NUMBER, token, parser);
ensureExpectedToken(XContentParser.Token.VALUE_NUMBER, token, parser, parsingExceptionProvider);
long count = parser.longValue();
if (count < 0) {
throw new DocumentParsingException(
throw documentParsingExceptionProvider.apply(
parser.getTokenLocation(),
"error parsing field [" + mappedFieldName + "], [" + COUNTS_FIELD + "] elements must be >= 0 but got " + count
);
Expand All @@ -189,22 +201,27 @@ private static ArrayList<Long> getCounts(String mappedFieldName, XContentParser
return counts;
}

private static ArrayList<Double> getCentroids(String mappedFieldName, XContentParser parser) throws IOException {
private static ArrayList<Double> getCentroids(
String mappedFieldName,
XContentParser parser,
BiFunction<XContentLocation, String, RuntimeException> documentParsingExceptionProvider,
ParsingExceptionProvider parsingExceptionProvider
) throws IOException {
XContentParser.Token token;
ArrayList<Double> centroids;
token = parser.nextToken();
// should be an array
ensureExpectedToken(XContentParser.Token.START_ARRAY, token, parser);
ensureExpectedToken(XContentParser.Token.START_ARRAY, token, parser, parsingExceptionProvider);
centroids = new ArrayList<>();
token = parser.nextToken();
double previousVal = -Double.MAX_VALUE;
while (token != XContentParser.Token.END_ARRAY) {
// should be a number
ensureExpectedToken(XContentParser.Token.VALUE_NUMBER, token, parser);
ensureExpectedToken(XContentParser.Token.VALUE_NUMBER, token, parser, parsingExceptionProvider);
double val = parser.doubleValue();
if (val < previousVal) {
// centroids must be in increasing order
throw new DocumentParsingException(
throw documentParsingExceptionProvider.apply(
parser.getTokenLocation(),
"error parsing field ["
+ mappedFieldName
Expand All @@ -224,4 +241,23 @@ private static ArrayList<Double> getCentroids(String mappedFieldName, XContentPa
return centroids;
}

/**
* Interface for throwing a parsing exception, needed for visibility
*/
@FunctionalInterface
public interface ParsingExceptionProvider {
RuntimeException apply(XContentParser parser, XContentParser.Token expected, XContentParser.Token actual) throws IOException;
}

public static void ensureExpectedToken(
XContentParser.Token expected,
XContentParser.Token actual,
XContentParser parser,
ParsingExceptionProvider parsingExceptionProvider
) throws IOException {
if (actual != expected) {
throw parsingExceptionProvider.apply(parser, expected, actual);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

/**
* Parsing package contains Elasticsearch specific classes for serializing and deserializing
* t-digests from various formats via Elasticsearch's XContent abstraction layer.
*/

package org.elasticsearch.tdigest.parsing;
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public static void expectValueToken(Token actual, XContentParser parser) {
}
}

private static ParsingException parsingException(XContentParser parser, Token expected, Token actual) {
public static ParsingException parsingException(XContentParser parser, Token expected, Token actual) {
return new ParsingException(
parser.getTokenLocation(),
String.format(Locale.ROOT, "Failed to parse object: expecting token of type [%s] but found [%s]", expected, actual)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.FeatureFlag;
import org.elasticsearch.common.xcontent.XContentParserUtils;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.index.fielddata.FieldDataContext;
import org.elasticsearch.index.fielddata.FormattedDocValues;
Expand Down Expand Up @@ -54,6 +55,7 @@
import org.elasticsearch.search.aggregations.metrics.TDigestState;
import org.elasticsearch.search.sort.BucketedSort;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.tdigest.parsing.TDigestParser;
import org.elasticsearch.xcontent.CopyingXContentParser;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentParser;
Expand All @@ -62,6 +64,7 @@

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;

Expand Down Expand Up @@ -368,21 +371,14 @@ public void parse(DocumentParserContext context) throws IOException {
}
subParser.nextToken();
// TODO: Here we should build a t-digest out of the input, based on the settings on the field
TDigestParser.ParsedTDigest parsedTDigest = TDigestParser.parse(fullPath(), subParser);

BytesStreamOutput streamOutput = new BytesStreamOutput();

for (int i = 0; i < parsedTDigest.centroids().size(); i++) {
long count = parsedTDigest.counts().get(i);
assert count >= 0;
// we do not add elements with count == 0
if (count > 0) {
streamOutput.writeVLong(count);
streamOutput.writeDouble(parsedTDigest.centroids().get(i));
}
}
TDigestParser.ParsedTDigest parsedTDigest = TDigestParser.parse(
fullPath(),
subParser,
DocumentParsingException::new,
XContentParserUtils::parsingException
);

BytesRef docValue = streamOutput.bytes().toBytesRef();
BytesRef docValue = encodeCentroidsAndCounts(parsedTDigest.centroids(), parsedTDigest.counts());
Field digestField = new BinaryDocValuesField(fullPath(), docValue);

// Add numeric doc values fields for the summary data
Expand Down Expand Up @@ -458,6 +454,23 @@ public void parse(DocumentParserContext context) throws IOException {
context.path().remove();
}

private static BytesRef encodeCentroidsAndCounts(List<Double> centroids, List<Long> counts) throws IOException {
BytesStreamOutput streamOutput = new BytesStreamOutput();

for (int i = 0; i < centroids.size(); i++) {
long count = counts.get(i);
assert count >= 0;
// we do not add elements with count == 0
if (count > 0) {
streamOutput.writeVLong(count);
streamOutput.writeDouble(centroids.get(i));
}
}

BytesRef docValue = streamOutput.bytes().toBytesRef();
return docValue;
}

private static String valuesCountSubFieldName(String fullPath) {
return fullPath + "._values_count";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,8 @@
public class EsqlCorePlugin extends Plugin implements ExtensiblePlugin {

public static final FeatureFlag EXPONENTIAL_HISTOGRAM_FEATURE_FLAG = new FeatureFlag("esql_exponential_histogram");

// Note, there is also a feature flag for the field type in the analytics plugin, but for visibility reasons we need
// another one here.
public static final FeatureFlag T_DIGEST_ESQL_SUPPORT = new FeatureFlag("esql_t_digest_support");
}
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,16 @@ public enum DataType implements Writeable {
.underConstruction()
),

/*
TDIGEST(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this commented-out code intentionally in this PR?

builder().esType("exponential_histogram")
.estimatedSize(16 * 160)// guess 160 buckets (OTEL default for positive values only histograms) with 16 bytes per bucket
.docValues()
.underConstruction()
),
*/

/**
* Fields with this type are dense vectors, represented as an array of float values.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,10 @@ public ExponentialHistogramBlockBuilder newExponentialHistogramBlockBuilder(int
return new ExponentialHistogramBlockBuilder(estimatedSize, this);
}

public TDigestBlockBuilder newTDigestBlockBuilder(int estimatedSize) {
return new TDigestBlockBuilder(estimatedSize, this);
}

public final ExponentialHistogramBlock newConstantExponentialHistogramBlock(ExponentialHistogram value, int positionCount) {
try (ExponentialHistogramBlockBuilder builder = newExponentialHistogramBlockBuilder(positionCount)) {
for (int i = 0; i < positionCount; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,13 +221,16 @@ public static void appendValue(Block.Builder builder, Object val, ElementType ty
switch (type) {
case LONG -> ((LongBlock.Builder) builder).appendLong((Long) val);
case INT -> ((IntBlock.Builder) builder).appendInt((Integer) val);
case NULL -> {
}
case BYTES_REF -> ((BytesRefBlock.Builder) builder).appendBytesRef(toBytesRef(val));
case FLOAT -> ((FloatBlock.Builder) builder).appendFloat((Float) val);
case DOUBLE -> ((DoubleBlock.Builder) builder).appendDouble((Double) val);
case BOOLEAN -> ((BooleanBlock.Builder) builder).appendBoolean((Boolean) val);
case TDIGEST -> ((TDigestBlockBuilder) builder).append((TDigestHolder) val);
case AGGREGATE_METRIC_DOUBLE -> ((AggregateMetricDoubleBlockBuilder) builder).appendLiteral((AggregateMetricDoubleLiteral) val);
case EXPONENTIAL_HISTOGRAM -> ((ExponentialHistogramBlockBuilder) builder).append((ExponentialHistogram) val);
default -> throw new UnsupportedOperationException("unsupported element type [" + type + "]");
case DOC, COMPOSITE, UNKNOWN -> throw new UnsupportedOperationException("unsupported element type [" + type + "]");
}
}

Expand Down Expand Up @@ -316,6 +319,13 @@ yield new AggregateMetricDoubleLiteral(
// return a copy so that the returned value is not bound to the lifetime of the block
yield ExponentialHistogram.builder(histogram, ExponentialHistogramCircuitBreaker.noop()).build();
}
case TDIGEST -> {
TDigestBlock tDigestBlock = (TDigestBlock) block;
// TODO memory tracking? Or do we not care here because this is only called for literals?
BytesRef scratch = new BytesRef();
yield tDigestBlock.getTDigestHolder(offset, scratch);

}
case UNKNOWN -> throw new IllegalArgumentException("can't read values from [" + block + "]");
};
}
Expand Down
Loading