Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
93a59b9
Add mapper for exponential_histogram type
JonasKunz Aug 6, 2025
ce6796e
Fix exception types, remove todos
JonasKunz Aug 6, 2025
d861def
Fix javadoc
JonasKunz Aug 6, 2025
f48361b
Fix zerobucket comments and visibility
JonasKunz Aug 7, 2025
a6f53fa
Merge remote-tracking branch 'elastic/main' into exp-histo-mapper
JonasKunz Aug 8, 2025
875c34d
Fix benchmark changes
JonasKunz Aug 8, 2025
69cd05b
Fix yaml test name
JonasKunz Aug 8, 2025
ff78a50
Review fixes
JonasKunz Aug 15, 2025
70892ab
Split encoding and histogram implementation, general code cleanup
JonasKunz Aug 15, 2025
e86fdaa
Merge remote-tracking branch 'elastic/main' into exp-histo-mapper
JonasKunz Aug 15, 2025
3049825
Merge branch 'main' into exp-histo-mapper
JonasKunz Aug 18, 2025
53e1cb0
Update x-pack/plugin/mapper-exponential-histogram/src/main/java/org/e…
JonasKunz Aug 19, 2025
20f6597
Update x-pack/plugin/mapper-exponential-histogram/src/main/java/org/e…
JonasKunz Aug 19, 2025
184904a
[CI] Auto commit changes from spotless
Aug 19, 2025
cfe3d1f
use multi-line comment to make spotless not mess up the formatting
JonasKunz Aug 19, 2025
116b64b
Merge remote-tracking branch 'origin/exp-histo-mapper' into exp-histo…
JonasKunz Aug 19, 2025
977c26a
Rename "load" to "decode"
JonasKunz Aug 19, 2025
b71dc55
Add comment explaning why invalid mapping test does not apply
JonasKunz Aug 19, 2025
e0298da
Refactor bucket encoding, fix comment
JonasKunz Aug 19, 2025
4096604
Merge remote-tracking branch 'elastic/main' into exp-histo-mapper
JonasKunz Aug 19, 2025
0f845dd
Apply suggestions from code review
JonasKunz Aug 20, 2025
b65efb0
Merge branch 'main' into exp-histo-mapper
JonasKunz Aug 20, 2025
f539f5a
checkstyle
JonasKunz Aug 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public void setUp() {
bucketIndex += 1 + random.nextInt(bucketCount) % (Math.max(1, bucketCount / dataPointSize));
generator.add(Math.pow(1.001, bucketIndex));
}
ExponentialHistogram histogram = generator.get();
ExponentialHistogram histogram = generator.getAndClear();
cnt = getBucketCount(histogram);
if (cnt < dataPointSize) {
throw new IllegalStateException("Expected bucket count to be " + dataPointSize + ", but was " + cnt);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

package org.elasticsearch.exponentialhistogram;

import org.apache.lucene.util.RamUsageEstimator;

import static org.elasticsearch.exponentialhistogram.ExponentialHistogram.MAX_INDEX;
import static org.elasticsearch.exponentialhistogram.ExponentialHistogram.MAX_SCALE;
import static org.elasticsearch.exponentialhistogram.ExponentialHistogram.MIN_INDEX;
Expand All @@ -38,6 +40,8 @@
*/
public final class ZeroBucket {

public static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(ZeroBucket.class);

/**
* The exponential histogram scale used for {@link #index}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,17 +148,18 @@ public void testMergeOrderIndependence() {

for (int i = 0; i < 100; i++) {
Collections.shuffle(values, random());
ExponentialHistogram shuffled = ExponentialHistogram.create(20, values.stream().mapToDouble(Double::doubleValue).toArray());

assertThat("Expected same scale", shuffled.scale(), equalTo(reference.scale()));
assertThat(
"Expected same threshold for zero-bucket",
shuffled.zeroBucket().zeroThreshold(),
equalTo(reference.zeroBucket().zeroThreshold())
);
assertThat("Expected same count for zero-bucket", shuffled.zeroBucket().count(), equalTo(reference.zeroBucket().count()));
assertBucketsEqual(shuffled.negativeBuckets(), reference.negativeBuckets());
assertBucketsEqual(shuffled.positiveBuckets(), reference.positiveBuckets());
double[] vals = values.stream().mapToDouble(Double::doubleValue).toArray();
try (ReleasableExponentialHistogram shuffled = ExponentialHistogram.create(20, breaker(), vals)) {
assertThat("Expected same scale", shuffled.scale(), equalTo(reference.scale()));
assertThat(
"Expected same threshold for zero-bucket",
shuffled.zeroBucket().zeroThreshold(),
equalTo(reference.zeroBucket().zeroThreshold())
);
assertThat("Expected same count for zero-bucket", shuffled.zeroBucket().count(), equalTo(reference.zeroBucket().count()));
assertBucketsEqual(shuffled.negativeBuckets(), reference.negativeBuckets());
assertBucketsEqual(shuffled.positiveBuckets(), reference.positiveBuckets());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public void testMergingPreservesExactThreshold() {
}

public void testBucketCollapsingPreservesExactThreshold() {
FixedCapacityExponentialHistogram histo = new FixedCapacityExponentialHistogram(2);
FixedCapacityExponentialHistogram histo = createAutoReleasedHistogram(2);
histo.resetBuckets(0);
histo.tryAddBucket(0, 42, true); // bucket [1,2]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@
package org.elasticsearch.xpack.exponentialhistogram;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.ByteArrayStreamInput;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.exponentialhistogram.BucketIterator;
import org.elasticsearch.exponentialhistogram.CopyableBucketIterator;
Expand All @@ -31,28 +29,19 @@
*/
public class CompressedExponentialHistogram implements ExponentialHistogram {

private static final int SCALE_OFFSET = 11;
private static final int HAS_NEGATIVE_BUCKETS_FLAG = 1 << 6;
private static final int SCALE_MASK = 0x3F;
static {
// protection against changes to MIN_SCALE and MAX_SCALE messing with our encoding
assert MIN_SCALE + SCALE_OFFSET >= 0;
assert MAX_SCALE + SCALE_OFFSET <= SCALE_MASK;
}
private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(CompressedExponentialHistogram.class);

private int scale;
private double zeroThreshold;
private long valueCount;
private ZeroBucket lazyZeroBucket;

private final Buckets positiveBuckets = new Buckets();
private final Buckets negativeBuckets = new Buckets();

private byte[] compressedData;
private final EncodedHistogramData encodedData = new EncodedHistogramData();
private final Buckets positiveBuckets = new Buckets(true);
private final Buckets negativeBuckets = new Buckets(false);

@Override
public int scale() {
return scale;
return encodedData.scale();
}

@Override
Expand All @@ -79,46 +68,54 @@ public ExponentialHistogram.Buckets negativeBuckets() {
*
* @param zeroThreshold the zeroThreshold for the histogram, which needs to be stored externally
* @param valueCount the total number of values the histogram contains, needs to be stored externally
* @param histogramData the encoded histogram bytes which previously where generated via {@code #writeHistogramBytes}
* @param encodedHistogramData the encoded histogram bytes which previously where generated via
* {@link #writeHistogramBytes(StreamOutput, int, List, List)}.
*/
public void reset(double zeroThreshold, long valueCount, BytesRef histogramData) throws IOException {
public void reset(double zeroThreshold, long valueCount, BytesRef encodedHistogramData) throws IOException {
lazyZeroBucket = null;

this.zeroThreshold = zeroThreshold;
this.valueCount = valueCount;
this.compressedData = histogramData.bytes;
ByteArrayStreamInput input = new ByteArrayStreamInput();
input.reset(histogramData.bytes, histogramData.offset, histogramData.length);

int scaleWithFlags = input.readByte();
this.scale = (scaleWithFlags & SCALE_MASK) - SCALE_OFFSET;
boolean hasNegativeBuckets = (scaleWithFlags & HAS_NEGATIVE_BUCKETS_FLAG) != 0;
encodedData.load(encodedHistogramData);
negativeBuckets.resetCachedData();
positiveBuckets.resetCachedData();
}

int negativeBucketsLength = 0;
if (hasNegativeBuckets) {
negativeBucketsLength = input.readVInt();
}
/**
* Serializes the given histogram, so that exactly the same data can be reconstructed via {@link #reset(double, long, BytesRef)}.
*
* @param output the output to write the serialized bytes to
* @param scale the scale of the histogram
* @param negativeBuckets the negative buckets of the histogram, sorted by the bucket indices
* @param positiveBuckets the positive buckets of the histogram, sorted by the bucket indices
*/
public static void writeHistogramBytes(
StreamOutput output,
int scale,
List<IndexWithCount> negativeBuckets,
List<IndexWithCount> positiveBuckets
) throws IOException {
EncodedHistogramData.write(output, scale, negativeBuckets, positiveBuckets);
}

negativeBuckets.reset(input.getPosition(), negativeBucketsLength);
input.skipBytes(negativeBucketsLength);
positiveBuckets.reset(input.getPosition(), input.available());
@Override
public long ramBytesUsed() {
return SHALLOW_SIZE + ZeroBucket.SHALLOW_SIZE + 2 * Buckets.SHALLOW_SIZE + EncodedHistogramData.SHALLOW_SIZE;
}

private final class Buckets implements ExponentialHistogram.Buckets {

private int encodedBucketsStart;
private int encodedBucketsLength;
private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOf(Buckets.class);

private final boolean isForPositiveBuckets; // false if for negative buckets
private long cachedValueCount;
private long cachedMaxIndex;

private Buckets() {
reset(0, 0);
private Buckets(boolean isForPositiveBuckets) {
this.isForPositiveBuckets = isForPositiveBuckets;
resetCachedData();
}

private void reset(int encodedBucketsStart, int encodedBucketsLength) {
this.encodedBucketsStart = encodedBucketsStart;
this.encodedBucketsLength = encodedBucketsLength;
private void resetCachedData() {
cachedValueCount = -1;
cachedMaxIndex = Long.MIN_VALUE;
}
Expand All @@ -137,7 +134,11 @@ private void computeCachedDataIfRequired() {

@Override
public CopyableBucketIterator iterator() {
return new CompressedBucketsIterator(encodedBucketsStart, encodedBucketsLength);
if (isForPositiveBuckets) {
return new CompressedBucketsIterator(encodedData.positiveBucketsDecoder());
} else {
return new CompressedBucketsIterator(encodedData.negativeBucketsDecoder());
}
}

@Override
Expand All @@ -154,182 +155,41 @@ public long valueCount() {

private class CompressedBucketsIterator implements CopyableBucketIterator {

private long currentIndex;
/**
* The count for the bucket this iterator is currently pointing at.
* A value of {@code -1} is used to represent that the end has been reached.
*/
private long currentCount;
private final ByteArrayStreamInput bucketData;

CompressedBucketsIterator(int encodedBucketsStartOffset, int length) {
bucketData = new ByteArrayStreamInput();
if (length > 0) {
bucketData.reset(compressedData, encodedBucketsStartOffset, length);
try {
currentIndex = bucketData.readZLong() - 1;
} catch (IOException e) {
throw new IllegalStateException("Bad histogram bytes", e);
}
currentCount = 0;
advance();
} else {
// no data means we are iterating over an empty set of buckets
markEndReached();
}
}

private void markEndReached() {
currentCount = -1;
}
private final EncodedHistogramData.BucketsDecoder decoder;

CompressedBucketsIterator(CompressedBucketsIterator toCopy) {
bucketData = new ByteArrayStreamInput();
bucketData.reset(compressedData, toCopy.bucketData.getPosition(), toCopy.bucketData.available());
currentCount = toCopy.currentCount;
currentIndex = toCopy.currentIndex;
CompressedBucketsIterator(EncodedHistogramData.BucketsDecoder delegate) {
this.decoder = delegate;
}

@Override
public CopyableBucketIterator copy() {
return new CompressedBucketsIterator(this);
return new CompressedBucketsIterator(decoder.copy());
}

@Override
public final boolean hasNext() {
return currentCount > 0;
return decoder.hasNext();
}

@Override
public final long peekCount() {
ensureEndNotReached();
return currentCount;
return decoder.peekCount();
}

@Override
public final long peekIndex() {
ensureEndNotReached();
return currentIndex;
return decoder.peekIndex();
}

@Override
public int scale() {
return CompressedExponentialHistogram.this.scale();
}

/**
* For details on the encoding, see {@link #writeHistogramBytes(StreamOutput, int, List, List)}.
*/
@Override
public final void advance() {
ensureEndNotReached();
try {
if (bucketData.available() > 0) {
currentIndex++;
long countOrNumEmptyBuckets = bucketData.readZLong();
if (countOrNumEmptyBuckets < 0) {
// we have encountered a negative value, this means we "skip"
// the given amount of empty buckets
long numEmptyBuckets = -countOrNumEmptyBuckets;
currentIndex += numEmptyBuckets;
// after we have skipped empty buckets, we know that the next value is a non-empty bucket
currentCount = bucketData.readZLong();
} else {
currentCount = countOrNumEmptyBuckets;
}
assert currentCount > 0;
} else {
markEndReached();
}
} catch (IOException e) {
throw new IllegalStateException("Bad histogram bytes", e);
}
decoder.advance();
}

private void ensureEndNotReached() {
if (currentCount == -1) {
throw new IllegalStateException("end has already been reached");
}
}
}
}

/**
* Serializes the given histogram, so that exactly the same histogram can be reconstructed via {@link #reset(double, long, BytesRef)}.
*
* @param output the output to write the serialized bytes to
* @param scale the scale of the histogram
* @param negativeBuckets the negative buckets of the histogram, sorted by the bucket indices
* @param positiveBuckets the positive buckets of the histogram, sorted by the bucket indices
*/
public static void writeHistogramBytes(
StreamOutput output,
int scale,
List<IndexWithCount> negativeBuckets,
List<IndexWithCount> positiveBuckets
) throws IOException {
assert scale >= MIN_SCALE && scale <= MAX_SCALE : "scale must be in range [" + MIN_SCALE + ", " + MAX_SCALE + "]";
boolean hasNegativeBuckets = negativeBuckets.isEmpty() == false;
int scaleWithFlags = (scale + SCALE_OFFSET);
if (hasNegativeBuckets) {
scaleWithFlags |= HAS_NEGATIVE_BUCKETS_FLAG;
}
output.writeByte((byte) scaleWithFlags);
if (hasNegativeBuckets) {
BytesStreamOutput temp = new BytesStreamOutput();
serializeBuckets(temp, negativeBuckets);
BytesReference data = temp.bytes();
output.writeVInt(data.length());
output.writeBytes(data.array(), data.arrayOffset(), data.length());
}
serializeBuckets(output, positiveBuckets);
}

/**
* Encodes the given bucket indices and counts as bytes into the given output.
* The following scheme is used to maximize compression:
* <ul>
* <li>if there are no buckets, the result is an empty array ({@code byte[0]})</li>
* <li> write the index of the first bucket as ZigZag-VLong</li>
* <li> write the count of the first bucket as ZigZag-VLong</li>
* <li> for each remaining bucket:
* <ul>
* <li>if the index of the bucket is exactly {@code previousBucketIndex+1}, write the count for the bucket as ZigZag-VLong</li>
* <li>Otherwise there is at least one empty bucket between this one and the previous one.
* We compute that number as {@code n=currentBucketIndex-previousIndex-1} and then write {@code -n} out as
* ZigZag-VLong followed by the count for the bucket as ZigZag-VLong. The negation is performed to allow to
* distinguish the cases when decoding.</li>
* </ul>
* </li>
* </ul>
*
* This encoding provides a compact storage for both dense and sparse histograms:
* For dense histograms it effectively results in encoding the index of the first bucket, followed by just an array of counts.
* For sparse histograms it corresponds to an interleaved encoding of the bucket indices with delta compression and the bucket counts.
* Even partially dense histograms profit from this encoding.
*
* @param out the output to write the encoded buckets to
* @param buckets the indices and counts of the buckets to encode, must be provided sorted based on the indices.
*/
private static void serializeBuckets(StreamOutput out, List<IndexWithCount> buckets) throws IOException {
if (buckets.isEmpty()) {
return; // no buckets, therefore nothing to write
}
long minIndex = buckets.getFirst().index();
out.writeZLong(minIndex);
long prevIndex = minIndex - 1;
for (IndexWithCount indexWithCount : buckets) {
long indexDelta = indexWithCount.index() - prevIndex;
assert indexDelta > 0; // values must be sorted and unique
assert indexWithCount.count() > 0;

long numEmptyBucketsInBetween = indexDelta - 1;
if (numEmptyBucketsInBetween > 0) {
out.writeZLong(-numEmptyBucketsInBetween);
}
out.writeZLong(indexWithCount.count());

prevIndex = indexWithCount.index();
}
}
}
Loading