-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Add implementation for exponential histogram merging and percentiles #131220
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 44 commits
43dd073
da159a9
fa4efe0
fba967f
cef3b11
aac9f6d
6a2b62f
eb955cd
2eb5fdd
fd7064e
66b5e2c
2f293d0
91193bc
92efdcf
e6924e9
cab3fdf
454a9cc
486a8bd
7c6655b
a980c0e
a46914a
fefd39b
ac804c7
5e1ca08
c091758
25be13d
6100bc6
7c99c81
b308838
311f44b
54fd41d
7dad089
522a72a
33b626b
28ccef1
fe311d8
89cfb02
39257b8
57fd335
cc13e0f
2513ca2
9f8fa45
dbb3edd
5668fc2
c63ba90
af51b7c
2cea9a7
941223a
0d2a50c
ff9467a
d34da19
b822e68
c44b0b5
8f0456d
d6fc366
f170dcd
e19e944
fdc257d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,99 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the "Elastic License | ||
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side | ||
* Public License v 1"; you may not use this file except in compliance with, at | ||
* your election, the "Elastic License 2.0", the "GNU Affero General Public | ||
* License v3.0 only", or the "Server Side Public License, v 1". | ||
*/ | ||
|
||
package org.elasticsearch.benchmark.exponentialhistogram; | ||
|
||
import org.elasticsearch.exponentialhistogram.ExponentialHistogramGenerator; | ||
import org.openjdk.jmh.annotations.Benchmark; | ||
import org.openjdk.jmh.annotations.BenchmarkMode; | ||
import org.openjdk.jmh.annotations.Fork; | ||
import org.openjdk.jmh.annotations.Measurement; | ||
import org.openjdk.jmh.annotations.Mode; | ||
import org.openjdk.jmh.annotations.OutputTimeUnit; | ||
import org.openjdk.jmh.annotations.Param; | ||
import org.openjdk.jmh.annotations.Scope; | ||
import org.openjdk.jmh.annotations.Setup; | ||
import org.openjdk.jmh.annotations.State; | ||
import org.openjdk.jmh.annotations.Threads; | ||
import org.openjdk.jmh.annotations.Warmup; | ||
import org.openjdk.jmh.profile.GCProfiler; | ||
import org.openjdk.jmh.profile.StackProfiler; | ||
import org.openjdk.jmh.runner.Runner; | ||
import org.openjdk.jmh.runner.RunnerException; | ||
import org.openjdk.jmh.runner.options.Options; | ||
import org.openjdk.jmh.runner.options.OptionsBuilder; | ||
|
||
import java.util.Random; | ||
import java.util.concurrent.ThreadLocalRandom; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.function.Supplier; | ||
|
||
@BenchmarkMode(Mode.AverageTime) | ||
@OutputTimeUnit(TimeUnit.NANOSECONDS) | ||
@Warmup(iterations = 3, time = 3, timeUnit = TimeUnit.SECONDS) | ||
@Measurement(iterations = 5, time = 2, timeUnit = TimeUnit.SECONDS) | ||
@Fork(1) | ||
@Threads(1) | ||
@State(Scope.Thread) | ||
public class ExponentialHistogramGenerationBench { | ||
|
||
@Param({ "100", "500", "1000", "5000", "10000", "20000" }) | ||
int bucketCount; | ||
|
||
@Param({ "NORMAL", "GAUSSIAN" }) | ||
String distribution; | ||
|
||
Random random; | ||
ExponentialHistogramGenerator histoGenerator; | ||
|
||
double[] data = new double[1000000]; | ||
|
||
@Setup | ||
public void setUp() { | ||
random = ThreadLocalRandom.current(); | ||
histoGenerator = new ExponentialHistogramGenerator(bucketCount); | ||
|
||
Supplier<Double> nextRandom = () -> distribution.equals("GAUSSIAN") ? random.nextGaussian() : random.nextDouble(); | ||
|
||
// Make sure that we start with a non-empty histogram, as this distorts initial additions | ||
for (int i = 0; i < 10000; ++i) { | ||
histoGenerator.add(nextRandom.get()); | ||
} | ||
|
||
for (int i = 0; i < data.length; ++i) { | ||
data[i] = nextRandom.get(); | ||
} | ||
} | ||
|
||
@State(Scope.Thread) | ||
public static class ThreadState { | ||
int index = 0; | ||
} | ||
|
||
@Benchmark | ||
@BenchmarkMode(Mode.AverageTime) | ||
@OutputTimeUnit(TimeUnit.MICROSECONDS) | ||
public void add(ThreadState state) { | ||
if (state.index >= data.length) { | ||
state.index = 0; | ||
} | ||
histoGenerator.add(data[state.index++]); | ||
} | ||
|
||
public static void main(String[] args) throws RunnerException { | ||
Options opt = new OptionsBuilder().include(".*" + ExponentialHistogramGenerationBench.class.getSimpleName() + ".*") | ||
.warmupIterations(5) | ||
.measurementIterations(5) | ||
.addProfiler(GCProfiler.class) | ||
.addProfiler(StackProfiler.class) | ||
.build(); | ||
|
||
new Runner(opt).run(); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,113 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the "Elastic License | ||
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side | ||
* Public License v 1"; you may not use this file except in compliance with, at | ||
* your election, the "Elastic License 2.0", the "GNU Affero General Public | ||
* License v3.0 only", or the "Server Side Public License, v 1". | ||
*/ | ||
|
||
package org.elasticsearch.benchmark.exponentialhistogram; | ||
|
||
import org.elasticsearch.exponentialhistogram.BucketIterator; | ||
import org.elasticsearch.exponentialhistogram.ExponentialHistogram; | ||
import org.elasticsearch.exponentialhistogram.ExponentialHistogramGenerator; | ||
import org.elasticsearch.exponentialhistogram.ExponentialHistogramMerger; | ||
import org.openjdk.jmh.annotations.Benchmark; | ||
import org.openjdk.jmh.annotations.BenchmarkMode; | ||
import org.openjdk.jmh.annotations.Fork; | ||
import org.openjdk.jmh.annotations.Measurement; | ||
import org.openjdk.jmh.annotations.Mode; | ||
import org.openjdk.jmh.annotations.OutputTimeUnit; | ||
import org.openjdk.jmh.annotations.Param; | ||
import org.openjdk.jmh.annotations.Scope; | ||
import org.openjdk.jmh.annotations.Setup; | ||
import org.openjdk.jmh.annotations.State; | ||
import org.openjdk.jmh.annotations.Threads; | ||
import org.openjdk.jmh.annotations.Warmup; | ||
|
||
import java.util.List; | ||
import java.util.Random; | ||
import java.util.concurrent.ThreadLocalRandom; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
@BenchmarkMode(Mode.AverageTime) | ||
@OutputTimeUnit(TimeUnit.NANOSECONDS) | ||
@Warmup(iterations = 3, time = 3, timeUnit = TimeUnit.SECONDS) | ||
@Measurement(iterations = 5, time = 2, timeUnit = TimeUnit.SECONDS) | ||
@Fork(1) | ||
@Threads(1) | ||
@State(Scope.Thread) | ||
public class ExponentialHistogramMergeBench { | ||
|
||
@Param({ "1000", "5000" }) | ||
int bucketCount; | ||
|
||
@Param({ "0.01", "0.1", "0.25", "0.5", "1.0", "2.0" }) | ||
double mergedHistoSizeFactor; | ||
|
||
Random random; | ||
ExponentialHistogramMerger histoMerger; | ||
|
||
ExponentialHistogram[] toMerge = new ExponentialHistogram[10_000]; | ||
|
||
@Setup | ||
public void setUp() { | ||
random = ThreadLocalRandom.current(); | ||
histoMerger = new ExponentialHistogramMerger(bucketCount); | ||
|
||
ExponentialHistogramGenerator initial = new ExponentialHistogramGenerator(bucketCount); | ||
for (int j = 0; j < bucketCount; j++) { | ||
initial.add(Math.pow(1.001, j)); | ||
} | ||
ExponentialHistogram initialHisto = initial.get(); | ||
int cnt = getBucketCount(initialHisto); | ||
if (cnt < bucketCount) { | ||
throw new IllegalArgumentException("Expected bucket count to be " + bucketCount + ", but was " + cnt); | ||
} | ||
histoMerger.add(initialHisto); | ||
|
||
int dataPointSize = (int) Math.round(bucketCount * mergedHistoSizeFactor); | ||
|
||
for (int i = 0; i < toMerge.length; i++) { | ||
ExponentialHistogramGenerator generator = new ExponentialHistogramGenerator(dataPointSize); | ||
|
||
int bucketIndex = 0; | ||
for (int j = 0; j < dataPointSize; j++) { | ||
bucketIndex += 1 + random.nextInt(bucketCount) % (Math.max(1, bucketCount / dataPointSize)); | ||
generator.add(Math.pow(1.001, bucketIndex)); | ||
} | ||
toMerge[i] = generator.get(); | ||
cnt = getBucketCount(toMerge[i]); | ||
if (cnt < dataPointSize) { | ||
throw new IllegalArgumentException("Expected bucket count to be " + dataPointSize + ", but was " + cnt); | ||
} | ||
} | ||
} | ||
|
||
private static int getBucketCount(ExponentialHistogram histo) { | ||
int cnt = 0; | ||
for (BucketIterator it : List.of(histo.negativeBuckets().iterator(), histo.positiveBuckets().iterator())) { | ||
while (it.hasNext()) { | ||
cnt++; | ||
it.advance(); | ||
} | ||
} | ||
return cnt; | ||
} | ||
|
||
@State(Scope.Thread) | ||
public static class ThreadState { | ||
int index = 0; | ||
} | ||
JonasKunz marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
||
@Benchmark | ||
@BenchmarkMode(Mode.AverageTime) | ||
@OutputTimeUnit(TimeUnit.MICROSECONDS) | ||
public void add(ThreadState state) { | ||
if (state.index >= toMerge.length) { | ||
state.index = 0; | ||
} | ||
histoMerger.add(toMerge[state.index++]); | ||
} | ||
} |
JonasKunz marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,103 @@ | ||
This library provides an implementation of merging and analysis algorithms for exponential histograms based on the [OpenTelemetry definition](https://opentelemetry.io/docs/specs/otel/metrics/data-model/#exponentialhistogram). It is designed as a complementary tool to the OpenTelemetry SDK, focusing specifically on efficient histogram merging and accurate percentile estimation. | ||
|
||
## Overview | ||
|
||
The library implements base-2 exponential histograms with perfect subsetting. The most imporant properties are: | ||
JonasKunz marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
||
* The histogram has a scale parameter, which defines the accuracy. A higher scale implies a higher accuracy. | ||
* The `base` for the buckets is defined as `base = 2^(2^-scale)`. | ||
* The histogram bucket at index `i` has the range `(base^i, base^(i+1)]` | ||
* Negative values are represented by a separate negative range of buckets with the boundaries `(-base^(i+1), -base^i]` | ||
* Histograms are perfectly subsetting: increasing the scale by one merges each pair of neighboring buckets | ||
JonasKunz marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
* A special zero bucket with a zero-threshold is used to handle zero and close-to-zero values | ||
|
||
For more details please refer to the [OpenTelemetry definition](https://opentelemetry.io/docs/specs/otel/metrics/data-model/#exponentialhistogram). | ||
|
||
The library implements a sparse storage approach where only populated buckets consume memory and count towards the bucket limit. This differs from the OpenTelemetry implementation, which uses dense storage. While dense storage allows for O(1) time insertion of individual values, our sparse representation requires O(log m) time where m is the bucket capacity. However, the sparse representation enables more efficient storage and provides a simple merging algorithm with runtime linear in the number of populated buckets. Additionally, this library also provides an array-backed sparse storage, ensuring cache efficiency. | ||
|
||
The sparse storage approach offers significant advantages for distributions with fewer distinct values than the bucket count, allowing the library to achieve representation of such distributions with an error so small that it won't be noticed in practice. This makes it suitable not only for exponential histograms but also as a universal solution for handling explicit bucket histograms. | ||
|
||
## Merging Algorithm | ||
|
||
The merging algorithm works similarly to the merge-step of merge sort. We simultaneously walk through the buckets of both histograms in order, merging them on the fly as needed. If the total number of buckets in the end would exceed the bucket limit, we scale down as needed. | ||
|
||
Before we merge the buckets, we need to take care of the special zero-bucket and bring both histograms to the same scale. | ||
|
||
For the zero-bucket, we merge the zero threshold from both histograms and collapse any overlapping buckets into the resulting new zero bucket. | ||
|
||
In order to bring both histograms to the same scale, we can make adjustments in both directions: we can increase or decrease the scale of histograms as needed. | ||
|
||
See the [upscaling section](#upscaling) for details on how the upscaling works. Upscaling helps prevent the precision of the result histogram merged from many histograms from being dragged down to the lowest scale of a potentially misconfigured input histogram. For example, if a histogram is recorded with a too low zero threshold, this can result in a degraded scale when using dense histogram storage, even if the histogram only contains two points. | ||
|
||
### Upscaling | ||
|
||
In general, we assume that all values in a bucket lie on a single point: the point of least relative error. This is the point `x` in the bucket such that: | ||
|
||
``` | ||
(x - l) / l = (u - x) / u | ||
``` | ||
|
||
Where `l` is the lower bucket boundary and `u` is the upper bucket boundary. | ||
|
||
This assumption allows us to increase the scale of histograms without increasing the bucket count. Buckets are simply mapped to the ones in the new scale containing the point of least relative error of the original buckets. | ||
|
||
|
||
This can introduce a small error, as the original center might be moved slightly. Therefore, we ensure that the upscaling happens at most once to prevent errors from adding up. The higher the amount of upscaling, the less the error (higher scale means smaller buckets, which in turn means we get a better fit around the original point of least relative error). | ||
|
||
## Distributions with Few Distinct Values | ||
|
||
The sparse storage model only requires memory linear to the total number of buckets, while dense storage needs to store the entire range of the smallest and biggest buckets. | ||
|
||
This offers significant benefits for distributions with fewer distinct values: | ||
If we have at least as many buckets as we have distinct values to store in the histogram, we can represent this distribution with a much smaller error than the dense representation. | ||
This can be achieved by maintaining the scale at the maximum supported value (so the buckets become the smallest). | ||
At the time of writing, the maximum scale is 38, so the relative distance between the lower and upper bucket boundaries is (2^2^(-38)). | ||
|
||
The impact of the error is best shown with a concrete example: | ||
If we store, for example, a duration value of 10^15 nanoseconds (= roughly 11.5 days), this value will be stored in a bucket that guarantees a relative error of at most 2^2^(-38), so roughly 2.5 microseconds in this case. | ||
As long as the number of values we insert is lower than the bucket count, we are guaranteed that no down-scaling happens: In contrast to dense storage, the scale does not depend on the spread between the smallest and largest bucket index. | ||
|
||
To clarify the difference between dense and sparse storage, let's assume that we have an empty histogram and the maximum scale is zero while the maximum bucket count is four. | ||
The same logic applies to higher scales and bucket counts, but we use these values to get easier numbers for this example. | ||
The scale of zero means that our bucket boundaries are `1, 2, 4, 8, 16, 32, 64, 128, 256, ...`. | ||
We now want to insert the value `6` into the histogram. The dense storage works by storing an array for the bucket counts plus an initial offset. | ||
This means that the first slot in the bucket counts array corresponds to the bucket with index `offset` and the last one to `offset + bucketCounts.length - 1`. | ||
So if we add the value `6` to the histogram, it falls into the `(4,8]` bucket, which has the index `2`. | ||
|
||
So our dense histogram looks like this: | ||
``` | ||
offset = 2 | ||
bucketCounts = [1, 0, 0, 0] // represent bucket counts for bucket index 2 to 5 | ||
``` | ||
|
||
If we now insert the value `20` (`(16,32]`, bucket index 4), everything is still fine: | ||
``` | ||
offset = 2 | ||
bucketCounts = [1, 0, 1, 0] // represent bucket counts for bucket index 2 to 5 | ||
``` | ||
|
||
However, we run into trouble if we insert the value `100`, which corresponds to index 6: That index is outside of the bounds of our array. | ||
We can't just increase the `offset`, because the first bucket in our array is populated too. | ||
We have no other option other than decreasing the scale of the histogram, to make sure that our values `6` and `100` fall in the range of four **consecutive** buckets due to the bucket count limit of the dense storage. | ||
|
||
In contrast, a sparse histogram has no trouble storing this data while keeping the scale of zero: | ||
``` | ||
bucketIndiciesToCounts: { | ||
"2" : 1, | ||
"4" : 1, | ||
"6" : 1 | ||
} | ||
``` | ||
|
||
Downscaling on the sparse representation only happens if either: | ||
* The number of populated buckets would become bigger than our maximum bucket count. We have to downscale to make neighboring, populated buckets combine to a single bucket until we are below our limit again. | ||
JonasKunz marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
* The highest or smallest indices require more bits to store than we allow. This does not happen in our implementation for normal inputs, because we allow up to 62 bits for index storage, which fits the entire numeric range of IEEE 754 double precision floats at our maximum scale. | ||
|
||
### Handling Explicit Bucket Histograms | ||
|
||
We can make use of this property to convert explicit bucket histograms (https://opentelemetry.io/docs/specs/otel/metrics/data-model/#histogram) to exponential ones by again assuming that all values in a bucket lie in a single point: | ||
* For each explicit bucket, we take its point of least relative error and add it to the corresponding exponential histogram bucket with the corresponding count. | ||
* The open, upper, and lower buckets, including infinity, will need special treatment, but these are not useful for percentile estimates anyway. | ||
|
||
This gives us a great solution for universally dealing with histograms: | ||
When merging exponential histograms generated from explicit ones, the scale is not decreased (and therefore the error not increased) as long as the number of distinct buckets from the original explicit bucket histograms does not exceed the exponential histogram bucket count. As a result, the computed percentiles will be precise with only the [relative error of the initial conversion](#distributions-with-few-distinct-values). | ||
In addition, this allows us to compute percentiles on mixed explicit bucket histograms or even mix them with exponential ones by just using the exponential histogram algorithms. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the "Elastic License | ||
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side | ||
* Public License v 1"; you may not use this file except in compliance with, at | ||
* your election, the "Elastic License 2.0", the "GNU Affero General Public | ||
* License v3.0 only", or the "Server Side Public License, v 1". | ||
*/ | ||
|
||
// TODO: publish this when ready? | ||
//apply plugin: 'elasticsearch.publish' | ||
apply plugin: 'elasticsearch.build' | ||
|
||
dependencies { | ||
testImplementation(project(":test:framework")) | ||
testImplementation('ch.obermuhlner:big-math:2.3.2') | ||
testImplementation('org.apache.commons:commons-math3:3.6.1') | ||
} | ||
|
||
tasks.named('forbiddenApisMain').configure { | ||
// this lib does not depend on core, so only jdk signatures should be checked | ||
replaceSignatureFiles 'jdk-signatures' | ||
} |
Uh oh!
There was an error while loading. Please reload this page.