Skip to content

Commit 3bb1d2b

Browse files
committed
Add mapper for exponential_histogram type
1 parent f170dcd commit 3bb1d2b

File tree

13 files changed

+1990
-25
lines changed

13 files changed

+1990
-25
lines changed

benchmarks/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ dependencies {
4747
api(project(':x-pack:plugin:core'))
4848
api(project(':x-pack:plugin:esql'))
4949
api(project(':x-pack:plugin:esql:compute'))
50+
api(project(':x-pack:plugin:mapper-exponential-histogram'))
5051
implementation project(path: ':libs:native')
5152
implementation project(path: ':libs:simdvec')
5253
implementation project(path: ':libs:exponential-histogram')

benchmarks/src/main/java/org/elasticsearch/benchmark/exponentialhistogram/ExponentialHistogramMergeBench.java

Lines changed: 51 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,14 @@
99

1010
package org.elasticsearch.benchmark.exponentialhistogram;
1111

12+
import org.apache.lucene.util.BytesRef;
13+
import org.elasticsearch.common.io.stream.BytesStreamOutput;
1214
import org.elasticsearch.exponentialhistogram.BucketIterator;
1315
import org.elasticsearch.exponentialhistogram.ExponentialHistogram;
1416
import org.elasticsearch.exponentialhistogram.ExponentialHistogramGenerator;
1517
import org.elasticsearch.exponentialhistogram.ExponentialHistogramMerger;
18+
import org.elasticsearch.xpack.exponentialhistogram.CompressedExponentialHistogram;
19+
import org.elasticsearch.xpack.exponentialhistogram.IndexWithCount;
1620
import org.openjdk.jmh.annotations.Benchmark;
1721
import org.openjdk.jmh.annotations.BenchmarkMode;
1822
import org.openjdk.jmh.annotations.Fork;
@@ -26,6 +30,8 @@
2630
import org.openjdk.jmh.annotations.Threads;
2731
import org.openjdk.jmh.annotations.Warmup;
2832

33+
import java.io.IOException;
34+
import java.util.ArrayList;
2935
import java.util.List;
3036
import java.util.Random;
3137
import java.util.concurrent.ThreadLocalRandom;
@@ -43,9 +49,12 @@ public class ExponentialHistogramMergeBench {
4349
@Param({ "1000", "2000", "5000" })
4450
int bucketCount;
4551

46-
@Param({ "0.01", "0.1", "0.25", "0.5", "1.0", "2.0" })
52+
@Param({ "1.0" })
4753
double mergedHistoSizeFactor;
4854

55+
@Param({ "array-backed", "compressed" })
56+
String histoImplementation;
57+
4958
Random random;
5059
ExponentialHistogramMerger histoMerger;
5160

@@ -79,16 +88,54 @@ public void setUp() {
7988
bucketIndex += 1 + random.nextInt(bucketCount) % (Math.max(1, bucketCount / dataPointSize));
8089
generator.add(Math.pow(1.001, bucketIndex));
8190
}
82-
toMerge[i] = generator.get();
83-
cnt = getBucketCount(toMerge[i]);
91+
ExponentialHistogram histogram = generator.get();
92+
cnt = getBucketCount(histogram);
8493
if (cnt < dataPointSize) {
85-
throw new IllegalArgumentException("Expected bucket count to be " + dataPointSize + ", but was " + cnt);
94+
throw new IllegalStateException("Expected bucket count to be " + dataPointSize + ", but was " + cnt);
95+
}
96+
97+
if ("array-backed".equals(histoImplementation)) {
98+
toMerge[i] = histogram;
99+
} else if ("compressed".equals(histoImplementation)) {
100+
toMerge[i] = asCompressedHistogram(histogram);
101+
} else {
102+
throw new IllegalArgumentException("Unknown implementation: " + histoImplementation);
86103
}
87104
}
88105

89106
index = 0;
90107
}
91108

