Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
93a59b9
Add mapper for exponential_histogram type
JonasKunz Aug 6, 2025
ce6796e
Fix exception types, remove todos
JonasKunz Aug 6, 2025
d861def
Fix javadoc
JonasKunz Aug 6, 2025
f48361b
Fix zerobucket comments and visibility
JonasKunz Aug 7, 2025
a6f53fa
Merge remote-tracking branch 'elastic/main' into exp-histo-mapper
JonasKunz Aug 8, 2025
875c34d
Fix benchmark changes
JonasKunz Aug 8, 2025
69cd05b
Fix yaml test name
JonasKunz Aug 8, 2025
ff78a50
Review fixes
JonasKunz Aug 15, 2025
70892ab
Split encoding and histogram implementation, general code cleanup
JonasKunz Aug 15, 2025
e86fdaa
Merge remote-tracking branch 'elastic/main' into exp-histo-mapper
JonasKunz Aug 15, 2025
3049825
Merge branch 'main' into exp-histo-mapper
JonasKunz Aug 18, 2025
53e1cb0
Update x-pack/plugin/mapper-exponential-histogram/src/main/java/org/e…
JonasKunz Aug 19, 2025
20f6597
Update x-pack/plugin/mapper-exponential-histogram/src/main/java/org/e…
JonasKunz Aug 19, 2025
184904a
[CI] Auto commit changes from spotless
Aug 19, 2025
cfe3d1f
use multi-line comment to make spotless not mess up the formatting
JonasKunz Aug 19, 2025
116b64b
Merge remote-tracking branch 'origin/exp-histo-mapper' into exp-histo…
JonasKunz Aug 19, 2025
977c26a
Rename "load" to "decode"
JonasKunz Aug 19, 2025
b71dc55
Add comment explaning why invalid mapping test does not apply
JonasKunz Aug 19, 2025
e0298da
Refactor bucket encoding, fix comment
JonasKunz Aug 19, 2025
4096604
Merge remote-tracking branch 'elastic/main' into exp-histo-mapper
JonasKunz Aug 19, 2025
0f845dd
Apply suggestions from code review
JonasKunz Aug 20, 2025
b65efb0
Merge branch 'main' into exp-histo-mapper
JonasKunz Aug 20, 2025
f539f5a
checkstyle
JonasKunz Aug 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions benchmarks/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ dependencies {
api(project(':x-pack:plugin:core'))
api(project(':x-pack:plugin:esql'))
api(project(':x-pack:plugin:esql:compute'))
api(project(':x-pack:plugin:mapper-exponential-histogram'))
implementation project(path: ':libs:native')
implementation project(path: ':libs:simdvec')
implementation project(path: ':libs:exponential-histogram')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,14 @@

package org.elasticsearch.benchmark.exponentialhistogram;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.exponentialhistogram.BucketIterator;
import org.elasticsearch.exponentialhistogram.ExponentialHistogram;
import org.elasticsearch.exponentialhistogram.ExponentialHistogramGenerator;
import org.elasticsearch.exponentialhistogram.ExponentialHistogramMerger;
import org.elasticsearch.xpack.exponentialhistogram.CompressedExponentialHistogram;
import org.elasticsearch.xpack.exponentialhistogram.IndexWithCount;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
Expand All @@ -26,6 +30,8 @@
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
Expand All @@ -46,6 +52,9 @@ public class ExponentialHistogramMergeBench {
@Param({ "0.01", "0.1", "0.25", "0.5", "1.0", "2.0" })
double mergedHistoSizeFactor;

@Param({ "array-backed", "compressed" })
String histoImplementation;

Random random;
ExponentialHistogramMerger histoMerger;

Expand Down Expand Up @@ -79,16 +88,54 @@ public void setUp() {
bucketIndex += 1 + random.nextInt(bucketCount) % (Math.max(1, bucketCount / dataPointSize));
generator.add(Math.pow(1.001, bucketIndex));
}
toMerge[i] = generator.get();
cnt = getBucketCount(toMerge[i]);
ExponentialHistogram histogram = generator.get();
cnt = getBucketCount(histogram);
if (cnt < dataPointSize) {
throw new IllegalArgumentException("Expected bucket count to be " + dataPointSize + ", but was " + cnt);
throw new IllegalStateException("Expected bucket count to be " + dataPointSize + ", but was " + cnt);
}

if ("array-backed".equals(histoImplementation)) {
toMerge[i] = histogram;
} else if ("compressed".equals(histoImplementation)) {
toMerge[i] = asCompressedHistogram(histogram);
} else {
throw new IllegalArgumentException("Unknown implementation: " + histoImplementation);
}
}

index = 0;
}

