diff --git a/benchmarks/build.gradle b/benchmarks/build.gradle index 96727eb3a2670..9439cca133fd9 100644 --- a/benchmarks/build.gradle +++ b/benchmarks/build.gradle @@ -47,6 +47,7 @@ dependencies { api(project(':x-pack:plugin:core')) api(project(':x-pack:plugin:esql')) api(project(':x-pack:plugin:esql:compute')) + api(project(':x-pack:plugin:mapper-exponential-histogram')) implementation project(path: ':libs:native') implementation project(path: ':libs:simdvec') implementation project(path: ':libs:exponential-histogram') diff --git a/benchmarks/src/main/java/org/elasticsearch/benchmark/exponentialhistogram/ExponentialHistogramMergeBench.java b/benchmarks/src/main/java/org/elasticsearch/benchmark/exponentialhistogram/ExponentialHistogramMergeBench.java index b638ff140bf19..0f7ee6a2c763e 100644 --- a/benchmarks/src/main/java/org/elasticsearch/benchmark/exponentialhistogram/ExponentialHistogramMergeBench.java +++ b/benchmarks/src/main/java/org/elasticsearch/benchmark/exponentialhistogram/ExponentialHistogramMergeBench.java @@ -9,11 +9,15 @@ package org.elasticsearch.benchmark.exponentialhistogram; +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.exponentialhistogram.BucketIterator; import org.elasticsearch.exponentialhistogram.ExponentialHistogram; import org.elasticsearch.exponentialhistogram.ExponentialHistogramCircuitBreaker; import org.elasticsearch.exponentialhistogram.ExponentialHistogramGenerator; import org.elasticsearch.exponentialhistogram.ExponentialHistogramMerger; +import org.elasticsearch.xpack.exponentialhistogram.CompressedExponentialHistogram; +import org.elasticsearch.xpack.exponentialhistogram.IndexWithCount; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.Fork; @@ -27,6 +31,8 @@ import org.openjdk.jmh.annotations.Threads; import org.openjdk.jmh.annotations.Warmup; +import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.concurrent.ThreadLocalRandom; @@ -47,6 +53,9 @@ public class ExponentialHistogramMergeBench { @Param({ "0.01", "0.1", "0.25", "0.5", "1.0", "2.0" }) double mergedHistoSizeFactor; + @Param({ "array-backed", "compressed" }) + String histoImplementation; + Random random; ExponentialHistogramMerger histoMerger; @@ -81,16 +90,54 @@ public void setUp() { bucketIndex += 1 + random.nextInt(bucketCount) % (Math.max(1, bucketCount / dataPointSize)); generator.add(Math.pow(1.001, bucketIndex)); } - toMerge[i] = generator.getAndClear(); - cnt = getBucketCount(toMerge[i]); + ExponentialHistogram histogram = generator.getAndClear(); + cnt = getBucketCount(histogram); if (cnt < dataPointSize) { - throw new IllegalArgumentException("Expected bucket count to be " + dataPointSize + ", but was " + cnt); + throw new IllegalStateException("Expected bucket count to be " + dataPointSize + ", but was " + cnt); + } + + if ("array-backed".equals(histoImplementation)) { + toMerge[i] = histogram; + } else if ("compressed".equals(histoImplementation)) { + toMerge[i] = asCompressedHistogram(histogram); + } else { + throw new IllegalArgumentException("Unknown implementation: " + histoImplementation); } } index = 0; } + private ExponentialHistogram asCompressedHistogram(ExponentialHistogram histogram) { + List negativeBuckets = new ArrayList<>(); + List positiveBuckets = new ArrayList<>(); + + BucketIterator it = histogram.negativeBuckets().iterator(); + while (it.hasNext()) { + negativeBuckets.add(new IndexWithCount(it.peekIndex(), it.peekCount())); + it.advance(); + } + it = histogram.positiveBuckets().iterator(); + while (it.hasNext()) { + positiveBuckets.add(new IndexWithCount(it.peekIndex(), it.peekCount())); + it.advance(); + } + + long totalCount = histogram.zeroBucket().count() + histogram.negativeBuckets().valueCount() + histogram.positiveBuckets() + .valueCount(); + BytesStreamOutput histoBytes = new BytesStreamOutput(); + try { + CompressedExponentialHistogram.writeHistogramBytes(histoBytes, histogram.scale(), negativeBuckets, positiveBuckets); + CompressedExponentialHistogram result = new CompressedExponentialHistogram(); + BytesRef data = histoBytes.bytes().toBytesRef(); + result.reset(histogram.zeroBucket().zeroThreshold(), totalCount, data); + return result; + } catch (IOException e) { + throw new RuntimeException(e); + } + + } + private static int getBucketCount(ExponentialHistogram histo) { int cnt = 0; for (BucketIterator it : List.of(histo.negativeBuckets().iterator(), histo.positiveBuckets().iterator())) { diff --git a/libs/exponential-histogram/src/main/java/org/elasticsearch/exponentialhistogram/ZeroBucket.java b/libs/exponential-histogram/src/main/java/org/elasticsearch/exponentialhistogram/ZeroBucket.java index 1341f283c5487..6a9d24e87c0e1 100644 --- a/libs/exponential-histogram/src/main/java/org/elasticsearch/exponentialhistogram/ZeroBucket.java +++ b/libs/exponential-histogram/src/main/java/org/elasticsearch/exponentialhistogram/ZeroBucket.java @@ -23,6 +23,7 @@ import org.apache.lucene.util.RamUsageEstimator; +import static org.elasticsearch.exponentialhistogram.ExponentialHistogram.MAX_INDEX; import static org.elasticsearch.exponentialhistogram.ExponentialHistogram.MAX_SCALE; import static org.elasticsearch.exponentialhistogram.ExponentialHistogram.MIN_INDEX; import static org.elasticsearch.exponentialhistogram.ExponentialHistogram.MIN_SCALE; @@ -36,15 +37,28 @@ * To allow efficient comparison with bucket boundaries, this class internally * represents the zero threshold as a exponential histogram bucket index with a scale, * computed via {@link ExponentialScaleUtils#computeIndex(double, int)}. - * - * @param index The index used with the scale to determine the zero threshold. - * @param scale The scale used with the index to determine the zero threshold. - * @param count The number of values in the zero bucket. */ -public record ZeroBucket(long index, int scale, long count) { +public final class ZeroBucket { public static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(ZeroBucket.class); + /** + * The exponential histogram scale used for {@link #index} + */ + private final int scale; + + /** + * The exponential histogram bucket index whose upper boundary corresponds to the zero threshold. + * Might be computed lazily from {@link #realThreshold}, uses {@link Long#MAX_VALUE} as placeholder in this case. + */ + private long index; + + /** + * Might be computed lazily from {@link #realThreshold}, uses {@link Double#NaN} as placeholder in this case. + */ + private double realThreshold; + + private final long count; // A singleton for an empty zero bucket with the smallest possible threshold. private static final ZeroBucket MINIMAL_EMPTY = new ZeroBucket(MIN_INDEX, MIN_SCALE, 0); @@ -55,7 +69,27 @@ public record ZeroBucket(long index, int scale, long count) { * @param count The number of values in the bucket. */ public ZeroBucket(double zeroThreshold, long count) { - this(computeIndex(zeroThreshold, MAX_SCALE) + 1, MAX_SCALE, count); + assert zeroThreshold >= 0.0 : "zeroThreshold must not be negative"; + this.index = Long.MAX_VALUE; // compute lazily when needed + this.scale = MAX_SCALE; + this.realThreshold = zeroThreshold; + this.count = count; + } + + private ZeroBucket(long index, int scale, long count) { + assert index >= MIN_INDEX && index <= MAX_INDEX : "index must be in range [" + MIN_INDEX + ", " + MAX_INDEX + "]"; + assert scale >= MIN_SCALE && scale <= MAX_SCALE : "scale must be in range [" + MIN_SCALE + ", " + MAX_SCALE + "]"; + this.index = index; + this.scale = scale; + this.realThreshold = Double.NaN; // compute lazily when needed + this.count = count; + } + + private ZeroBucket(double realThreshold, long index, int scale, long count) { + this.realThreshold = realThreshold; + this.index = index; + this.scale = scale; + this.count = count; } /** @@ -75,8 +109,33 @@ public static ZeroBucket minimalWithCount(long count) { if (count == 0) { return MINIMAL_EMPTY; } else { - return new ZeroBucket(MINIMAL_EMPTY.index, MINIMAL_EMPTY.scale(), count); + return new ZeroBucket(MINIMAL_EMPTY.zeroThreshold(), MINIMAL_EMPTY.index(), MINIMAL_EMPTY.scale(), count); + } + } + + /** + * @return The value of the zero threshold. + */ + public double zeroThreshold() { + if (Double.isNaN(realThreshold)) { + realThreshold = exponentiallyScaledToDoubleValue(index(), scale()); } + return realThreshold; + } + + public long index() { + if (index == Long.MAX_VALUE) { + index = computeIndex(zeroThreshold(), scale()) + 1; + } + return index; + } + + public int scale() { + return scale; + } + + public long count() { + return count; } /** @@ -99,9 +158,9 @@ public ZeroBucket merge(ZeroBucket other) { long totalCount = count + other.count; // Both are populated, so we need to use the higher zero-threshold. if (this.compareZeroThreshold(other) >= 0) { - return new ZeroBucket(index, scale, totalCount); + return new ZeroBucket(realThreshold, index, scale, totalCount); } else { - return new ZeroBucket(other.index, other.scale, totalCount); + return new ZeroBucket(other.realThreshold, other.index, other.scale, totalCount); } } } @@ -133,14 +192,7 @@ public ZeroBucket collapseOverlappingBucketsForAll(BucketIterator... bucketItera * equal to, or greater than the other's. */ public int compareZeroThreshold(ZeroBucket other) { - return compareExponentiallyScaledValues(index, scale, other.index, other.scale); - } - - /** - * @return The value of the zero threshold. - */ - public double zeroThreshold() { - return exponentiallyScaledToDoubleValue(index, scale); + return compareExponentiallyScaledValues(index(), scale(), other.index(), other.scale()); } /** @@ -154,7 +206,7 @@ public ZeroBucket collapseOverlappingBuckets(BucketIterator buckets) { long collapsedCount = 0; long highestCollapsedIndex = 0; - while (buckets.hasNext() && compareExponentiallyScaledValues(buckets.peekIndex(), buckets.scale(), index, scale) < 0) { + while (buckets.hasNext() && compareExponentiallyScaledValues(buckets.peekIndex(), buckets.scale(), index(), scale()) < 0) { highestCollapsedIndex = buckets.peekIndex(); collapsedCount += buckets.peekCount(); buckets.advance(); @@ -165,9 +217,9 @@ public ZeroBucket collapseOverlappingBuckets(BucketIterator buckets) { long newZeroCount = count + collapsedCount; // +1 because we need to adjust the zero threshold to the upper boundary of the collapsed bucket long collapsedUpperBoundIndex = highestCollapsedIndex + 1; - if (compareExponentiallyScaledValues(index, scale, collapsedUpperBoundIndex, buckets.scale()) >= 0) { + if (compareExponentiallyScaledValues(index(), scale(), collapsedUpperBoundIndex, buckets.scale()) >= 0) { // Our current zero-threshold is larger than the upper boundary of the largest collapsed bucket, so we keep it. - return new ZeroBucket(index, scale, newZeroCount); + return new ZeroBucket(realThreshold, index, scale, newZeroCount); } else { return new ZeroBucket(collapsedUpperBoundIndex, buckets.scale(), newZeroCount); } diff --git a/libs/exponential-histogram/src/test/java/org/elasticsearch/exponentialhistogram/ExponentialHistogramMergerTests.java b/libs/exponential-histogram/src/test/java/org/elasticsearch/exponentialhistogram/ExponentialHistogramMergerTests.java index 5113b73653641..98eca3fe94100 100644 --- a/libs/exponential-histogram/src/test/java/org/elasticsearch/exponentialhistogram/ExponentialHistogramMergerTests.java +++ b/libs/exponential-histogram/src/test/java/org/elasticsearch/exponentialhistogram/ExponentialHistogramMergerTests.java @@ -151,7 +151,12 @@ public void testMergeOrderIndependence() { double[] vals = values.stream().mapToDouble(Double::doubleValue).toArray(); try (ReleasableExponentialHistogram shuffled = ExponentialHistogram.create(20, breaker(), vals)) { assertThat("Expected same scale", shuffled.scale(), equalTo(reference.scale())); - assertThat("Expected same zero-bucket", shuffled.zeroBucket(), equalTo(reference.zeroBucket())); + assertThat( + "Expected same threshold for zero-bucket", + shuffled.zeroBucket().zeroThreshold(), + equalTo(reference.zeroBucket().zeroThreshold()) + ); + assertThat("Expected same count for zero-bucket", shuffled.zeroBucket().count(), equalTo(reference.zeroBucket().count())); assertBucketsEqual(shuffled.negativeBuckets(), reference.negativeBuckets()); assertBucketsEqual(shuffled.positiveBuckets(), reference.positiveBuckets()); } diff --git a/libs/exponential-histogram/src/test/java/org/elasticsearch/exponentialhistogram/ZeroBucketTests.java b/libs/exponential-histogram/src/test/java/org/elasticsearch/exponentialhistogram/ZeroBucketTests.java index fdea89d0421c5..1d2cbd57604ab 100644 --- a/libs/exponential-histogram/src/test/java/org/elasticsearch/exponentialhistogram/ZeroBucketTests.java +++ b/libs/exponential-histogram/src/test/java/org/elasticsearch/exponentialhistogram/ZeroBucketTests.java @@ -28,4 +28,33 @@ public class ZeroBucketTests extends ExponentialHistogramTestCase { public void testMinimalBucketHasZeroThreshold() { assertThat(ZeroBucket.minimalWithCount(42).zeroThreshold(), equalTo(0.0)); } + + public void testExactThresholdPreserved() { + ZeroBucket bucket = new ZeroBucket(3.0, 10); + assertThat(bucket.zeroThreshold(), equalTo(3.0)); + } + + public void testMergingPreservesExactThreshold() { + ZeroBucket bucketA = new ZeroBucket(3.0, 10); + ZeroBucket bucketB = new ZeroBucket(3.5, 20); + ZeroBucket merged = bucketA.merge(bucketB); + assertThat(merged.zeroThreshold(), equalTo(3.5)); + assertThat(merged.count(), equalTo(30L)); + } + + public void testBucketCollapsingPreservesExactThreshold() { + FixedCapacityExponentialHistogram histo = createAutoReleasedHistogram(2); + histo.resetBuckets(0); + histo.tryAddBucket(0, 42, true); // bucket [1,2] + + ZeroBucket bucketA = new ZeroBucket(3.0, 10); + + CopyableBucketIterator iterator = histo.positiveBuckets().iterator(); + ZeroBucket merged = bucketA.collapseOverlappingBuckets(iterator); + + assertThat(iterator.hasNext(), equalTo(false)); + assertThat(merged.zeroThreshold(), equalTo(3.0)); + assertThat(merged.count(), equalTo(52L)); + } + } diff --git a/x-pack/plugin/mapper-exponential-histogram/build.gradle b/x-pack/plugin/mapper-exponential-histogram/build.gradle new file mode 100644 index 0000000000000..fc262d693e896 --- /dev/null +++ b/x-pack/plugin/mapper-exponential-histogram/build.gradle @@ -0,0 +1,31 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +apply plugin: 'elasticsearch.internal-es-plugin' +apply plugin: 'elasticsearch.internal-yaml-rest-test' + +esplugin { + name = 'exponential-histogram' + description = 'Module for the exponential_histogram field type' + classname ='org.elasticsearch.xpack.exponentialhistogram.ExponentialHistogramMapperPlugin' + extendedPlugins = ['x-pack-core'] +} +base { + archivesName = 'x-pack-exponential-histogram' +} + +dependencies { + api project(":libs:exponential-histogram") + compileOnly project(path: xpackModule('core')) + yamlRestTestImplementation(testArtifact(project(xpackModule('core')))) +} + +restResources { + restApi { + include '_common', 'indices', 'index', 'get' + } +} diff --git a/x-pack/plugin/mapper-exponential-histogram/src/main/java/org/elasticsearch/xpack/exponentialhistogram/CompressedExponentialHistogram.java b/x-pack/plugin/mapper-exponential-histogram/src/main/java/org/elasticsearch/xpack/exponentialhistogram/CompressedExponentialHistogram.java new file mode 100644 index 0000000000000..8a2e348568bbc --- /dev/null +++ b/x-pack/plugin/mapper-exponential-histogram/src/main/java/org/elasticsearch/xpack/exponentialhistogram/CompressedExponentialHistogram.java @@ -0,0 +1,195 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.exponentialhistogram; + +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.RamUsageEstimator; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.exponentialhistogram.BucketIterator; +import org.elasticsearch.exponentialhistogram.CopyableBucketIterator; +import org.elasticsearch.exponentialhistogram.ExponentialHistogram; +import org.elasticsearch.exponentialhistogram.ZeroBucket; + +import java.io.IOException; +import java.util.List; +import java.util.OptionalLong; + +/** + * Implementation of a {@link ExponentialHistogram} optimized for a minimal memory footprint. + * The compression used here also corresponds to how exponential_histogram fields are stored in + * doc values by {@link ExponentialHistogramFieldMapper}. + *

+ * While this implementation is optimized for a minimal memory footprint, it is still a fully compliant {@link ExponentialHistogram} + * and can therefore be directly consumed for merging / quantile estimation without requiring any prior copying or decoding. + */ +public class CompressedExponentialHistogram implements ExponentialHistogram { + + private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(CompressedExponentialHistogram.class); + + private double zeroThreshold; + private long valueCount; + private ZeroBucket lazyZeroBucket; + + private final EncodedHistogramData encodedData = new EncodedHistogramData(); + private final Buckets positiveBuckets = new Buckets(true); + private final Buckets negativeBuckets = new Buckets(false); + + @Override + public int scale() { + return encodedData.scale(); + } + + @Override + public ZeroBucket zeroBucket() { + if (lazyZeroBucket == null) { + long zeroCount = valueCount - negativeBuckets.valueCount() - positiveBuckets.valueCount(); + lazyZeroBucket = new ZeroBucket(zeroThreshold, zeroCount); + } + return lazyZeroBucket; + } + + @Override + public ExponentialHistogram.Buckets positiveBuckets() { + return positiveBuckets; + } + + @Override + public ExponentialHistogram.Buckets negativeBuckets() { + return negativeBuckets; + } + + /** + * Resets this instance to decode the provided histogram data. + * + * @param zeroThreshold the zeroThreshold for the histogram, which needs to be stored externally + * @param valueCount the total number of values the histogram contains, needs to be stored externally + * @param encodedHistogramData the encoded histogram bytes which previously where generated via + * {@link #writeHistogramBytes(StreamOutput, int, List, List)}. + */ + public void reset(double zeroThreshold, long valueCount, BytesRef encodedHistogramData) throws IOException { + lazyZeroBucket = null; + this.zeroThreshold = zeroThreshold; + this.valueCount = valueCount; + encodedData.decode(encodedHistogramData); + negativeBuckets.resetCachedData(); + positiveBuckets.resetCachedData(); + } + + /** + * Serializes the given histogram, so that exactly the same data can be reconstructed via {@link #reset(double, long, BytesRef)}. + * + * @param output the output to write the serialized bytes to + * @param scale the scale of the histogram + * @param negativeBuckets the negative buckets of the histogram, sorted by the bucket indices + * @param positiveBuckets the positive buckets of the histogram, sorted by the bucket indices + */ + public static void writeHistogramBytes( + StreamOutput output, + int scale, + List negativeBuckets, + List positiveBuckets + ) throws IOException { + EncodedHistogramData.write(output, scale, negativeBuckets, positiveBuckets); + } + + @Override + public long ramBytesUsed() { + return SHALLOW_SIZE + ZeroBucket.SHALLOW_SIZE + 2 * Buckets.SHALLOW_SIZE + EncodedHistogramData.SHALLOW_SIZE; + } + + private final class Buckets implements ExponentialHistogram.Buckets { + + private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOf(Buckets.class); + + private final boolean isForPositiveBuckets; // false if for negative buckets + private long cachedValueCount; + private long cachedMaxIndex; + + private Buckets(boolean isForPositiveBuckets) { + this.isForPositiveBuckets = isForPositiveBuckets; + resetCachedData(); + } + + private void resetCachedData() { + cachedValueCount = -1; + cachedMaxIndex = Long.MIN_VALUE; + } + + private void computeCachedDataIfRequired() { + if (cachedValueCount == -1) { + cachedValueCount = 0; + BucketIterator it = iterator(); + while (it.hasNext()) { + cachedMaxIndex = it.peekIndex(); + cachedValueCount += it.peekCount(); + it.advance(); + } + } + } + + @Override + public CopyableBucketIterator iterator() { + if (isForPositiveBuckets) { + return new CompressedBucketsIterator(encodedData.positiveBucketsDecoder()); + } else { + return new CompressedBucketsIterator(encodedData.negativeBucketsDecoder()); + } + } + + @Override + public OptionalLong maxBucketIndex() { + computeCachedDataIfRequired(); + return cachedValueCount > 0 ? OptionalLong.of(cachedMaxIndex) : OptionalLong.empty(); + } + + @Override + public long valueCount() { + computeCachedDataIfRequired(); + return cachedValueCount; + } + + private class CompressedBucketsIterator implements CopyableBucketIterator { + + private final EncodedHistogramData.BucketsDecoder decoder; + + CompressedBucketsIterator(EncodedHistogramData.BucketsDecoder delegate) { + this.decoder = delegate; + } + + @Override + public CopyableBucketIterator copy() { + return new CompressedBucketsIterator(decoder.copy()); + } + + @Override + public final boolean hasNext() { + return decoder.hasNext(); + } + + @Override + public final long peekCount() { + return decoder.peekCount(); + } + + @Override + public final long peekIndex() { + return decoder.peekIndex(); + } + + @Override + public int scale() { + return CompressedExponentialHistogram.this.scale(); + } + + @Override + public final void advance() { + decoder.advance(); + } + } + } +} diff --git a/x-pack/plugin/mapper-exponential-histogram/src/main/java/org/elasticsearch/xpack/exponentialhistogram/EncodedHistogramData.java b/x-pack/plugin/mapper-exponential-histogram/src/main/java/org/elasticsearch/xpack/exponentialhistogram/EncodedHistogramData.java new file mode 100644 index 0000000000000..43db0c5fc1739 --- /dev/null +++ b/x-pack/plugin/mapper-exponential-histogram/src/main/java/org/elasticsearch/xpack/exponentialhistogram/EncodedHistogramData.java @@ -0,0 +1,239 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.exponentialhistogram; + +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.RamUsageEstimator; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.ByteArrayStreamInput; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; +import java.util.List; + +import static org.elasticsearch.exponentialhistogram.ExponentialHistogram.MAX_SCALE; +import static org.elasticsearch.exponentialhistogram.ExponentialHistogram.MIN_SCALE; + +/** + * Encodes the data of an exponential histogram in a compact format as byte array. + * Note that some data of exponential histograms is stored outside of this array as separate doc values for better compression, + * e.g. the zero threshold. This data is therefore not part of this encoding. + **/ +final class EncodedHistogramData { + + /* + The encoding has the following format: + - 1 byte: scale + flags + scale has a range of less than 64, so we can use the two remaining bits for flags + Currently the only flag is HAS_NEGATIVE_BUCKETS_FLAG, which indicates that there are negative buckets + If this flag is not set, we can skip encoding the length of the negative buckets and therefore save a + bit of space for this typical case. + - VInt: (optional) length of encoded negative buckets in bytes, if HAS_NEGATIVE_BUCKETS_FLAG is set + - byte[]: encoded negative buckets, details on the encoding below + - byte[]: encoded positive buckets, details on the encoding below + There is no end marker for the encoded buckets, therefore the total length of the encoded data needs to be known when decoding + + The following scheme is used to encode the negative and positive buckets: + - if there are no buckets, the result is an empty array (byte[0]) + - write the index of the first bucket as ZigZag-VLong + - write the count of the first bucket as ZigZag-VLong + - for each remaining (non-empty) bucket: + - if there was no empty bucket right before this bucket (the index of the bucket is exactly previousBucketIndex+1), + write the count for the bucket as ZigZag-VLong + - Otherwise there is at least one empty bucket between this one and the previous one. + We compute the number of empty buckets as n=currentBucketIndex-previousIndex-1 and then write -n out as + ZigZag-VLong followed by the count for the bucket as ZigZag-VLong. The negation is performed to allow to + distinguish whether a value represents a bucket count (positive number) or the number of empty buckets (negative number) + when decoding. + + While this encoding is designed for sparse histograms, it compresses well for dense histograms too. + For fully dense histograms it effectively results in encoding the index of the first bucket, followed by just an array of counts. + For sparse histograms it corresponds to an interleaved encoding of the bucket indices with delta compression and the bucket counts. + Even mostly sparse histograms that have some dense regions profit from this encoding. + */ + + static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(EncodedHistogramData.class); + + private static final int SCALE_OFFSET = 11; + private static final int HAS_NEGATIVE_BUCKETS_FLAG = 1 << 6; // = 64 + private static final int SCALE_MASK = 0x3F; // = 63 + static { + // protection against changes to MIN_SCALE and MAX_SCALE messing with our encoding + assert MIN_SCALE + SCALE_OFFSET >= 0; + assert MAX_SCALE + SCALE_OFFSET <= SCALE_MASK; + } + + private int scale; + + private byte[] encodedData; + private int negativeBucketsStart; + private int negativeBucketsLength; + private int positiveBucketsLength; + + void decode(BytesRef data) throws IOException { + this.encodedData = data.bytes; + ByteArrayStreamInput input = new ByteArrayStreamInput(); + input.reset(data.bytes, data.offset, data.length); + + int scaleWithFlags = input.readByte(); + this.scale = (scaleWithFlags & SCALE_MASK) - SCALE_OFFSET; + boolean hasNegativeBuckets = (scaleWithFlags & HAS_NEGATIVE_BUCKETS_FLAG) != 0; + + negativeBucketsLength = 0; + if (hasNegativeBuckets) { + negativeBucketsLength = input.readVInt(); + } + + negativeBucketsStart = input.getPosition(); + input.skipBytes(negativeBucketsLength); + positiveBucketsLength = input.available(); + } + + static void write(StreamOutput output, int scale, List negativeBuckets, List positiveBuckets) + throws IOException { + assert scale >= MIN_SCALE && scale <= MAX_SCALE : "scale must be in range [" + MIN_SCALE + ", " + MAX_SCALE + "]"; + boolean hasNegativeBuckets = negativeBuckets.isEmpty() == false; + int scaleWithFlags = (scale + SCALE_OFFSET); + assert scaleWithFlags >= 0 && scaleWithFlags <= SCALE_MASK; + if (hasNegativeBuckets) { + scaleWithFlags |= HAS_NEGATIVE_BUCKETS_FLAG; + } + output.writeByte((byte) scaleWithFlags); + if (hasNegativeBuckets) { + BytesStreamOutput temp = new BytesStreamOutput(); + BucketsDecoder.serializeBuckets(temp, negativeBuckets); + BytesReference data = temp.bytes(); + output.writeVInt(data.length()); + output.writeBytes(data.array(), data.arrayOffset(), data.length()); + } + BucketsDecoder.serializeBuckets(output, positiveBuckets); + } + + int scale() { + return scale; + } + + BucketsDecoder negativeBucketsDecoder() { + return new BucketsDecoder(negativeBucketsStart, negativeBucketsLength); + } + + BucketsDecoder positiveBucketsDecoder() { + return new BucketsDecoder(negativeBucketsStart + negativeBucketsLength, positiveBucketsLength); + } + + final class BucketsDecoder { + + private long currentIndex; + /** + * The count for the bucket this iterator is currently pointing at. + * A value of {@code -1} is used to represent that the end has been reached. + */ + private long currentCount; + + private final ByteArrayStreamInput bucketsStreamInput; + + private BucketsDecoder(int encodedBucketsStartOffset, int length) { + bucketsStreamInput = new ByteArrayStreamInput(); + if (length > 0) { + bucketsStreamInput.reset(encodedData, encodedBucketsStartOffset, length); + try { + currentIndex = bucketsStreamInput.readZLong() - 1; + } catch (IOException e) { + throw new IllegalStateException("Bad histogram bytes", e); + } + currentCount = 0; + advance(); + } else { + // no data means we are iterating over an empty set of buckets + markEndReached(); + } + } + + private BucketsDecoder(BucketsDecoder toCopy) { + bucketsStreamInput = new ByteArrayStreamInput(); + bucketsStreamInput.reset(encodedData, toCopy.bucketsStreamInput.getPosition(), toCopy.bucketsStreamInput.getPosition()); + currentCount = toCopy.currentCount; + currentIndex = toCopy.currentIndex; + } + + BucketsDecoder copy() { + return new BucketsDecoder(this); + } + + private void markEndReached() { + currentCount = -1; + } + + boolean hasNext() { + return currentCount != -1; + } + + long peekCount() { + assert currentCount != -1 : "End has already been reached"; + return currentCount; + } + + long peekIndex() { + assert currentCount != -1 : "End has already been reached"; + return currentIndex; + } + + void advance() { + assert currentCount != -1 : "End has already been reached"; + try { + if (bucketsStreamInput.available() > 0) { + currentIndex++; + long countOrNumEmptyBuckets = bucketsStreamInput.readZLong(); + if (countOrNumEmptyBuckets < 0) { + // we have encountered a negative value, this means we "skip" + // the given amount of empty buckets + long numEmptyBuckets = -countOrNumEmptyBuckets; + currentIndex += numEmptyBuckets; + // after we have skipped empty buckets, we know that the next value is a non-empty bucket + currentCount = bucketsStreamInput.readZLong(); + } else { + currentCount = countOrNumEmptyBuckets; + } + assert currentCount > 0; + } else { + markEndReached(); + } + } catch (IOException e) { + throw new IllegalStateException("Bad histogram bytes", e); + } + } + + private static void serializeBuckets(StreamOutput out, List buckets) throws IOException { + if (buckets.isEmpty()) { + return; // no buckets, therefore nothing to write + } + + IndexWithCount firstBucket = buckets.getFirst(); + out.writeZLong(firstBucket.index()); + out.writeZLong(firstBucket.count()); + long prevIndex = firstBucket.index(); + + for (int i = 1; i < buckets.size(); i++) { + IndexWithCount bucket = buckets.get(i); + + long indexDelta = bucket.index() - prevIndex; + assert indexDelta > 0; // values must be sorted and unique + assert bucket.count() > 0; + + long numEmptyBucketsInBetween = indexDelta - 1; + if (numEmptyBucketsInBetween > 0) { + out.writeZLong(-numEmptyBucketsInBetween); + } + out.writeZLong(bucket.count()); + + prevIndex = bucket.index(); + } + } + } +} diff --git a/x-pack/plugin/mapper-exponential-histogram/src/main/java/org/elasticsearch/xpack/exponentialhistogram/ExponentialHistogramFieldMapper.java b/x-pack/plugin/mapper-exponential-histogram/src/main/java/org/elasticsearch/xpack/exponentialhistogram/ExponentialHistogramFieldMapper.java new file mode 100644 index 0000000000000..4efb7c82b2997 --- /dev/null +++ b/x-pack/plugin/mapper-exponential-histogram/src/main/java/org/elasticsearch/xpack/exponentialhistogram/ExponentialHistogramFieldMapper.java @@ -0,0 +1,688 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.exponentialhistogram; + +import org.apache.lucene.document.BinaryDocValuesField; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.NumericDocValuesField; +import org.apache.lucene.index.BinaryDocValues; +import org.apache.lucene.index.LeafReader; +import org.apache.lucene.index.NumericDocValues; +import org.apache.lucene.search.Query; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.NumericUtils; +import org.elasticsearch.common.Explicit; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.util.FeatureFlag; +import org.elasticsearch.exponentialhistogram.BucketIterator; +import org.elasticsearch.exponentialhistogram.ExponentialHistogram; +import org.elasticsearch.index.fielddata.FieldDataContext; +import org.elasticsearch.index.fielddata.IndexFieldData; +import org.elasticsearch.index.mapper.CompositeSyntheticFieldLoader; +import org.elasticsearch.index.mapper.DocumentParserContext; +import org.elasticsearch.index.mapper.DocumentParsingException; +import org.elasticsearch.index.mapper.FieldMapper; +import org.elasticsearch.index.mapper.IgnoreMalformedStoredValues; +import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.index.mapper.MapperBuilderContext; +import org.elasticsearch.index.mapper.SourceLoader; +import org.elasticsearch.index.mapper.SourceValueFetcher; +import org.elasticsearch.index.mapper.TextSearchInfo; +import org.elasticsearch.index.mapper.ValueFetcher; +import org.elasticsearch.index.query.SearchExecutionContext; +import org.elasticsearch.xcontent.CopyingXContentParser; +import org.elasticsearch.xcontent.ParseField; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentParser; +import org.elasticsearch.xcontent.XContentSubParser; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; + +import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken; +import static org.elasticsearch.exponentialhistogram.ExponentialHistogram.MAX_INDEX; +import static org.elasticsearch.exponentialhistogram.ExponentialHistogram.MIN_INDEX; + +/** + * {@link FieldMapper} for the exponential_histogram field type. The mapped data represents {@link ExponentialHistogram}s. + * + *

Example index mapping with an exponential_histogram field:

+ *
{
+ *   "mappings": {
+ *     "properties": {
+ *       "my_histo": {
+ *         "type": "exponential_histogram"
+ *       }
+ *     }
+ *   }
+ * }
+ * 
+ * + *

Example histogram data for a full histogram value:

+ *
{
+ *   "my_histo": {
+ *     "scale": 12,
+ *     "zero": {
+ *       "threshold": 0.123456,
+ *       "count": 42
+ *     },
+ *     "positive": {
+ *       "indices": [-1000000, -10, 25, 26, 99999999],
+ *       "counts": [1, 2, 3, 4, 5]
+ *     },
+ *     "negative": {
+ *       "indices": [-123, 0, 12345],
+ *       "counts": [20, 30, 40]
+ *     }
+ *    }
+ * 
+ */ +public class ExponentialHistogramFieldMapper extends FieldMapper { + + public static final FeatureFlag EXPONENTIAL_HISTOGRAM_FEATURE = new FeatureFlag("exponential_histogram"); + + public static final String CONTENT_TYPE = "exponential_histogram"; + + public static final ParseField SCALE_FIELD = new ParseField("scale"); + public static final ParseField ZERO_FIELD = new ParseField("zero"); + public static final ParseField ZERO_COUNT_FIELD = new ParseField("count"); + public static final ParseField ZERO_THRESHOLD_FIELD = new ParseField("threshold"); + + public static final ParseField POSITIVE_FIELD = new ParseField("positive"); + public static final ParseField NEGATIVE_FIELD = new ParseField("negative"); + public static final ParseField BUCKET_INDICES_FIELD = new ParseField("indices"); + public static final ParseField BUCKET_COUNTS_FIELD = new ParseField("counts"); + + private static ExponentialHistogramFieldMapper toType(FieldMapper in) { + return (ExponentialHistogramFieldMapper) in; + } + + /** + * We store the zero-threshold as a separate doc value with this lucene field name. + * We don't expect the zero threshold to be accessed independently of the other histogram data, + * but this approach should save a lot of storage space: The zero threshold is a value configured by users. + * This means it is expected to not or only rarely change over time and is very often the same across time series. + * Storing it as a separate doc value allows us to compress the zero threshold across documents. + * + * @param fullPath the full path of the mapped field + * @return the name for the lucene field + */ + private static String zeroThresholdSubFieldName(String fullPath) { + return fullPath + "._zero_threshold"; + } + + /** + * We store the sum of the counts over all buckets (including the zero bucket) in this value. + * This is done to allow access to it at query time without having to load the entire histogram. + * + * As a side effect, this is used to avoid storing the count for the zero bucket with the histogram data, + * as it can be computed from this value minus the sum of the counts of all other buckets. + * + * @param fullPath the full path of the mapped field + * @return the name for the lucene field + */ + private static String valuesCountSubFieldName(String fullPath) { + return fullPath + "._values_count"; + } + + static class Builder extends FieldMapper.Builder { + + private final FieldMapper.Parameter> meta = FieldMapper.Parameter.metaParam(); + private final FieldMapper.Parameter> ignoreMalformed; + + Builder(String name, boolean ignoreMalformedByDefault) { + super(name); + this.ignoreMalformed = FieldMapper.Parameter.explicitBoolParam( + "ignore_malformed", + true, + m -> toType(m).ignoreMalformed, + ignoreMalformedByDefault + ); + } + + @Override + protected FieldMapper.Parameter[] getParameters() { + return new FieldMapper.Parameter[] { ignoreMalformed, meta }; + } + + @Override + public ExponentialHistogramFieldMapper build(MapperBuilderContext context) { + return new ExponentialHistogramFieldMapper( + leafName(), + new ExponentialHistogramFieldType(context.buildFullName(leafName()), meta.getValue()), + builderParams(this, context), + this + ); + } + } + + public static final FieldMapper.TypeParser PARSER = new FieldMapper.TypeParser( + (n, c) -> new Builder(n, IGNORE_MALFORMED_SETTING.get(c.getSettings())), + notInMultiFields(CONTENT_TYPE) + ); + + private final Explicit ignoreMalformed; + private final boolean ignoreMalformedByDefault; + + ExponentialHistogramFieldMapper( + String simpleName, + MappedFieldType mappedFieldType, + FieldMapper.BuilderParams builderParams, + Builder builder + ) { + super(simpleName, mappedFieldType, builderParams); + this.ignoreMalformed = builder.ignoreMalformed.getValue(); + this.ignoreMalformedByDefault = builder.ignoreMalformed.getDefaultValue().value(); + } + + @Override + public boolean ignoreMalformed() { + return ignoreMalformed.value(); + } + + @Override + protected String contentType() { + return CONTENT_TYPE; + } + + @Override + public FieldMapper.Builder getMergeBuilder() { + return new Builder(leafName(), ignoreMalformedByDefault).init(this); + } + + @Override + protected void parseCreateField(DocumentParserContext context) { + throw new UnsupportedOperationException("Parsing is implemented in parse(), this method should NEVER be called"); + } + + static class ExponentialHistogramFieldType extends MappedFieldType { + + ExponentialHistogramFieldType(String name, Map meta) { + super(name, false, false, true, TextSearchInfo.NONE, meta); + } + + @Override + public String typeName() { + return CONTENT_TYPE; + } + + @Override + public ValueFetcher valueFetcher(SearchExecutionContext context, String format) { + return SourceValueFetcher.identity(name(), context, format); + } + + @Override + public boolean isAggregatable() { + return false; + } + + @Override + public IndexFieldData.Builder fielddataBuilder(FieldDataContext fieldDataContext) { + throw new IllegalArgumentException("The [" + CONTENT_TYPE + "] field does not support this operation currently"); + } + + @Override + public Query termQuery(Object value, SearchExecutionContext context) { + throw new IllegalArgumentException( + "[" + CONTENT_TYPE + "] field do not support searching, " + "use dedicated aggregations instead: [" + name() + "]" + ); + } + } + + @Override + protected boolean supportsParsingObject() { + return true; + } + + @Override + public void parse(DocumentParserContext context) throws IOException { + context.path().add(leafName()); + + boolean shouldStoreMalformedDataForSyntheticSource = context.mappingLookup().isSourceSynthetic() && ignoreMalformed(); + XContentParser.Token token; + XContentSubParser subParser = null; + XContentBuilder malformedDataForSyntheticSource = null; + + try { + token = context.parser().currentToken(); + if (token == XContentParser.Token.VALUE_NULL) { + context.path().remove(); + return; + } + + Integer scale = null; + ParsedZeroBucket zeroBucket = ParsedZeroBucket.DEFAULT; + List negativeBuckets = Collections.emptyList(); + List positiveBuckets = Collections.emptyList(); + + ensureExpectedToken(XContentParser.Token.START_OBJECT, token, context.parser()); + if (shouldStoreMalformedDataForSyntheticSource) { + var copyingParser = new CopyingXContentParser(context.parser()); + malformedDataForSyntheticSource = copyingParser.getBuilder(); + subParser = new XContentSubParser(copyingParser); + } else { + subParser = new XContentSubParser(context.parser()); + } + token = subParser.nextToken(); + while (token != XContentParser.Token.END_OBJECT) { + + ensureExpectedToken(XContentParser.Token.FIELD_NAME, token, subParser); + String fieldName = subParser.currentName(); + if (fieldName.equals(SCALE_FIELD.getPreferredName())) { + token = subParser.nextToken(); + + ensureExpectedToken(XContentParser.Token.VALUE_NUMBER, token, subParser); + scale = subParser.intValue(); + if (scale > ExponentialHistogram.MAX_SCALE || scale < ExponentialHistogram.MIN_SCALE) { + throw new DocumentParsingException( + subParser.getTokenLocation(), + "error parsing field [" + + fullPath() + + "], scale field must be in " + + "range [" + + ExponentialHistogram.MIN_SCALE + + ", " + + ExponentialHistogram.MAX_SCALE + + "] but got " + + scale + ); + } + } else if (fieldName.equals(ZERO_FIELD.getPreferredName())) { + zeroBucket = parseZeroBucket(subParser); + } else if (fieldName.equals(POSITIVE_FIELD.getPreferredName())) { + positiveBuckets = readAndValidateBuckets(POSITIVE_FIELD.getPreferredName(), subParser); + } else if (fieldName.equals(NEGATIVE_FIELD.getPreferredName())) { + negativeBuckets = readAndValidateBuckets(NEGATIVE_FIELD.getPreferredName(), subParser); + } else { + throw new DocumentParsingException( + subParser.getTokenLocation(), + "error parsing field [" + fullPath() + "], with unknown parameter [" + fieldName + "]" + ); + } + token = subParser.nextToken(); + } + if (scale == null) { + throw new DocumentParsingException( + subParser.getTokenLocation(), + "error parsing field [" + fullPath() + "], expected field called [" + SCALE_FIELD.getPreferredName() + "]" + ); + } + + if (context.doc().getByKey(fieldType().name()) != null) { + throw new IllegalArgumentException( + "Field [" + + fullPath() + + "] of type [" + + typeName() + + "] doesn't support indexing multiple values for the same field in the same document" + ); + } + + long totalValueCount; + try { + totalValueCount = getTotalValueCount(zeroBucket, positiveBuckets, negativeBuckets); + } catch (ArithmeticException e) { + throw new IllegalArgumentException( + "Field [" + fullPath() + "] has a total value count exceeding the allowed maximum value of " + Long.MAX_VALUE + ); + } + + BytesStreamOutput histogramBytesOutput = new BytesStreamOutput(); + CompressedExponentialHistogram.writeHistogramBytes(histogramBytesOutput, scale, negativeBuckets, positiveBuckets); + BytesRef histoBytes = histogramBytesOutput.bytes().toBytesRef(); + + Field histoField = new BinaryDocValuesField(fullPath(), histoBytes); + long thresholdAsLong = NumericUtils.doubleToSortableLong(zeroBucket.threshold()); + NumericDocValuesField zeroThresholdField = new NumericDocValuesField(zeroThresholdSubFieldName(fullPath()), thresholdAsLong); + NumericDocValuesField valuesCountField = new NumericDocValuesField(valuesCountSubFieldName(fullPath()), totalValueCount); + + context.doc().addWithKey(fieldType().name(), histoField); + context.doc().add(zeroThresholdField); + context.doc().add(valuesCountField); + + } catch (Exception ex) { + if (ignoreMalformed.value() == false) { + throw new DocumentParsingException( + context.parser().getTokenLocation(), + "failed to parse field [" + fieldType().name() + "] of type [" + fieldType().typeName() + "]", + ex + ); + } + + if (subParser != null) { + // close the subParser so we advance to the end of the object + subParser.close(); + } else if (shouldStoreMalformedDataForSyntheticSource) { + // We have a malformed value, but it's not an object given that `subParser` is null. + // So we just remember whatever it is. + malformedDataForSyntheticSource = XContentBuilder.builder(context.parser().contentType().xContent()) + .copyCurrentStructure(context.parser()); + } + + if (malformedDataForSyntheticSource != null) { + context.doc().add(IgnoreMalformedStoredValues.storedField(fullPath(), malformedDataForSyntheticSource)); + } + + context.addIgnoredField(fieldType().name()); + } + context.path().remove(); + } + + private static long getTotalValueCount( + ParsedZeroBucket zeroBucket, + List positiveBuckets, + List negativeBuckets + ) { + long totalValueCount = zeroBucket.count(); + for (IndexWithCount bucket : positiveBuckets) { + totalValueCount = Math.addExact(totalValueCount, bucket.count()); + } + for (IndexWithCount bucket : negativeBuckets) { + totalValueCount = Math.addExact(totalValueCount, bucket.count()); + } + return totalValueCount; + } + + private ParsedZeroBucket parseZeroBucket(XContentSubParser subParser) throws IOException { + long zeroCount = ParsedZeroBucket.DEFAULT.count(); + double zeroThreshold = ParsedZeroBucket.DEFAULT.threshold(); + + XContentParser.Token token = subParser.nextToken(); + ensureExpectedToken(XContentParser.Token.START_OBJECT, token, subParser); + token = subParser.nextToken(); + while (token != XContentParser.Token.END_OBJECT) { + ensureExpectedToken(XContentParser.Token.FIELD_NAME, token, subParser); + String fieldName = subParser.currentName(); + if (fieldName.equals(ZERO_THRESHOLD_FIELD.getPreferredName())) { + token = subParser.nextToken(); + ensureExpectedToken(XContentParser.Token.VALUE_NUMBER, token, subParser); + zeroThreshold = subParser.doubleValue(); + if (zeroThreshold < 0.0 || Double.isFinite(zeroThreshold) == false) { + throw new DocumentParsingException( + subParser.getTokenLocation(), + "error parsing field [" + + fullPath() + + "], zero.threshold field must be a non-negative, finite number but got " + + zeroThreshold + ); + } + } else if (fieldName.equals(ZERO_COUNT_FIELD.getPreferredName())) { + token = subParser.nextToken(); + ensureExpectedToken(XContentParser.Token.VALUE_NUMBER, token, subParser); + zeroCount = subParser.longValue(); + if (zeroCount < 0) { + throw new DocumentParsingException( + subParser.getTokenLocation(), + "error parsing field [" + fullPath() + "], zero.count field must be a non-negative number but got " + zeroCount + ); + } + } else { + throw new DocumentParsingException( + subParser.getTokenLocation(), + "error parsing field [" + fullPath() + "], with unknown parameter for zero sub-object [" + fieldName + "]" + ); + } + token = subParser.nextToken(); + } + return new ParsedZeroBucket(zeroCount, zeroThreshold); + } + + private List readAndValidateBuckets(String containerFieldName, XContentSubParser parser) throws IOException { + XContentParser.Token token = parser.nextToken(); + ensureExpectedToken(XContentParser.Token.START_OBJECT, token, parser); + token = parser.nextToken(); + + List indices = Collections.emptyList(); + List counts = Collections.emptyList(); + + while (token != XContentParser.Token.END_OBJECT) { + ensureExpectedToken(XContentParser.Token.FIELD_NAME, token, parser); + String fieldName = parser.currentName(); + if (fieldName.equals(BUCKET_INDICES_FIELD.getPreferredName())) { + indices = new ArrayList<>(); + token = parser.nextToken(); + // should be an array + ensureExpectedToken(XContentParser.Token.START_ARRAY, token, parser); + token = parser.nextToken(); + while (token != XContentParser.Token.END_ARRAY) { + ensureExpectedToken(XContentParser.Token.VALUE_NUMBER, token, parser); + long index = parser.longValue(); + if (index < MIN_INDEX || index > MAX_INDEX) { + throw new DocumentParsingException( + parser.getTokenLocation(), + "error parsing field [" + + fullPath() + + "], " + + containerFieldName + + "." + + BUCKET_INDICES_FIELD.getPreferredName() + + " values must all be in range [" + + MIN_INDEX + + ", " + + MAX_INDEX + + "] but got " + + index + ); + } + indices.add(index); + token = parser.nextToken(); + } + } else if (fieldName.equals(BUCKET_COUNTS_FIELD.getPreferredName())) { + counts = new ArrayList<>(); + token = parser.nextToken(); + // should be an array + ensureExpectedToken(XContentParser.Token.START_ARRAY, token, parser); + token = parser.nextToken(); + while (token != XContentParser.Token.END_ARRAY) { + ensureExpectedToken(XContentParser.Token.VALUE_NUMBER, token, parser); + long count = parser.longValue(); + if (count <= 0) { + throw new DocumentParsingException( + parser.getTokenLocation(), + "error parsing field [" + + fullPath() + + "], " + + containerFieldName + + "." + + BUCKET_COUNTS_FIELD.getPreferredName() + + " values must all be greater than zero but got " + + count + ); + } + counts.add(count); + token = parser.nextToken(); + } + } else { + throw new DocumentParsingException( + parser.getTokenLocation(), + "error parsing field [" + + fullPath() + + "], with unknown parameter for " + + containerFieldName + + " sub-object [" + + fieldName + + "]" + ); + } + token = parser.nextToken(); + } + + if (indices.size() != counts.size()) { + throw new DocumentParsingException( + parser.getTokenLocation(), + "error parsing field [" + + fullPath() + + "], expected same length from [" + + containerFieldName + + "." + + BUCKET_INDICES_FIELD.getPreferredName() + + "] and " + + "[" + + containerFieldName + + "." + + BUCKET_COUNTS_FIELD.getPreferredName() + + "] but got [" + + indices.size() + + " != " + + counts.size() + + "]" + ); + } + + List results = new ArrayList<>(indices.size()); + for (int i = 0; i < indices.size(); i++) { + results.add(new IndexWithCount(indices.get(i), counts.get(i))); + } + results.sort(Comparator.comparing(IndexWithCount::index)); + + for (int i = 1; i < results.size(); i++) { + long index = results.get(i).index(); + if (index == results.get(i - 1).index()) { + throw new DocumentParsingException( + parser.getTokenLocation(), + "error parsing field [" + + fullPath() + + "], expected entries of [" + + containerFieldName + + "." + + BUCKET_INDICES_FIELD.getPreferredName() + + "] to be unique, but got " + + index + + " multiple times" + ); + } + } + return results; + } + + @Override + protected FieldMapper.SyntheticSourceSupport syntheticSourceSupport() { + return new FieldMapper.SyntheticSourceSupport.Native( + () -> new CompositeSyntheticFieldLoader( + leafName(), + fullPath(), + new ExponentialHistogramSyntheticFieldLoader(), + new CompositeSyntheticFieldLoader.MalformedValuesLayer(fullPath()) + ) + ); + } + + private class ExponentialHistogramSyntheticFieldLoader implements CompositeSyntheticFieldLoader.DocValuesLayer { + + private final CompressedExponentialHistogram histogram = new CompressedExponentialHistogram(); + private BytesRef binaryValue; + private double zeroThreshold; + private long valueCount; + + @Override + public SourceLoader.SyntheticFieldLoader.DocValuesLoader docValuesLoader(LeafReader leafReader, int[] docIdsInLeaf) + throws IOException { + BinaryDocValues histoDocValues = leafReader.getBinaryDocValues(fieldType().name()); + if (histoDocValues == null) { + // No values in this leaf + binaryValue = null; + return null; + } + NumericDocValues zeroThresholds = leafReader.getNumericDocValues(zeroThresholdSubFieldName(fullPath())); + NumericDocValues valueCounts = leafReader.getNumericDocValues(valuesCountSubFieldName(fullPath())); + assert zeroThresholds != null; + assert valueCounts != null; + return docId -> { + if (histoDocValues.advanceExact(docId)) { + + boolean zeroThresholdPresent = zeroThresholds.advanceExact(docId); + boolean valueCountsPresent = valueCounts.advanceExact(docId); + assert zeroThresholdPresent && valueCountsPresent; + + binaryValue = histoDocValues.binaryValue(); + zeroThreshold = NumericUtils.sortableLongToDouble(zeroThresholds.longValue()); + valueCount = valueCounts.longValue(); + return true; + } + binaryValue = null; + return false; + }; + } + + @Override + public boolean hasValue() { + return binaryValue != null; + } + + @Override + public void write(XContentBuilder b) throws IOException { + if (binaryValue == null) { + return; + } + + histogram.reset(zeroThreshold, valueCount, binaryValue); + + b.startObject(); + + b.field(SCALE_FIELD.getPreferredName(), histogram.scale()); + double zeroThreshold = histogram.zeroBucket().zeroThreshold(); + long zeroCount = histogram.zeroBucket().count(); + + if (zeroCount != 0 || zeroThreshold != 0) { + b.startObject(ZERO_FIELD.getPreferredName()); + if (zeroCount != 0) { + b.field(ZERO_COUNT_FIELD.getPreferredName(), zeroCount); + } + if (zeroThreshold != 0) { + b.field(ZERO_THRESHOLD_FIELD.getPreferredName(), zeroThreshold); + } + b.endObject(); + } + + writeBuckets(b, POSITIVE_FIELD.getPreferredName(), histogram.positiveBuckets()); + writeBuckets(b, NEGATIVE_FIELD.getPreferredName(), histogram.negativeBuckets()); + + b.endObject(); + } + + private static void writeBuckets(XContentBuilder b, String fieldName, ExponentialHistogram.Buckets buckets) throws IOException { + if (buckets.iterator().hasNext() == false) { + return; + } + b.startObject(fieldName); + BucketIterator it = buckets.iterator(); + b.startArray(BUCKET_INDICES_FIELD.getPreferredName()); + while (it.hasNext()) { + b.value(it.peekIndex()); + it.advance(); + } + b.endArray(); + it = buckets.iterator(); + b.startArray(BUCKET_COUNTS_FIELD.getPreferredName()); + while (it.hasNext()) { + b.value(it.peekCount()); + it.advance(); + } + b.endArray(); + b.endObject(); + } + + @Override + public String fieldName() { + return fullPath(); + } + + @Override + public long valueCount() { + return binaryValue != null ? 1 : 0; + } + }; + + private record ParsedZeroBucket(long count, double threshold) { + static final ParsedZeroBucket DEFAULT = new ParsedZeroBucket(0, 0); + } +} diff --git a/x-pack/plugin/mapper-exponential-histogram/src/main/java/org/elasticsearch/xpack/exponentialhistogram/ExponentialHistogramMapperPlugin.java b/x-pack/plugin/mapper-exponential-histogram/src/main/java/org/elasticsearch/xpack/exponentialhistogram/ExponentialHistogramMapperPlugin.java new file mode 100644 index 0000000000000..9f24b1cc4782d --- /dev/null +++ b/x-pack/plugin/mapper-exponential-histogram/src/main/java/org/elasticsearch/xpack/exponentialhistogram/ExponentialHistogramMapperPlugin.java @@ -0,0 +1,30 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.exponentialhistogram; + +import org.elasticsearch.index.mapper.Mapper; +import org.elasticsearch.plugins.MapperPlugin; +import org.elasticsearch.plugins.Plugin; + +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; + +/** + * Plugin adding support for exponential histogram field types. + */ +public class ExponentialHistogramMapperPlugin extends Plugin implements MapperPlugin { + @Override + public Map getMappers() { + Map mappers = new LinkedHashMap<>(); + if (ExponentialHistogramFieldMapper.EXPONENTIAL_HISTOGRAM_FEATURE.isEnabled()) { + mappers.put(ExponentialHistogramFieldMapper.CONTENT_TYPE, ExponentialHistogramFieldMapper.PARSER); + } + return Collections.unmodifiableMap(mappers); + } +} diff --git a/x-pack/plugin/mapper-exponential-histogram/src/main/java/org/elasticsearch/xpack/exponentialhistogram/IndexWithCount.java b/x-pack/plugin/mapper-exponential-histogram/src/main/java/org/elasticsearch/xpack/exponentialhistogram/IndexWithCount.java new file mode 100644 index 0000000000000..6a6d0254ea1bd --- /dev/null +++ b/x-pack/plugin/mapper-exponential-histogram/src/main/java/org/elasticsearch/xpack/exponentialhistogram/IndexWithCount.java @@ -0,0 +1,15 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.exponentialhistogram; + +/** + * An exponential histogram bucket represented by its index and associated bucket count. + * @param index the index of the bucket + * @param count the number of values in that bucket. + */ +public record IndexWithCount(long index, long count) {} diff --git a/x-pack/plugin/mapper-exponential-histogram/src/test/java/org/elasticsearch/xpack/exponentialhistogram/ExponentialHistogramFieldMapperTests.java b/x-pack/plugin/mapper-exponential-histogram/src/test/java/org/elasticsearch/xpack/exponentialhistogram/ExponentialHistogramFieldMapperTests.java new file mode 100644 index 0000000000000..1bf29a4bd8a6a --- /dev/null +++ b/x-pack/plugin/mapper-exponential-histogram/src/test/java/org/elasticsearch/xpack/exponentialhistogram/ExponentialHistogramFieldMapperTests.java @@ -0,0 +1,506 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.exponentialhistogram; + +import org.elasticsearch.core.Types; +import org.elasticsearch.index.mapper.DocumentMapper; +import org.elasticsearch.index.mapper.DocumentParsingException; +import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.index.mapper.MapperParsingException; +import org.elasticsearch.index.mapper.MapperTestCase; +import org.elasticsearch.index.mapper.SourceToParse; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.xcontent.XContentBuilder; +import org.junit.AssumptionViolatedException; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Set; +import java.util.TreeMap; + +import static org.elasticsearch.exponentialhistogram.ExponentialHistogram.MAX_INDEX; +import static org.elasticsearch.exponentialhistogram.ExponentialHistogram.MAX_SCALE; +import static org.elasticsearch.exponentialhistogram.ExponentialHistogram.MIN_INDEX; +import static org.elasticsearch.exponentialhistogram.ExponentialHistogram.MIN_SCALE; +import static org.hamcrest.Matchers.containsString; + +public class ExponentialHistogramFieldMapperTests extends MapperTestCase { + + protected Collection getPlugins() { + return Collections.singletonList(new ExponentialHistogramMapperPlugin()); + } + + @Override + protected void minimalMapping(XContentBuilder b) throws IOException { + b.field("type", ExponentialHistogramFieldMapper.CONTENT_TYPE); + } + + @Override + protected Object getSampleValueForDocument() { + return Map.of( + "scale", + 10, + "zero", + Map.of("count", 42, "threshold", 1.234), + "positive", + Map.of("indices", List.of(-1, 0, 1), "counts", List.of(2, 3, 4)), + "negative", + Map.of("indices", List.of(-100, 100), "counts", List.of(1000, 2000)) + ); + } + + @Override + protected Object getSampleObjectForDocument() { + return getSampleValueForDocument(); + } + + @Override + protected boolean supportsSearchLookup() { + return false; + } + + @Override + protected boolean supportsStoredFields() { + return false; + } + + @Override + protected boolean supportsIgnoreMalformed() { + return true; + } + + @Override + protected void registerParameters(ParameterChecker checker) throws IOException { + checker.registerUpdateCheck(b -> b.field("ignore_malformed", true), m -> assertTrue(m.ignoreMalformed())); + } + + @Override + protected Object generateRandomInputValue(MappedFieldType ft) { + throw new AssumptionViolatedException("Exponential histograms currently don't support fielddata"); + } + + private static Map createRandomHistogramValue(int maxBucketCount) { + int scale = randomIntBetween(MIN_SCALE, MAX_SCALE); + long maxCounts = Long.MAX_VALUE / (maxBucketCount + 1); + long zeroCount = randomBoolean() ? 0 : randomLongBetween(0, maxCounts); + double zeroThreshold = randomBoolean() ? 0 : randomDouble(); + List positiveIndices = new ArrayList<>(); + List positiveCounts = new ArrayList<>(); + List negativeIndices = new ArrayList<>(); + List negativeCounts = new ArrayList<>(); + if (randomBoolean()) { + fillBucketsRandomly(positiveIndices, positiveCounts, maxBucketCount / 2); + } + if (randomBoolean()) { + fillBucketsRandomly(negativeIndices, negativeCounts, maxBucketCount / 2); + } + + return Map.of( + "scale", + scale, + "zero", + Map.of("count", zeroCount, "threshold", zeroThreshold), + "positive", + Map.of("indices", positiveIndices, "counts", positiveCounts), + "negative", + Map.of("indices", negativeIndices, "counts", negativeCounts) + ); + } + + private static void fillBucketsRandomly(List indices, List counts, int maxBucketCount) { + int bucketCount = randomIntBetween(0, maxBucketCount); + long maxCounts = Long.MAX_VALUE / (maxBucketCount * 2L + 1); + boolean useDense = randomBoolean(); + if (useDense) { + // Use dense indices, i.e., indices are sequential and start at MIN_INDEX + long startIndex = randomLongBetween(MIN_INDEX, MAX_INDEX - bucketCount); + for (int i = 0; i < bucketCount; i++) { + indices.add(startIndex + i); + counts.add(randomLongBetween(1, maxCounts)); + } + } else { + Set usedIndices = new HashSet<>(); + for (int i = 0; i < bucketCount; i++) { + long index; + do { + index = randomLongBetween(MIN_INDEX, MAX_INDEX); + } while (usedIndices.add(index) == false); + indices.add(index); + counts.add(randomLongBetween(1, maxCounts)); + } + } + } + + @Override + protected List exampleMalformedValues() { + var randomString = randomAlphaOfLengthBetween(1, 10); + var randomLong = randomLong(); + var randomDouble = randomDouble(); + var randomBoolean = randomBoolean(); + + return List.of( + // Basic type validation - non-object values + exampleMalformedValue(b -> b.value(randomString)).errorMatches( + "Failed to parse object: expecting token of type [START_OBJECT]" + ), + exampleMalformedValue(b -> b.value(randomLong)).errorMatches("Failed to parse object: expecting token of type [START_OBJECT]"), + exampleMalformedValue(b -> b.value(randomDouble)).errorMatches( + "Failed to parse object: expecting token of type [START_OBJECT]" + ), + exampleMalformedValue(b -> b.value(randomBoolean)).errorMatches( + "Failed to parse object: expecting token of type [START_OBJECT]" + ), + + // Missing scale field + exampleMalformedValue(b -> b.startObject().endObject()).errorMatches("expected field called [scale]"), + + // Scale field validation + exampleMalformedValue(b -> b.startObject().field("scale", "foo").endObject()).errorMatches( + "Failed to parse object: expecting token of type [VALUE_NUMBER]" + ), + exampleMalformedValue(b -> b.startObject().field("scale", MIN_SCALE - 1).endObject()).errorMatches( + "scale field must be in range [" + MIN_SCALE + ", " + MAX_SCALE + "] but got " + (MIN_SCALE - 1) + ), + exampleMalformedValue(b -> b.startObject().field("scale", MAX_SCALE + 1).endObject()).errorMatches( + "scale field must be in range [" + MIN_SCALE + ", " + MAX_SCALE + "] but got " + (MAX_SCALE + 1) + ), + + // Zero field validation - wrong token type + exampleMalformedValue(b -> b.startObject().field("scale", 0).field("zero", "not_an_object").endObject()).errorMatches( + "Failed to parse object: expecting token of type [START_OBJECT]" + ), + + // Zero.threshold field validation + exampleMalformedValue( + b -> b.startObject().field("scale", 0).startObject("zero").field("threshold", "not_a_number").endObject().endObject() + ).errorMatches("Failed to parse object: expecting token of type [VALUE_NUMBER]"), + exampleMalformedValue( + b -> b.startObject().field("scale", 0).startObject("zero").field("threshold", -1.0).endObject().endObject() + ).errorMatches("zero.threshold field must be a non-negative, finite number but got -1.0"), + // Zero.count field validation + exampleMalformedValue( + b -> b.startObject().field("scale", 0).startObject("zero").field("count", "not_a_number").endObject().endObject() + ).errorMatches("Failed to parse object: expecting token of type [VALUE_NUMBER]"), + exampleMalformedValue(b -> b.startObject().field("scale", 0).startObject("zero").field("count", -1).endObject().endObject()) + .errorMatches("zero.count field must be a non-negative number but got -1"), + + // Unknown field in zero sub-object + exampleMalformedValue( + b -> b.startObject().field("scale", 0).startObject("zero").field("unknown_field", 123).endObject().endObject() + ).errorMatches("with unknown parameter for zero sub-object [unknown_field]"), + + // Positive/negative field validation - wrong token type + exampleMalformedValue(b -> b.startObject().field("scale", 0).field("positive", "not_an_object").endObject()).errorMatches( + "Failed to parse object: expecting token of type [START_OBJECT]" + ), + exampleMalformedValue(b -> b.startObject().field("scale", 0).field("negative", "not_an_object").endObject()).errorMatches( + "Failed to parse object: expecting token of type [START_OBJECT]" + ), + + // indices validation - wrong token type + exampleMalformedValue( + b -> b.startObject().field("scale", 0).startObject("positive").field("indices", "not_an_array").endObject().endObject() + ).errorMatches("Failed to parse object: expecting token of type [START_ARRAY]"), + + // counts validation - wrong token type + exampleMalformedValue( + b -> b.startObject().field("scale", 0).startObject("positive").field("counts", "not_an_array").endObject().endObject() + ).errorMatches("Failed to parse object: expecting token of type [START_ARRAY]"), + + // indices array element validation - wrong token type + exampleMalformedValue( + b -> b.startObject() + .field("scale", 0) + .startObject("positive") + .startArray("indices") + .value("not_a_number") + .endArray() + .startArray("counts") + .value(1) + .endArray() + .endObject() + .endObject() + ).errorMatches("Failed to parse object: expecting token of type [VALUE_NUMBER]"), + + // counts array element validation - wrong token type + exampleMalformedValue( + b -> b.startObject() + .field("scale", 0) + .startObject("positive") + .startArray("indices") + .value(1) + .endArray() + .startArray("counts") + .value("not_a_number") + .endArray() + .endObject() + .endObject() + ).errorMatches("Failed to parse object: expecting token of type [VALUE_NUMBER]"), + + // indices value range validation + exampleMalformedValue( + b -> b.startObject() + .field("scale", 0) + .startObject("positive") + .startArray("indices") + .value(MIN_INDEX - 1) + .endArray() + .startArray("counts") + .value(1) + .endArray() + .endObject() + .endObject() + ).errorMatches( + "positive.indices values must all be in range [" + MIN_INDEX + ", " + MAX_INDEX + "] but got " + (MIN_INDEX - 1) + ), + + exampleMalformedValue( + b -> b.startObject() + .field("scale", 0) + .startObject("positive") + .startArray("indices") + .value(MAX_INDEX + 1) + .endArray() + .startArray("counts") + .value(1) + .endArray() + .endObject() + .endObject() + ).errorMatches( + "positive.indices values must all be in range [" + MIN_INDEX + ", " + MAX_INDEX + "] but got " + (MAX_INDEX + 1) + ), + + // counts value validation - zero or negative + exampleMalformedValue( + b -> b.startObject() + .field("scale", 0) + .startObject("positive") + .startArray("indices") + .value(1) + .endArray() + .startArray("counts") + .value(0) + .endArray() + .endObject() + .endObject() + ).errorMatches("positive.counts values must all be greater than zero but got 0"), + exampleMalformedValue( + b -> b.startObject() + .field("scale", 0) + .startObject("positive") + .startArray("indices") + .value(1) + .endArray() + .startArray("counts") + .value(-1) + .endArray() + .endObject() + .endObject() + ).errorMatches("positive.counts values must all be greater than zero but got -1"), + + // Mismatched array lengths + exampleMalformedValue( + b -> b.startObject() + .field("scale", 0) + .startObject("positive") + .startArray("indices") + .value(1) + .value(2) + .endArray() + .startArray("counts") + .value(1) + .endArray() + .endObject() + .endObject() + ).errorMatches("expected same length from [positive.indices] and [positive.counts] but got [2 != 1]"), + + // Duplicate indices + exampleMalformedValue( + b -> b.startObject() + .field("scale", 0) + .startObject("positive") + .startArray("indices") + .value(1) + .value(1) + .endArray() + .startArray("counts") + .value(1) + .value(2) + .endArray() + .endObject() + .endObject() + ).errorMatches("expected entries of [positive.indices] to be unique, but got 1 multiple times"), + + // Unknown field in positive/negative sub-object + exampleMalformedValue( + b -> b.startObject().field("scale", 0).startObject("positive").field("unknown_field", 123).endObject().endObject() + ).errorMatches("with unknown parameter for positive sub-object [unknown_field]"), + + exampleMalformedValue( + b -> b.startObject().field("scale", 0).startObject("negative").field("unknown_field", 123).endObject().endObject() + ).errorMatches("with unknown parameter for negative sub-object [unknown_field]"), + + // Unknown top-level field + exampleMalformedValue(b -> b.startObject().field("scale", 0).field("unknown_field", 123).endObject()).errorMatches( + "with unknown parameter [unknown_field]" + ), + + // Overflow of total value counts + exampleMalformedValue( + b -> b.startObject() + .field("scale", 0) + .startObject("zero") + .field("count", 1) + .endObject() + .startObject("positive") + .startArray("indices") + .value(1) + .endArray() + .startArray("counts") + .value(Long.MAX_VALUE) + .endArray() + .endObject() + .endObject() + ).errorMatches("has a total value count exceeding the allowed maximum value of " + Long.MAX_VALUE) + ); + } + + public void testCannotBeUsedInMultifields() { + Exception e = expectThrows(MapperParsingException.class, () -> createMapperService(fieldMapping(b -> { + b.field("type", "keyword"); + b.startObject("fields"); + b.startObject("hist"); + b.field("type", "exponential_histogram"); + b.endObject(); + b.endObject(); + }))); + assertThat(e.getMessage(), containsString("Field [hist] of type [exponential_histogram] can't be used in multifields")); + } + + public void testCannotUseHistogramInArrays() throws Exception { + DocumentMapper mapper = createDocumentMapper(fieldMapping(this::minimalMapping)); + SourceToParse source = source( + b -> b.startArray("field").startObject().field("scale", 1).endObject().startObject().field("scale", 2).endObject().endArray() + ); + Exception e = expectThrows(DocumentParsingException.class, () -> mapper.parse(source)); + assertThat( + e.getCause().getMessage(), + containsString( + "Field [field] of type [exponential_histogram] doesn't support" + + " indexing multiple values for the same field in the same document" + ) + ); + } + + @Override + protected SyntheticSourceSupport syntheticSourceSupport(boolean ignoreMalformed) { + return new SyntheticSourceSupport() { + @Override + public SyntheticSourceExample example(int maxValues) { + Map histogram = createRandomHistogramValue(maxValues); + return new SyntheticSourceExample(histogram, convertHistogramToCanonicalForm(histogram), this::mapping); + } + + private Map convertHistogramToCanonicalForm(Map histogram) { + Map result = new LinkedHashMap<>(); + result.put("scale", histogram.get("scale")); + + Map zeroBucket = convertZeroBucketToCanonicalForm(Types.forciblyCast(histogram.get("zero"))); + if (zeroBucket != null) { + result.put("zero", zeroBucket); + } + + Map positive = convertBucketListToCanonicalForm(Types.forciblyCast(histogram.get("positive"))); + if (positive != null) { + result.put("positive", positive); + } + + Map negative = convertBucketListToCanonicalForm(Types.forciblyCast(histogram.get("negative"))); + if (negative != null) { + result.put("negative", negative); + } + + return result; + } + + private Map convertBucketListToCanonicalForm(Map buckets) { + if (buckets == null) { + return null; + } + List indices = Types.forciblyCast(buckets.get("indices")); + List counts = Types.forciblyCast(buckets.get("counts")); + if (indices == null || indices.isEmpty()) { + return null; + } + NavigableMap indicesToCountsSorted = new TreeMap<>(); + for (int i = 0; i < indices.size(); i++) { + indicesToCountsSorted.put(indices.get(i).longValue(), counts.get(i).longValue()); + } + + List resultIndices = new ArrayList<>(); + List resultCounts = new ArrayList<>(); + indicesToCountsSorted.forEach((index, count) -> { + resultIndices.add(index); + resultCounts.add(count); + }); + + LinkedHashMap result = new LinkedHashMap<>(); + result.put("indices", resultIndices); + result.put("counts", resultCounts); + return result; + } + + private Map convertZeroBucketToCanonicalForm(Map zeroBucket) { + if (zeroBucket == null) { + return null; + } + Map result = new HashMap<>(); + Number threshold = Types.forciblyCast(zeroBucket.get("threshold")); + if (threshold != null && threshold.doubleValue() != 0) { + result.put("threshold", threshold); + } + Number count = Types.forciblyCast(zeroBucket.get("count")); + if (count != null && count.longValue() != 0) { + result.put("count", count); + } + return result.isEmpty() ? null : result; + } + + private void mapping(XContentBuilder b) throws IOException { + b.field("type", ExponentialHistogramFieldMapper.CONTENT_TYPE); + if (ignoreMalformed) { + b.field("ignore_malformed", true); + } + } + + @Override + public List invalidExample() { + // We always support synthetic source independent of the configured mapping, so this test does not apply + return List.of(); + } + }; + } + + @Override + public void testSyntheticSourceKeepArrays() { + // exponential_histogram can't be used within an array + } + + @Override + protected IngestScriptSupport ingestScriptSupport() { + throw new AssumptionViolatedException("not yet implemented"); + } +} diff --git a/x-pack/plugin/mapper-exponential-histogram/src/yamlRestTest/java/org/elasticsearch/xpack/exponentialhistogram/ExponentialHistogramYamlTestSuiteIT.java b/x-pack/plugin/mapper-exponential-histogram/src/yamlRestTest/java/org/elasticsearch/xpack/exponentialhistogram/ExponentialHistogramYamlTestSuiteIT.java new file mode 100644 index 0000000000000..6c2ef857127cc --- /dev/null +++ b/x-pack/plugin/mapper-exponential-histogram/src/yamlRestTest/java/org/elasticsearch/xpack/exponentialhistogram/ExponentialHistogramYamlTestSuiteIT.java @@ -0,0 +1,36 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.exponentialhistogram; + +import com.carrotsearch.randomizedtesting.annotations.Name; +import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; + +import org.elasticsearch.test.cluster.ElasticsearchCluster; +import org.elasticsearch.test.rest.yaml.ClientYamlTestCandidate; +import org.elasticsearch.test.rest.yaml.ESClientYamlSuiteTestCase; +import org.junit.ClassRule; + +public class ExponentialHistogramYamlTestSuiteIT extends ESClientYamlSuiteTestCase { + + public ExponentialHistogramYamlTestSuiteIT(@Name("yaml") ClientYamlTestCandidate testCandidate) { + super(testCandidate); + } + + @ParametersFactory + public static Iterable parameters() throws Exception { + return ESClientYamlSuiteTestCase.createParameters(); + } + + @ClassRule + public static ElasticsearchCluster cluster = ElasticsearchCluster.local().module("exponential-histogram").build(); + + @Override + protected String getTestRestCluster() { + return cluster.getHttpAddresses(); + } +} diff --git a/x-pack/plugin/mapper-exponential-histogram/src/yamlRestTest/resources/rest-api-spec/test/10_synthetic_source.yml b/x-pack/plugin/mapper-exponential-histogram/src/yamlRestTest/resources/rest-api-spec/test/10_synthetic_source.yml new file mode 100644 index 0000000000000..461748b552421 --- /dev/null +++ b/x-pack/plugin/mapper-exponential-histogram/src/yamlRestTest/resources/rest-api-spec/test/10_synthetic_source.yml @@ -0,0 +1,235 @@ +setup: + +# - skip: +# version: " - 8.12.99" +# reason: "exponential_histogram was added in 8.13" + + - do: + indices.create: + index: test_exponential_histogram + body: + settings: + index: + mapping: + source: + mode: synthetic + mappings: + properties: + my_histo: + type: exponential_histogram + +--- +"Sparse unsorted histogram": + - do: + index: + index: test_exponential_histogram + id: "1" + refresh: true + body: + my_histo: + scale: 12 + zero: + threshold: 0.123456 + count: 42 + positive: + indices: [25, 26, 99999999, -10, -1000000] + counts: [3, 4, 5, 2, 1] + negative: + indices: [-1000, 123456, -999] + counts: [10, 11, 20] + + - do: + get: + index: test_exponential_histogram + id: "1" + - match: + _source.my_histo: + scale: 12 + zero: + threshold: 0.123456 + count: 42 + positive: + indices: [-1000000, -10, 25, 26, 99999999] + counts: [1, 2, 3, 4, 5] + negative: + indices: [-1000, -999, 123456] + counts: [10, 20, 11] + +--- +"Dense histogram": + - do: + index: + index: test_exponential_histogram + id: "1" + refresh: true + body: + my_histo: + scale: -10 + positive: + indices: [1,2,3,4,5] + counts: [6,7,8,9,10] + negative: + indices: [-1,0,1,2,3,4,5,6,7,8] + counts: [1,2,3,4,5,6,7,8,9,10] + + - do: + get: + index: test_exponential_histogram + id: "1" + - match: + _source.my_histo: + scale: -10 + positive: + indices: [1,2,3,4,5] + counts: [6,7,8,9,10] + negative: + indices: [-1,0,1,2,3,4,5,6,7,8] + counts: [1,2,3,4,5,6,7,8,9,10] + +--- +"Only positive buckets": + - do: + index: + index: test_exponential_histogram + id: "1" + refresh: true + body: + my_histo: + scale: 0 + positive: + indices: [-100, 10, 20] + counts: [3, 2, 1] + + - do: + get: + index: test_exponential_histogram + id: "1" + - match: + _source.my_histo: + scale: 0 + positive: + indices: [-100, 10, 20] + counts: [3, 2, 1] + +--- +"Only negative buckets": + - do: + index: + index: test_exponential_histogram + id: "1" + refresh: true + body: + my_histo: + scale: 0 + negative: + indices: [-100, 10, 20] + counts: [3, 2, 1] + + - do: + get: + index: test_exponential_histogram + id: "1" + - match: + _source.my_histo: + scale: 0 + negative: + indices: [-100, 10, 20] + counts: [3, 2, 1] +--- +"Empty histogram": + - do: + index: + index: test_exponential_histogram + id: "1" + refresh: true + body: + my_histo: + scale: -7 + - do: + get: + index: test_exponential_histogram + id: "1" + - match: + _source.my_histo: + scale: -7 + +--- +"Empty Zero Bucket but with custom threshold": + - do: + index: + index: test_exponential_histogram + id: "1" + refresh: true + body: + my_histo: + scale: 0 + zero: + threshold: 42.7 + - do: + get: + index: test_exponential_histogram + id: "1" + - match: + _source.my_histo: + scale: 0 + zero: + threshold: 42.7 + +--- +"Non-empty zero bucket with default zero threshold": + - do: + index: + index: test_exponential_histogram + id: "1" + refresh: true + body: + my_histo: + scale: 0 + zero: + count: 101 + - do: + get: + index: test_exponential_histogram + id: "1" + - match: + _source.my_histo: + scale: 0 + zero: + count: 101 + + +--- +"Numeric Limits": + - do: + index: + index: test_exponential_histogram + id: "1" + refresh: true + body: + my_histo: + scale: 38 + zero: + count: 2305843009213693952 # 2^61 to not cause overflows for the total value count sum + threshold: 1E-300 + positive: + indices: [-4611686018427387903, 4611686018427387903] + counts: [1, 2305843009213693952] + negative: + indices: [-4611686018427387903, 4611686018427387903] + counts: [2305843009213693952, 1] + - do: + get: + index: test_exponential_histogram + id: "1" + - match: + _source.my_histo: + scale: 38 + zero: + count: 2305843009213693952 + threshold: 1E-300 + positive: + indices: [-4611686018427387903, 4611686018427387903] + counts: [1, 2305843009213693952] + negative: + indices: [-4611686018427387903, 4611686018427387903] + counts: [2305843009213693952, 1]