Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ final class FixedCapacityExponentialHistogram extends AbstractExponentialHistogr
// When we use term "index", we mean the exponential histogram bucket index.
// They store all buckets for the negative range first, with the bucket indices in ascending order,
// followed by all buckets for the positive range, also with their indices in ascending order.
// This means we store the buckets ordered by their boundaries in ascending order (from -INF to +INF).
// This means we store all the negative buckets first, ordered by their boundaries in descending order (from 0 to -INF),
// followed by all the positive buckets, ordered by their boundaries in ascending order (from 0 to +INF).
private final long[] bucketIndices;
private final long[] bucketCounts;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.stream.DoubleStream;
import java.util.stream.IntStream;

import static org.elasticsearch.exponentialhistogram.ExponentialHistogram.MAX_SCALE;
import static org.elasticsearch.test.ESTestCase.randomBoolean;
import static org.elasticsearch.test.ESTestCase.randomDouble;
import static org.elasticsearch.test.ESTestCase.randomIntBetween;
Expand All @@ -35,6 +36,10 @@ public static ExponentialHistogram randomHistogram() {
}

public static ReleasableExponentialHistogram randomHistogram(ExponentialHistogramCircuitBreaker breaker) {
return randomHistogram(randomIntBetween(4, 300), breaker);
}