private ExponentialHistogram asCompressedHistogram(ExponentialHistogram histogram) {
List<IndexWithCount> negativeBuckets = new ArrayList<>();
List<IndexWithCount> positiveBuckets = new ArrayList<>();

BucketIterator it = histogram.negativeBuckets().iterator();
while (it.hasNext()) {
negativeBuckets.add(new IndexWithCount(it.peekIndex(), it.peekCount()));
it.advance();
}
it = histogram.positiveBuckets().iterator();
while (it.hasNext()) {
positiveBuckets.add(new IndexWithCount(it.peekIndex(), it.peekCount()));
it.advance();
}

long totalCount = histogram.zeroBucket().count() + histogram.negativeBuckets().valueCount() + histogram.positiveBuckets()
.valueCount();
BytesStreamOutput histoBytes = new BytesStreamOutput();
try {
CompressedExponentialHistogram.writeHistogramBytes(histoBytes, histogram.scale(), negativeBuckets, positiveBuckets);
CompressedExponentialHistogram result = new CompressedExponentialHistogram();
BytesRef data = histoBytes.bytes().toBytesRef();
result.reset(histogram.zeroBucket().zeroThreshold(), totalCount, data);
return result;
} catch (IOException e) {
throw new RuntimeException(e);
}

}

private static int getBucketCount(ExponentialHistogram histo) {
int cnt = 0;
for (BucketIterator it : List.of(histo.negativeBuckets().iterator(), histo.positiveBuckets().iterator())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

package org.elasticsearch.exponentialhistogram;

import static org.elasticsearch.exponentialhistogram.ExponentialHistogram.MAX_INDEX;
import static org.elasticsearch.exponentialhistogram.ExponentialHistogram.MAX_SCALE;
import static org.elasticsearch.exponentialhistogram.ExponentialHistogram.MIN_INDEX;
import static org.elasticsearch.exponentialhistogram.ExponentialHistogram.MIN_SCALE;
Expand All @@ -34,13 +35,26 @@
* To allow efficient comparison with bucket boundaries, this class internally
* represents the zero threshold as a exponential histogram bucket index with a scale,
* computed via {@link ExponentialScaleUtils#computeIndex(double, int)}.
*
* @param index The index used with the scale to determine the zero threshold.
* @param scale The scale used with the index to determine the zero threshold.
* @param count The number of values in the zero bucket.
*/
public record ZeroBucket(long index, int scale, long count) {
public final class ZeroBucket {

/**
* The exponential histogram scale used for {@link #index}
*/
private final int scale;

/**
* The exponential histogram bucket index whose upper boundary corresponds to the zero threshold.
* Might be computed lazily from {@link #realThreshold}, uses {@link Long#MAX_VALUE} as placeholder in this case.
*/
private long index;

/**
* Might be computed lazily from {@link #realThreshold}, uses {@link Double#NaN} as placeholder in this case.
*/
private double realThreshold;

private final long count;
// A singleton for an empty zero bucket with the smallest possible threshold.
private static final ZeroBucket MINIMAL_EMPTY = new ZeroBucket(MIN_INDEX, MIN_SCALE, 0);

Expand All @@ -51,7 +65,27 @@ public record ZeroBucket(long index, int scale, long count) {
* @param count The number of values in the bucket.
*/
public ZeroBucket(double zeroThreshold, long count) {
this(computeIndex(zeroThreshold, MAX_SCALE) + 1, MAX_SCALE, count);
assert zeroThreshold >= 0.0 : "zeroThreshold must not be negative";
this.index = Long.MAX_VALUE; // compute lazily when needed
this.scale = MAX_SCALE;
this.realThreshold = zeroThreshold;
this.count = count;
}

private ZeroBucket(long index, int scale, long count) {
assert index >= MIN_INDEX && index <= MAX_INDEX : "index must be in range [" + MIN_INDEX + ", " + MAX_INDEX + "]";
assert scale >= MIN_SCALE && scale <= MAX_SCALE : "scale must be in range [" + MIN_SCALE + ", " + MAX_SCALE + "]";
this.index = index;
this.scale = scale;
this.realThreshold = Double.NaN; // compute lazily when needed
this.count = count;
}

private ZeroBucket(double realThreshold, long index, int scale, long count) {
this.realThreshold = realThreshold;
this.index = index;
this.scale = scale;
this.count = count;
}

/**
Expand All @@ -71,8 +105,33 @@ public static ZeroBucket minimalWithCount(long count) {
if (count == 0) {
return MINIMAL_EMPTY;
} else {
return new ZeroBucket(MINIMAL_EMPTY.index, MINIMAL_EMPTY.scale(), count);
return new ZeroBucket(MINIMAL_EMPTY.zeroThreshold(), MINIMAL_EMPTY.index(), MINIMAL_EMPTY.scale(), count);
}
}

/**
* @return The value of the zero threshold.
*/
public double zeroThreshold() {
if (Double.isNaN(realThreshold)) {
realThreshold = exponentiallyScaledToDoubleValue(index(), scale());
}
return realThreshold;
}

public long index() {
if (index == Long.MAX_VALUE) {
index = computeIndex(zeroThreshold(), scale()) + 1;
}
return index;
}

public int scale() {
return scale;
}

public long count() {
return count;
}

/**
Expand All @@ -95,9 +154,9 @@ public ZeroBucket merge(ZeroBucket other) {
long totalCount = count + other.count;
// Both are populated, so we need to use the higher zero-threshold.
if (this.compareZeroThreshold(other) >= 0) {
return new ZeroBucket(index, scale, totalCount);
return new ZeroBucket(realThreshold, index, scale, totalCount);
} else {
return new ZeroBucket(other.index, other.scale, totalCount);
return new ZeroBucket(other.realThreshold, other.index, other.scale, totalCount);
}
}
}
Expand Down Expand Up @@ -129,14 +188,7 @@ public ZeroBucket collapseOverlappingBucketsForAll(BucketIterator... bucketItera
* equal to, or greater than the other's.
*/
public int compareZeroThreshold(ZeroBucket other) {
return compareExponentiallyScaledValues(index, scale, other.index, other.scale);
}

/**
* @return The value of the zero threshold.
*/
public double zeroThreshold() {
return exponentiallyScaledToDoubleValue(index, scale);
return compareExponentiallyScaledValues(index(), scale(), other.index(), other.scale());
}

/**
Expand All @@ -150,7 +202,7 @@ public ZeroBucket collapseOverlappingBuckets(BucketIterator buckets) {

long collapsedCount = 0;
long highestCollapsedIndex = 0;
while (buckets.hasNext() && compareExponentiallyScaledValues(buckets.peekIndex(), buckets.scale(), index, scale) < 0) {
while (buckets.hasNext() && compareExponentiallyScaledValues(buckets.peekIndex(), buckets.scale(), index(), scale()) < 0) {
highestCollapsedIndex = buckets.peekIndex();
collapsedCount += buckets.peekCount();
buckets.advance();
Expand All @@ -161,9 +213,9 @@ public ZeroBucket collapseOverlappingBuckets(BucketIterator buckets) {
long newZeroCount = count + collapsedCount;
// +1 because we need to adjust the zero threshold to the upper boundary of the collapsed bucket
long collapsedUpperBoundIndex = highestCollapsedIndex + 1;
if (compareExponentiallyScaledValues(index, scale, collapsedUpperBoundIndex, buckets.scale()) >= 0) {
if (compareExponentiallyScaledValues(index(), scale(), collapsedUpperBoundIndex, buckets.scale()) >= 0) {
// Our current zero-threshold is larger than the upper boundary of the largest collapsed bucket, so we keep it.
return new ZeroBucket(index, scale, newZeroCount);
return new ZeroBucket(realThreshold, index, scale, newZeroCount);
} else {
return new ZeroBucket(collapsedUpperBoundIndex, buckets.scale(), newZeroCount);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,12 @@ public void testMergeOrderIndependence() {
ExponentialHistogram shuffled = ExponentialHistogram.create(20, values.stream().mapToDouble(Double::doubleValue).toArray());

assertThat("Expected same scale", shuffled.scale(), equalTo(reference.scale()));
assertThat("Expected same zero-bucket", shuffled.zeroBucket(), equalTo(reference.zeroBucket()));
assertThat(
"Expected same threshold for zero-bucket",
shuffled.zeroBucket().zeroThreshold(),
equalTo(reference.zeroBucket().zeroThreshold())
);
assertThat("Expected same count for zero-bucket", shuffled.zeroBucket().count(), equalTo(reference.zeroBucket().count()));
assertBucketsEqual(shuffled.negativeBuckets(), reference.negativeBuckets());
assertBucketsEqual(shuffled.positiveBuckets(), reference.positiveBuckets());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,33 @@ public class ZeroBucketTests extends ESTestCase {
public void testMinimalBucketHasZeroThreshold() {
assertThat(ZeroBucket.minimalWithCount(42).zeroThreshold(), equalTo(0.0));
}

public void testExactThresholdPreserved() {
ZeroBucket bucket = new ZeroBucket(3.0, 10);
assertThat(bucket.zeroThreshold(), equalTo(3.0));
}

public void testMergingPreservesExactThreshold() {
ZeroBucket bucketA = new ZeroBucket(3.0, 10);
ZeroBucket bucketB = new ZeroBucket(3.5, 20);
ZeroBucket merged = bucketA.merge(bucketB);
assertThat(merged.zeroThreshold(), equalTo(3.5));
assertThat(merged.count(), equalTo(30L));
}

public void testBucketCollapsingPreservesExactThreshold() {
FixedCapacityExponentialHistogram histo = new FixedCapacityExponentialHistogram(2);
histo.resetBuckets(0);
histo.tryAddBucket(0, 42, true); // bucket [1,2]

ZeroBucket bucketA = new ZeroBucket(3.0, 10);

CopyableBucketIterator iterator = histo.positiveBuckets().iterator();
ZeroBucket merged = bucketA.collapseOverlappingBuckets(iterator);

assertThat(iterator.hasNext(), equalTo(false));
assertThat(merged.zeroThreshold(), equalTo(3.0));
assertThat(merged.count(), equalTo(52L));
}

}
31 changes: 31 additions & 0 deletions x-pack/plugin/mapper-exponential-histogram/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

apply plugin: 'elasticsearch.internal-es-plugin'
apply plugin: 'elasticsearch.internal-yaml-rest-test'

esplugin {
name = 'exponential-histogram'
description = 'Module for the exponential_histogram field type'
classname ='org.elasticsearch.xpack.exponentialhistogram.ExponentialHistogramMapperPlugin'
extendedPlugins = ['x-pack-core']
}
base {
archivesName = 'x-pack-exponential-histogram'
}

dependencies {
api project(":libs:exponential-histogram")
compileOnly project(path: xpackModule('core'))
yamlRestTestImplementation(testArtifact(project(xpackModule('core'))))
}

restResources {
restApi {
include '_common', 'indices', 'index', 'get'
}
}
Loading