Skip to content

Commit 7ba2b30

Browse files
committed
ESQL: Implement TopN support for exponential histogram
1 parent aebedd6 commit 7ba2b30

File tree

10 files changed

+259
-11
lines changed

10 files changed

+259
-11
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ExponentialHistogramArrayBlock.java

Lines changed: 45 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -79,19 +79,58 @@ private List<Block> getSubBlocks() {
7979
}
8080

8181
void loadValue(int valueIndex, CompressedExponentialHistogram resultHistogram, BytesRef tempBytesRef) {
82-
BytesRef bytes = encodedHistograms.getBytesRef(encodedHistograms.getFirstValueIndex(valueIndex), tempBytesRef);
83-
double zeroThreshold = zeroThresholds.getDouble(zeroThresholds.getFirstValueIndex(valueIndex));
84-
long valueCount = valueCounts.getLong(valueCounts.getFirstValueIndex(valueIndex));
85-
double sum = sums.getDouble(sums.getFirstValueIndex(valueIndex));
86-
double min = valueCount == 0 ? Double.NaN : minima.getDouble(minima.getFirstValueIndex(valueIndex));
87-
double max = valueCount == 0 ? Double.NaN : maxima.getDouble(maxima.getFirstValueIndex(valueIndex));
82+
BytesRef bytes = getEncodedHistogramBytes(valueIndex, tempBytesRef);
83+
double zeroThreshold = getHistogramZeroThreshold(valueIndex);
84+
long valueCount = getHistogramValueCount(valueIndex);
85+
double sum = getHistogramSum(valueIndex);
86+
double min = getHistogramMin(valueIndex);
87+
double max = getHistogramMax(valueIndex);
8888
try {
8989
resultHistogram.reset(zeroThreshold, valueCount, sum, min, max, bytes);
9090
} catch (IOException e) {
9191
throw new IllegalStateException("error loading histogram", e);
9292
}
9393
}
9494

