diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ConstantNullBlock.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ConstantNullBlock.java index 0dc4a67ab6100..dda3dbc90c7a2 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ConstantNullBlock.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ConstantNullBlock.java @@ -300,6 +300,12 @@ public ExponentialHistogram getExponentialHistogram(int valueIndex, ExponentialH throw new UnsupportedOperationException("null block"); } + @Override + public void serializeExponentialHistogram(int valueIndex, SerializedOutput out, BytesRef scratch) { + assert false : "null block"; + throw new UnsupportedOperationException("null block"); + } + @Override public int getTotalValueCount() { return 0; diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ExponentialHistogramArrayBlock.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ExponentialHistogramArrayBlock.java index 074813551ef4d..bfcba10f6c2a8 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ExponentialHistogramArrayBlock.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ExponentialHistogramArrayBlock.java @@ -19,9 +19,34 @@ final class ExponentialHistogramArrayBlock extends AbstractNonThreadSafeRefCounted implements ExponentialHistogramBlock { + // Exponential histograms consist of several components that we store in separate blocks + // due to (a) better compression in the field mapper for disk storage and (b) faster computations if only one sub-component is needed + // What are the semantics of positions, multi-value counts and nulls in the exponential histogram block and + // how do they relate to the sub-blocks? + // ExponentialHistogramBlock need to adhere to the contract of Blocks for the access patterns: + // + // for (int position = 0; position < block.getPositionCount(); position++) { + // ...int valueCount = block.getValueCount(position); + // ...for (int valueIndex = 0; valueIndex < valueCount; valueIndex++) { + // ......ExponentialHistogram histo = block.getExponentialHistogram(valueIndex, scratch); + // ...} + // } + // + // That implies that given only a value-index, we need to be able to retrieve all components of the histogram. + // Because we can't make any assumptions on how value indices are laid out in the sub-blocks for multi-values, + // we enforce that the sub-blocks have at most one value per position (i.e., no multi-values). + // Based on this, we can define the valueIndex for ExponentialHistogramArrayBlock to correspond to positions in the sub-blocks. + // So basically the sub-blocks are the "flattened" components of the histograms. + // If we later add multi-value support to ExponentialHistogramArrayBlock, + // we can't use the multi-value support of the sub-blocks to implement that. + // Instead, we need to maintain a firstValueIndex array ourselves in ExponentialHistogramArrayBlock. + private final DoubleBlock minima; private final DoubleBlock maxima; private final DoubleBlock sums; + /** + Holds the number of values in each histogram. Note that this is a different concept from getValueCount(position)! + */ private final LongBlock valueCounts; private final DoubleBlock zeroThresholds; private final BytesRefBlock encodedHistograms; @@ -94,6 +119,22 @@ public ExponentialHistogram getExponentialHistogram(int valueIndex, ExponentialH } } + @Override + public void serializeExponentialHistogram(int valueIndex, SerializedOutput out, BytesRef scratch) { + // not that this value count is different from getValueCount(position)! + // this value count represents the number of individual samples the histogram was computed for + long valueCount = valueCounts.getLong(valueCounts.getFirstValueIndex(valueIndex)); + out.appendLong(valueCounts.getLong(valueCounts.getFirstValueIndex(valueIndex))); + out.appendDouble(sums.getDouble(sums.getFirstValueIndex(valueIndex))); + out.appendDouble(zeroThresholds.getDouble(zeroThresholds.getFirstValueIndex(valueIndex))); + if (valueCount > 0) { + // min / max are only non-null for non-empty histograms + out.appendDouble(minima.getDouble(minima.getFirstValueIndex(valueIndex))); + out.appendDouble(maxima.getDouble(maxima.getFirstValueIndex(valueIndex))); + } + out.appendBytesRef(encodedHistograms.getBytesRef(encodedHistograms.getFirstValueIndex(valueIndex), scratch)); + } + @Override protected void closeInternal() { Releasables.close(getSubBlocks()); @@ -376,5 +417,4 @@ public int hashCode() { // this ensures proper equality with null blocks and should be unique enough for practical purposes return encodedHistograms.hashCode(); } - } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ExponentialHistogramBlock.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ExponentialHistogramBlock.java index 3aad62ce9355c..3a3b65201d76b 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ExponentialHistogramBlock.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ExponentialHistogramBlock.java @@ -7,6 +7,7 @@ package org.elasticsearch.compute.data; +import org.apache.lucene.util.BytesRef; import org.elasticsearch.exponentialhistogram.ExponentialHistogram; /** @@ -27,6 +28,16 @@ public sealed interface ExponentialHistogramBlock extends Block permits Constant */ ExponentialHistogram getExponentialHistogram(int valueIndex, ExponentialHistogramScratch scratch); + /** + * Serializes the exponential histogram at the given index into the provided output, so that it can be read back + * via {@link ExponentialHistogramBlockBuilder#deserializeAndAppend(SerializedInput)}. + * + * @param valueIndex + * @param out + * @param scratch + */ + void serializeExponentialHistogram(int valueIndex, SerializedOutput out, BytesRef scratch); + static boolean equals(ExponentialHistogramBlock blockA, ExponentialHistogramBlock blockB) { if (blockA == blockB) { return true; @@ -42,4 +53,27 @@ static boolean equals(ExponentialHistogramBlock blockA, ExponentialHistogramBloc }; } + /** + * Abstraction to use for writing individual values via {@link #serializeExponentialHistogram(int, SerializedOutput, BytesRef)}. + */ + interface SerializedOutput { + void appendDouble(double value); + + void appendLong(long value); + + void appendBytesRef(BytesRef bytesRef); + } + + /** + * Abstraction to use for reading individual serialized via + * {@link ExponentialHistogramBlockBuilder#deserializeAndAppend(SerializedInput)}. + */ + interface SerializedInput { + double readDouble(); + + long readLong(); + + BytesRef readBytesRef(BytesRef scratch); + } + } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ExponentialHistogramBlockBuilder.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ExponentialHistogramBlockBuilder.java index cd3591a19932c..f8aa1a708a3f9 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ExponentialHistogramBlockBuilder.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ExponentialHistogramBlockBuilder.java @@ -7,6 +7,7 @@ package org.elasticsearch.compute.data; +import org.apache.lucene.util.BytesRef; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.core.Releasables; import org.elasticsearch.exponentialhistogram.CompressedExponentialHistogram; @@ -25,6 +26,8 @@ public class ExponentialHistogramBlockBuilder implements Block.Builder, BlockLoa private final DoubleBlock.Builder zeroThresholdsBuilder; private final BytesRefBlock.Builder encodedHistogramsBuilder; + private final BytesRef scratch = new BytesRef(); + ExponentialHistogramBlockBuilder(int estimatedSize, BlockFactory blockFactory) { DoubleBlock.Builder minimaBuilder = null; DoubleBlock.Builder maximaBuilder = null; @@ -131,6 +134,27 @@ public ExponentialHistogramBlockBuilder append(ExponentialHistogram histogram) { return this; } + /** + * Decodes and appends a value serialized with + * {@link ExponentialHistogramBlock#serializeExponentialHistogram(int, ExponentialHistogramBlock.SerializedOutput, BytesRef)}. + * + * @param input the input to deserialize from + */ + public void deserializeAndAppend(ExponentialHistogramBlock.SerializedInput input) { + long valueCount = input.readLong(); + valueCountsBuilder.appendLong(valueCount); + sumsBuilder.appendDouble(input.readDouble()); + zeroThresholdsBuilder.appendDouble(input.readDouble()); + if (valueCount > 0) { + minimaBuilder.appendDouble(input.readDouble()); + maximaBuilder.appendDouble(input.readDouble()); + } else { + minimaBuilder.appendNull(); + maximaBuilder.appendNull(); + } + encodedHistogramsBuilder.appendBytesRef(input.readBytesRef(scratch)); + } + @Override public ExponentialHistogramBlock build() { DoubleBlock minima = null; diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ResultBuilder.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ResultBuilder.java index 6eee1383e277b..cff31535f82e0 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ResultBuilder.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ResultBuilder.java @@ -55,6 +55,7 @@ static ResultBuilder resultBuilderFor( case NULL -> new ResultBuilderForNull(blockFactory); case DOC -> new ResultBuilderForDoc(blockFactory, (DocVectorEncoder) encoder, positions); case AGGREGATE_METRIC_DOUBLE -> new ResultBuilderForAggregateMetricDouble(blockFactory, positions); + case EXPONENTIAL_HISTOGRAM -> new ResultBuilderForExponentialHistogram(blockFactory, positions); default -> { assert false : "Result builder for [" + elementType + "]"; throw new UnsupportedOperationException("Result builder for [" + elementType + "]"); diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ResultBuilderForExponentialHistogram.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ResultBuilderForExponentialHistogram.java new file mode 100644 index 0000000000000..8f7b8a59e5a35 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ResultBuilderForExponentialHistogram.java @@ -0,0 +1,75 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.operator.topn; + +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.BlockFactory; +import org.elasticsearch.compute.data.ExponentialHistogramBlock; +import org.elasticsearch.compute.data.ExponentialHistogramBlockBuilder; + +public class ResultBuilderForExponentialHistogram implements ResultBuilder { + + private final ExponentialHistogramBlockBuilder builder; + private final ReusableTopNEncoderInput reusableInput = new ReusableTopNEncoderInput(); + + ResultBuilderForExponentialHistogram(BlockFactory blockFactory, int positions) { + this.builder = blockFactory.newExponentialHistogramBlockBuilder(positions); + } + + @Override + public void decodeKey(BytesRef keys) { + throw new AssertionError("ExponentialHistogramBlock can't be a key"); + } + + @Override + public void decodeValue(BytesRef values) { + int count = TopNEncoder.DEFAULT_UNSORTABLE.decodeVInt(values); + if (count == 0) { + builder.appendNull(); + return; + } + assert count == 1 : "ExponentialHistogramBlock does not support multi values"; + reusableInput.inputValues = values; + builder.deserializeAndAppend(reusableInput); + } + + @Override + public Block build() { + return builder.build(); + } + + @Override + public String toString() { + return "ResultBuilderForExponentialHistogram"; + } + + @Override + public void close() { + builder.close(); + } + + private static final class ReusableTopNEncoderInput implements ExponentialHistogramBlock.SerializedInput { + BytesRef inputValues; + + @Override + public double readDouble() { + return TopNEncoder.DEFAULT_UNSORTABLE.decodeDouble(inputValues); + } + + @Override + public long readLong() { + return TopNEncoder.DEFAULT_UNSORTABLE.decodeLong(inputValues); + } + + @Override + public BytesRef readBytesRef(BytesRef scratch) { + return TopNEncoder.DEFAULT_UNSORTABLE.decodeBytesRef(inputValues, scratch); + } + } +} diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ValueExtractor.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ValueExtractor.java index b6f3a1198d1ff..d5120cddcb761 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ValueExtractor.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ValueExtractor.java @@ -14,6 +14,7 @@ import org.elasticsearch.compute.data.DocBlock; import org.elasticsearch.compute.data.DoubleBlock; import org.elasticsearch.compute.data.ElementType; +import org.elasticsearch.compute.data.ExponentialHistogramBlock; import org.elasticsearch.compute.data.FloatBlock; import org.elasticsearch.compute.data.IntBlock; import org.elasticsearch.compute.data.LongBlock; @@ -53,6 +54,7 @@ static ValueExtractor extractorFor(ElementType elementType, TopNEncoder encoder, case NULL -> new ValueExtractorForNull(); case DOC -> new ValueExtractorForDoc(encoder, ((DocBlock) block).asVector()); case AGGREGATE_METRIC_DOUBLE -> new ValueExtractorForAggregateMetricDouble(encoder, (AggregateMetricDoubleBlock) block); + case EXPONENTIAL_HISTOGRAM -> new ValueExtractorForExponentialHistogram(encoder, (ExponentialHistogramBlock) block); default -> { assert false : "No value extractor for [" + block.elementType() + "]"; throw new UnsupportedOperationException("No value extractor for [" + block.elementType() + "]"); diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ValueExtractorForExponentialHistogram.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ValueExtractorForExponentialHistogram.java new file mode 100644 index 0000000000000..0df2383d4bade --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/ValueExtractorForExponentialHistogram.java @@ -0,0 +1,62 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.operator.topn; + +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.compute.data.ExponentialHistogramBlock; +import org.elasticsearch.compute.operator.BreakingBytesRefBuilder; + +public class ValueExtractorForExponentialHistogram implements ValueExtractor { + private final ExponentialHistogramBlock block; + + private final BytesRef scratch = new BytesRef(); + private final ReusableTopNEncoderOutput reusableOutput = new ReusableTopNEncoderOutput(); + + ValueExtractorForExponentialHistogram(TopNEncoder encoder, ExponentialHistogramBlock block) { + assert encoder == TopNEncoder.DEFAULT_UNSORTABLE; + this.block = block; + } + + @Override + public void writeValue(BreakingBytesRefBuilder values, int position) { + // number of multi-values first for compatibility with ValueExtractorForNull + if (block.isNull(position)) { + TopNEncoder.DEFAULT_UNSORTABLE.encodeVInt(0, values); + } else { + assert block.getValueCount(position) == 1 : "Multi-valued ExponentialHistogram blocks are not supported in TopN"; + TopNEncoder.DEFAULT_UNSORTABLE.encodeVInt(1, values); + int valueIndex = block.getFirstValueIndex(position); + reusableOutput.target = values; + block.serializeExponentialHistogram(valueIndex, reusableOutput, scratch); + } + } + + @Override + public String toString() { + return "ValueExtractorForExponentialHistogram"; + } + + private static final class ReusableTopNEncoderOutput implements ExponentialHistogramBlock.SerializedOutput { + BreakingBytesRefBuilder target; + + @Override + public void appendDouble(double value) { + TopNEncoder.DEFAULT_UNSORTABLE.encodeDouble(value, target); + } + + @Override + public void appendLong(long value) { + TopNEncoder.DEFAULT_UNSORTABLE.encodeLong(value, target); + } + + @Override + public void appendBytesRef(BytesRef value) { + TopNEncoder.DEFAULT_UNSORTABLE.encodeBytesRef(value, target); + } + } +} diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/ExtractorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/ExtractorTests.java index 022055a72bcbb..60b11e5a290e8 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/ExtractorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/ExtractorTests.java @@ -44,7 +44,7 @@ public static Iterable parameters() { case UNKNOWN -> { supportsNull = false; } - case COMPOSITE, EXPONENTIAL_HISTOGRAM -> { + case COMPOSITE -> { // TODO: add later supportsNull = false; } @@ -130,6 +130,9 @@ public static Iterable parameters() { ) } ); } + case EXPONENTIAL_HISTOGRAM -> + // multi values are not supported + cases.add(valueTestCase("single " + e, e, TopNEncoder.DEFAULT_UNSORTABLE, () -> BlockTestUtils.randomValue(e))); case NULL -> { } default -> { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java index 908193f4bcb18..7eadaa979d3c8 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java @@ -487,10 +487,10 @@ private PhysicalOperation planTopN(TopNExec topNExec, LocalExecutionPlannerConte case BOOLEAN, NULL, BYTE, SHORT, INTEGER, LONG, DOUBLE, FLOAT, HALF_FLOAT, DATETIME, DATE_NANOS, DATE_PERIOD, TIME_DURATION, OBJECT, SCALED_FLOAT, UNSIGNED_LONG, TSID_DATA_TYPE -> TopNEncoder.DEFAULT_SORTABLE; case GEO_POINT, CARTESIAN_POINT, GEO_SHAPE, CARTESIAN_SHAPE, COUNTER_LONG, COUNTER_INTEGER, COUNTER_DOUBLE, SOURCE, - AGGREGATE_METRIC_DOUBLE, DENSE_VECTOR, GEOHASH, GEOTILE, GEOHEX -> TopNEncoder.DEFAULT_UNSORTABLE; + AGGREGATE_METRIC_DOUBLE, DENSE_VECTOR, GEOHASH, GEOTILE, GEOHEX, EXPONENTIAL_HISTOGRAM -> + TopNEncoder.DEFAULT_UNSORTABLE; // unsupported fields are encoded as BytesRef, we'll use the same encoder; all values should be null at this point - // TODO(b/133393) add exponential histogram support - case PARTIAL_AGG, EXPONENTIAL_HISTOGRAM, UNSUPPORTED -> TopNEncoder.UNSUPPORTED; + case PARTIAL_AGG, UNSUPPORTED -> TopNEncoder.UNSUPPORTED; }; } List orders = topNExec.order().stream().map(order -> {