diff --git a/libs/exponential-histogram/src/test/java/org/elasticsearch/exponentialhistogram/ExponentialHistogramTestUtils.java b/libs/exponential-histogram/src/test/java/org/elasticsearch/exponentialhistogram/ExponentialHistogramTestUtils.java index dce815ac7ae99..74bda6b0dfff4 100644 --- a/libs/exponential-histogram/src/test/java/org/elasticsearch/exponentialhistogram/ExponentialHistogramTestUtils.java +++ b/libs/exponential-histogram/src/test/java/org/elasticsearch/exponentialhistogram/ExponentialHistogramTestUtils.java @@ -24,6 +24,7 @@ import java.util.stream.DoubleStream; import java.util.stream.IntStream; +import static org.elasticsearch.exponentialhistogram.ExponentialHistogram.MAX_SCALE; import static org.elasticsearch.test.ESTestCase.randomBoolean; import static org.elasticsearch.test.ESTestCase.randomDouble; import static org.elasticsearch.test.ESTestCase.randomIntBetween; @@ -35,6 +36,10 @@ public static ExponentialHistogram randomHistogram() { } public static ReleasableExponentialHistogram randomHistogram(ExponentialHistogramCircuitBreaker breaker) { + return randomHistogram(randomIntBetween(4, 300), breaker); + } + + public static ReleasableExponentialHistogram randomHistogram(int numBuckets, ExponentialHistogramCircuitBreaker breaker) { boolean hasNegativeValues = randomBoolean(); boolean hasPositiveValues = randomBoolean(); boolean hasZeroValues = randomBoolean(); @@ -46,16 +51,30 @@ public static ReleasableExponentialHistogram randomHistogram(ExponentialHistogra hasZeroValues ? IntStream.range(0, randomIntBetween(1, 100)).map(i1 -> 0) : IntStream.empty() ).mapToDouble(sign -> sign * (Math.pow(1_000_000, randomDouble()))).toArray(); - int numBuckets = randomIntBetween(4, 300); ReleasableExponentialHistogram histo = ExponentialHistogram.create(numBuckets, breaker, rawValues); // Setup a proper zeroThreshold based on a random chance if (histo.zeroBucket().count() > 0 && randomBoolean()) { double smallestNonZeroValue = DoubleStream.of(rawValues).map(Math::abs).filter(val -> val != 0).min().orElse(0.0); double zeroThreshold = smallestNonZeroValue * randomDouble(); try (ReleasableExponentialHistogram releaseAfterCopy = histo) { - histo = ExponentialHistogram.builder(histo, breaker) - .zeroBucket(ZeroBucket.create(zeroThreshold, histo.zeroBucket().count())) - .build(); + ZeroBucket zeroBucket; + if (zeroThreshold == 0 || randomBoolean()) { + zeroBucket = ZeroBucket.create(zeroThreshold, histo.zeroBucket().count()); + } else { + // define the zero bucket using index and scale as it can have an impact on serialization + int scale = randomIntBetween(0, MAX_SCALE); + long index = ExponentialScaleUtils.computeIndex(zeroThreshold, scale) - 1; + zeroBucket = ZeroBucket.create(index, scale, histo.zeroBucket().count()); + } + ExponentialHistogramBuilder builder = ExponentialHistogram.builder(histo, breaker).zeroBucket(zeroBucket); + + if ((Double.isNaN(histo.min()) || histo.min() > -zeroThreshold)) { + builder.min(-zeroThreshold); + } + if ((Double.isNaN(histo.max()) || histo.max() < zeroThreshold)) { + builder.max(zeroThreshold); + } + histo = builder.build(); } } return histo; diff --git a/server/build.gradle b/server/build.gradle index 8897a9ae8c95f..7163aad57b390 100644 --- a/server/build.gradle +++ b/server/build.gradle @@ -78,6 +78,11 @@ dependencies { // tests use the locally compiled version of server exclude group: 'org.elasticsearch', module: 'server' } + testImplementation(testArtifact(project(":libs:exponential-histogram"))) { + // The test framework is manually included with excludes above + exclude group: 'org.elasticsearch.test', module: 'framework' + } + internalClusterTestImplementation(project(":test:framework")) { exclude group: 'org.elasticsearch', module: 'server' } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/ExponentialHistogramState.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/ExponentialHistogramState.java index e10974b47bc34..1250032121773 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/ExponentialHistogramState.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/ExponentialHistogramState.java @@ -20,10 +20,17 @@ import org.elasticsearch.exponentialhistogram.ExponentialHistogramBuilder; import org.elasticsearch.exponentialhistogram.ExponentialHistogramCircuitBreaker; import org.elasticsearch.exponentialhistogram.ExponentialHistogramMerger; +import org.elasticsearch.exponentialhistogram.ExponentialHistogramQuantile; +import org.elasticsearch.exponentialhistogram.ExponentialScaleUtils; import org.elasticsearch.exponentialhistogram.ReleasableExponentialHistogram; import org.elasticsearch.exponentialhistogram.ZeroBucket; +import org.elasticsearch.tdigest.Centroid; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; import static org.elasticsearch.exponentialhistogram.ExponentialHistogram.MAX_SCALE; import static org.elasticsearch.exponentialhistogram.ExponentialHistogram.MIN_SCALE; @@ -117,6 +124,96 @@ public void add(ExponentialHistogram histogram) { mergedHistograms.add(histogram); } + /** + * Returns the number of scalar values added to this histogram, so the sum + * of {@link ExponentialHistogram#valueCount()} for all histograms added. + * @return the number of values + */ + public long size() { + return histogram().valueCount(); + } + + /** + * Returns the fraction of all points added which are ≤ x. Points + * that are exactly equal get half credit (i.e. we use the mid-point + * rule) + * + * @param x The cutoff for the cdf. + * @return The fraction of all data which is less or equal to x. + */ + public double cdf(double x) { + ExponentialHistogram histogram = histogram(); + long numValuesLess = ExponentialHistogramQuantile.estimateRank(histogram, x, false); + long numValuesLessOrEqual = ExponentialHistogramQuantile.estimateRank(histogram, x, true); + long numValuesEqual = numValuesLessOrEqual - numValuesLess; + // Just like for t-digest, equal values get half credit + return (numValuesLess + numValuesEqual / 2.0) / histogram.valueCount(); + } + + /** + * Returns an estimate of a cutoff such that a specified fraction of the data + * added to this TDigest would be less than or equal to the cutoff. + * + * @param q The desired fraction + * @return The smallest value x such that cdf(x) ≥ q + */ + public double quantile(double q) { + return ExponentialHistogramQuantile.getQuantile(histogram(), q); + } + + /** + * @return an array of the mean values of the populated histogram buckets with their counts + */ + public Collection centroids() { + List centroids = new ArrayList<>(); + addBucketCentersAsCentroids(centroids, histogram().negativeBuckets().iterator(), -1); + // negative buckets are in decreasing order, we want increasing order, therefore reverse + Collections.reverse(centroids); + if (histogram().zeroBucket().count() > 0) { + centroids.add(new Centroid(0.0, histogram().zeroBucket().count())); + } + addBucketCentersAsCentroids(centroids, histogram().positiveBuckets().iterator(), 1); + return centroids; + } + + private void addBucketCentersAsCentroids(List result, BucketIterator buckets, int sign) { + while (buckets.hasNext()) { + double center = sign * ExponentialScaleUtils.getPointOfLeastRelativeError(buckets.peekIndex(), buckets.scale()); + long count = buckets.peekCount(); + result.add(new Centroid(center, count)); + buckets.advance(); + } + } + + /** + * @return the length of the array returned by {@link #centroids()}. + */ + public int centroidCount() { + ExponentialHistogram histo = histogram(); + int count = histo.zeroBucket().count() > 0 ? 1 : 0; + count += histo.negativeBuckets().bucketCount(); + count += histo.positiveBuckets().bucketCount(); + return count; + } + + /** + * The minimum value of the histogram, or {@code Double.POSITIVE_INFINITY} if the histogram is empty. + * @return the minimum + */ + public double getMin() { + double min = histogram().min(); + return Double.isNaN(min) ? Double.POSITIVE_INFINITY : min; + } + + /** + * The maximum value of the histogram, or {@code Double.NEGATIVE_INFINITY} if the histogram is empty. + * @return the maximum + */ + public double getMax() { + double max = histogram().max(); + return Double.isNaN(max) ? Double.NEGATIVE_INFINITY : max; + } + public void write(StreamOutput out) throws IOException { if (isEmpty()) { out.writeByte(EMPTY_HISTOGRAM_MARKER_SCALE); diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/metrics/ExponentialHistogramStateTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/metrics/ExponentialHistogramStateTests.java index ef167bf3dfc75..4166f7ae97b20 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/metrics/ExponentialHistogramStateTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/metrics/ExponentialHistogramStateTests.java @@ -15,19 +15,26 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.core.Releasables; import org.elasticsearch.exponentialhistogram.ExponentialHistogram; +import org.elasticsearch.exponentialhistogram.ExponentialHistogramBuilder; import org.elasticsearch.exponentialhistogram.ExponentialHistogramCircuitBreaker; +import org.elasticsearch.exponentialhistogram.ExponentialHistogramTestUtils; import org.elasticsearch.exponentialhistogram.ExponentialScaleUtils; import org.elasticsearch.exponentialhistogram.ReleasableExponentialHistogram; import org.elasticsearch.exponentialhistogram.ZeroBucket; +import org.elasticsearch.tdigest.Centroid; import org.elasticsearch.test.ESTestCase; import java.io.EOFException; import java.io.IOException; -import java.util.Arrays; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.stream.IntStream; -import static org.elasticsearch.exponentialhistogram.ExponentialHistogram.MAX_SCALE; +import static org.hamcrest.Matchers.closeTo; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; public class ExponentialHistogramStateTests extends ESTestCase { @@ -115,35 +122,120 @@ public void testAdd() throws IOException { Releasables.close(state); } - private static ReleasableExponentialHistogram randomHistogram(int maxBuckets) { - int numPositiveValues = randomBoolean() ? 0 : randomIntBetween(1, 1000); - int numNegativeValues = randomBoolean() ? 0 : randomIntBetween(1, 1000); - - double[] values = IntStream.concat( - IntStream.range(0, numPositiveValues).map(i -> 1), - IntStream.range(0, numNegativeValues).map(i -> -1) - ).mapToDouble(sign -> sign * Math.pow(1_000_000_000, randomDouble())).toArray(); - ReleasableExponentialHistogram histogram = ExponentialHistogram.create( - maxBuckets, - ExponentialHistogramCircuitBreaker.noop(), - values - ); + public void testEmpty() { + try (ExponentialHistogramState state = ExponentialHistogramState.create(breaker())) { + assertThat(state.isEmpty(), equalTo(true)); + assertThat(state.centroids(), empty()); + assertThat(state.centroidCount(), equalTo(0)); + assertThat(state.size(), equalTo(0L)); + assertThat(state.quantile(0.0), equalTo(Double.NaN)); + assertThat(state.quantile(0.5), equalTo(Double.NaN)); + assertThat(state.quantile(1.0), equalTo(Double.NaN)); + assertThat(state.getMin(), equalTo(Double.POSITIVE_INFINITY)); + assertThat(state.getMax(), equalTo(Double.NEGATIVE_INFINITY)); + } + } + + public void testQuantiles() { + try (ExponentialHistogramState state = ExponentialHistogramState.create(breaker())) { + ExponentialHistogram sample = ExponentialHistogram.create( + 100, + ExponentialHistogramCircuitBreaker.noop(), + new double[] { 10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0, 17.0, 18.0, 19.0, 20.0 } + ); + state.add(sample); + + assertThat(state.quantile(0.0), closeTo(10.0, 0.0000001)); + assertThat(state.quantile(0.1), closeTo(11.0, 0.0000001)); + assertThat(state.quantile(0.5), closeTo(15.0, 0.0000001)); + assertThat(state.quantile(0.9), closeTo(19.0, 0.0000001)); + assertThat(state.quantile(1.0), closeTo(20.0, 0.0000001)); + assertThat(state.getMin(), equalTo(10.0)); + assertThat(state.getMax(), equalTo(20.0)); + } + } + + public void testCDF() { + try (ExponentialHistogramState state = ExponentialHistogramState.create(breaker())) { + ExponentialHistogram sample = ExponentialHistogram.create( + 100, + ExponentialHistogramCircuitBreaker.noop(), + IntStream.range(0, 100).mapToDouble(i -> i).toArray() + ); + state.add(sample); + + assertThat(state.cdf(0.0 - 0.001), closeTo(0.0, 0.0000001)); + assertThat(state.cdf(10.0 - 0.001), closeTo(0.1, 0.0000001)); + assertThat(state.cdf(50.0 - 0.001), closeTo(0.5, 0.0000001)); + assertThat(state.cdf(90.0 - 0.001), closeTo(0.9, 0.0000001)); + assertThat(state.cdf(100.0 - 0.001), closeTo(1.0, 0.0000001)); + } + } + + public void testSizeMinMax() { + try (ExponentialHistogramState state = ExponentialHistogramState.create(breaker())) { + ExponentialHistogram sample; + do { + sample = randomHistogram(randomIntBetween(4, 100)); + } while (sample.valueCount() == 0); + state.add(sample); + + assertThat(state.size(), equalTo(sample.valueCount())); + assertThat(state.getMin(), equalTo(sample.min())); + assertThat(state.getMax(), equalTo(sample.max())); + } + } + + public void testCentroids() { + try (ExponentialHistogramState state = ExponentialHistogramState.create(breaker())) { + List expectedCentroids = new ArrayList<>(); + ExponentialHistogramBuilder builder = ExponentialHistogram.builder(0, ExponentialHistogramCircuitBreaker.noop()); - if (randomBoolean()) { - double zeroThreshold = Arrays.stream(values).map(Math::abs).min().orElse(1.0) / 2.0; - long zeroBucketCount = randomIntBetween(0, 100); - ZeroBucket zeroBucket; if (randomBoolean()) { - zeroBucket = ZeroBucket.create(zeroThreshold, zeroBucketCount); - } else { - // define the zero bucket using index and scale to verify serialization is exact - int scale = randomIntBetween(0, MAX_SCALE); - long index = ExponentialScaleUtils.computeIndex(zeroThreshold, scale) - 1; - zeroBucket = ZeroBucket.create(index, scale, zeroBucketCount); + builder.setNegativeBucket(-1, 11).setNegativeBucket(2, 22); + expectedCentroids.add(new Centroid(-ExponentialScaleUtils.getPointOfLeastRelativeError(2, 0), 22)); + expectedCentroids.add(new Centroid(-ExponentialScaleUtils.getPointOfLeastRelativeError(-1, 0), 11)); + } + if (randomBoolean()) { + builder.zeroBucket(ZeroBucket.create(0.0, 123)); + expectedCentroids.add(new Centroid(0.0, 123)); + } + if (randomBoolean()) { + builder.setPositiveBucket(-11, 40).setPositiveBucket(12, 41); + expectedCentroids.add(new Centroid(ExponentialScaleUtils.getPointOfLeastRelativeError(-11, 0), 40)); + expectedCentroids.add(new Centroid(ExponentialScaleUtils.getPointOfLeastRelativeError(12, 0), 41)); + } + + state.add(builder.build()); + + Collection centroids = state.centroids(); + assertThat(centroids.size(), equalTo(expectedCentroids.size())); + assertThat(state.centroidCount(), equalTo(expectedCentroids.size())); + + Iterator actualIt = centroids.iterator(); + Iterator expectedIt = expectedCentroids.iterator(); + for (int i = 0; i < expectedCentroids.size(); i++) { + Centroid actual = actualIt.next(); + Centroid expected = expectedIt.next(); + assertThat(actual.mean(), closeTo(expected.mean(), 0.00001)); + assertThat(actual.count(), equalTo(expected.count())); } - histogram = ExponentialHistogram.builder(histogram, ExponentialHistogramCircuitBreaker.noop()).zeroBucket(zeroBucket).build(); } - return histogram; + } + + public void testCentroidSorted() { + try (ExponentialHistogramState state = ExponentialHistogramState.create(breaker())) { + state.add(randomHistogram(randomIntBetween(4, 500))); + + List actualCentroidMeans = state.centroids().stream().map(Centroid::mean).toList(); + List sortedMeans = new ArrayList<>(actualCentroidMeans); + Collections.sort(sortedMeans); + assertThat(actualCentroidMeans, equalTo(sortedMeans)); + } + } + + private static ReleasableExponentialHistogram randomHistogram(int maxBuckets) { + return ExponentialHistogramTestUtils.randomHistogram(maxBuckets, ExponentialHistogramCircuitBreaker.noop()); } private CircuitBreaker breaker() {