95+
void serializeValue(int valueIndex, SerializedOutput out, BytesRef tempBytesRef) {
96+
long valueCount = getHistogramValueCount(valueIndex);
97+
out.appendLong(valueCount);
98+
out.appendDouble(getHistogramSum(valueIndex));
99+
out.appendDouble(getHistogramZeroThreshold(valueIndex));
100+
if (valueCount > 0) {
101+
// min / max are only non-null for non-empty histograms
102+
out.appendDouble(getHistogramMin(valueIndex));
103+
out.appendDouble(getHistogramMax(valueIndex));
104+
}
105+
out.appendBytesRef(getEncodedHistogramBytes(valueIndex, tempBytesRef));
106+
}
107+
108+
private double getHistogramMin(int valueIndex) {
109+
int minimaValIndex = minima.getFirstValueIndex(valueIndex);
110+
return minima.isNull(minimaValIndex) ? Double.NaN : minima.getDouble(minimaValIndex);
111+
}
112+
113+
private double getHistogramMax(int valueIndex) {
114+
int maximaValIndex = maxima.getFirstValueIndex(valueIndex);
115+
return maxima.isNull(maximaValIndex) ? Double.NaN : maxima.getDouble(maximaValIndex);
116+
}
117+
118+
private double getHistogramSum(int valueIndex) {
119+
return sums.getDouble(sums.getFirstValueIndex(valueIndex));
120+
}
121+
122+
private long getHistogramValueCount(int valueIndex) {
123+
return valueCounts.getLong(valueCounts.getFirstValueIndex(valueIndex));
124+
}
125+
126+
private double getHistogramZeroThreshold(int valueIndex) {
127+
return zeroThresholds.getDouble(zeroThresholds.getFirstValueIndex(valueIndex));
128+
}
129+
130+
private BytesRef getEncodedHistogramBytes(int valueIndex, BytesRef tempBytesRef) {
131+
return encodedHistograms.getBytesRef(encodedHistograms.getFirstValueIndex(valueIndex), tempBytesRef);
132+
}
133+
95134
@Override
96135
protected void closeInternal() {
97136
Releasables.close(getSubBlocks());

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ExponentialHistogramBlock.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
package org.elasticsearch.compute.data;
99

10+
import org.apache.lucene.util.BytesRef;
1011
import org.elasticsearch.exponentialhistogram.ExponentialHistogram;
1112

1213
/**
@@ -15,6 +16,28 @@
1516
*/
1617
public sealed interface ExponentialHistogramBlock extends Block permits ConstantNullBlock, ExponentialHistogramArrayBlock {
1718

19+
/**
20+
* Abstraction to use for writing individual values via the {@link ExponentialHistogramBlockAccessor#serializeValue(int, SerializedOutput)}.
21+
*/
22+
interface SerializedOutput {
23+
void appendDouble(double value);
24+
25+
void appendLong(long value);
26+
27+
void appendBytesRef(BytesRef bytesRef);
28+
}
29+
30+
/**
31+
* Abstraction to use for reading individual serialized via {@link ExponentialHistogramBlockBuilder#deserializeAndAppend(SerializedInput)}.
32+
*/
33+
interface SerializedInput {
34+
double readDouble();
35+
36+
long readLong();
37+
38+
BytesRef readBytesRef(BytesRef tempBytesRef);
39+
}
40+
1841
static boolean equals(ExponentialHistogramBlock blockA, ExponentialHistogramBlock blockB) {
1942
if (blockA == blockB) {
2043
return true;

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ExponentialHistogramBlockAccessor.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,29 @@ public ExponentialHistogram get(int valueIndex) {
3838
assert block.isReleased() == false;
3939
ExponentialHistogramArrayBlock arrayBlock = (ExponentialHistogramArrayBlock) block;
4040
if (reusedHistogram == null) {
41-
tempBytesRef = new BytesRef();
4241
reusedHistogram = new CompressedExponentialHistogram();
4342
}
43+
if (tempBytesRef == null) {
44+
tempBytesRef = new BytesRef();
45+
}
4446
arrayBlock.loadValue(valueIndex, reusedHistogram, tempBytesRef);
4547
return reusedHistogram;
4648
}
4749

50+
/**
51+
* Encodes and appends a histogram value, so that it can be later deserialized
52+
* via {@link ExponentialHistogramBlockBuilder#deserializeAndAppend(ExponentialHistogramBlock.SerializedInput)}.
53+
*
54+
* @param output the output to deserialize into
55+
*/
56+
public void serializeValue(int valueIndex, ExponentialHistogramBlock.SerializedOutput output) {
57+
assert block.isNull(valueIndex) == false;
58+
assert block.isReleased() == false;
59+
ExponentialHistogramArrayBlock arrayBlock = (ExponentialHistogramArrayBlock) block;
60+
if (tempBytesRef == null) {
61+
tempBytesRef = new BytesRef();
62+
}
63+
arrayBlock.serializeValue(valueIndex, output, tempBytesRef);
64+
}
65+
4866
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ExponentialHistogramBlockBuilder.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
package org.elasticsearch.compute.data;
99

10+
import org.apache.lucene.util.BytesRef;
1011
import org.elasticsearch.common.io.stream.BytesStreamOutput;
1112
import org.elasticsearch.core.Releasables;
1213
import org.elasticsearch.exponentialhistogram.CompressedExponentialHistogram;
@@ -24,6 +25,8 @@ public class ExponentialHistogramBlockBuilder implements Block.Builder {
2425
private final DoubleBlock.Builder zeroThresholdsBuilder;
2526
private final BytesRefBlock.Builder encodedHistogramsBuilder;
2627

28+
private final BytesRef scratch = new BytesRef();
29+
2730
ExponentialHistogramBlockBuilder(int estimatedSize, BlockFactory blockFactory) {
2831
DoubleBlock.Builder minimaBuilder = null;
2932
DoubleBlock.Builder maximaBuilder = null;
@@ -100,6 +103,27 @@ public ExponentialHistogramBlockBuilder append(ExponentialHistogram histogram) {
100103
return this;
101104
}
102105

106+
/**
107+
* Decodes and appends a value serialized with
108+
* {@link ExponentialHistogramBlockAccessor#serializeValue(int, ExponentialHistogramBlock.SerializedOutput)}.
109+
*
110+
* @param input the input to deserialize from
111+
*/
112+
public void deserializeAndAppend(ExponentialHistogramBlock.SerializedInput input) {
113+
long valueCount = input.readLong();
114+
valueCountsBuilder.appendLong(valueCount);
115+
sumsBuilder.appendDouble(input.readDouble());
116+
zeroThresholdsBuilder.appendDouble(input.readDouble());
117+
if (valueCount > 0) {
118+
minimaBuilder.appendDouble(input.readDouble());
119+
maximaBuilder.appendDouble(input.readDouble());
120+
} else {
121+
minimaBuilder.appendNull();
122+
maximaBuilder.appendNull();
123+
}
124+
encodedHistogramsBuilder.appendBytesRef(input.readBytesRef(scratch));
125+
}
126+
103127
@Override
104128
public ExponentialHistogramBlock build() {
105129
DoubleBlock minima = null;

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ResultBuilder.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ static ResultBuilder resultBuilderFor(
5555
case NULL -> new ResultBuilderForNull(blockFactory);
5656
case DOC -> new ResultBuilderForDoc(blockFactory, (DocVectorEncoder) encoder, positions);
5757
case AGGREGATE_METRIC_DOUBLE -> new ResultBuilderForAggregateMetricDouble(blockFactory, positions);
58+
case EXPONENTIAL_HISTOGRAM -> new ResultBuilderForExponentialHistogram(blockFactory, positions);
5859
default -> {
5960
assert false : "Result builder for [" + elementType + "]";
6061
throw new UnsupportedOperationException("Result builder for [" + elementType + "]");
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.compute.operator.topn;
9+
10+
import org.apache.lucene.util.BytesRef;
11+
import org.elasticsearch.compute.data.Block;
12+
import org.elasticsearch.compute.data.BlockFactory;
13+
import org.elasticsearch.compute.data.ExponentialHistogramBlock;
14+
import org.elasticsearch.compute.data.ExponentialHistogramBlockBuilder;
15+
16+
public class ResultBuilderForExponentialHistogram implements ResultBuilder {
17+
18+
private final ExponentialHistogramBlockBuilder builder;
19+
private final ReusableTopNEncoderInput reusableInput = new ReusableTopNEncoderInput();
20+
21+
ResultBuilderForExponentialHistogram(BlockFactory blockFactory, int positions) {
22+
this.builder = blockFactory.newExponentialHistogramBlockBuilder(positions);
23+
}
24+
25+
@Override
26+
public void decodeKey(BytesRef keys) {
27+
throw new AssertionError("ExponentialHistogramBlock can't be a key");
28+
}
29+
30+
@Override
31+
public void decodeValue(BytesRef values) {
32+
int count = TopNEncoder.DEFAULT_UNSORTABLE.decodeVInt(values);
33+
if (count == 0) {
34+
builder.appendNull();
35+
return;
36+
}
37+
assert count == 1 : "ExponentialHistogramBlock does not support multi values";
38+
reusableInput.inputValues = values;
39+
builder.deserializeAndAppend(reusableInput);
40+
}
41+
42+
@Override
43+
public Block build() {
44+
return builder.build();
45+
}
46+
47+
@Override
48+
public String toString() {
49+
return "ResultBuilderForExponentialHistogram";
50+
}
51+
52+
@Override
53+
public void close() {
54+
builder.close();
55+
}
56+
57+
private class ReusableTopNEncoderInput implements ExponentialHistogramBlock.SerializedInput {
58+
BytesRef inputValues;
59+
60+
@Override
61+
public double readDouble() {
62+
return TopNEncoder.DEFAULT_UNSORTABLE.decodeDouble(inputValues);
63+
}
64+
65+
@Override
66+
public long readLong() {
67+
return TopNEncoder.DEFAULT_UNSORTABLE.decodeLong(inputValues);
68+
}
69+
70+
@Override
71+
public BytesRef readBytesRef(BytesRef tempBytesRef) {
72+
return TopNEncoder.DEFAULT_UNSORTABLE.decodeBytesRef(inputValues, tempBytesRef);
73+
}
74+
}
75+
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ValueExtractor.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.elasticsearch.compute.data.DocBlock;
1515
import org.elasticsearch.compute.data.DoubleBlock;
1616
import org.elasticsearch.compute.data.ElementType;
17+
import org.elasticsearch.compute.data.ExponentialHistogramBlock;
1718
import org.elasticsearch.compute.data.FloatBlock;
1819
import org.elasticsearch.compute.data.IntBlock;
1920
import org.elasticsearch.compute.data.LongBlock;
@@ -53,6 +54,7 @@ static ValueExtractor extractorFor(ElementType elementType, TopNEncoder encoder,
5354
case NULL -> new ValueExtractorForNull();
5455
case DOC -> new ValueExtractorForDoc(encoder, ((DocBlock) block).asVector());
5556
case AGGREGATE_METRIC_DOUBLE -> new ValueExtractorForAggregateMetricDouble(encoder, (AggregateMetricDoubleBlock) block);
57+
case EXPONENTIAL_HISTOGRAM -> new ValueExtractorForExponentialHistogram(encoder, (ExponentialHistogramBlock) block);
5658
default -> {
5759
assert false : "No value extractor for [" + block.elementType() + "]";
5860
throw new UnsupportedOperationException("No value extractor for [" + block.elementType() + "]");
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.compute.operator.topn;
9+
10+
import org.apache.lucene.util.BytesRef;
11+
import org.elasticsearch.compute.data.ExponentialHistogramBlock;
12+
import org.elasticsearch.compute.data.ExponentialHistogramBlockAccessor;
13+
import org.elasticsearch.compute.operator.BreakingBytesRefBuilder;
14+
15+
public class ValueExtractorForExponentialHistogram implements ValueExtractor {
16+
private final ExponentialHistogramBlock block;
17+
private final ExponentialHistogramBlockAccessor accessor;
18+
private final ReusableTopNEncoderOutput reusableOutput = new ReusableTopNEncoderOutput();
19+
20+
ValueExtractorForExponentialHistogram(TopNEncoder encoder, ExponentialHistogramBlock block) {
21+
assert encoder == TopNEncoder.DEFAULT_UNSORTABLE;
22+
this.block = block;
23+
this.accessor = new ExponentialHistogramBlockAccessor(block);
24+
}
25+
26+
@Override
27+
public void writeValue(BreakingBytesRefBuilder values, int position) {
28+
// number of multi-values first for compatibility with ValueExtractorForNull
29+
if (block.isNull(position)) {
30+
TopNEncoder.DEFAULT_UNSORTABLE.encodeVInt(0, values);
31+
} else {
32+
assert block.getValueCount(position) == 1 : "Multi-valued ExponentialHistogram blocks are not supported in TopN";
33+
TopNEncoder.DEFAULT_UNSORTABLE.encodeVInt(1, values);
34+
int valueIndex = block.getFirstValueIndex(position);
35+
reusableOutput.target = values;
36+
accessor.serializeValue(valueIndex, reusableOutput);
37+
}
38+
}
39+
40+
@Override
41+
public String toString() {
42+
return "ValueExtractorForExponentialHistogram";
43+
}
44+
45+
private static final class ReusableTopNEncoderOutput implements ExponentialHistogramBlock.SerializedOutput {
46+
BreakingBytesRefBuilder target;
47+
48+
@Override
49+
public void appendDouble(double value) {
50+
TopNEncoder.DEFAULT_UNSORTABLE.encodeDouble(value, target);
51+
}
52+
53+
@Override
54+
public void appendLong(long value) {
55+
TopNEncoder.DEFAULT_UNSORTABLE.encodeLong(value, target);
56+
}
57+
58+
@Override
59+
public void appendBytesRef(BytesRef value) {
60+
TopNEncoder.DEFAULT_UNSORTABLE.encodeBytesRef(value, target);
61+
}
62+
}
63+
}

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/ExtractorTests.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public static Iterable<Object[]> parameters() {
4444
case UNKNOWN -> {
4545
supportsNull = false;
4646
}
47-
case COMPOSITE, EXPONENTIAL_HISTOGRAM -> {
47+
case COMPOSITE -> {
4848
// TODO: add later
4949
supportsNull = false;
5050
}
@@ -130,6 +130,9 @@ public static Iterable<Object[]> parameters() {
130130
) }
131131
);
132132
}
133+
case EXPONENTIAL_HISTOGRAM ->
134+
// multi values are not supported
135+
cases.add(valueTestCase("single " + e, e, TopNEncoder.DEFAULT_UNSORTABLE, () -> BlockTestUtils.randomValue(e)));
133136
case NULL -> {
134137
}
135138
default -> {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -487,10 +487,10 @@ private PhysicalOperation planTopN(TopNExec topNExec, LocalExecutionPlannerConte
487487
case BOOLEAN, NULL, BYTE, SHORT, INTEGER, LONG, DOUBLE, FLOAT, HALF_FLOAT, DATETIME, DATE_NANOS, DATE_PERIOD, TIME_DURATION,
488488
OBJECT, SCALED_FLOAT, UNSIGNED_LONG, TSID_DATA_TYPE -> TopNEncoder.DEFAULT_SORTABLE;
489489
case GEO_POINT, CARTESIAN_POINT, GEO_SHAPE, CARTESIAN_SHAPE, COUNTER_LONG, COUNTER_INTEGER, COUNTER_DOUBLE, SOURCE,
490-
AGGREGATE_METRIC_DOUBLE, DENSE_VECTOR, GEOHASH, GEOTILE, GEOHEX -> TopNEncoder.DEFAULT_UNSORTABLE;
490+
AGGREGATE_METRIC_DOUBLE, DENSE_VECTOR, GEOHASH, GEOTILE, GEOHEX, EXPONENTIAL_HISTOGRAM ->
491+
TopNEncoder.DEFAULT_UNSORTABLE;
491492
// unsupported fields are encoded as BytesRef, we'll use the same encoder; all values should be null at this point
492-
// TODO(b/133393) add exponential histogram support
493-
case PARTIAL_AGG, EXPONENTIAL_HISTOGRAM, UNSUPPORTED -> TopNEncoder.UNSUPPORTED;
493+
case PARTIAL_AGG, UNSUPPORTED -> TopNEncoder.UNSUPPORTED;
494494
};
495495
}
496496
List<TopNOperator.SortOrder> orders = topNExec.order().stream().map(order -> {

0 commit comments

Comments
 (0)