diff --git a/libs/exponential-histogram/src/main/java/org/elasticsearch/exponentialhistogram/ExponentialHistogramXContent.java b/libs/exponential-histogram/src/main/java/org/elasticsearch/exponentialhistogram/ExponentialHistogramXContent.java index 1f6b9c266a826..3227412454e52 100644 --- a/libs/exponential-histogram/src/main/java/org/elasticsearch/exponentialhistogram/ExponentialHistogramXContent.java +++ b/libs/exponential-histogram/src/main/java/org/elasticsearch/exponentialhistogram/ExponentialHistogramXContent.java @@ -21,9 +21,16 @@ package org.elasticsearch.exponentialhistogram; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.Types; import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentParser; import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.function.BiConsumer; /** * Handles the serialization of an {@link ExponentialHistogram} to XContent. @@ -48,7 +55,11 @@ public class ExponentialHistogramXContent { * @param histogram the ExponentialHistogram to serialize * @throws IOException if the XContentBuilder throws an IOException */ - public static void serialize(XContentBuilder builder, ExponentialHistogram histogram) throws IOException { + public static void serialize(XContentBuilder builder, @Nullable ExponentialHistogram histogram) throws IOException { + if (histogram == null) { + builder.nullValue(); + return; + } builder.startObject(); builder.field(SCALE_FIELD, histogram.scale()); @@ -101,4 +112,63 @@ private static void writeBuckets(XContentBuilder b, String fieldName, Exponentia b.endObject(); } + /** + * Parses an {@link ExponentialHistogram} from the provided {@link XContentParser}. + * This method is neither optimized, nor does it do any validation of the parsed content. + * No estimation for missing sum/min/max is done. + * Therefore only intended for testing! + * + * @param xContent the serialized histogram to read + * @return the deserialized histogram + * @throws IOException if the XContentParser throws an IOException + */ + public static ExponentialHistogram parseForTesting(XContentParser xContent) throws IOException { + if (xContent.currentToken() == null) { + xContent.nextToken(); + } + if (xContent.currentToken() == XContentParser.Token.VALUE_NULL) { + return null; + } + return parseForTesting(xContent.map()); + } + + /** + * Parses an {@link ExponentialHistogram} from a {@link Map}. + * This method is neither optimized, nor does it do any validation of the parsed content. + * No estimation for missing sum/min/max is done. + * Therefore only intended for testing! + * + * @param xContent the serialized histogram as a map + * @return the deserialized histogram + */ + public static ExponentialHistogram parseForTesting(@Nullable Map xContent) { + if (xContent == null) { + return null; + } + int scale = ((Number) xContent.get(SCALE_FIELD)).intValue(); + ExponentialHistogramBuilder builder = ExponentialHistogram.builder(scale, ExponentialHistogramCircuitBreaker.noop()); + + Map zero = Types.forciblyCast(xContent.getOrDefault(ZERO_FIELD, Collections.emptyMap())); + double zeroThreshold = zero.getOrDefault(ZERO_THRESHOLD_FIELD, 0).doubleValue(); + long zeroCount = zero.getOrDefault(ZERO_COUNT_FIELD, 0).longValue(); + builder.zeroBucket(ZeroBucket.create(zeroThreshold, zeroCount)); + + builder.sum(((Number) xContent.getOrDefault(SUM_FIELD, 0)).doubleValue()); + builder.min(((Number) xContent.getOrDefault(MIN_FIELD, Double.NaN)).doubleValue()); + builder.max(((Number) xContent.getOrDefault(MAX_FIELD, Double.NaN)).doubleValue()); + + parseBuckets(Types.forciblyCast(xContent.getOrDefault(NEGATIVE_FIELD, Collections.emptyMap())), builder::setNegativeBucket); + parseBuckets(Types.forciblyCast(xContent.getOrDefault(POSITIVE_FIELD, Collections.emptyMap())), builder::setPositiveBucket); + + return builder.build(); + } + + private static void parseBuckets(Map> serializedBuckets, BiConsumer bucketSetter) { + List indices = serializedBuckets.getOrDefault(BUCKET_INDICES_FIELD, Collections.emptyList()); + List counts = serializedBuckets.getOrDefault(BUCKET_COUNTS_FIELD, Collections.emptyList()); + assert indices.size() == counts.size(); + for (int i = 0; i < indices.size(); i++) { + bucketSetter.accept(indices.get(i).longValue(), counts.get(i).longValue()); + } + } } diff --git a/libs/exponential-histogram/src/test/java/org/elasticsearch/exponentialhistogram/ExponentialHistogramXContentTests.java b/libs/exponential-histogram/src/test/java/org/elasticsearch/exponentialhistogram/ExponentialHistogramXContentTests.java index 495e68031851c..80b788217f27c 100644 --- a/libs/exponential-histogram/src/test/java/org/elasticsearch/exponentialhistogram/ExponentialHistogramXContentTests.java +++ b/libs/exponential-histogram/src/test/java/org/elasticsearch/exponentialhistogram/ExponentialHistogramXContentTests.java @@ -23,6 +23,8 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentParser; +import org.elasticsearch.xcontent.XContentParserConfiguration; import org.elasticsearch.xcontent.json.JsonXContent; import java.io.IOException; @@ -31,6 +33,11 @@ public class ExponentialHistogramXContentTests extends ExponentialHistogramTestCase { + public void testNullHistogram() { + assertThat(toJson(null), equalTo("null")); + checkRoundTrip(null); + } + public void testEmptyHistogram() { ExponentialHistogram emptyHistogram = ExponentialHistogram.empty(); assertThat(toJson(emptyHistogram), equalTo("{\"scale\":" + emptyHistogram.scale() + ",\"sum\":0.0}")); @@ -62,11 +69,13 @@ public void testFullHistogram() { + "}" ) ); + checkRoundTrip(histo); } public void testOnlyZeroThreshold() { ExponentialHistogram histo = createAutoReleasedHistogram(b -> b.scale(3).sum(1.1).zeroBucket(ZeroBucket.create(5.0, 0))); assertThat(toJson(histo), equalTo("{\"scale\":3,\"sum\":1.1,\"zero\":{\"threshold\":5.0}}")); + checkRoundTrip(histo); } public void testOnlyZeroCount() { @@ -74,6 +83,7 @@ public void testOnlyZeroCount() { b -> b.zeroBucket(ZeroBucket.create(0.0, 7)).scale(2).sum(1.1).min(0).max(0) ); assertThat(toJson(histo), equalTo("{\"scale\":2,\"sum\":1.1,\"min\":0.0,\"max\":0.0,\"zero\":{\"count\":7}}")); + checkRoundTrip(histo); } public void testOnlyPositiveBuckets() { @@ -84,6 +94,7 @@ public void testOnlyPositiveBuckets() { toJson(histo), equalTo("{\"scale\":4,\"sum\":1.1,\"min\":0.5,\"max\":2.5,\"positive\":{\"indices\":[-1,2],\"counts\":[3,5]}}") ); + checkRoundTrip(histo); } public void testOnlyNegativeBuckets() { @@ -94,6 +105,7 @@ public void testOnlyNegativeBuckets() { toJson(histo), equalTo("{\"scale\":5,\"sum\":1.1,\"min\":-0.5,\"max\":-0.25,\"negative\":{\"indices\":[-1,2],\"counts\":[4,6]}}") ); + checkRoundTrip(histo); } private static String toJson(ExponentialHistogram histo) { @@ -105,4 +117,17 @@ private static String toJson(ExponentialHistogram histo) { } } + private static void checkRoundTrip(ExponentialHistogram histo) { + try (XContentBuilder builder = JsonXContent.contentBuilder()) { + ExponentialHistogramXContent.serialize(builder, histo); + String json = Strings.toString(builder); + try (XContentParser parser = JsonXContent.jsonXContent.createParser(XContentParserConfiguration.EMPTY, json)) { + ExponentialHistogram parsed = ExponentialHistogramXContent.parseForTesting(parser); + assertThat(parsed, equalTo(histo)); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } diff --git a/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/plugin/EsqlCorePlugin.java b/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/plugin/EsqlCorePlugin.java index 1b2a59b5c6886..da808b0083d22 100644 --- a/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/plugin/EsqlCorePlugin.java +++ b/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/plugin/EsqlCorePlugin.java @@ -7,9 +7,11 @@ package org.elasticsearch.xpack.esql.core.plugin; +import org.elasticsearch.common.util.FeatureFlag; import org.elasticsearch.plugins.ExtensiblePlugin; import org.elasticsearch.plugins.Plugin; public class EsqlCorePlugin extends Plugin implements ExtensiblePlugin { + public static final FeatureFlag EXPONENTIAL_HISTOGRAM_FEATURE_FLAG = new FeatureFlag("esql_exponential_histogram"); } diff --git a/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/type/DataType.java b/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/type/DataType.java index b05d6784c072f..35dfcaa96ee7c 100644 --- a/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/type/DataType.java +++ b/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/type/DataType.java @@ -340,6 +340,14 @@ public enum DataType implements Writeable { .estimatedSize(Double.BYTES * 3 + Integer.BYTES) .supportedSince(DataTypesTransportVersions.ESQL_AGGREGATE_METRIC_DOUBLE_CREATED_VERSION) ), + + EXPONENTIAL_HISTOGRAM( + builder().esType("exponential_histogram") + .estimatedSize(16 * 160)// guess 160 buckets (OTEL default for positive values only histograms) with 16 bytes per bucket + .docValues() + .underConstruction() + ), + /** * Fields with this type are dense vectors, represented as an array of float values. */ diff --git a/x-pack/plugin/esql/build.gradle b/x-pack/plugin/esql/build.gradle index 734c0b62eb729..54a1b81b9f0c2 100644 --- a/x-pack/plugin/esql/build.gradle +++ b/x-pack/plugin/esql/build.gradle @@ -42,6 +42,7 @@ dependencies { implementation project('compute:ann') implementation project(':libs:dissect') implementation project(':libs:grok') + implementation project(':libs:exponential-histogram') api "org.apache.lucene:lucene-spatial3d:${versions.lucene}" api project(":libs:h3") implementation project('arrow') diff --git a/x-pack/plugin/esql/compute/build.gradle b/x-pack/plugin/esql/compute/build.gradle index bd4bb33873be5..e792bad34f67a 100644 --- a/x-pack/plugin/esql/compute/build.gradle +++ b/x-pack/plugin/esql/compute/build.gradle @@ -16,6 +16,7 @@ dependencies { compileOnly project(xpackModule('ml')) annotationProcessor project('gen') implementation 'com.carrotsearch:hppc:0.8.1' + api project(':libs:exponential-histogram') testImplementation(project(':modules:analysis-common')) testImplementation(project(':test:framework')) diff --git a/x-pack/plugin/esql/compute/src/main/java/module-info.java b/x-pack/plugin/esql/compute/src/main/java/module-info.java index eef946614ae42..53f33058c1cfe 100644 --- a/x-pack/plugin/esql/compute/src/main/java/module-info.java +++ b/x-pack/plugin/esql/compute/src/main/java/module-info.java @@ -21,6 +21,7 @@ requires org.elasticsearch.geo; requires org.elasticsearch.xcore; requires hppc; + requires org.elasticsearch.exponentialhistogram; exports org.elasticsearch.compute; exports org.elasticsearch.compute.aggregation; diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockFactory.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockFactory.java index 53cb0a72f86fc..9a34c010d66ec 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockFactory.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockFactory.java @@ -14,6 +14,7 @@ import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BytesRefArray; import org.elasticsearch.compute.data.Block.MvOrdering; +import org.elasticsearch.exponentialhistogram.ExponentialHistogram; import java.util.BitSet; @@ -467,6 +468,19 @@ public final AggregateMetricDoubleBlock newConstantAggregateMetricDoubleBlock( } } + public ExponentialHistogramBlockBuilder newExponentialHistogramBlockBuilder(int estimatedSize) { + return new ExponentialHistogramBlockBuilder(estimatedSize, this); + } + + public final ExponentialHistogramBlock newConstantExponentialHistogramBlock(ExponentialHistogram value, int positionCount) { + try (ExponentialHistogramBlockBuilder builder = newExponentialHistogramBlockBuilder(positionCount)) { + for (int i = 0; i < positionCount; i++) { + builder.append(value); + } + return builder.build(); + } + } + public final AggregateMetricDoubleBlock newAggregateMetricDoubleBlock( double[] minValues, double[] maxValues, diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockUtils.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockUtils.java index fa5f139c5c6a1..fd036a9bc1e0d 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockUtils.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockUtils.java @@ -13,6 +13,8 @@ import org.elasticsearch.compute.data.AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleLiteral; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; +import org.elasticsearch.exponentialhistogram.ExponentialHistogram; +import org.elasticsearch.exponentialhistogram.ExponentialHistogramCircuitBreaker; import java.util.ArrayList; import java.util.Arrays; @@ -224,6 +226,7 @@ public static void appendValue(Block.Builder builder, Object val, ElementType ty case DOUBLE -> ((DoubleBlock.Builder) builder).appendDouble((Double) val); case BOOLEAN -> ((BooleanBlock.Builder) builder).appendBoolean((Boolean) val); case AGGREGATE_METRIC_DOUBLE -> ((AggregateMetricDoubleBlockBuilder) builder).appendLiteral((AggregateMetricDoubleLiteral) val); + case EXPONENTIAL_HISTOGRAM -> ((ExponentialHistogramBlockBuilder) builder).append((ExponentialHistogram) val); default -> throw new UnsupportedOperationException("unsupported element type [" + type + "]"); } } @@ -253,6 +256,7 @@ private static Block constantBlock(BlockFactory blockFactory, ElementType type, case BOOLEAN -> blockFactory.newConstantBooleanBlockWith((boolean) val, size); case AGGREGATE_METRIC_DOUBLE -> blockFactory.newConstantAggregateMetricDoubleBlock((AggregateMetricDoubleLiteral) val, size); case FLOAT -> blockFactory.newConstantFloatBlockWith((float) val, size); + case EXPONENTIAL_HISTOGRAM -> blockFactory.newConstantExponentialHistogramBlock((ExponentialHistogram) val, size); default -> throw new UnsupportedOperationException("unsupported element type [" + type + "]"); }; } @@ -306,6 +310,12 @@ yield new AggregateMetricDoubleLiteral( aggBlock.countBlock().getInt(offset) ); } + case EXPONENTIAL_HISTOGRAM -> { + ExponentialHistogramBlock histoBlock = (ExponentialHistogramBlock) block; + ExponentialHistogram histogram = new ExponentialHistogramBlockAccessor(histoBlock).get(offset); + // return a copy so that the returned value is not bound to the lifetime of the block + yield ExponentialHistogram.builder(histogram, ExponentialHistogramCircuitBreaker.noop()).build(); + } case UNKNOWN -> throw new IllegalArgumentException("can't read values from [" + block + "]"); }; } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ConstantNullBlock.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ConstantNullBlock.java index 22f4809100014..4586638d6ea5c 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ConstantNullBlock.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ConstantNullBlock.java @@ -26,7 +26,8 @@ public final class ConstantNullBlock extends AbstractNonThreadSafeRefCounted FloatBlock, DoubleBlock, BytesRefBlock, - AggregateMetricDoubleBlock { + AggregateMetricDoubleBlock, + ExponentialHistogramBlock { private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(ConstantNullBlock.class); private final int positionCount; diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ElementType.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ElementType.java index 5202231067787..52a7853e56182 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ElementType.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ElementType.java @@ -11,6 +11,7 @@ import org.elasticsearch.TransportVersion; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.exponentialhistogram.ExponentialHistogram; import java.io.IOException; import java.util.Arrays; @@ -64,6 +65,16 @@ public enum ElementType { "AggregateMetricDouble", BlockFactory::newAggregateMetricDoubleBlockBuilder, AggregateMetricDoubleArrayBlock::readFrom + ), + + /** + * Blocks that contain exponential_histograms. + */ + EXPONENTIAL_HISTOGRAM( + 11, + "ExponentialHistogram", + BlockFactory::newExponentialHistogramBlockBuilder, + ExponentialHistogramArrayBlock::readFrom ); private static final TransportVersion ESQL_SERIALIZE_BLOCK_TYPE_CODE = TransportVersion.fromName("esql_serialize_block_type_code"); @@ -113,6 +124,8 @@ public static ElementType fromJava(Class type) { elementType = BOOLEAN; } else if (type == AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleLiteral.class) { elementType = AGGREGATE_METRIC_DOUBLE; + } else if (type != null && ExponentialHistogram.class.isAssignableFrom(type)) { + elementType = EXPONENTIAL_HISTOGRAM; } else if (type == null || type == Void.class) { elementType = NULL; } else { diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ExponentialHistogramArrayBlock.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ExponentialHistogramArrayBlock.java new file mode 100644 index 0000000000000..20f485f913903 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ExponentialHistogramArrayBlock.java @@ -0,0 +1,377 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.data; + +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.core.ReleasableIterator; +import org.elasticsearch.core.Releasables; +import org.elasticsearch.exponentialhistogram.CompressedExponentialHistogram; + +import java.io.IOException; +import java.util.List; + +final class ExponentialHistogramArrayBlock extends AbstractNonThreadSafeRefCounted implements ExponentialHistogramBlock { + + private final DoubleBlock minima; + private final DoubleBlock maxima; + private final DoubleBlock sums; + private final LongBlock valueCounts; + private final DoubleBlock zeroThresholds; + private final BytesRefBlock encodedHistograms; + + ExponentialHistogramArrayBlock( + DoubleBlock minima, + DoubleBlock maxima, + DoubleBlock sums, + LongBlock valueCounts, + DoubleBlock zeroThresholds, + BytesRefBlock encodedHistograms + ) { + this.minima = minima; + this.maxima = maxima; + this.sums = sums; + this.valueCounts = valueCounts; + this.zeroThresholds = zeroThresholds; + this.encodedHistograms = encodedHistograms; + assert assertInvariants(); + } + + private boolean assertInvariants() { + for (Block b : getSubBlocks()) { + assert b.isReleased() == false; + assert b.doesHaveMultivaluedFields() == false + : "ExponentialHistogramArrayBlock sub-blocks can't have multi-values but [" + b + "] does"; + assert b.getPositionCount() == getPositionCount() + : "ExponentialHistogramArrayBlock sub-blocks must have the same position count but [" + + b + + "] has " + + b.getPositionCount() + + " instead of " + + getPositionCount(); + for (int i = 0; i < b.getPositionCount(); i++) { + if (isNull(i)) { + assert b.isNull(i) + : "ExponentialHistogramArrayBlock sub-block [" + b + "] should be null at position " + i + ", but was not"; + } else { + if (b == minima || b == maxima) { + // minima / maxima should be null exactly when value count is 0 or the histogram is null + assert b.isNull(i) == (valueCounts.getLong(valueCounts.getFirstValueIndex(i)) == 0) + : "ExponentialHistogramArrayBlock minima/maxima sub-block [" + b + "] has wrong nullity at position " + i; + } else { + assert b.isNull(i) == false + : "ExponentialHistogramArrayBlock sub-block [" + b + "] should be non-null at position " + i + ", but was not"; + } + } + } + } + return true; + } + + private List getSubBlocks() { + return List.of(sums, valueCounts, zeroThresholds, encodedHistograms, minima, maxima); + } + + void loadValue(int valueIndex, CompressedExponentialHistogram resultHistogram, BytesRef tempBytesRef) { + BytesRef bytes = encodedHistograms.getBytesRef(encodedHistograms.getFirstValueIndex(valueIndex), tempBytesRef); + double zeroThreshold = zeroThresholds.getDouble(zeroThresholds.getFirstValueIndex(valueIndex)); + long valueCount = valueCounts.getLong(valueCounts.getFirstValueIndex(valueIndex)); + double sum = sums.getDouble(sums.getFirstValueIndex(valueIndex)); + double min = valueCount == 0 ? Double.NaN : minima.getDouble(minima.getFirstValueIndex(valueIndex)); + double max = valueCount == 0 ? Double.NaN : maxima.getDouble(maxima.getFirstValueIndex(valueIndex)); + try { + resultHistogram.reset(zeroThreshold, valueCount, sum, min, max, bytes); + } catch (IOException e) { + throw new IllegalStateException("error loading histogram", e); + } + } + + @Override + protected void closeInternal() { + Releasables.close(getSubBlocks()); + } + + @Override + public Vector asVector() { + return null; + } + + @Override + public int getTotalValueCount() { + return encodedHistograms.getTotalValueCount(); + } + + @Override + public int getPositionCount() { + return encodedHistograms.getPositionCount(); + } + + @Override + public int getFirstValueIndex(int position) { + return position; + } + + @Override + public int getValueCount(int position) { + return isNull(position) ? 0 : 1; + } + + @Override + public ElementType elementType() { + return ElementType.EXPONENTIAL_HISTOGRAM; + } + + @Override + public BlockFactory blockFactory() { + return encodedHistograms.blockFactory(); + } + + @Override + public void allowPassingToDifferentDriver() { + getSubBlocks().forEach(Block::allowPassingToDifferentDriver); + } + + @Override + public boolean isNull(int position) { + return encodedHistograms.isNull(position); + } + + @Override + public boolean mayHaveNulls() { + return encodedHistograms.mayHaveNulls(); + } + + @Override + public boolean areAllValuesNull() { + return encodedHistograms.areAllValuesNull(); + } + + @Override + public boolean mayHaveMultivaluedFields() { + return false; + } + + @Override + public boolean doesHaveMultivaluedFields() { + return false; + } + + @Override + public Block filter(int... positions) { + DoubleBlock filteredMinima = null; + DoubleBlock filteredMaxima = null; + DoubleBlock filteredSums = null; + LongBlock filteredValueCounts = null; + DoubleBlock filteredZeroThresholds = null; + BytesRefBlock filteredEncodedHistograms = null; + boolean success = false; + try { + filteredMinima = minima.filter(positions); + filteredMaxima = maxima.filter(positions); + filteredSums = sums.filter(positions); + filteredValueCounts = valueCounts.filter(positions); + filteredZeroThresholds = zeroThresholds.filter(positions); + filteredEncodedHistograms = encodedHistograms.filter(positions); + success = true; + } finally { + if (success == false) { + Releasables.close( + filteredMinima, + filteredMaxima, + filteredSums, + filteredValueCounts, + filteredZeroThresholds, + filteredEncodedHistograms + ); + } + } + return new ExponentialHistogramArrayBlock( + filteredMinima, + filteredMaxima, + filteredSums, + filteredValueCounts, + filteredZeroThresholds, + filteredEncodedHistograms + ); + } + + @Override + public Block keepMask(BooleanVector mask) { + DoubleBlock filteredMinima = null; + DoubleBlock filteredMaxima = null; + DoubleBlock filteredSums = null; + LongBlock filteredValueCounts = null; + DoubleBlock filteredZeroThresholds = null; + BytesRefBlock filteredEncodedHistograms = null; + boolean success = false; + try { + filteredMinima = minima.keepMask(mask); + filteredMaxima = maxima.keepMask(mask); + filteredSums = sums.keepMask(mask); + filteredValueCounts = valueCounts.keepMask(mask); + filteredZeroThresholds = zeroThresholds.keepMask(mask); + filteredEncodedHistograms = encodedHistograms.keepMask(mask); + success = true; + } finally { + if (success == false) { + Releasables.close( + filteredMinima, + filteredMaxima, + filteredSums, + filteredValueCounts, + filteredZeroThresholds, + filteredEncodedHistograms + ); + } + } + return new ExponentialHistogramArrayBlock( + filteredMinima, + filteredMaxima, + filteredSums, + filteredValueCounts, + filteredZeroThresholds, + filteredEncodedHistograms + ); + } + + @Override + public ReleasableIterator lookup(IntBlock positions, ByteSizeValue targetBlockSize) { + throw new UnsupportedOperationException("can't lookup values from ExponentialHistogramArrayBlock"); + } + + @Override + public MvOrdering mvOrdering() { + return MvOrdering.UNORDERED; + } + + @Override + public Block expand() { + // we don't support multivalues so expanding is a no-op + this.incRef(); + return this; + } + + @Override + public ExponentialHistogramArrayBlock deepCopy(BlockFactory blockFactory) { + DoubleBlock copiedMinima = null; + DoubleBlock copiedMaxima = null; + DoubleBlock copiedSums = null; + LongBlock copiedValueCounts = null; + DoubleBlock copiedZeroThresholds = null; + BytesRefBlock copiedEncodedHistograms = null; + boolean success = false; + try { + copiedMinima = minima.deepCopy(blockFactory); + copiedMaxima = maxima.deepCopy(blockFactory); + copiedSums = sums.deepCopy(blockFactory); + copiedValueCounts = valueCounts.deepCopy(blockFactory); + copiedZeroThresholds = zeroThresholds.deepCopy(blockFactory); + copiedEncodedHistograms = encodedHistograms.deepCopy(blockFactory); + success = true; + } finally { + if (success == false) { + Releasables.close(copiedMinima, copiedMaxima, copiedSums, copiedValueCounts, copiedZeroThresholds, copiedEncodedHistograms); + } + } + return new ExponentialHistogramArrayBlock( + copiedMinima, + copiedMaxima, + copiedSums, + copiedValueCounts, + copiedZeroThresholds, + copiedEncodedHistograms + ); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + minima.writeTo(out); + maxima.writeTo(out); + sums.writeTo(out); + valueCounts.writeTo(out); + zeroThresholds.writeTo(out); + encodedHistograms.writeTo(out); + } + + public static ExponentialHistogramArrayBlock readFrom(BlockStreamInput in) throws IOException { + DoubleBlock minima = null; + DoubleBlock maxima = null; + DoubleBlock sums = null; + LongBlock valueCounts = null; + DoubleBlock zeroThresholds = null; + BytesRefBlock encodedHistograms = null; + + boolean success = false; + try { + minima = DoubleBlock.readFrom(in); + maxima = DoubleBlock.readFrom(in); + sums = DoubleBlock.readFrom(in); + valueCounts = LongBlock.readFrom(in); + zeroThresholds = DoubleBlock.readFrom(in); + encodedHistograms = BytesRefBlock.readFrom(in); + success = true; + } finally { + if (success == false) { + Releasables.close(minima, maxima, sums, valueCounts, zeroThresholds, encodedHistograms); + } + } + return new ExponentialHistogramArrayBlock(minima, maxima, sums, valueCounts, zeroThresholds, encodedHistograms); + } + + @Override + public long ramBytesUsed() { + long bytes = 0; + for (Block b : getSubBlocks()) { + bytes += b.ramBytesUsed(); + } + return bytes; + } + + void copyInto( + DoubleBlock.Builder minimaBuilder, + DoubleBlock.Builder maximaBuilder, + DoubleBlock.Builder sumsBuilder, + LongBlock.Builder valueCountsBuilder, + DoubleBlock.Builder zeroThresholdsBuilder, + BytesRefBlock.Builder encodedHistogramsBuilder, + int beginInclusive, + int endExclusive + ) { + minimaBuilder.copyFrom(minima, beginInclusive, endExclusive); + maximaBuilder.copyFrom(maxima, beginInclusive, endExclusive); + sumsBuilder.copyFrom(sums, beginInclusive, endExclusive); + valueCountsBuilder.copyFrom(valueCounts, beginInclusive, endExclusive); + zeroThresholdsBuilder.copyFrom(zeroThresholds, beginInclusive, endExclusive); + encodedHistogramsBuilder.copyFrom(encodedHistograms, beginInclusive, endExclusive); + } + + @Override + public boolean equals(Object o) { + if (o instanceof ExponentialHistogramBlock block) { + return ExponentialHistogramBlock.equals(this, block); + } + return false; + } + + boolean equalsAfterTypeCheck(ExponentialHistogramArrayBlock that) { + return minima.equals(that.minima) + && maxima.equals(that.maxima) + && sums.equals(that.sums) + && valueCounts.equals(that.valueCounts) + && zeroThresholds.equals(that.zeroThresholds) + && encodedHistograms.equals(that.encodedHistograms); + } + + @Override + public int hashCode() { + // for now we use just the hash of encodedHistograms + // this ensures proper equality with null blocks and should be unique enough for practical purposes + return encodedHistograms.hashCode(); + } +} diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ExponentialHistogramBlock.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ExponentialHistogramBlock.java new file mode 100644 index 0000000000000..d6da0ad7176e8 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ExponentialHistogramBlock.java @@ -0,0 +1,33 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.data; + +import org.elasticsearch.exponentialhistogram.ExponentialHistogram; + +/** + * A block that holds {@link ExponentialHistogram} values. + * Position access is done through {@link ExponentialHistogramBlockAccessor}. + */ +public sealed interface ExponentialHistogramBlock extends Block permits ConstantNullBlock, ExponentialHistogramArrayBlock { + + static boolean equals(ExponentialHistogramBlock blockA, ExponentialHistogramBlock blockB) { + if (blockA == blockB) { + return true; + } + return switch (blockA) { + case null -> false; + case ConstantNullBlock a -> a.equals(blockB); + case ExponentialHistogramArrayBlock a -> switch (blockB) { + case null -> false; + case ConstantNullBlock b -> b.equals(a); + case ExponentialHistogramArrayBlock b -> a.equalsAfterTypeCheck(b); + }; + }; + } + +} diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ExponentialHistogramBlockAccessor.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ExponentialHistogramBlockAccessor.java new file mode 100644 index 0000000000000..bf1fa242f2e96 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ExponentialHistogramBlockAccessor.java @@ -0,0 +1,48 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.data; + +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.exponentialhistogram.CompressedExponentialHistogram; +import org.elasticsearch.exponentialhistogram.ExponentialHistogram; + +/** + * Provides access to the values stored in an {@link ExponentialHistogramBlock} as {@link ExponentialHistogram}s. + */ +public class ExponentialHistogramBlockAccessor { + + private final ExponentialHistogramBlock block; + private BytesRef tempBytesRef; + private CompressedExponentialHistogram reusedHistogram; + + public ExponentialHistogramBlockAccessor(ExponentialHistogramBlock block) { + this.block = block; + } + + /** + * Returns the {@link ExponentialHistogram} at the given value index. + * The return value of this method is reused across invocations, so callers should + * not retain a reference to it. + * In addition, the returned histogram must not be used after the block is released. + * + * @param valueIndex, should be obtained via {@link ExponentialHistogramBlock#getFirstValueIndex(int)}. + * @return null if the the value stored in the block + */ + public ExponentialHistogram get(int valueIndex) { + assert block.isNull(valueIndex) == false; + assert block.isReleased() == false; + ExponentialHistogramArrayBlock arrayBlock = (ExponentialHistogramArrayBlock) block; + if (reusedHistogram == null) { + tempBytesRef = new BytesRef(); + reusedHistogram = new CompressedExponentialHistogram(); + } + arrayBlock.loadValue(valueIndex, reusedHistogram, tempBytesRef); + return reusedHistogram; + } + +} diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ExponentialHistogramBlockBuilder.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ExponentialHistogramBlockBuilder.java new file mode 100644 index 0000000000000..5b7c0ceeceb4c --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ExponentialHistogramBlockBuilder.java @@ -0,0 +1,188 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.data; + +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.core.Releasables; +import org.elasticsearch.exponentialhistogram.CompressedExponentialHistogram; +import org.elasticsearch.exponentialhistogram.ExponentialHistogram; +import org.elasticsearch.exponentialhistogram.ZeroBucket; + +import java.io.IOException; + +public class ExponentialHistogramBlockBuilder implements Block.Builder { + + private final DoubleBlock.Builder minimaBuilder; + private final DoubleBlock.Builder maximaBuilder; + private final DoubleBlock.Builder sumsBuilder; + private final LongBlock.Builder valueCountsBuilder; + private final DoubleBlock.Builder zeroThresholdsBuilder; + private final BytesRefBlock.Builder encodedHistogramsBuilder; + + ExponentialHistogramBlockBuilder(int estimatedSize, BlockFactory blockFactory) { + DoubleBlock.Builder minimaBuilder = null; + DoubleBlock.Builder maximaBuilder = null; + DoubleBlock.Builder sumsBuilder = null; + LongBlock.Builder valueCountsBuilder = null; + DoubleBlock.Builder zeroThresholdsBuilder = null; + BytesRefBlock.Builder encodedHistogramsBuilder = null; + boolean success = false; + try { + minimaBuilder = blockFactory.newDoubleBlockBuilder(estimatedSize); + maximaBuilder = blockFactory.newDoubleBlockBuilder(estimatedSize); + sumsBuilder = blockFactory.newDoubleBlockBuilder(estimatedSize); + valueCountsBuilder = blockFactory.newLongBlockBuilder(estimatedSize); + zeroThresholdsBuilder = blockFactory.newDoubleBlockBuilder(estimatedSize); + encodedHistogramsBuilder = blockFactory.newBytesRefBlockBuilder(estimatedSize); + this.minimaBuilder = minimaBuilder; + this.maximaBuilder = maximaBuilder; + this.sumsBuilder = sumsBuilder; + this.valueCountsBuilder = valueCountsBuilder; + this.zeroThresholdsBuilder = zeroThresholdsBuilder; + this.encodedHistogramsBuilder = encodedHistogramsBuilder; + success = true; + } finally { + if (success == false) { + Releasables.close( + minimaBuilder, + maximaBuilder, + sumsBuilder, + valueCountsBuilder, + zeroThresholdsBuilder, + encodedHistogramsBuilder + ); + } + } + } + + public ExponentialHistogramBlockBuilder append(ExponentialHistogram histogram) { + assert histogram != null; + // TODO: fix performance and correctness before using in production code + // The current implementation encodes the histogram into the format we use for storage on disk + // This format is optimized for minimal memory usage at the cost of encoding speed + // In addition, it only support storing the zero threshold as a double value, which is lossy when merging histograms + // We should add a dedicated encoding when building a block from computed histograms which do not originate from doc values + // That encoding should be optimized for speed and support storing the zero threshold as (scale, index) pair + ZeroBucket zeroBucket = histogram.zeroBucket(); + assert zeroBucket.compareZeroThreshold(ZeroBucket.minimalEmpty()) == 0 || zeroBucket.isIndexBased() == false + : "Current encoding only supports double-based zero thresholds"; + + BytesStreamOutput encodedBytes = new BytesStreamOutput(); + try { + CompressedExponentialHistogram.writeHistogramBytes( + encodedBytes, + histogram.scale(), + histogram.negativeBuckets().iterator(), + histogram.positiveBuckets().iterator() + ); + } catch (IOException e) { + throw new RuntimeException("Failed to encode histogram", e); + } + if (Double.isNaN(histogram.min())) { + minimaBuilder.appendNull(); + } else { + minimaBuilder.appendDouble(histogram.min()); + } + if (Double.isNaN(histogram.max())) { + maximaBuilder.appendNull(); + } else { + maximaBuilder.appendDouble(histogram.max()); + } + sumsBuilder.appendDouble(histogram.sum()); + valueCountsBuilder.appendLong(histogram.valueCount()); + zeroThresholdsBuilder.appendDouble(zeroBucket.zeroThreshold()); + encodedHistogramsBuilder.appendBytesRef(encodedBytes.bytes().toBytesRef()); + return this; + } + + @Override + public ExponentialHistogramBlock build() { + DoubleBlock minima = null; + DoubleBlock maxima = null; + DoubleBlock sums = null; + LongBlock valueCounts = null; + DoubleBlock zeroThresholds = null; + BytesRefBlock encodedHistograms = null; + boolean success = false; + try { + minima = minimaBuilder.build(); + maxima = maximaBuilder.build(); + sums = sumsBuilder.build(); + valueCounts = valueCountsBuilder.build(); + zeroThresholds = zeroThresholdsBuilder.build(); + encodedHistograms = encodedHistogramsBuilder.build(); + success = true; + return new ExponentialHistogramArrayBlock(minima, maxima, sums, valueCounts, zeroThresholds, encodedHistograms); + } finally { + if (success == false) { + Releasables.close(minima, maxima, sums, valueCounts, zeroThresholds, encodedHistograms); + } + } + } + + @Override + public ExponentialHistogramBlockBuilder appendNull() { + minimaBuilder.appendNull(); + maximaBuilder.appendNull(); + sumsBuilder.appendNull(); + valueCountsBuilder.appendNull(); + zeroThresholdsBuilder.appendNull(); + encodedHistogramsBuilder.appendNull(); + return this; + } + + @Override + public ExponentialHistogramBlockBuilder beginPositionEntry() { + throw new UnsupportedOperationException("ExponentialHistogramBlock does not support multi-values"); + } + + @Override + public ExponentialHistogramBlockBuilder endPositionEntry() { + throw new UnsupportedOperationException("ExponentialHistogramBlock does not support multi-values"); + } + + @Override + public ExponentialHistogramBlockBuilder copyFrom(Block block, int beginInclusive, int endExclusive) { + if (block.areAllValuesNull()) { + for (int i = beginInclusive; i < endExclusive; i++) { + appendNull(); + } + } else { + ExponentialHistogramArrayBlock histoBlock = (ExponentialHistogramArrayBlock) block; + histoBlock.copyInto( + minimaBuilder, + maximaBuilder, + sumsBuilder, + valueCountsBuilder, + zeroThresholdsBuilder, + encodedHistogramsBuilder, + beginInclusive, + endExclusive + ); + } + return this; + } + + @Override + public ExponentialHistogramBlockBuilder mvOrdering(Block.MvOrdering mvOrdering) { + assert mvOrdering == Block.MvOrdering.UNORDERED + : "Exponential histograms don't have a natural order, so it doesn't make sense to call this"; + return this; + } + + @Override + public long estimatedBytes() { + return minimaBuilder.estimatedBytes() + maximaBuilder.estimatedBytes() + sumsBuilder.estimatedBytes() + valueCountsBuilder + .estimatedBytes() + zeroThresholdsBuilder.estimatedBytes() + encodedHistogramsBuilder.estimatedBytes(); + } + + @Override + public void close() { + Releasables.close(minimaBuilder, maximaBuilder, sumsBuilder, valueCountsBuilder, zeroThresholdsBuilder, encodedHistogramsBuilder); + } +} diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/QueryList.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/QueryList.java index f300a400e60af..c42946ed71777 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/QueryList.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/QueryList.java @@ -188,6 +188,7 @@ public static IntFunction createBlockValueReader(Block block) { case DOC -> throw new IllegalArgumentException("can't read values from [doc] block"); case COMPOSITE -> throw new IllegalArgumentException("can't read values from [composite] block"); case AGGREGATE_METRIC_DOUBLE -> throw new IllegalArgumentException("can't read values from [aggregate metric double] block"); + case EXPONENTIAL_HISTOGRAM -> throw new IllegalArgumentException("can't read values from [exponential histogram] block"); case UNKNOWN -> throw new IllegalArgumentException("can't read values from [" + block + "]"); }; } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockBuilderCopyFromTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockBuilderCopyFromTests.java index c97e3bf5fc223..752250bce5643 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockBuilderCopyFromTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockBuilderCopyFromTests.java @@ -27,7 +27,11 @@ public class BlockBuilderCopyFromTests extends ESTestCase { public static List params() { List params = new ArrayList<>(); for (ElementType e : ElementType.values()) { - if (e == ElementType.UNKNOWN || e == ElementType.NULL || e == ElementType.DOC || e == ElementType.COMPOSITE) { + if (e == ElementType.UNKNOWN + || e == ElementType.NULL + || e == ElementType.DOC + || e == ElementType.COMPOSITE + || e == ElementType.EXPONENTIAL_HISTOGRAM) { continue; } for (boolean nullAllowed : new boolean[] { false, true }) { diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockBuilderTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockBuilderTests.java index 60c2d7dc0276c..45f452d7ca188 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockBuilderTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockBuilderTests.java @@ -41,6 +41,13 @@ public static List params() { return params; } + private static boolean supportsVectors(ElementType type) { + return switch (type) { + case AGGREGATE_METRIC_DOUBLE, EXPONENTIAL_HISTOGRAM -> false; + default -> true; + }; + } + private final ElementType elementType; BlockFactory blockFactory = BlockFactoryTests.blockFactory(ByteSizeValue.ofGb(1)); @@ -183,8 +190,9 @@ public void testCrankyConstantBlock() { RandomBlock random = RandomBlock.randomBlock(elementType, 1, false, 1, 1, 0, 0); builder.copyFrom(random.block(), 0, random.block().getPositionCount()); try (Block built = builder.build()) { - if (built instanceof AggregateMetricDoubleArrayBlock == false) { - assertThat(built.asVector().isConstant(), is(true)); + Vector vector = built.asVector(); + if (supportsVectors(elementType)) { + assertThat(vector.isConstant(), is(true)); } assertThat(built, equalTo(random.block())); } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockMultiValuedTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockMultiValuedTests.java index 876f6752d1190..a01ce9664110f 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockMultiValuedTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockMultiValuedTests.java @@ -49,6 +49,7 @@ public static List params() { || e == ElementType.NULL || e == ElementType.DOC || e == ElementType.COMPOSITE + || e == ElementType.EXPONENTIAL_HISTOGRAM // TODO(b/133393): Enable tests once the block supports lookup || e == ElementType.AGGREGATE_METRIC_DOUBLE) { continue; } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/ExponentialHistogramBlockEqualityTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/ExponentialHistogramBlockEqualityTests.java new file mode 100644 index 0000000000000..92f72ee32a22d --- /dev/null +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/ExponentialHistogramBlockEqualityTests.java @@ -0,0 +1,93 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.data; + +import org.elasticsearch.compute.test.ComputeTestCase; +import org.elasticsearch.compute.test.RandomBlock; +import org.elasticsearch.core.Releasables; +import org.elasticsearch.exponentialhistogram.ExponentialHistogram; +import org.elasticsearch.exponentialhistogram.ExponentialHistogramCircuitBreaker; + +import java.util.List; + +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.Matchers.equalTo; + +public class ExponentialHistogramBlockEqualityTests extends ComputeTestCase { + + public void testEmptyBlock() { + List blocks = List.of( + blockFactory().newConstantNullBlock(0), + blockFactory().newExponentialHistogramBlockBuilder(0).build(), + filterAndRelease(blockFactory().newExponentialHistogramBlockBuilder(0).appendNull().build()), + filterAndRelease(blockFactory().newExponentialHistogramBlockBuilder(0).append(ExponentialHistogram.empty()).build()) + ); + for (Block a : blocks) { + for (Block b : blocks) { + assertThat(a, equalTo(b)); + assertThat(a.hashCode(), equalTo(b.hashCode())); + } + } + Releasables.close(blocks); + } + + public void testNullValuesEquality() { + List blocks = List.of( + blockFactory().newConstantNullBlock(2), + blockFactory().newExponentialHistogramBlockBuilder(0).appendNull().appendNull().build() + ); + for (Block a : blocks) { + for (Block b : blocks) { + assertThat(a, equalTo(b)); + assertThat(a.hashCode(), equalTo(b.hashCode())); + } + } + Releasables.close(blocks); + } + + public void testFilteredBlockEquality() { + ExponentialHistogram histo1 = ExponentialHistogram.create(4, ExponentialHistogramCircuitBreaker.noop(), 1, 2, 3, 4, 5); + ExponentialHistogram histo2 = ExponentialHistogram.empty(); + Block block1 = blockFactory().newExponentialHistogramBlockBuilder(0).append(histo1).append(histo1).append(histo2).build(); + + Block block2 = blockFactory().newExponentialHistogramBlockBuilder(0).append(histo1).append(histo2).append(histo2).build(); + + Block block1Filtered = block1.filter(1, 2); + Block block2Filtered = block2.filter(0, 1); + + assertThat(block1, not(equalTo(block2))); + assertThat(block1, not(equalTo(block1Filtered))); + assertThat(block1, not(equalTo(block2Filtered))); + assertThat(block2, not(equalTo(block1))); + assertThat(block2, not(equalTo(block1Filtered))); + assertThat(block2, not(equalTo(block2Filtered))); + + assertThat(block1Filtered, equalTo(block2Filtered)); + assertThat(block1Filtered.hashCode(), equalTo(block2Filtered.hashCode())); + + Releasables.close(block1, block2, block1Filtered, block2Filtered); + } + + public void testRandomBlockEquality() { + int positionCount = randomIntBetween(0, 10_000); + Block expHistoBlock = RandomBlock.randomBlock(blockFactory(), ElementType.EXPONENTIAL_HISTOGRAM, positionCount, true, 1, 10, 0, 10) + .block(); + Block copy = BlockUtils.deepCopyOf(expHistoBlock, blockFactory()); + + assertThat(expHistoBlock, equalTo(copy)); + assertThat(expHistoBlock.hashCode(), equalTo(copy.hashCode())); + + Releasables.close(expHistoBlock, copy); + } + + private static Block filterAndRelease(Block toFilterAndRelease) { + Block filtered = toFilterAndRelease.filter(); + toFilterAndRelease.close(); + return filtered; + } +} diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/VectorBuilderTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/VectorBuilderTests.java index 5cb72177be6f7..91ec105f20091 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/VectorBuilderTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/VectorBuilderTests.java @@ -34,7 +34,8 @@ public static List params() { || e == ElementType.NULL || e == ElementType.DOC || e == ElementType.COMPOSITE - || e == ElementType.AGGREGATE_METRIC_DOUBLE) { + || e == ElementType.AGGREGATE_METRIC_DOUBLE + || e == ElementType.EXPONENTIAL_HISTOGRAM) { continue; } params.add(new Object[] { e }); @@ -118,7 +119,7 @@ public void testCranky() { private Vector.Builder vectorBuilder(int estimatedSize, BlockFactory blockFactory) { return switch (elementType) { - case NULL, DOC, COMPOSITE, AGGREGATE_METRIC_DOUBLE, UNKNOWN -> throw new UnsupportedOperationException(); + case NULL, DOC, COMPOSITE, AGGREGATE_METRIC_DOUBLE, EXPONENTIAL_HISTOGRAM, UNKNOWN -> throw new UnsupportedOperationException(); case BOOLEAN -> blockFactory.newBooleanVectorBuilder(estimatedSize); case BYTES_REF -> blockFactory.newBytesRefVectorBuilder(estimatedSize); case FLOAT -> blockFactory.newFloatVectorBuilder(estimatedSize); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/VectorFixedBuilderTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/VectorFixedBuilderTests.java index 0dbc177350f0b..b72ee46c4b828 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/VectorFixedBuilderTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/VectorFixedBuilderTests.java @@ -34,7 +34,8 @@ public static List params() { || elementType == ElementType.NULL || elementType == ElementType.DOC || elementType == ElementType.BYTES_REF - || elementType == ElementType.AGGREGATE_METRIC_DOUBLE) { + || elementType == ElementType.AGGREGATE_METRIC_DOUBLE + || elementType == ElementType.EXPONENTIAL_HISTOGRAM) { continue; } params.add(new Object[] { elementType }); @@ -118,7 +119,8 @@ public void testCranky() { private Vector.Builder vectorBuilder(int size, BlockFactory blockFactory) { return switch (elementType) { - case NULL, BYTES_REF, DOC, COMPOSITE, AGGREGATE_METRIC_DOUBLE, UNKNOWN -> throw new UnsupportedOperationException(); + case NULL, BYTES_REF, DOC, COMPOSITE, AGGREGATE_METRIC_DOUBLE, EXPONENTIAL_HISTOGRAM, UNKNOWN -> + throw new UnsupportedOperationException(); case BOOLEAN -> blockFactory.newBooleanVectorFixedBuilder(size); case DOUBLE -> blockFactory.newDoubleVectorFixedBuilder(size); case FLOAT -> blockFactory.newFloatVectorFixedBuilder(size); @@ -129,7 +131,7 @@ private Vector.Builder vectorBuilder(int size, BlockFactory blockFactory) { private void fill(Vector.Builder builder, Vector from) { switch (elementType) { - case NULL, DOC, COMPOSITE, AGGREGATE_METRIC_DOUBLE, UNKNOWN -> throw new UnsupportedOperationException(); + case NULL, DOC, COMPOSITE, AGGREGATE_METRIC_DOUBLE, EXPONENTIAL_HISTOGRAM, UNKNOWN -> throw new UnsupportedOperationException(); case BOOLEAN -> { for (int p = 0; p < from.getPositionCount(); p++) { ((BooleanVector.FixedBuilder) builder).appendBoolean(((BooleanVector) from).getBoolean(p)); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/mvdedupe/MultivalueDedupeTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/mvdedupe/MultivalueDedupeTests.java index 4caa6415f0cfe..73efcec997eef 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/mvdedupe/MultivalueDedupeTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/mvdedupe/MultivalueDedupeTests.java @@ -64,7 +64,8 @@ public static List supportedTypes() { ElementType.DOC, ElementType.COMPOSITE, ElementType.FLOAT, - ElementType.AGGREGATE_METRIC_DOUBLE + ElementType.AGGREGATE_METRIC_DOUBLE, + ElementType.EXPONENTIAL_HISTOGRAM )) { continue; } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/ExtractorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/ExtractorTests.java index f0dfd62978f4d..022055a72bcbb 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/ExtractorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/ExtractorTests.java @@ -39,11 +39,14 @@ public static Iterable parameters() { BlockFactory blockFactory = TestBlockFactory.getNonBreakingInstance(); List cases = new ArrayList<>(); for (ElementType e : ElementType.values()) { + boolean supportsNull = true; switch (e) { case UNKNOWN -> { + supportsNull = false; } - case COMPOSITE -> { + case COMPOSITE, EXPONENTIAL_HISTOGRAM -> { // TODO: add later + supportsNull = false; } case AGGREGATE_METRIC_DOUBLE -> { cases.add( @@ -64,6 +67,7 @@ public static Iterable parameters() { ); } case FLOAT -> { + supportsNull = false; } case BYTES_REF -> { cases.add(valueTestCase("single alpha", e, TopNEncoder.UTF8, () -> randomAlphaOfLength(5))); @@ -107,22 +111,25 @@ public static Iterable parameters() { ) ); } - case DOC -> cases.add( - new Object[] { - new TestCase( - "doc", - e, - new DocVectorEncoder(AlwaysReferencedIndexedByShardId.INSTANCE), - () -> new DocVector( - AlwaysReferencedIndexedByShardId.INSTANCE, - // Shard ID should be small and non-negative. - blockFactory.newConstantIntBlockWith(randomIntBetween(0, 255), 1).asVector(), - blockFactory.newConstantIntBlockWith(randomInt(), 1).asVector(), - blockFactory.newConstantIntBlockWith(randomInt(), 1).asVector(), - randomBoolean() ? null : randomBoolean() - ).asBlock() - ) } - ); + case DOC -> { + supportsNull = false; + cases.add( + new Object[] { + new TestCase( + "doc", + e, + new DocVectorEncoder(AlwaysReferencedIndexedByShardId.INSTANCE), + () -> new DocVector( + AlwaysReferencedIndexedByShardId.INSTANCE, + // Shard ID should be small and non-negative. + blockFactory.newConstantIntBlockWith(randomIntBetween(0, 255), 1).asVector(), + blockFactory.newConstantIntBlockWith(randomInt(), 1).asVector(), + blockFactory.newConstantIntBlockWith(randomInt(), 1).asVector(), + randomBoolean() ? null : randomBoolean() + ).asBlock() + ) } + ); + } case NULL -> { } default -> { @@ -137,7 +144,7 @@ public static Iterable parameters() { ); } } - if (e != ElementType.UNKNOWN && e != ElementType.COMPOSITE && e != ElementType.FLOAT && e != ElementType.DOC) { + if (supportsNull) { cases.add(valueTestCase("null " + e, e, TopNEncoder.DEFAULT_UNSORTABLE, () -> null)); } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java index f13233a48b9fd..72020925d0faa 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java @@ -78,6 +78,7 @@ import static org.elasticsearch.compute.data.ElementType.BYTES_REF; import static org.elasticsearch.compute.data.ElementType.COMPOSITE; import static org.elasticsearch.compute.data.ElementType.DOUBLE; +import static org.elasticsearch.compute.data.ElementType.EXPONENTIAL_HISTOGRAM; import static org.elasticsearch.compute.data.ElementType.FLOAT; import static org.elasticsearch.compute.data.ElementType.INT; import static org.elasticsearch.compute.data.ElementType.LONG; @@ -534,7 +535,7 @@ public void testCollectAllValues() { encoders.add(DEFAULT_SORTABLE); for (ElementType e : ElementType.values()) { - if (e == ElementType.UNKNOWN || e == COMPOSITE) { + if (e == ElementType.UNKNOWN || e == COMPOSITE || e == EXPONENTIAL_HISTOGRAM) { continue; } elementTypes.add(e); @@ -605,7 +606,7 @@ public void testCollectAllValues_RandomMultiValues() { for (int type = 0; type < blocksCount; type++) { ElementType e = randomFrom(ElementType.values()); - if (e == ElementType.UNKNOWN || e == COMPOSITE || e == AGGREGATE_METRIC_DOUBLE) { + if (e == ElementType.UNKNOWN || e == COMPOSITE || e == AGGREGATE_METRIC_DOUBLE || e == EXPONENTIAL_HISTOGRAM) { continue; } elementTypes.add(e); @@ -1037,7 +1038,11 @@ public void testRandomMultiValuesTopN() { for (int type = 0; type < blocksCount; type++) { ElementType e = randomValueOtherThanMany( - t -> t == ElementType.UNKNOWN || t == ElementType.DOC || t == COMPOSITE || t == AGGREGATE_METRIC_DOUBLE, + t -> t == ElementType.UNKNOWN + || t == ElementType.DOC + || t == COMPOSITE + || t == AGGREGATE_METRIC_DOUBLE + || t == EXPONENTIAL_HISTOGRAM, () -> randomFrom(ElementType.values()) ); elementTypes.add(e); diff --git a/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/BlockTestUtils.java b/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/BlockTestUtils.java index a4c9e312eeb66..5a16d164707c2 100644 --- a/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/BlockTestUtils.java +++ b/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/BlockTestUtils.java @@ -19,12 +19,18 @@ import org.elasticsearch.compute.data.DocBlock; import org.elasticsearch.compute.data.DoubleBlock; import org.elasticsearch.compute.data.ElementType; +import org.elasticsearch.compute.data.ExponentialHistogramBlock; +import org.elasticsearch.compute.data.ExponentialHistogramBlockAccessor; +import org.elasticsearch.compute.data.ExponentialHistogramBlockBuilder; import org.elasticsearch.compute.data.FloatBlock; import org.elasticsearch.compute.data.IntBlock; import org.elasticsearch.compute.data.LongBlock; import org.elasticsearch.compute.data.OrdinalBytesRefBlock; import org.elasticsearch.compute.data.Page; import org.elasticsearch.core.Releasables; +import org.elasticsearch.exponentialhistogram.ExponentialHistogram; +import org.elasticsearch.exponentialhistogram.ExponentialHistogramCircuitBreaker; +import org.elasticsearch.exponentialhistogram.ReleasableExponentialHistogram; import org.hamcrest.Matcher; import java.util.ArrayList; @@ -32,6 +38,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.IntStream; import static org.elasticsearch.compute.data.BlockUtils.toJavaObject; import static org.elasticsearch.test.ESTestCase.between; @@ -69,6 +76,7 @@ public static Object randomValue(ElementType e) { randomInt(), between(0, Integer.MAX_VALUE) ); + case EXPONENTIAL_HISTOGRAM -> randomExponentialHistogram(); case NULL -> null; case COMPOSITE -> throw new IllegalArgumentException("can't make random values for composite"); case UNKNOWN -> throw new IllegalArgumentException("can't make random values for [" + e + "]"); @@ -216,6 +224,10 @@ public static void append(Block.Builder builder, Object value) { b.appendShard(v.shard()).appendSegment(v.segment()).appendDoc(v.doc()); return; } + if (builder instanceof ExponentialHistogramBlockBuilder b && value instanceof ExponentialHistogram histogram) { + b.append(histogram); + return; + } if (value instanceof List l && l.isEmpty()) { builder.appendNull(); return; @@ -301,6 +313,7 @@ public static List> valuesAtPositions(Block block, int from, int to yield literal; } + case EXPONENTIAL_HISTOGRAM -> new ExponentialHistogramBlockAccessor((ExponentialHistogramBlock) block).get(i); default -> throw new IllegalArgumentException("unsupported element type [" + block.elementType() + "]"); }); } @@ -361,6 +374,29 @@ public static Page convertBytesRefsToOrdinals(Page page) { } } + static ExponentialHistogram randomExponentialHistogram() { + // TODO(b/133393): allow (index,scale) based zero thresholds as soon as we support them in the block + // ideally Replace this with the shared random generation in ExponentialHistogramTestUtils + boolean hasNegativeValues = randomBoolean(); + boolean hasPositiveValues = randomBoolean(); + boolean hasZeroValues = randomBoolean(); + double[] rawValues = IntStream.concat( + IntStream.concat( + hasNegativeValues ? IntStream.range(0, randomIntBetween(1, 1000)).map(i1 -> -1) : IntStream.empty(), + hasPositiveValues ? IntStream.range(0, randomIntBetween(1, 1000)).map(i1 -> 1) : IntStream.empty() + ), + hasZeroValues ? IntStream.range(0, randomIntBetween(1, 100)).map(i1 -> 0) : IntStream.empty() + ).mapToDouble(sign -> sign * (Math.pow(1_000_000, randomDouble()))).toArray(); + + int numBuckets = randomIntBetween(4, 300); + ReleasableExponentialHistogram histo = ExponentialHistogram.create( + numBuckets, + ExponentialHistogramCircuitBreaker.noop(), + rawValues + ); + return histo; + } + private static int dedupe(Map dedupe, BytesRefVector.Builder bytes, BytesRef v) { Integer current = dedupe.get(v); if (current != null) { diff --git a/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/RandomBlock.java b/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/RandomBlock.java index b458f1f20254c..8281a07aa4df2 100644 --- a/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/RandomBlock.java +++ b/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/RandomBlock.java @@ -15,9 +15,11 @@ import org.elasticsearch.compute.data.BytesRefBlock; import org.elasticsearch.compute.data.DoubleBlock; import org.elasticsearch.compute.data.ElementType; +import org.elasticsearch.compute.data.ExponentialHistogramBlockBuilder; import org.elasticsearch.compute.data.FloatBlock; import org.elasticsearch.compute.data.IntBlock; import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.exponentialhistogram.ExponentialHistogram; import org.elasticsearch.geo.GeometryTestUtils; import org.elasticsearch.geo.ShapeTestUtils; import org.elasticsearch.geometry.Point; @@ -87,6 +89,15 @@ public static RandomBlock randomBlock( ) { List> values = new ArrayList<>(); Block.MvOrdering mvOrdering = Block.MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING; + if (elementType == ElementType.EXPONENTIAL_HISTOGRAM) { + // histograms do not support multi-values + // TODO(b/133393) remove this when we support multi-values in exponential histogram blocks + minValuesPerPosition = Math.min(1, minValuesPerPosition); + maxValuesPerPosition = Math.min(1, maxValuesPerPosition); + minDupsPerPosition = 0; + maxDupsPerPosition = 0; + mvOrdering = Block.MvOrdering.UNORDERED; // histograms do not support ordering + } try (var builder = elementType.newBlockBuilder(positionCount, blockFactory)) { boolean bytesRefFromPoints = ESTestCase.randomBoolean(); Supplier pointSupplier = ESTestCase.randomBoolean() ? GeometryTestUtils::randomPoint : ShapeTestUtils::randomPoint; @@ -155,6 +166,12 @@ public static RandomBlock randomBlock( b.count().appendInt(count); valuesAtPosition.add(new AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleLiteral(min, max, sum, count)); } + case EXPONENTIAL_HISTOGRAM -> { + ExponentialHistogramBlockBuilder b = (ExponentialHistogramBlockBuilder) builder; + ExponentialHistogram histogram = BlockTestUtils.randomExponentialHistogram(); + b.append(histogram); + valuesAtPosition.add(histogram); + } default -> throw new IllegalArgumentException("unsupported element type [" + elementType + "]"); } } diff --git a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java index b72ccdb573366..5cb10aa89dddf 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java +++ b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java @@ -766,6 +766,7 @@ public void testSuggestedCast() throws IOException { shouldBeSupported.remove(DataType.DOC_DATA_TYPE); shouldBeSupported.remove(DataType.TSID_DATA_TYPE); shouldBeSupported.remove(DataType.DENSE_VECTOR); + shouldBeSupported.remove(DataType.EXPONENTIAL_HISTOGRAM); // TODO(b/133393): add support when blockloader is implemented if (EsqlCapabilities.Cap.AGGREGATE_METRIC_DOUBLE_V0.isEnabled() == false) { shouldBeSupported.remove(DataType.AGGREGATE_METRIC_DOUBLE); } diff --git a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/AllSupportedFieldsTestCase.java b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/AllSupportedFieldsTestCase.java index 0617f03402730..3455c9814a8a9 100644 --- a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/AllSupportedFieldsTestCase.java +++ b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/AllSupportedFieldsTestCase.java @@ -514,6 +514,8 @@ private static boolean supportedInIndex(DataType t) { UNSUPPORTED, PARTIAL_AGG, // You can't index these - they are just constants. DATE_PERIOD, TIME_DURATION, GEOTILE, GEOHASH, GEOHEX, + // TODO(b/133393): BlockLoader for EXPONENTIAL_HISTOGRAM is not implemented yet + EXPONENTIAL_HISTOGRAM, // TODO fix geo CARTESIAN_POINT, CARTESIAN_SHAPE -> false; default -> true; diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestUtils.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestUtils.java index a6ce1b3b41b2e..a1e1931d83502 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestUtils.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestUtils.java @@ -593,6 +593,7 @@ public static Type asType(ElementType elementType, Type actualType) { case DOC -> throw new IllegalArgumentException("can't assert on doc blocks"); case COMPOSITE -> throw new IllegalArgumentException("can't assert on composite blocks"); case AGGREGATE_METRIC_DOUBLE -> AGGREGATE_METRIC_DOUBLE; + case EXPONENTIAL_HISTOGRAM -> throw new IllegalArgumentException("exponential histogram blocks not supported yet"); case UNKNOWN -> throw new IllegalArgumentException("Unknown block types cannot be handled"); }; } diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java index 39cfe2f28ece7..fe78e40ebb056 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java @@ -44,6 +44,9 @@ import org.elasticsearch.core.PathUtils; import org.elasticsearch.core.SuppressForbidden; import org.elasticsearch.core.Tuple; +import org.elasticsearch.exponentialhistogram.ExponentialHistogram; +import org.elasticsearch.exponentialhistogram.ExponentialHistogramCircuitBreaker; +import org.elasticsearch.exponentialhistogram.ReleasableExponentialHistogram; import org.elasticsearch.geo.GeometryTestUtils; import org.elasticsearch.geo.ShapeTestUtils; import org.elasticsearch.geometry.utils.Geohash; @@ -157,6 +160,7 @@ import java.util.Set; import java.util.TreeMap; import java.util.jar.JarInputStream; +import java.util.stream.IntStream; import java.util.zip.ZipEntry; import static java.util.Collections.emptyList; @@ -992,12 +996,36 @@ public static Literal randomLiteral(DataType type) { } case TSID_DATA_TYPE -> randomTsId().toBytesRef(); case DENSE_VECTOR -> Arrays.asList(randomArray(10, 10, i -> new Float[10], ESTestCase::randomFloat)); + case EXPONENTIAL_HISTOGRAM -> new WriteableExponentialHistogram(EsqlTestUtils.randomExponentialHistogram()); case UNSUPPORTED, OBJECT, DOC_DATA_TYPE, PARTIAL_AGG -> throw new IllegalArgumentException( "can't make random values for [" + type.typeName() + "]" ); }, type); } + private static ExponentialHistogram randomExponentialHistogram() { + // TODO(b/133393): allow (index,scale) based zero thresholds as soon as we support them in the block + // ideally Replace this with the shared random generation in ExponentialHistogramTestUtils + boolean hasNegativeValues = randomBoolean(); + boolean hasPositiveValues = randomBoolean(); + boolean hasZeroValues = randomBoolean(); + double[] rawValues = IntStream.concat( + IntStream.concat( + hasNegativeValues ? IntStream.range(0, randomIntBetween(1, 1000)).map(i1 -> -1) : IntStream.empty(), + hasPositiveValues ? IntStream.range(0, randomIntBetween(1, 1000)).map(i1 -> 1) : IntStream.empty() + ), + hasZeroValues ? IntStream.range(0, randomIntBetween(1, 100)).map(i1 -> 0) : IntStream.empty() + ).mapToDouble(sign -> sign * (Math.pow(1_000_000, randomDouble()))).toArray(); + + int numBuckets = randomIntBetween(4, 300); + ReleasableExponentialHistogram histo = ExponentialHistogram.create( + numBuckets, + ExponentialHistogramCircuitBreaker.noop(), + rawValues + ); + return histo; + } + static Version randomVersion() { // TODO degenerate versions and stuff return switch (between(0, 2)) { diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/WriteableExponentialHistogram.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/WriteableExponentialHistogram.java new file mode 100644 index 0000000000000..bbcdb989e7f8a --- /dev/null +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/WriteableExponentialHistogram.java @@ -0,0 +1,154 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql; + +import org.elasticsearch.TransportVersion; +import org.elasticsearch.common.io.stream.GenericNamedWriteable; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.exponentialhistogram.AbstractExponentialHistogram; +import org.elasticsearch.exponentialhistogram.BucketIterator; +import org.elasticsearch.exponentialhistogram.ExponentialHistogram; +import org.elasticsearch.exponentialhistogram.ExponentialHistogramBuilder; +import org.elasticsearch.exponentialhistogram.ExponentialHistogramCircuitBreaker; +import org.elasticsearch.exponentialhistogram.ZeroBucket; + +import java.io.IOException; + +/** + * A wrapper around ExponentialHistogram to make it writeable to @link StreamOutput} + * so that it can be used e.g. in {@link org.elasticsearch.xpack.esql.core.expression.Literal}s. + * Only intended for testing purposes. + */ +public class WriteableExponentialHistogram extends AbstractExponentialHistogram implements GenericNamedWriteable { + + // TODO(b/133393): as it turns out, this is also required in production. Therefore we have to properly register this class, + // like in https://github.com/elastic/elasticsearch/pull/135054 + + private static final String WRITEABLE_NAME = "test_exponential_histogram"; + + public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry( + GenericNamedWriteable.class, + WRITEABLE_NAME, + WriteableExponentialHistogram::readFrom + ); + + private final ExponentialHistogram delegate; + + WriteableExponentialHistogram(ExponentialHistogram delegate) { + this.delegate = delegate; + } + + @Override + public int scale() { + return delegate.scale(); + } + + @Override + public ZeroBucket zeroBucket() { + return delegate.zeroBucket(); + } + + @Override + public Buckets positiveBuckets() { + return delegate.positiveBuckets(); + } + + @Override + public Buckets negativeBuckets() { + return delegate.negativeBuckets(); + } + + @Override + public double sum() { + return delegate.sum(); + } + + @Override + public long valueCount() { + return delegate.valueCount(); + } + + @Override + public double min() { + return delegate.min(); + } + + @Override + public double max() { + return delegate.max(); + } + + @Override + public long ramBytesUsed() { + return 0; + } + + @Override + public String getWriteableName() { + return WRITEABLE_NAME; + } + + @Override + public boolean supportsVersion(TransportVersion version) { + return true; + } + + @Override + public TransportVersion getMinimalSupportedVersion() { + assert false : "must not be called when overriding supportsVersion"; + throw new UnsupportedOperationException("must not be called when overriding supportsVersion"); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeByte((byte) scale()); + out.writeDouble(sum()); + out.writeDouble(min()); + out.writeDouble(max()); + out.writeDouble(zeroBucket().zeroThreshold()); + out.writeLong(zeroBucket().count()); + writeBuckets(out, negativeBuckets()); + writeBuckets(out, positiveBuckets()); + } + + private static void writeBuckets(StreamOutput out, Buckets buckets) throws IOException { + int count = 0; + BucketIterator iterator = buckets.iterator(); + while (iterator.hasNext()) { + count++; + iterator.advance(); + } + out.writeInt(count); + iterator = buckets.iterator(); + while (iterator.hasNext()) { + out.writeLong(iterator.peekIndex()); + out.writeLong(iterator.peekCount()); + iterator.advance(); + } + } + + private static WriteableExponentialHistogram readFrom(StreamInput in) throws IOException { + byte scale = in.readByte(); + ExponentialHistogramBuilder builder = ExponentialHistogram.builder(scale, ExponentialHistogramCircuitBreaker.noop()); + builder.sum(in.readDouble()); + builder.min(in.readDouble()); + builder.max(in.readDouble()); + builder.zeroBucket(ZeroBucket.create(in.readDouble(), in.readLong())); + int negBucketCount = in.readInt(); + for (int i = 0; i < negBucketCount; i++) { + builder.setNegativeBucket(in.readLong(), in.readLong()); + } + int posBucketCount = in.readInt(); + for (int i = 0; i < posBucketCount; i++) { + builder.setPositiveBucket(in.readLong(), in.readLong()); + } + return new WriteableExponentialHistogram(builder.build()); + } +} diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupJoinTypesIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupJoinTypesIT.java index cd5b9df03c0e4..dc17e8b7e5f81 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupJoinTypesIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupJoinTypesIT.java @@ -59,6 +59,7 @@ import static org.elasticsearch.xpack.esql.core.type.DataType.DENSE_VECTOR; import static org.elasticsearch.xpack.esql.core.type.DataType.DOC_DATA_TYPE; import static org.elasticsearch.xpack.esql.core.type.DataType.DOUBLE; +import static org.elasticsearch.xpack.esql.core.type.DataType.EXPONENTIAL_HISTOGRAM; import static org.elasticsearch.xpack.esql.core.type.DataType.FLOAT; import static org.elasticsearch.xpack.esql.core.type.DataType.GEOHASH; import static org.elasticsearch.xpack.esql.core.type.DataType.GEOHEX; @@ -246,6 +247,7 @@ public LookupJoinTypesIT(BinaryComparisonOperation operation) { || type == TSID_DATA_TYPE || type == AGGREGATE_METRIC_DOUBLE // need special handling for loads at the moment || type == DENSE_VECTOR // need special handling for loads at the moment + || type == EXPONENTIAL_HISTOGRAM || type == GEOHASH || type == GEOTILE || type == GEOHEX diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java index e637481c7e058..2337febf882ce 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java @@ -19,6 +19,8 @@ import java.util.Locale; import java.util.Set; +import static org.elasticsearch.xpack.esql.core.plugin.EsqlCorePlugin.EXPONENTIAL_HISTOGRAM_FEATURE_FLAG; + /** * A {@link Set} of "capabilities" supported by the {@link RestEsqlQueryAction} * and {@link RestEsqlAsyncQueryAction} APIs. These are exposed over the @@ -1527,6 +1529,11 @@ public enum Cap { */ PACK_DIMENSIONS_IN_TS, + /** + * Support for exponential_histogram type + */ + EXPONENTIAL_HISTOGRAM(EXPONENTIAL_HISTOGRAM_FEATURE_FLAG), + /** * Create new block when filtering OrdinalBytesRefBlock */ diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/PositionToXContent.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/PositionToXContent.java index 2e8a6d2941b4f..350012d138d22 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/PositionToXContent.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/PositionToXContent.java @@ -15,9 +15,13 @@ import org.elasticsearch.compute.data.BooleanBlock; import org.elasticsearch.compute.data.BytesRefBlock; import org.elasticsearch.compute.data.DoubleBlock; +import org.elasticsearch.compute.data.ExponentialHistogramBlock; +import org.elasticsearch.compute.data.ExponentialHistogramBlockAccessor; import org.elasticsearch.compute.data.FloatBlock; import org.elasticsearch.compute.data.IntBlock; import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.exponentialhistogram.ExponentialHistogram; +import org.elasticsearch.exponentialhistogram.ExponentialHistogramXContent; import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper; import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xcontent.XContentBuilder; @@ -168,6 +172,18 @@ protected XContentBuilder valueToXContent(XContentBuilder builder, ToXContent.Pa return builder.value(aggregateMetricDoubleBlockToString((AggregateMetricDoubleBlock) block, valueIndex)); } }; + case EXPONENTIAL_HISTOGRAM -> new PositionToXContent(block) { + + ExponentialHistogramBlockAccessor accessor = new ExponentialHistogramBlockAccessor((ExponentialHistogramBlock) block); + + @Override + protected XContentBuilder valueToXContent(XContentBuilder builder, ToXContent.Params params, int valueIndex) + throws IOException { + ExponentialHistogram histogram = accessor.get(valueIndex); + ExponentialHistogramXContent.serialize(builder, histogram); + return builder; + } + }; case NULL -> new PositionToXContent(block) { @Override protected XContentBuilder valueToXContent(XContentBuilder builder, ToXContent.Params params, int valueIndex) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/ResponseValueUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/ResponseValueUtils.java index e013836fde15f..f3681d7b2357a 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/ResponseValueUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/ResponseValueUtils.java @@ -16,6 +16,7 @@ import org.elasticsearch.compute.data.BooleanBlock; import org.elasticsearch.compute.data.BytesRefBlock; import org.elasticsearch.compute.data.DoubleBlock; +import org.elasticsearch.compute.data.ExponentialHistogramBlock; import org.elasticsearch.compute.data.FloatBlock; import org.elasticsearch.compute.data.IntBlock; import org.elasticsearch.compute.data.LongBlock; @@ -35,6 +36,7 @@ import static org.elasticsearch.xpack.esql.core.util.NumericUtils.unsignedLongAsNumber; import static org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter.aggregateMetricDoubleBlockToString; import static org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter.dateTimeToString; +import static org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter.exponentialHistogramBlockToString; import static org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter.geoGridToString; import static org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter.ipToString; import static org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter.nanoTimeToString; @@ -139,6 +141,7 @@ private static Object valueAt(DataType dataType, Block block, int offset, BytesR ); case GEOHEX, GEOHASH, GEOTILE -> geoGridToString(((LongBlock) block).getLong(offset), dataType); case AGGREGATE_METRIC_DOUBLE -> aggregateMetricDoubleBlockToString((AggregateMetricDoubleBlock) block, offset); + case EXPONENTIAL_HISTOGRAM -> exponentialHistogramBlockToString((ExponentialHistogramBlock) block, offset); case UNSUPPORTED -> (String) null; case SOURCE -> { BytesRef val = ((BytesRefBlock) block).getBytesRef(offset, scratch); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/nulls/Coalesce.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/nulls/Coalesce.java index 642eb27f7fb63..9283bc7b0900e 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/nulls/Coalesce.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/nulls/Coalesce.java @@ -209,9 +209,9 @@ public ExpressionEvaluator.Factory toEvaluator(ToEvaluator toEvaluator) { .toEvaluator(toEvaluator, children()); case NULL -> EvalOperator.CONSTANT_NULL_FACTORY; case UNSUPPORTED, SHORT, BYTE, DATE_PERIOD, OBJECT, DOC_DATA_TYPE, SOURCE, TIME_DURATION, FLOAT, HALF_FLOAT, TSID_DATA_TYPE, - SCALED_FLOAT, PARTIAL_AGG, AGGREGATE_METRIC_DOUBLE, DENSE_VECTOR -> throw new UnsupportedOperationException( - dataType() + " can’t be coalesced" - ); + SCALED_FLOAT, PARTIAL_AGG, AGGREGATE_METRIC_DOUBLE, EXPONENTIAL_HISTOGRAM, DENSE_VECTOR -> + throw new UnsupportedOperationException(dataType() + " can’t be coalesced"); + // TODO(b/133393): Implement coalesce for exponential histograms }; } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/Join.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/Join.java index 6bece6a612392..3410f8be0c027 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/Join.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/Join.java @@ -50,6 +50,7 @@ import static org.elasticsearch.xpack.esql.core.type.DataType.DATE_PERIOD; import static org.elasticsearch.xpack.esql.core.type.DataType.DENSE_VECTOR; import static org.elasticsearch.xpack.esql.core.type.DataType.DOC_DATA_TYPE; +import static org.elasticsearch.xpack.esql.core.type.DataType.EXPONENTIAL_HISTOGRAM; import static org.elasticsearch.xpack.esql.core.type.DataType.GEOHASH; import static org.elasticsearch.xpack.esql.core.type.DataType.GEOHEX; import static org.elasticsearch.xpack.esql.core.type.DataType.GEOTILE; @@ -96,6 +97,7 @@ public class Join extends BinaryPlan implements PostAnalysisVerificationAware, S TSID_DATA_TYPE, PARTIAL_AGG, AGGREGATE_METRIC_DOUBLE, + EXPONENTIAL_HISTOGRAM, DENSE_VECTOR }; private final JoinConfig config; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AggregateMapper.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AggregateMapper.java index c2848c93096a2..8d4e5ab0346dd 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AggregateMapper.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AggregateMapper.java @@ -112,7 +112,7 @@ private static DataType toDataType(ElementType elementType) { case LONG -> DataType.LONG; case DOUBLE -> DataType.DOUBLE; case DOC -> DataType.DOC_DATA_TYPE; - case FLOAT, NULL, COMPOSITE, AGGREGATE_METRIC_DOUBLE, UNKNOWN -> throw new EsqlIllegalArgumentException( + case FLOAT, NULL, COMPOSITE, AGGREGATE_METRIC_DOUBLE, EXPONENTIAL_HISTOGRAM, UNKNOWN -> throw new EsqlIllegalArgumentException( "unsupported agg type: " + elementType ); }; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java index 5c87b1fea06d4..7eadaa979d3c8 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java @@ -487,7 +487,8 @@ private PhysicalOperation planTopN(TopNExec topNExec, LocalExecutionPlannerConte case BOOLEAN, NULL, BYTE, SHORT, INTEGER, LONG, DOUBLE, FLOAT, HALF_FLOAT, DATETIME, DATE_NANOS, DATE_PERIOD, TIME_DURATION, OBJECT, SCALED_FLOAT, UNSIGNED_LONG, TSID_DATA_TYPE -> TopNEncoder.DEFAULT_SORTABLE; case GEO_POINT, CARTESIAN_POINT, GEO_SHAPE, CARTESIAN_SHAPE, COUNTER_LONG, COUNTER_INTEGER, COUNTER_DOUBLE, SOURCE, - AGGREGATE_METRIC_DOUBLE, DENSE_VECTOR, GEOHASH, GEOTILE, GEOHEX -> TopNEncoder.DEFAULT_UNSORTABLE; + AGGREGATE_METRIC_DOUBLE, DENSE_VECTOR, GEOHASH, GEOTILE, GEOHEX, EXPONENTIAL_HISTOGRAM -> + TopNEncoder.DEFAULT_UNSORTABLE; // unsupported fields are encoded as BytesRef, we'll use the same encoder; all values should be null at this point case PARTIAL_AGG, UNSUPPORTED -> TopNEncoder.UNSUPPORTED; }; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java index 1adc65b633366..2ea6c3c8f5ed3 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java @@ -371,6 +371,7 @@ public static ElementType toElementType(DataType dataType, MappedFieldType.Field case GEO_SHAPE, CARTESIAN_SHAPE -> fieldExtractPreference == EXTRACT_SPATIAL_BOUNDS ? ElementType.INT : ElementType.BYTES_REF; case PARTIAL_AGG -> ElementType.COMPOSITE; case AGGREGATE_METRIC_DOUBLE -> ElementType.AGGREGATE_METRIC_DOUBLE; + case EXPONENTIAL_HISTOGRAM -> ElementType.EXPONENTIAL_HISTOGRAM; case DENSE_VECTOR -> ElementType.FLOAT; case SHORT, BYTE, DATE_PERIOD, TIME_DURATION, OBJECT, FLOAT, HALF_FLOAT, SCALED_FLOAT -> throw EsqlIllegalArgumentException .illegalDataType(dataType); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/type/EsqlDataTypeConverter.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/type/EsqlDataTypeConverter.java index 98b89d1983958..4437cf05c41d0 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/type/EsqlDataTypeConverter.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/type/EsqlDataTypeConverter.java @@ -20,8 +20,12 @@ import org.elasticsearch.compute.data.AggregateMetricDoubleBlockBuilder; import org.elasticsearch.compute.data.AggregateMetricDoubleBlockBuilder.Metric; import org.elasticsearch.compute.data.DoubleBlock; +import org.elasticsearch.compute.data.ExponentialHistogramBlock; +import org.elasticsearch.compute.data.ExponentialHistogramBlockAccessor; import org.elasticsearch.compute.data.IntBlock; import org.elasticsearch.core.Booleans; +import org.elasticsearch.exponentialhistogram.ExponentialHistogram; +import org.elasticsearch.exponentialhistogram.ExponentialHistogramXContent; import org.elasticsearch.geometry.utils.Geohash; import org.elasticsearch.h3.H3; import org.elasticsearch.search.DocValueFormat; @@ -785,6 +789,20 @@ public static String aggregateMetricDoubleBlockToString(AggregateMetricDoubleBlo } } + public static String exponentialHistogramToString(ExponentialHistogram histo) { + try (XContentBuilder builder = JsonXContent.contentBuilder()) { + ExponentialHistogramXContent.serialize(builder, histo); + return Strings.toString(builder); + } catch (IOException e) { + throw new IllegalStateException("error rendering exponential histogram", e); + } + } + + public static String exponentialHistogramBlockToString(ExponentialHistogramBlock histoBlock, int index) { + ExponentialHistogram histo = new ExponentialHistogramBlockAccessor(histoBlock).get(index); + return exponentialHistogramToString(histo); + } + public static String aggregateMetricDoubleLiteralToString(AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleLiteral aggMetric) { try (XContentBuilder builder = JsonXContent.contentBuilder()) { builder.startObject(); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/SerializationTestUtils.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/SerializationTestUtils.java index 9521a74c1db2b..237dc568fc9f3 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/SerializationTestUtils.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/SerializationTestUtils.java @@ -125,6 +125,7 @@ public static NamedWriteableRegistry writableRegistry() { AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleLiteral::new ) ); + entries.add(WriteableExponentialHistogram.ENTRY); return new NamedWriteableRegistry(entries); } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java index 77f73f430868e..edac6b1c5b3eb 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java @@ -25,6 +25,7 @@ import org.elasticsearch.compute.data.BooleanBlock; import org.elasticsearch.compute.data.BytesRefBlock; import org.elasticsearch.compute.data.DoubleBlock; +import org.elasticsearch.compute.data.ExponentialHistogramBlockBuilder; import org.elasticsearch.compute.data.FloatBlock; import org.elasticsearch.compute.data.IntBlock; import org.elasticsearch.compute.data.LongBlock; @@ -38,6 +39,10 @@ import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Releasables; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.core.Types; +import org.elasticsearch.exponentialhistogram.ExponentialHistogram; +import org.elasticsearch.exponentialhistogram.ExponentialHistogramCircuitBreaker; +import org.elasticsearch.exponentialhistogram.ExponentialHistogramXContent; import org.elasticsearch.geo.GeometryTestUtils; import org.elasticsearch.geo.ShapeTestUtils; import org.elasticsearch.geometry.Point; @@ -290,6 +295,17 @@ private Page randomPage(List columns) { BytesRef tsIdValue = (BytesRef) EsqlTestUtils.randomLiteral(DataType.TSID_DATA_TYPE).value(); ((BytesRefBlock.Builder) builder).appendBytesRef(tsIdValue); } + case EXPONENTIAL_HISTOGRAM -> { + ExponentialHistogramBlockBuilder expBuilder = (ExponentialHistogramBlockBuilder) builder; + int valueCount = randomIntBetween(0, 500); + int bucketCount = randomIntBetween(4, Math.max(4, valueCount)); + ExponentialHistogram histo = ExponentialHistogram.create( + bucketCount, + ExponentialHistogramCircuitBreaker.noop(), + randomDoubles(valueCount).toArray() + ); + expBuilder.append(histo); + } // default -> throw new UnsupportedOperationException("unsupported data type [" + c + "]"); } return builder.build(); @@ -1262,6 +1278,16 @@ static Page valuesToPage(BlockFactory blockFactory, List columns byte[] decode = Base64.getUrlDecoder().decode(value.toString()); ((BytesRefBlock.Builder) builder).appendBytesRef(new BytesRef(decode)); } + case EXPONENTIAL_HISTOGRAM -> { + ExponentialHistogramBlockBuilder expHistoBuilder = (ExponentialHistogramBlockBuilder) builder; + Map serializedHisto = Types.forciblyCast(value); + ExponentialHistogram parsed = ExponentialHistogramXContent.parseForTesting(serializedHisto); + if (parsed == null) { + expHistoBuilder.appendNull(); + } else { + expHistoBuilder.append(parsed); + } + } } } } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/conditional/CaseTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/conditional/CaseTests.java index 4059ad709d0f5..f4fa6e7574d51 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/conditional/CaseTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/conditional/CaseTests.java @@ -66,6 +66,7 @@ public class CaseTests extends AbstractScalarFunctionTestCase { if (Build.current().isSnapshot()) { t.addAll( DataType.UNDER_CONSTRUCTION.stream() + .filter(type -> type != DataType.EXPONENTIAL_HISTOGRAM) // TODO(b/133393): implement .filter(type -> type != DataType.AGGREGATE_METRIC_DOUBLE && type != DataType.DENSE_VECTOR) .toList() ); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/physical/AbstractPhysicalPlanSerializationTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/physical/AbstractPhysicalPlanSerializationTests.java index 219b0a3586a6c..e71b4defdc403 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/physical/AbstractPhysicalPlanSerializationTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/physical/AbstractPhysicalPlanSerializationTests.java @@ -11,6 +11,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.compute.data.AggregateMetricDoubleBlockBuilder; import org.elasticsearch.search.SearchModule; +import org.elasticsearch.xpack.esql.WriteableExponentialHistogram; import org.elasticsearch.xpack.esql.core.tree.Node; import org.elasticsearch.xpack.esql.expression.ExpressionWritables; import org.elasticsearch.xpack.esql.expression.predicate.operator.arithmetic.Add; @@ -56,6 +57,7 @@ protected final NamedWriteableRegistry getNamedWriteableRegistry() { entries.add(Add.ENTRY); // Used by the eval tests entries.add(AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleLiteral.ENTRY); entries.add(LookupJoinExec.ENTRY); + entries.add(WriteableExponentialHistogram.ENTRY); return new NamedWriteableRegistry(entries); }