Skip to content

Commit 11b4c3a

Browse files
JonasKunzKubik42
authored andcommitted
ESQL: Add TopN support for exponential histograms (elastic#137313)
1 parent 0fb5ef3 commit 11b4c3a

File tree

10 files changed

+252
-5
lines changed

10 files changed

+252
-5
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ConstantNullBlock.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,12 @@ public ExponentialHistogram getExponentialHistogram(int valueIndex, ExponentialH
300300
throw new UnsupportedOperationException("null block");
301301
}
302302

303+
@Override
304+
public void serializeExponentialHistogram(int valueIndex, SerializedOutput out, BytesRef scratch) {
305+
assert false : "null block";
306+
throw new UnsupportedOperationException("null block");
307+
}
308+
303309
@Override
304310
public int getTotalValueCount() {
305311
return 0;

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ExponentialHistogramArrayBlock.java

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,34 @@
1919

2020
final class ExponentialHistogramArrayBlock extends AbstractNonThreadSafeRefCounted implements ExponentialHistogramBlock {
2121

22+
// Exponential histograms consist of several components that we store in separate blocks
23+
// due to (a) better compression in the field mapper for disk storage and (b) faster computations if only one sub-component is needed
24+
// What are the semantics of positions, multi-value counts and nulls in the exponential histogram block and
25+
// how do they relate to the sub-blocks?
26+
// ExponentialHistogramBlock need to adhere to the contract of Blocks for the access patterns:
27+
//
28+
// for (int position = 0; position < block.getPositionCount(); position++) {
29+
// ...int valueCount = block.getValueCount(position);
30+
// ...for (int valueIndex = 0; valueIndex < valueCount; valueIndex++) {
31+
// ......ExponentialHistogram histo = block.getExponentialHistogram(valueIndex, scratch);
32+
// ...}
33+
// }
34+
//
35+
// That implies that given only a value-index, we need to be able to retrieve all components of the histogram.
36+
// Because we can't make any assumptions on how value indices are laid out in the sub-blocks for multi-values,
37+
// we enforce that the sub-blocks have at most one value per position (i.e., no multi-values).
38+
// Based on this, we can define the valueIndex for ExponentialHistogramArrayBlock to correspond to positions in the sub-blocks.
39+
// So basically the sub-blocks are the "flattened" components of the histograms.
40+
// If we later add multi-value support to ExponentialHistogramArrayBlock,
41+
// we can't use the multi-value support of the sub-blocks to implement that.
42+
// Instead, we need to maintain a firstValueIndex array ourselves in ExponentialHistogramArrayBlock.
43+
2244
private final DoubleBlock minima;
2345
private final DoubleBlock maxima;
2446
private final DoubleBlock sums;
47+
/**
48+
Holds the number of values in each histogram. Note that this is a different concept from getValueCount(position)!
49+
*/
2550
private final LongBlock valueCounts;
2651
private final DoubleBlock zeroThresholds;
2752
private final BytesRefBlock encodedHistograms;
@@ -94,6 +119,22 @@ public ExponentialHistogram getExponentialHistogram(int valueIndex, ExponentialH
94119
}
95120
}
96121

122+
@Override
123+
public void serializeExponentialHistogram(int valueIndex, SerializedOutput out, BytesRef scratch) {
124+
// not that this value count is different from getValueCount(position)!
125+
// this value count represents the number of individual samples the histogram was computed for
126+
long valueCount = valueCounts.getLong(valueCounts.getFirstValueIndex(valueIndex));
127+
out.appendLong(valueCounts.getLong(valueCounts.getFirstValueIndex(valueIndex)));
128+
out.appendDouble(sums.getDouble(sums.getFirstValueIndex(valueIndex)));
129+
out.appendDouble(zeroThresholds.getDouble(zeroThresholds.getFirstValueIndex(valueIndex)));
130+
if (valueCount > 0) {
131+
// min / max are only non-null for non-empty histograms
132+
out.appendDouble(minima.getDouble(minima.getFirstValueIndex(valueIndex)));
133+
out.appendDouble(maxima.getDouble(maxima.getFirstValueIndex(valueIndex)));
134+
}
135+
out.appendBytesRef(encodedHistograms.getBytesRef(encodedHistograms.getFirstValueIndex(valueIndex), scratch));
136+
}
137+
97138
@Override
98139
protected void closeInternal() {
99140
Releasables.close(getSubBlocks());
@@ -376,5 +417,4 @@ public int hashCode() {
376417
// this ensures proper equality with null blocks and should be unique enough for practical purposes
377418
return encodedHistograms.hashCode();
378419
}
379-
380420
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ExponentialHistogramBlock.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
package org.elasticsearch.compute.data;
99

10+
import org.apache.lucene.util.BytesRef;
1011
import org.elasticsearch.exponentialhistogram.ExponentialHistogram;
1112

1213
/**
@@ -27,6 +28,16 @@ public sealed interface ExponentialHistogramBlock extends Block permits Constant
2728
*/
2829
ExponentialHistogram getExponentialHistogram(int valueIndex, ExponentialHistogramScratch scratch);
2930

31+
/**
32+
* Serializes the exponential histogram at the given index into the provided output, so that it can be read back
33+
* via {@link ExponentialHistogramBlockBuilder#deserializeAndAppend(SerializedInput)}.
34+
*
35+
* @param valueIndex
36+
* @param out
37+
* @param scratch
38+
*/
39+
void serializeExponentialHistogram(int valueIndex, SerializedOutput out, BytesRef scratch);
40+
3041
static boolean equals(ExponentialHistogramBlock blockA, ExponentialHistogramBlock blockB) {
3142
if (blockA == blockB) {
3243
return true;
@@ -42,4 +53,27 @@ static boolean equals(ExponentialHistogramBlock blockA, ExponentialHistogramBloc
4253
};
4354
}
4455

56+
/**
57+
* Abstraction to use for writing individual values via {@link #serializeExponentialHistogram(int, SerializedOutput, BytesRef)}.
58+
*/
59+
interface SerializedOutput {
60+
void appendDouble(double value);
61+
62+
void appendLong(long value);
63+
64+
void appendBytesRef(BytesRef bytesRef);
65+
}
66+
67+
/**
68+
* Abstraction to use for reading individual serialized via
69+
* {@link ExponentialHistogramBlockBuilder#deserializeAndAppend(SerializedInput)}.
70+
*/
71+
interface SerializedInput {
72+
double readDouble();
73+
74+
long readLong();
75+
76+
BytesRef readBytesRef(BytesRef scratch);
77+
}
78+
4579
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ExponentialHistogramBlockBuilder.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
package org.elasticsearch.compute.data;
99

10+
import org.apache.lucene.util.BytesRef;
1011
import org.elasticsearch.common.io.stream.BytesStreamOutput;
1112
import org.elasticsearch.core.Releasables;
1213
import org.elasticsearch.exponentialhistogram.CompressedExponentialHistogram;
@@ -25,6 +26,8 @@ public class ExponentialHistogramBlockBuilder implements Block.Builder, BlockLoa
2526
private final DoubleBlock.Builder zeroThresholdsBuilder;
2627
private final BytesRefBlock.Builder encodedHistogramsBuilder;
2728

29+
private final BytesRef scratch = new BytesRef();
30+
2831
ExponentialHistogramBlockBuilder(int estimatedSize, BlockFactory blockFactory) {
2932
DoubleBlock.Builder minimaBuilder = null;
3033
DoubleBlock.Builder maximaBuilder = null;
@@ -131,6 +134,27 @@ public ExponentialHistogramBlockBuilder append(ExponentialHistogram histogram) {
131134
return this;
132135
}
133136

137+
/**
138+
* Decodes and appends a value serialized with
139+
* {@link ExponentialHistogramBlock#serializeExponentialHistogram(int, ExponentialHistogramBlock.SerializedOutput, BytesRef)}.
140+
*
141+
* @param input the input to deserialize from
142+
*/
143+
public void deserializeAndAppend(ExponentialHistogramBlock.SerializedInput input) {
144+
long valueCount = input.readLong();
145+
valueCountsBuilder.appendLong(valueCount);
146+
sumsBuilder.appendDouble(input.readDouble());
147+
zeroThresholdsBuilder.appendDouble(input.readDouble());
148+
if (valueCount > 0) {
149+
minimaBuilder.appendDouble(input.readDouble());
150+
maximaBuilder.appendDouble(input.readDouble());
151+
} else {
152+
minimaBuilder.appendNull();
153+
maximaBuilder.appendNull();
154+
}
155+
encodedHistogramsBuilder.appendBytesRef(input.readBytesRef(scratch));
156+
}
157+
134158
@Override
135159
public ExponentialHistogramBlock build() {
136160
DoubleBlock minima = null;

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ResultBuilder.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ static ResultBuilder resultBuilderFor(
5555
case NULL -> new ResultBuilderForNull(blockFactory);
5656
case DOC -> new ResultBuilderForDoc(blockFactory, (DocVectorEncoder) encoder, positions);
5757
case AGGREGATE_METRIC_DOUBLE -> new ResultBuilderForAggregateMetricDouble(blockFactory, positions);
58+
case EXPONENTIAL_HISTOGRAM -> new ResultBuilderForExponentialHistogram(blockFactory, positions);
5859
default -> {
5960
assert false : "Result builder for [" + elementType + "]";
6061
throw new UnsupportedOperationException("Result builder for [" + elementType + "]");
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.compute.operator.topn;
9+
10+
import org.apache.lucene.util.BytesRef;
11+
import org.elasticsearch.compute.data.Block;
12+
import org.elasticsearch.compute.data.BlockFactory;
13+
import org.elasticsearch.compute.data.ExponentialHistogramBlock;
14+
import org.elasticsearch.compute.data.ExponentialHistogramBlockBuilder;
15+
16+
public class ResultBuilderForExponentialHistogram implements ResultBuilder {
17+
18+
private final ExponentialHistogramBlockBuilder builder;
19+
private final ReusableTopNEncoderInput reusableInput = new ReusableTopNEncoderInput();
20+
21+
ResultBuilderForExponentialHistogram(BlockFactory blockFactory, int positions) {
22+
this.builder = blockFactory.newExponentialHistogramBlockBuilder(positions);
23+
}
24+
25+
@Override
26+
public void decodeKey(BytesRef keys) {
27+
throw new AssertionError("ExponentialHistogramBlock can't be a key");
28+
}
29+
30+
@Override
31+
public void decodeValue(BytesRef values) {
32+
int count = TopNEncoder.DEFAULT_UNSORTABLE.decodeVInt(values);
33+
if (count == 0) {
34+
builder.appendNull();
35+
return;
36+
}
37+
assert count == 1 : "ExponentialHistogramBlock does not support multi values";
38+
reusableInput.inputValues = values;
39+
builder.deserializeAndAppend(reusableInput);
40+
}
41+
42+
@Override
43+
public Block build() {
44+
return builder.build();
45+
}
46+
47+
@Override
48+
public String toString() {
49+
return "ResultBuilderForExponentialHistogram";
50+
}
51+
52+
@Override
53+
public void close() {
54+
builder.close();
55+
}
56+
57+
private static final class ReusableTopNEncoderInput implements ExponentialHistogramBlock.SerializedInput {
58+
BytesRef inputValues;
59+
60+
@Override
61+
public double readDouble() {
62+
return TopNEncoder.DEFAULT_UNSORTABLE.decodeDouble(inputValues);
63+
}
64+
65+
@Override
66+
public long readLong() {
67+
return TopNEncoder.DEFAULT_UNSORTABLE.decodeLong(inputValues);
68+
}
69+
70+
@Override
71+
public BytesRef readBytesRef(BytesRef scratch) {
72+
return TopNEncoder.DEFAULT_UNSORTABLE.decodeBytesRef(inputValues, scratch);
73+
}
74+
}
75+
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ValueExtractor.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.elasticsearch.compute.data.DocBlock;
1515
import org.elasticsearch.compute.data.DoubleBlock;
1616
import org.elasticsearch.compute.data.ElementType;
17+
import org.elasticsearch.compute.data.ExponentialHistogramBlock;
1718
import org.elasticsearch.compute.data.FloatBlock;
1819
import org.elasticsearch.compute.data.IntBlock;
1920
import org.elasticsearch.compute.data.LongBlock;
@@ -53,6 +54,7 @@ static ValueExtractor extractorFor(ElementType elementType, TopNEncoder encoder,
5354
case NULL -> new ValueExtractorForNull();
5455
case DOC -> new ValueExtractorForDoc(encoder, ((DocBlock) block).asVector());
5556
case AGGREGATE_METRIC_DOUBLE -> new ValueExtractorForAggregateMetricDouble(encoder, (AggregateMetricDoubleBlock) block);
57+
case EXPONENTIAL_HISTOGRAM -> new ValueExtractorForExponentialHistogram(encoder, (ExponentialHistogramBlock) block);
5658
default -> {
5759
assert false : "No value extractor for [" + block.elementType() + "]";
5860
throw new UnsupportedOperationException("No value extractor for [" + block.elementType() + "]");
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.compute.operator.topn;
9+
10+
import org.apache.lucene.util.BytesRef;
11+
import org.elasticsearch.compute.data.ExponentialHistogramBlock;
12+
import org.elasticsearch.compute.operator.BreakingBytesRefBuilder;
13+
14+
public class ValueExtractorForExponentialHistogram implements ValueExtractor {
15+
private final ExponentialHistogramBlock block;
16+
17+
private final BytesRef scratch = new BytesRef();
18+
private final ReusableTopNEncoderOutput reusableOutput = new ReusableTopNEncoderOutput();
19+
20+
ValueExtractorForExponentialHistogram(TopNEncoder encoder, ExponentialHistogramBlock block) {
21+
assert encoder == TopNEncoder.DEFAULT_UNSORTABLE;
22+
this.block = block;
23+
}
24+
25+
@Override
26+
public void writeValue(BreakingBytesRefBuilder values, int position) {
27+
// number of multi-values first for compatibility with ValueExtractorForNull
28+
if (block.isNull(position)) {
29+
TopNEncoder.DEFAULT_UNSORTABLE.encodeVInt(0, values);
30+
} else {
31+
assert block.getValueCount(position) == 1 : "Multi-valued ExponentialHistogram blocks are not supported in TopN";
32+
TopNEncoder.DEFAULT_UNSORTABLE.encodeVInt(1, values);
33+
int valueIndex = block.getFirstValueIndex(position);
34+
reusableOutput.target = values;
35+
block.serializeExponentialHistogram(valueIndex, reusableOutput, scratch);
36+
}
37+
}
38+
39+
@Override
40+
public String toString() {
41+
return "ValueExtractorForExponentialHistogram";
42+
}
43+
44+
private static final class ReusableTopNEncoderOutput implements ExponentialHistogramBlock.SerializedOutput {
45+
BreakingBytesRefBuilder target;
46+
47+
@Override
48+
public void appendDouble(double value) {
49+
TopNEncoder.DEFAULT_UNSORTABLE.encodeDouble(value, target);
50+
}
51+
52+
@Override
53+
public void appendLong(long value) {
54+
TopNEncoder.DEFAULT_UNSORTABLE.encodeLong(value, target);
55+
}
56+
57+
@Override
58+
public void appendBytesRef(BytesRef value) {
59+
TopNEncoder.DEFAULT_UNSORTABLE.encodeBytesRef(value, target);
60+
}
61+
}
62+
}

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/ExtractorTests.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public static Iterable<Object[]> parameters() {
4444
case UNKNOWN -> {
4545
supportsNull = false;
4646
}
47-
case COMPOSITE, EXPONENTIAL_HISTOGRAM -> {
47+
case COMPOSITE -> {
4848
// TODO: add later
4949
supportsNull = false;
5050
}
@@ -130,6 +130,9 @@ public static Iterable<Object[]> parameters() {
130130
) }
131131
);
132132
}
133+
case EXPONENTIAL_HISTOGRAM ->
134+
// multi values are not supported
135+
cases.add(valueTestCase("single " + e, e, TopNEncoder.DEFAULT_UNSORTABLE, () -> BlockTestUtils.randomValue(e)));
133136
case NULL -> {
134137
}
135138
default -> {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -487,10 +487,10 @@ private PhysicalOperation planTopN(TopNExec topNExec, LocalExecutionPlannerConte
487487
case BOOLEAN, NULL, BYTE, SHORT, INTEGER, LONG, DOUBLE, FLOAT, HALF_FLOAT, DATETIME, DATE_NANOS, DATE_PERIOD, TIME_DURATION,
488488
OBJECT, SCALED_FLOAT, UNSIGNED_LONG, TSID_DATA_TYPE -> TopNEncoder.DEFAULT_SORTABLE;
489489
case GEO_POINT, CARTESIAN_POINT, GEO_SHAPE, CARTESIAN_SHAPE, COUNTER_LONG, COUNTER_INTEGER, COUNTER_DOUBLE, SOURCE,
490-
AGGREGATE_METRIC_DOUBLE, DENSE_VECTOR, GEOHASH, GEOTILE, GEOHEX -> TopNEncoder.DEFAULT_UNSORTABLE;
490+
AGGREGATE_METRIC_DOUBLE, DENSE_VECTOR, GEOHASH, GEOTILE, GEOHEX, EXPONENTIAL_HISTOGRAM ->
491+
TopNEncoder.DEFAULT_UNSORTABLE;
491492
// unsupported fields are encoded as BytesRef, we'll use the same encoder; all values should be null at this point
492-
// TODO(b/133393) add exponential histogram support
493-
case PARTIAL_AGG, EXPONENTIAL_HISTOGRAM, UNSUPPORTED -> TopNEncoder.UNSUPPORTED;
493+
case PARTIAL_AGG, UNSUPPORTED -> TopNEncoder.UNSUPPORTED;
494494
};
495495
}
496496
List<TopNOperator.SortOrder> orders = topNExec.order().stream().map(order -> {

0 commit comments

Comments
 (0)