public static ReleasableExponentialHistogram randomHistogram(int numBuckets, ExponentialHistogramCircuitBreaker breaker) {
boolean hasNegativeValues = randomBoolean();
boolean hasPositiveValues = randomBoolean();
boolean hasZeroValues = randomBoolean();
Expand All @@ -46,16 +51,30 @@ public static ReleasableExponentialHistogram randomHistogram(ExponentialHistogra
hasZeroValues ? IntStream.range(0, randomIntBetween(1, 100)).map(i1 -> 0) : IntStream.empty()
).mapToDouble(sign -> sign * (Math.pow(1_000_000, randomDouble()))).toArray();

int numBuckets = randomIntBetween(4, 300);
ReleasableExponentialHistogram histo = ExponentialHistogram.create(numBuckets, breaker, rawValues);
// Setup a proper zeroThreshold based on a random chance
if (histo.zeroBucket().count() > 0 && randomBoolean()) {
double smallestNonZeroValue = DoubleStream.of(rawValues).map(Math::abs).filter(val -> val != 0).min().orElse(0.0);
double zeroThreshold = smallestNonZeroValue * randomDouble();
try (ReleasableExponentialHistogram releaseAfterCopy = histo) {
histo = ExponentialHistogram.builder(histo, breaker)
.zeroBucket(ZeroBucket.create(zeroThreshold, histo.zeroBucket().count()))
.build();
ZeroBucket zeroBucket;
if (zeroThreshold == 0 || randomBoolean()) {
zeroBucket = ZeroBucket.create(zeroThreshold, histo.zeroBucket().count());
} else {
// define the zero bucket using index and scale as it can have an impact on serialization
int scale = randomIntBetween(0, MAX_SCALE);
long index = ExponentialScaleUtils.computeIndex(zeroThreshold, scale) - 1;
zeroBucket = ZeroBucket.create(index, scale, histo.zeroBucket().count());
}
ExponentialHistogramBuilder builder = ExponentialHistogram.builder(histo, breaker).zeroBucket(zeroBucket);

if ((Double.isNaN(histo.min()) || histo.min() > -zeroThreshold)) {
builder.min(-zeroThreshold);
}
if ((Double.isNaN(histo.max()) || histo.max() < zeroThreshold)) {
builder.max(zeroThreshold);
}
histo = builder.build();
}
}
return histo;
Expand Down
5 changes: 5 additions & 0 deletions server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ dependencies {
// tests use the locally compiled version of server
exclude group: 'org.elasticsearch', module: 'server'
}
testImplementation(testArtifact(project(":libs:exponential-histogram"))) {
// The test framework is manually included with excludes above
exclude group: 'org.elasticsearch.test', module: 'framework'
}

internalClusterTestImplementation(project(":test:framework")) {
exclude group: 'org.elasticsearch', module: 'server'
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,17 @@
import org.elasticsearch.exponentialhistogram.ExponentialHistogramBuilder;
import org.elasticsearch.exponentialhistogram.ExponentialHistogramCircuitBreaker;
import org.elasticsearch.exponentialhistogram.ExponentialHistogramMerger;
import org.elasticsearch.exponentialhistogram.ExponentialHistogramQuantile;
import org.elasticsearch.exponentialhistogram.ExponentialScaleUtils;
import org.elasticsearch.exponentialhistogram.ReleasableExponentialHistogram;
import org.elasticsearch.exponentialhistogram.ZeroBucket;
import org.elasticsearch.tdigest.Centroid;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;

import static org.elasticsearch.exponentialhistogram.ExponentialHistogram.MAX_SCALE;
import static org.elasticsearch.exponentialhistogram.ExponentialHistogram.MIN_SCALE;
Expand Down Expand Up @@ -117,6 +124,96 @@ public void add(ExponentialHistogram histogram) {
mergedHistograms.add(histogram);
}

/**
* Returns the number of scalar values added to this histogram, so the sum
* of {@link ExponentialHistogram#valueCount()} for all histograms added.
* @return the number of values
*/
public long size() {
return histogram().valueCount();
}

/**
* Returns the fraction of all points added which are &le; x. Points
* that are exactly equal get half credit (i.e. we use the mid-point
* rule)
*
* @param x The cutoff for the cdf.
* @return The fraction of all data which is less or equal to x.
*/
public double cdf(double x) {
ExponentialHistogram histogram = histogram();
long numValuesLess = ExponentialHistogramQuantile.estimateRank(histogram, x, false);
long numValuesLessOrEqual = ExponentialHistogramQuantile.estimateRank(histogram, x, true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just noticed, these don't check if the value is outside min/max? You should do that separately, return 0 for < min and 1 for > max.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correctness wise, we already do this here.

So you are suggesting to do this for performance reasons?
I'm not sure if this is worth it, because it increases the complexity: We currently clamp the bucket POLRE to the histogram min / max. So if the POLRE was clamped, we need to return different values based on whether the requested rank was inclusive or exclusive equal values.

I don't think that this is a common enough case to justify this extra code

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this increase the complexity? Isn't this a simple condition to return early, that can be added at the top of ExponentialHistogramQuantile.estimateRank? It should be uncontroversial, in terms of correctness, and more efficient.

Btw, this is unrelated to this pr so not a blocker.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are totally right, I had this messed up in my head. I confused the < min() precondition with <= min(). The latter would increase complexity due to having to account for inclusivity, the first one is trivial.

long numValuesEqual = numValuesLessOrEqual - numValuesLess;
// Just like for t-digest, equal values get half credit
return (numValuesLess + numValuesEqual / 2.0) / histogram.valueCount();
}

/**
* Returns an estimate of a cutoff such that a specified fraction of the data
* added to this TDigest would be less than or equal to the cutoff.
*
* @param q The desired fraction
* @return The smallest value x such that cdf(x) &ge; q
*/
public double quantile(double q) {
return ExponentialHistogramQuantile.getQuantile(histogram(), q);
}

/**
* @return an array of the mean values of the populated histogram buckets with their counts
*/
public Collection<Centroid> centroids() {
List<Centroid> centroids = new ArrayList<>(centroidCount());
addBucketCentersAsCentroids(centroids, histogram().negativeBuckets().iterator(), -1);
// negative buckets are in decreasing order, we want increasing order, therefore reverse
Collections.reverse(centroids);
if (histogram().zeroBucket().count() > 0) {
centroids.add(new Centroid(0.0, histogram().zeroBucket().count()));
}
addBucketCentersAsCentroids(centroids, histogram().positiveBuckets().iterator(), 1);
return centroids;
}

private void addBucketCentersAsCentroids(List<Centroid> result, BucketIterator buckets, int sign) {
while (buckets.hasNext()) {
double center = sign * ExponentialScaleUtils.getPointOfLeastRelativeError(buckets.peekIndex(), buckets.scale());
long count = buckets.peekCount();
result.add(new Centroid(center, count));
buckets.advance();
}
}

/**
* @return the length of the array returned by {@link #centroids()}.
*/
public int centroidCount() {
ExponentialHistogram histo = histogram();
int count = histo.zeroBucket().count() > 0 ? 1 : 0;
count += histo.negativeBuckets().bucketCount();
count += histo.positiveBuckets().bucketCount();
return count;
}

/**
* The minimum value of the histogram, or {@code Double.POSITIVE_INFINITY} if the histogram is empty.
* @return the minimum
*/
public double getMin() {
double min = histogram().min();
return Double.isNaN(min) ? Double.POSITIVE_INFINITY : min;
}

/**
* The maximum value of the histogram, or {@code Double.NEGATIVE_INFINITY} if the histogram is empty.
* @return the maximum
*/
public double getMax() {
double max = histogram().max();
return Double.isNaN(max) ? Double.NEGATIVE_INFINITY : max;
}

public void write(StreamOutput out) throws IOException {
if (isEmpty()) {
out.writeByte(EMPTY_HISTOGRAM_MARKER_SCALE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,26 @@
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.exponentialhistogram.ExponentialHistogram;
import org.elasticsearch.exponentialhistogram.ExponentialHistogramBuilder;
import org.elasticsearch.exponentialhistogram.ExponentialHistogramCircuitBreaker;
import org.elasticsearch.exponentialhistogram.ExponentialHistogramTestUtils;
import org.elasticsearch.exponentialhistogram.ExponentialScaleUtils;
import org.elasticsearch.exponentialhistogram.ReleasableExponentialHistogram;
import org.elasticsearch.exponentialhistogram.ZeroBucket;
import org.elasticsearch.tdigest.Centroid;
import org.elasticsearch.test.ESTestCase;

import java.io.EOFException;
import java.io.IOException;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.stream.IntStream;

import static org.elasticsearch.exponentialhistogram.ExponentialHistogram.MAX_SCALE;
import static org.hamcrest.Matchers.closeTo;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;

public class ExponentialHistogramStateTests extends ESTestCase {
Expand Down Expand Up @@ -115,35 +122,120 @@ public void testAdd() throws IOException {
Releasables.close(state);
}

private static ReleasableExponentialHistogram randomHistogram(int maxBuckets) {
int numPositiveValues = randomBoolean() ? 0 : randomIntBetween(1, 1000);
int numNegativeValues = randomBoolean() ? 0 : randomIntBetween(1, 1000);

double[] values = IntStream.concat(
IntStream.range(0, numPositiveValues).map(i -> 1),
IntStream.range(0, numNegativeValues).map(i -> -1)
).mapToDouble(sign -> sign * Math.pow(1_000_000_000, randomDouble())).toArray();
ReleasableExponentialHistogram histogram = ExponentialHistogram.create(
maxBuckets,
ExponentialHistogramCircuitBreaker.noop(),
values
);
public void testEmpty() {
try (ExponentialHistogramState state = ExponentialHistogramState.create(breaker())) {
assertThat(state.isEmpty(), equalTo(true));
assertThat(state.centroids(), empty());
assertThat(state.centroidCount(), equalTo(0));
assertThat(state.size(), equalTo(0L));
assertThat(state.quantile(0.0), equalTo(Double.NaN));
assertThat(state.quantile(0.5), equalTo(Double.NaN));
assertThat(state.quantile(1.0), equalTo(Double.NaN));
assertThat(state.getMin(), equalTo(Double.POSITIVE_INFINITY));
assertThat(state.getMax(), equalTo(Double.NEGATIVE_INFINITY));
}
}

public void testQuantiles() {
try (ExponentialHistogramState state = ExponentialHistogramState.create(breaker())) {
ExponentialHistogram sample = ExponentialHistogram.create(
100,
ExponentialHistogramCircuitBreaker.noop(),
new double[] { 10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0, 17.0, 18.0, 19.0, 20.0 }
);
state.add(sample);

assertThat(state.quantile(0.0), closeTo(10.0, 0.0000001));
assertThat(state.quantile(0.1), closeTo(11.0, 0.0000001));
assertThat(state.quantile(0.5), closeTo(15.0, 0.0000001));
assertThat(state.quantile(0.9), closeTo(19.0, 0.0000001));
assertThat(state.quantile(1.0), closeTo(20.0, 0.0000001));
assertThat(state.getMin(), equalTo(10.0));
assertThat(state.getMax(), equalTo(20.0));
}
}

public void testCDF() {
try (ExponentialHistogramState state = ExponentialHistogramState.create(breaker())) {
ExponentialHistogram sample = ExponentialHistogram.create(
100,
ExponentialHistogramCircuitBreaker.noop(),
IntStream.range(0, 100).mapToDouble(i -> i).toArray()
);
state.add(sample);

assertThat(state.cdf(0.0 - 0.001), closeTo(0.0, 0.0000001));
assertThat(state.cdf(10.0 - 0.001), closeTo(0.1, 0.0000001));
assertThat(state.cdf(50.0 - 0.001), closeTo(0.5, 0.0000001));
assertThat(state.cdf(90.0 - 0.001), closeTo(0.9, 0.0000001));
assertThat(state.cdf(100.0 - 0.001), closeTo(1.0, 0.0000001));
}
}

public void testSizeMinMax() {
try (ExponentialHistogramState state = ExponentialHistogramState.create(breaker())) {
ExponentialHistogram sample;
do {
sample = randomHistogram(randomIntBetween(4, 100));
} while (sample.valueCount() == 0);
state.add(sample);

assertThat(state.size(), equalTo(sample.valueCount()));
assertThat(state.getMin(), equalTo(sample.min()));
assertThat(state.getMax(), equalTo(sample.max()));
}
}

public void testCentroids() {
try (ExponentialHistogramState state = ExponentialHistogramState.create(breaker())) {
List<Centroid> expectedCentroids = new ArrayList<>();
ExponentialHistogramBuilder builder = ExponentialHistogram.builder(0, ExponentialHistogramCircuitBreaker.noop());

if (randomBoolean()) {
double zeroThreshold = Arrays.stream(values).map(Math::abs).min().orElse(1.0) / 2.0;
long zeroBucketCount = randomIntBetween(0, 100);
ZeroBucket zeroBucket;
if (randomBoolean()) {
zeroBucket = ZeroBucket.create(zeroThreshold, zeroBucketCount);
} else {
// define the zero bucket using index and scale to verify serialization is exact
int scale = randomIntBetween(0, MAX_SCALE);
long index = ExponentialScaleUtils.computeIndex(zeroThreshold, scale) - 1;
zeroBucket = ZeroBucket.create(index, scale, zeroBucketCount);
builder.setNegativeBucket(-1, 11).setNegativeBucket(2, 22);
expectedCentroids.add(new Centroid(-ExponentialScaleUtils.getPointOfLeastRelativeError(2, 0), 22));
expectedCentroids.add(new Centroid(-ExponentialScaleUtils.getPointOfLeastRelativeError(-1, 0), 11));
}
if (randomBoolean()) {
builder.zeroBucket(ZeroBucket.create(0.0, 123));
expectedCentroids.add(new Centroid(0.0, 123));
}
if (randomBoolean()) {
builder.setPositiveBucket(-11, 40).setPositiveBucket(12, 41);
expectedCentroids.add(new Centroid(ExponentialScaleUtils.getPointOfLeastRelativeError(-11, 0), 40));
expectedCentroids.add(new Centroid(ExponentialScaleUtils.getPointOfLeastRelativeError(12, 0), 41));
}

state.add(builder.build());

Collection<Centroid> centroids = state.centroids();
assertThat(centroids.size(), equalTo(expectedCentroids.size()));
assertThat(state.centroidCount(), equalTo(expectedCentroids.size()));

Iterator<Centroid> actualIt = centroids.iterator();
Iterator<Centroid> expectedIt = expectedCentroids.iterator();
for (int i = 0; i < expectedCentroids.size(); i++) {
Centroid actual = actualIt.next();
Centroid expected = expectedIt.next();
assertThat(actual.mean(), closeTo(expected.mean(), 0.00001));
assertThat(actual.count(), equalTo(expected.count()));
}
histogram = ExponentialHistogram.builder(histogram, ExponentialHistogramCircuitBreaker.noop()).zeroBucket(zeroBucket).build();
}
return histogram;
}

public void testCentroidSorted() {
try (ExponentialHistogramState state = ExponentialHistogramState.create(breaker())) {
state.add(randomHistogram(randomIntBetween(4, 500)));

List<Double> actualCentroidMeans = state.centroids().stream().map(Centroid::mean).toList();
List<Double> sortedMeans = new ArrayList<>(actualCentroidMeans);
Collections.sort(sortedMeans);
assertThat(actualCentroidMeans, equalTo(sortedMeans));
}
}

private static ReleasableExponentialHistogram randomHistogram(int maxBuckets) {
return ExponentialHistogramTestUtils.randomHistogram(maxBuckets, ExponentialHistogramCircuitBreaker.noop());
}

private CircuitBreaker breaker() {
Expand Down