Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.stream.DoubleStream;
import java.util.stream.IntStream;

import static org.elasticsearch.exponentialhistogram.ExponentialHistogram.MAX_SCALE;
import static org.elasticsearch.test.ESTestCase.randomBoolean;
import static org.elasticsearch.test.ESTestCase.randomDouble;
import static org.elasticsearch.test.ESTestCase.randomIntBetween;
Expand All @@ -35,6 +36,10 @@ public static ExponentialHistogram randomHistogram() {
}

public static ReleasableExponentialHistogram randomHistogram(ExponentialHistogramCircuitBreaker breaker) {
return randomHistogram(randomIntBetween(4, 300), breaker);
}

public static ReleasableExponentialHistogram randomHistogram(int numBuckets, ExponentialHistogramCircuitBreaker breaker) {
boolean hasNegativeValues = randomBoolean();
boolean hasPositiveValues = randomBoolean();
boolean hasZeroValues = randomBoolean();
Expand All @@ -46,16 +51,30 @@ public static ReleasableExponentialHistogram randomHistogram(ExponentialHistogra
hasZeroValues ? IntStream.range(0, randomIntBetween(1, 100)).map(i1 -> 0) : IntStream.empty()
).mapToDouble(sign -> sign * (Math.pow(1_000_000, randomDouble()))).toArray();

int numBuckets = randomIntBetween(4, 300);
ReleasableExponentialHistogram histo = ExponentialHistogram.create(numBuckets, breaker, rawValues);
// Setup a proper zeroThreshold based on a random chance
if (histo.zeroBucket().count() > 0 && randomBoolean()) {
double smallestNonZeroValue = DoubleStream.of(rawValues).map(Math::abs).filter(val -> val != 0).min().orElse(0.0);
double zeroThreshold = smallestNonZeroValue * randomDouble();
try (ReleasableExponentialHistogram releaseAfterCopy = histo) {
histo = ExponentialHistogram.builder(histo, breaker)
.zeroBucket(ZeroBucket.create(zeroThreshold, histo.zeroBucket().count()))
.build();
ZeroBucket zeroBucket;
if (zeroThreshold == 0 || randomBoolean()) {
zeroBucket = ZeroBucket.create(zeroThreshold, histo.zeroBucket().count());
} else {
// define the zero bucket using index and scale as it can have an impact on serialization
int scale = randomIntBetween(0, MAX_SCALE);
long index = ExponentialScaleUtils.computeIndex(zeroThreshold, scale) - 1;
zeroBucket = ZeroBucket.create(index, scale, histo.zeroBucket().count());
}
ExponentialHistogramBuilder builder = ExponentialHistogram.builder(histo, breaker).zeroBucket(zeroBucket);

if ((Double.isNaN(histo.min()) || histo.min() > -zeroThreshold)) {
builder.min(-zeroThreshold);
}
if ((Double.isNaN(histo.max()) || histo.max() < zeroThreshold)) {
builder.max(zeroThreshold);
}
histo = builder.build();
}
}
return histo;
Expand Down
5 changes: 5 additions & 0 deletions server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ dependencies {
// tests use the locally compiled version of server
exclude group: 'org.elasticsearch', module: 'server'
}
testImplementation(testArtifact(project(":libs:exponential-histogram"))) {
// The test framework is manually included with excludes above
exclude group: 'org.elasticsearch.test', module: 'framework'
}

internalClusterTestImplementation(project(":test:framework")) {
exclude group: 'org.elasticsearch', module: 'server'
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,17 @@
import org.elasticsearch.exponentialhistogram.ExponentialHistogramBuilder;
import org.elasticsearch.exponentialhistogram.ExponentialHistogramCircuitBreaker;
import org.elasticsearch.exponentialhistogram.ExponentialHistogramMerger;
import org.elasticsearch.exponentialhistogram.ExponentialHistogramQuantile;
import org.elasticsearch.exponentialhistogram.ExponentialScaleUtils;
import org.elasticsearch.exponentialhistogram.ReleasableExponentialHistogram;
import org.elasticsearch.exponentialhistogram.ZeroBucket;
import org.elasticsearch.tdigest.Centroid;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;

import static org.elasticsearch.exponentialhistogram.ExponentialHistogram.MAX_SCALE;
import static org.elasticsearch.exponentialhistogram.ExponentialHistogram.MIN_SCALE;
Expand Down Expand Up @@ -117,6 +124,96 @@ public void add(ExponentialHistogram histogram) {
mergedHistograms.add(histogram);
}

/**
* Returns the number of scalar values added to this histogram, so the sum
* of {@link ExponentialHistogram#valueCount()} for all histograms added.
* @return the number of values
*/
public long size() {
return histogram().valueCount();
}

/**
* Returns the fraction of all points added which are &le; x. Points
* that are exactly equal get half credit (i.e. we use the mid-point
* rule)
*
* @param x The cutoff for the cdf.
* @return The fraction of all data which is less or equal to x.
*/
public double cdf(double x) {
ExponentialHistogram histogram = histogram();
long numValuesLess = ExponentialHistogramQuantile.estimateRank(histogram, x, false);
long numValuesLessOrEqual = ExponentialHistogramQuantile.estimateRank(histogram, x, true);
long numValuesEqual = numValuesLessOrEqual - numValuesLess;
// Just like for t-digest, equal values get half credit
return (numValuesLess + numValuesEqual / 2.0) / histogram.valueCount();
}

/**
* Returns an estimate of a cutoff such that a specified fraction of the data
* added to this TDigest would be less than or equal to the cutoff.
*
* @param q The desired fraction
* @return The smallest value x such that cdf(x) &ge; q
*/
public double quantile(double q) {
return ExponentialHistogramQuantile.getQuantile(histogram(), q);
}

/**
* @return an array of the mean values of the populated histogram buckets with their counts
*/
public Collection<Centroid> centroids() {
List<Centroid> centroids = new ArrayList<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it make sense to pre-allocate the list using centroidCount?

addBucketCentersAsCentroids(centroids, histogram().negativeBuckets().iterator(), -1);
// negative buckets are in decreasing order, we want increasing order, therefore reverse
Collections.reverse(centroids);
Comment on lines +170 to +171
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused by this.
This had me believe that you start with the lowest values and go up to the highest ones:

// They store all buckets for the negative range first, with the bucket indices in ascending order,
// followed by all buckets for the positive range, also with their indices in ascending order.
// This means we store the buckets ordered by their boundaries in ascending order (from -INF to +INF).
private final long[] bucketIndices;
private final long[] bucketCounts;

I guess the last sentence in the comment is wrong then? The indices are ascending but the highest index for the negative scale has the lowest value. Did I get that right?

if (histogram().zeroBucket().count() > 0) {
centroids.add(new Centroid(0.0, histogram().zeroBucket().count()));
}
addBucketCentersAsCentroids(centroids, histogram().positiveBuckets().iterator(), 1);
return centroids;
}

private void addBucketCentersAsCentroids(List<Centroid> result, BucketIterator buckets, int sign) {
while (buckets.hasNext()) {
double center = sign * ExponentialScaleUtils.getPointOfLeastRelativeError(buckets.peekIndex(), buckets.scale());
long count = buckets.peekCount();
result.add(new Centroid(center, count));
buckets.advance();
}
}

/**
* @return the length of the array returned by {@link #centroids()}.
*/
public int centroidCount() {
ExponentialHistogram histo = histogram();
int count = histo.zeroBucket().count() > 0 ? 1 : 0;
count += histo.negativeBuckets().bucketCount();
count += histo.positiveBuckets().bucketCount();
return count;
}

/**
* The minimum value of the histogram, or {@code Double.POSITIVE_INFINITY} if the histogram is empty.
* @return the minimum
*/
public double getMin() {
double min = histogram().min();
return Double.isNaN(min) ? Double.POSITIVE_INFINITY : min;
}

/**
* The maximum value of the histogram, or {@code Double.NEGATIVE_INFINITY} if the histogram is empty.
* @return the maximum
*/
public double getMax() {
double max = histogram().max();
return Double.isNaN(max) ? Double.NEGATIVE_INFINITY : max;
}

public void write(StreamOutput out) throws IOException {
if (isEmpty()) {
out.writeByte(EMPTY_HISTOGRAM_MARKER_SCALE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,26 @@
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.exponentialhistogram.ExponentialHistogram;
import org.elasticsearch.exponentialhistogram.ExponentialHistogramBuilder;
import org.elasticsearch.exponentialhistogram.ExponentialHistogramCircuitBreaker;
import org.elasticsearch.exponentialhistogram.ExponentialHistogramTestUtils;
import org.elasticsearch.exponentialhistogram.ExponentialScaleUtils;
import org.elasticsearch.exponentialhistogram.ReleasableExponentialHistogram;
import org.elasticsearch.exponentialhistogram.ZeroBucket;
import org.elasticsearch.tdigest.Centroid;
import org.elasticsearch.test.ESTestCase;

import java.io.EOFException;
import java.io.IOException;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.stream.IntStream;

import static org.elasticsearch.exponentialhistogram.ExponentialHistogram.MAX_SCALE;
import static org.hamcrest.Matchers.closeTo;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;

public class ExponentialHistogramStateTests extends ESTestCase {
Expand Down Expand Up @@ -115,35 +122,120 @@ public void testAdd() throws IOException {
Releasables.close(state);
}

private static ReleasableExponentialHistogram randomHistogram(int maxBuckets) {
int numPositiveValues = randomBoolean() ? 0 : randomIntBetween(1, 1000);
int numNegativeValues = randomBoolean() ? 0 : randomIntBetween(1, 1000);

double[] values = IntStream.concat(
IntStream.range(0, numPositiveValues).map(i -> 1),
IntStream.range(0, numNegativeValues).map(i -> -1)
).mapToDouble(sign -> sign * Math.pow(1_000_000_000, randomDouble())).toArray();
ReleasableExponentialHistogram histogram = ExponentialHistogram.create(
maxBuckets,
ExponentialHistogramCircuitBreaker.noop(),
values
);
public void testEmpty() {
try (ExponentialHistogramState state = ExponentialHistogramState.create(breaker())) {
assertThat(state.isEmpty(), equalTo(true));
assertThat(state.centroids(), empty());
assertThat(state.centroidCount(), equalTo(0));
assertThat(state.size(), equalTo(0L));
assertThat(state.quantile(0.0), equalTo(Double.NaN));
assertThat(state.quantile(0.5), equalTo(Double.NaN));
assertThat(state.quantile(1.0), equalTo(Double.NaN));
assertThat(state.getMin(), equalTo(Double.POSITIVE_INFINITY));
assertThat(state.getMax(), equalTo(Double.NEGATIVE_INFINITY));
}
}

public void testQuantiles() {
try (ExponentialHistogramState state = ExponentialHistogramState.create(breaker())) {
ExponentialHistogram sample = ExponentialHistogram.create(
100,
ExponentialHistogramCircuitBreaker.noop(),
new double[] { 10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0, 17.0, 18.0, 19.0, 20.0 }
);
state.add(sample);

assertThat(state.quantile(0.0), closeTo(10.0, 0.0000001));
assertThat(state.quantile(0.1), closeTo(11.0, 0.0000001));
assertThat(state.quantile(0.5), closeTo(15.0, 0.0000001));
assertThat(state.quantile(0.9), closeTo(19.0, 0.0000001));
assertThat(state.quantile(1.0), closeTo(20.0, 0.0000001));
assertThat(state.getMin(), equalTo(10.0));
assertThat(state.getMax(), equalTo(20.0));
}
}

public void testCDF() {
try (ExponentialHistogramState state = ExponentialHistogramState.create(breaker())) {
ExponentialHistogram sample = ExponentialHistogram.create(
100,
ExponentialHistogramCircuitBreaker.noop(),
IntStream.range(0, 100).mapToDouble(i -> i).toArray()
);
state.add(sample);

assertThat(state.cdf(0.0 - 0.001), closeTo(0.0, 0.0000001));
assertThat(state.cdf(10.0 - 0.001), closeTo(0.1, 0.0000001));
assertThat(state.cdf(50.0 - 0.001), closeTo(0.5, 0.0000001));
assertThat(state.cdf(90.0 - 0.001), closeTo(0.9, 0.0000001));
assertThat(state.cdf(100.0 - 0.001), closeTo(1.0, 0.0000001));
}
}

public void testSizeMinMax() {
try (ExponentialHistogramState state = ExponentialHistogramState.create(breaker())) {
ExponentialHistogram sample;
do {
sample = randomHistogram(randomIntBetween(4, 100));
} while (sample.valueCount() == 0);
state.add(sample);

assertThat(state.size(), equalTo(sample.valueCount()));
assertThat(state.getMin(), equalTo(sample.min()));
assertThat(state.getMax(), equalTo(sample.max()));
}
}

public void testCentroids() {
try (ExponentialHistogramState state = ExponentialHistogramState.create(breaker())) {
List<Centroid> expectedCentroids = new ArrayList<>();
ExponentialHistogramBuilder builder = ExponentialHistogram.builder(0, ExponentialHistogramCircuitBreaker.noop());

if (randomBoolean()) {
double zeroThreshold = Arrays.stream(values).map(Math::abs).min().orElse(1.0) / 2.0;
long zeroBucketCount = randomIntBetween(0, 100);
ZeroBucket zeroBucket;
if (randomBoolean()) {
zeroBucket = ZeroBucket.create(zeroThreshold, zeroBucketCount);
} else {
// define the zero bucket using index and scale to verify serialization is exact
int scale = randomIntBetween(0, MAX_SCALE);
long index = ExponentialScaleUtils.computeIndex(zeroThreshold, scale) - 1;
zeroBucket = ZeroBucket.create(index, scale, zeroBucketCount);
builder.setNegativeBucket(-1, 11).setNegativeBucket(2, 22);
expectedCentroids.add(new Centroid(-ExponentialScaleUtils.getPointOfLeastRelativeError(2, 0), 22));
expectedCentroids.add(new Centroid(-ExponentialScaleUtils.getPointOfLeastRelativeError(-1, 0), 11));
}
if (randomBoolean()) {
builder.zeroBucket(ZeroBucket.create(0.0, 123));
expectedCentroids.add(new Centroid(0.0, 123));
}
if (randomBoolean()) {
builder.setPositiveBucket(-11, 40).setPositiveBucket(12, 41);
expectedCentroids.add(new Centroid(ExponentialScaleUtils.getPointOfLeastRelativeError(-11, 0), 40));
expectedCentroids.add(new Centroid(ExponentialScaleUtils.getPointOfLeastRelativeError(12, 0), 41));
}

state.add(builder.build());

Collection<Centroid> centroids = state.centroids();
assertThat(centroids.size(), equalTo(expectedCentroids.size()));
assertThat(state.centroidCount(), equalTo(expectedCentroids.size()));

Iterator<Centroid> actualIt = centroids.iterator();
Iterator<Centroid> expectedIt = expectedCentroids.iterator();
for (int i = 0; i < expectedCentroids.size(); i++) {
Centroid actual = actualIt.next();
Centroid expected = expectedIt.next();
assertThat(actual.mean(), closeTo(expected.mean(), 0.00001));
assertThat(actual.count(), equalTo(expected.count()));
}
histogram = ExponentialHistogram.builder(histogram, ExponentialHistogramCircuitBreaker.noop()).zeroBucket(zeroBucket).build();
}
return histogram;
}

public void testCentroidSorted() {
try (ExponentialHistogramState state = ExponentialHistogramState.create(breaker())) {
state.add(randomHistogram(randomIntBetween(4, 500)));

List<Double> actualCentroidMeans = state.centroids().stream().map(Centroid::mean).toList();
List<Double> sortedMeans = new ArrayList<>(actualCentroidMeans);
Collections.sort(sortedMeans);
assertThat(actualCentroidMeans, equalTo(sortedMeans));
}
}

private static ReleasableExponentialHistogram randomHistogram(int maxBuckets) {
return ExponentialHistogramTestUtils.randomHistogram(maxBuckets, ExponentialHistogramCircuitBreaker.noop());
}

private CircuitBreaker breaker() {
Expand Down