109+
private ExponentialHistogram asCompressedHistogram(ExponentialHistogram histogram) {
110+
List<IndexWithCount> negativeBuckets = new ArrayList<>();
111+
List<IndexWithCount> positiveBuckets = new ArrayList<>();
112+
113+
BucketIterator it = histogram.negativeBuckets().iterator();
114+
while (it.hasNext()) {
115+
negativeBuckets.add(new IndexWithCount(it.peekIndex(), it.peekCount()));
116+
it.advance();
117+
}
118+
it = histogram.positiveBuckets().iterator();
119+
while (it.hasNext()) {
120+
positiveBuckets.add(new IndexWithCount(it.peekIndex(), it.peekCount()));
121+
it.advance();
122+
}
123+
124+
long totalCount = histogram.zeroBucket().count() + histogram.negativeBuckets().valueCount() + histogram.positiveBuckets()
125+
.valueCount();
126+
BytesStreamOutput histoBytes = new BytesStreamOutput();
127+
try {
128+
CompressedExponentialHistogram.writeHistogramBytes(histoBytes, histogram.scale(), negativeBuckets, positiveBuckets);
129+
CompressedExponentialHistogram result = new CompressedExponentialHistogram();
130+
BytesRef data = histoBytes.bytes().toBytesRef();
131+
result.reset(histogram.zeroBucket().zeroThreshold(), totalCount, data);
132+
return result;
133+
} catch (IOException e) {
134+
throw new RuntimeException(e);
135+
}
136+
137+
}
138+
92139
private static int getBucketCount(ExponentialHistogram histo) {
93140
int cnt = 0;
94141
for (BucketIterator it : List.of(histo.negativeBuckets().iterator(), histo.positiveBuckets().iterator())) {

libs/exponential-histogram/src/main/java/org/elasticsearch/exponentialhistogram/ZeroBucket.java

Lines changed: 72 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
package org.elasticsearch.exponentialhistogram;
2323

24+
import static org.elasticsearch.exponentialhistogram.ExponentialHistogram.MAX_INDEX;
2425
import static org.elasticsearch.exponentialhistogram.ExponentialHistogram.MAX_SCALE;
2526
import static org.elasticsearch.exponentialhistogram.ExponentialHistogram.MIN_INDEX;
2627
import static org.elasticsearch.exponentialhistogram.ExponentialHistogram.MIN_SCALE;
@@ -34,13 +35,26 @@
3435
* To allow efficient comparison with bucket boundaries, this class internally
3536
* represents the zero threshold as a exponential histogram bucket index with a scale,
3637
* computed via {@link ExponentialScaleUtils#computeIndex(double, int)}.
37-
*
38-
* @param index The index used with the scale to determine the zero threshold.
39-
* @param scale The scale used with the index to determine the zero threshold.
40-
* @param count The number of values in the zero bucket.
4138
*/
42-
public record ZeroBucket(long index, int scale, long count) {
39+
public final class ZeroBucket {
4340

41+
/**
42+
* The exponential histogram scale used for {@link #index}
43+
*/
44+
private final int scale;
45+
46+
/**
47+
* The exponential histogram bucket index whose upper boundary corresponds to the zero threshold.
48+
* Might be computed lazily from {@link #realThreshold}, uses {@link Long#MAX_VALUE} as placeholder in this case.
49+
*/
50+
private long index;
51+
52+
/**
53+
* Might be computed lazily from {@link #realThreshold}, uses {@link Long#MAX_VALUE} as placeholder in this case.
54+
*/
55+
private double realThreshold;
56+
57+
private final long count;
4458
// A singleton for an empty zero bucket with the smallest possible threshold.
4559
private static final ZeroBucket MINIMAL_EMPTY = new ZeroBucket(MIN_INDEX, MIN_SCALE, 0);
4660

@@ -51,7 +65,27 @@ public record ZeroBucket(long index, int scale, long count) {
5165
* @param count The number of values in the bucket.
5266
*/
5367
public ZeroBucket(double zeroThreshold, long count) {
54-
this(computeIndex(zeroThreshold, MAX_SCALE) + 1, MAX_SCALE, count);
68+
assert zeroThreshold >= 0.0 : "zeroThreshold must not be negative";
69+
this.index = Long.MAX_VALUE; // compute lazily when needed
70+
this.scale = MAX_SCALE;
71+
this.realThreshold = zeroThreshold;
72+
this.count = count;
73+
}
74+
75+
ZeroBucket(long index, int scale, long count) {
76+
assert index >= MIN_INDEX && index <= MAX_INDEX : "index must be in range [" + MIN_INDEX + ", " + MAX_INDEX + "]";
77+
assert scale >= MIN_SCALE && scale <= MAX_SCALE : "scale must be in range [" + MIN_SCALE + ", " + MAX_SCALE + "]";
78+
this.index = index;
79+
this.scale = scale;
80+
this.realThreshold = Double.NaN; // compute lazily when needed
81+
this.count = count;
82+
}
83+
84+
private ZeroBucket(double realThreshold, long index, int scale, long count) {
85+
this.index = index;
86+
this.scale = scale;
87+
this.realThreshold = realThreshold; // compute lazily when needed
88+
this.count = count;
5589
}
5690

5791
/**
@@ -71,8 +105,33 @@ public static ZeroBucket minimalWithCount(long count) {
71105
if (count == 0) {
72106
return MINIMAL_EMPTY;
73107
} else {
74-
return new ZeroBucket(MINIMAL_EMPTY.index, MINIMAL_EMPTY.scale(), count);
108+
return new ZeroBucket(MINIMAL_EMPTY.zeroThreshold(), MINIMAL_EMPTY.index(), MINIMAL_EMPTY.scale(), count);
109+
}
110+
}
111+
112+
/**
113+
* @return The value of the zero threshold.
114+
*/
115+
public double zeroThreshold() {
116+
if (Double.isNaN(realThreshold)) {
117+
realThreshold = exponentiallyScaledToDoubleValue(index(), scale());
75118
}
119+
return realThreshold;
120+
}
121+
122+
public long index() {
123+
if (index == Long.MAX_VALUE) {
124+
index = computeIndex(zeroThreshold(), scale()) + 1;
125+
}
126+
return index;
127+
}
128+
129+
public int scale() {
130+
return scale;
131+
}
132+
133+
public long count() {
134+
return count;
76135
}
77136

78137
/**
@@ -95,9 +154,9 @@ public ZeroBucket merge(ZeroBucket other) {
95154
long totalCount = count + other.count;
96155
// Both are populated, so we need to use the higher zero-threshold.
97156
if (this.compareZeroThreshold(other) >= 0) {
98-
return new ZeroBucket(index, scale, totalCount);
157+
return new ZeroBucket(realThreshold, index, scale, totalCount);
99158
} else {
100-
return new ZeroBucket(other.index, other.scale, totalCount);
159+
return new ZeroBucket(other.realThreshold, other.index, other.scale, totalCount);
101160
}
102161
}
103162
}
@@ -129,14 +188,7 @@ public ZeroBucket collapseOverlappingBucketsForAll(BucketIterator... bucketItera
129188
* equal to, or greater than the other's.
130189
*/
131190
public int compareZeroThreshold(ZeroBucket other) {
132-
return compareExponentiallyScaledValues(index, scale, other.index, other.scale);
133-
}
134-
135-
/**
136-
* @return The value of the zero threshold.
137-
*/
138-
public double zeroThreshold() {
139-
return exponentiallyScaledToDoubleValue(index, scale);
191+
return compareExponentiallyScaledValues(index(), scale(), other.index(), other.scale());
140192
}
141193

142194
/**
@@ -150,7 +202,7 @@ public ZeroBucket collapseOverlappingBuckets(BucketIterator buckets) {
150202

151203
long collapsedCount = 0;
152204
long highestCollapsedIndex = 0;
153-
while (buckets.hasNext() && compareExponentiallyScaledValues(buckets.peekIndex(), buckets.scale(), index, scale) < 0) {
205+
while (buckets.hasNext() && compareExponentiallyScaledValues(buckets.peekIndex(), buckets.scale(), index(), scale()) < 0) {
154206
highestCollapsedIndex = buckets.peekIndex();
155207
collapsedCount += buckets.peekCount();
156208
buckets.advance();
@@ -161,9 +213,9 @@ public ZeroBucket collapseOverlappingBuckets(BucketIterator buckets) {
161213
long newZeroCount = count + collapsedCount;
162214
// +1 because we need to adjust the zero threshold to the upper boundary of the collapsed bucket
163215
long collapsedUpperBoundIndex = highestCollapsedIndex + 1;
164-
if (compareExponentiallyScaledValues(index, scale, collapsedUpperBoundIndex, buckets.scale()) >= 0) {
216+
if (compareExponentiallyScaledValues(index(), scale(), collapsedUpperBoundIndex, buckets.scale()) >= 0) {
165217
// Our current zero-threshold is larger than the upper boundary of the largest collapsed bucket, so we keep it.
166-
return new ZeroBucket(index, scale, newZeroCount);
218+
return new ZeroBucket(realThreshold, index, scale, newZeroCount);
167219
} else {
168220
return new ZeroBucket(collapsedUpperBoundIndex, buckets.scale(), newZeroCount);
169221
}

libs/exponential-histogram/src/test/java/org/elasticsearch/exponentialhistogram/ExponentialHistogramMergerTests.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,12 @@ public void testMergeOrderIndependence() {
145145
ExponentialHistogram shuffled = ExponentialHistogram.create(20, values.stream().mapToDouble(Double::doubleValue).toArray());
146146

147147
assertThat("Expected same scale", shuffled.scale(), equalTo(reference.scale()));
148-
assertThat("Expected same zero-bucket", shuffled.zeroBucket(), equalTo(reference.zeroBucket()));
148+
assertThat(
149+
"Expected same threshold for zero-bucket",
150+
shuffled.zeroBucket().zeroThreshold(),
151+
equalTo(reference.zeroBucket().zeroThreshold())
152+
);
153+
assertThat("Expected same count for zero-bucket", shuffled.zeroBucket().count(), equalTo(reference.zeroBucket().count()));
149154
assertBucketsEqual(shuffled.negativeBuckets(), reference.negativeBuckets());
150155
assertBucketsEqual(shuffled.positiveBuckets(), reference.positiveBuckets());
151156
}

libs/exponential-histogram/src/test/java/org/elasticsearch/exponentialhistogram/ZeroBucketTests.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,4 +30,33 @@ public class ZeroBucketTests extends ESTestCase {
3030
public void testMinimalBucketHasZeroThreshold() {
3131
assertThat(ZeroBucket.minimalWithCount(42).zeroThreshold(), equalTo(0.0));
3232
}
33+
34+
public void testExactThresholdPreserved() {
35+
ZeroBucket bucket = new ZeroBucket(3.0, 10);
36+
assertThat(bucket.zeroThreshold(), equalTo(3.0));
37+
}
38+
39+
public void testMergingPreservesExactThreshold() {
40+
ZeroBucket bucketA = new ZeroBucket(3.0, 10);
41+
ZeroBucket bucketB = new ZeroBucket(3.5, 20);
42+
ZeroBucket merged = bucketA.merge(bucketB);
43+
assertThat(merged.zeroThreshold(), equalTo(3.5));
44+
assertThat(merged.count(), equalTo(30L));
45+
}
46+
47+
public void testBucketCollapsingPreservesExactThreshold() {
48+
FixedCapacityExponentialHistogram histo = new FixedCapacityExponentialHistogram(2);
49+
histo.resetBuckets(0);
50+
histo.tryAddBucket(0, 42, true); // bucket [1,2]
51+
52+
ZeroBucket bucketA = new ZeroBucket(3.0, 10);
53+
54+
CopyableBucketIterator iterator = histo.positiveBuckets().iterator();
55+
ZeroBucket merged = bucketA.collapseOverlappingBuckets(iterator);
56+
57+
assertThat(iterator.hasNext(), equalTo(false));
58+
assertThat(merged.zeroThreshold(), equalTo(3.0));
59+
assertThat(merged.count(), equalTo(52L));
60+
}
61+
3362
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
apply plugin: 'elasticsearch.internal-es-plugin'
9+
apply plugin: 'elasticsearch.internal-yaml-rest-test'
10+
11+
esplugin {
12+
name = 'exponential-histogram'
13+
description = 'Module for the exponential_histogram field type'
14+
classname ='org.elasticsearch.xpack.exponentialhistogram.ExponentialHistogramMapperPlugin'
15+
extendedPlugins = ['x-pack-core']
16+
}
17+
base {
18+
archivesName = 'x-pack-exponential-histogram'
19+
}
20+
21+
dependencies {
22+
api project(":libs:exponential-histogram")
23+
compileOnly project(path: xpackModule('core'))
24+
yamlRestTestImplementation(testArtifact(project(xpackModule('core'))))
25+
}
26+
27+
restResources {
28+
restApi {
29+
include '_common', 'indices', 'index', 'get'
30+
}
31+
}

0 commit comments

Comments
 (0)