diff --git a/CHANGELOG.md b/CHANGELOG.md index 7b442c66670e6..4d6ca57f7053d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Introduced internal API for retrieving metadata about requested indices from transport actions ([#18523](https://github.com/opensearch-project/OpenSearch/pull/18523)) ### Changed +- Combining filter rewrite and skip list to optimize sub aggregation([#19573](https://github.com/opensearch-project/OpenSearch/pull/19573)) - Faster `terms` query creation for `keyword` field with index and docValues enabled ([#19350](https://github.com/opensearch-project/OpenSearch/pull/19350)) - Refactor to move prepareIndex and prepareDelete methods to Engine class ([#19551](https://github.com/opensearch-project/OpenSearch/pull/19551)) - Omit maxScoreCollector in SimpleTopDocsCollectorContext when concurrent segment search enabled ([#19584](https://github.com/opensearch-project/OpenSearch/pull/19584)) diff --git a/server/src/main/java/org/opensearch/search/aggregations/BitSetDocIdStream.java b/server/src/main/java/org/opensearch/search/aggregations/BitSetDocIdStream.java new file mode 100644 index 0000000000000..a33b1dbf79180 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/aggregations/BitSetDocIdStream.java @@ -0,0 +1,76 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.aggregations; + +import org.apache.lucene.search.CheckedIntConsumer; +import org.apache.lucene.search.DocIdStream; +import org.apache.lucene.util.FixedBitSet; +import org.apache.lucene.util.MathUtil; + +import java.io.IOException; + +/** + * DocIdStream implementation using FixedBitSet. This is duplicate of the implementation in Lucene + * and should ideally eventually be removed. + * + * @opensearch.internal + */ +public final class BitSetDocIdStream extends DocIdStream { + + private final FixedBitSet bitSet; + private final int offset, max; + private int upTo; + + public BitSetDocIdStream(FixedBitSet bitSet, int offset) { + this.bitSet = bitSet; + this.offset = offset; + upTo = offset; + max = MathUtil.unsignedMin(Integer.MAX_VALUE, offset + bitSet.length()); + } + + @Override + public boolean mayHaveRemaining() { + return upTo < max; + } + + @Override + public void forEach(int upTo, CheckedIntConsumer consumer) throws IOException { + if (upTo > this.upTo) { + upTo = Math.min(upTo, max); + bitSet.forEach(this.upTo - offset, upTo - offset, offset, consumer); + this.upTo = upTo; + } + } + + @Override + public int count(int upTo) throws IOException { + if (upTo > this.upTo) { + upTo = Math.min(upTo, max); + int count = bitSet.cardinality(this.upTo - offset, upTo - offset); + this.upTo = upTo; + return count; + } else { + return 0; + } + } + + @Override + public int intoArray(int upTo, int[] array) { + if (upTo > this.upTo) { + upTo = Math.min(upTo, max); + int count = bitSet.intoArray(this.upTo - offset, upTo - offset, offset, array); + if (count == array.length) { // The whole range of doc IDs may not have been copied + upTo = array[array.length - 1] + 1; + } + this.upTo = upTo; + return count; + } + return 0; + } +} diff --git a/server/src/main/java/org/opensearch/search/aggregations/LeafBucketCollector.java b/server/src/main/java/org/opensearch/search/aggregations/LeafBucketCollector.java index 0b34ffc78853a..4b7e4d36ce937 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/LeafBucketCollector.java +++ b/server/src/main/java/org/opensearch/search/aggregations/LeafBucketCollector.java @@ -32,8 +32,10 @@ package org.opensearch.search.aggregations; +import org.apache.lucene.search.DocIdStream; import org.apache.lucene.search.LeafCollector; import org.apache.lucene.search.Scorable; +import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds; import java.io.IOException; @@ -123,6 +125,34 @@ public void collect(int doc) throws IOException { collect(doc, 0); } + /** + * Bulk-collect doc IDs within owningBucketOrd. + * + *

Note: The provided {@link DocIdStream} may be reused across calls and should be consumed + * immediately. + * + *

Note: The provided {@link DocIdStream} typically holds all the docIds for the corresponding + * owningBucketOrd. This method may be called multiple times per segment (but once per owningBucketOrd). + * + *

While the {@link DocIdStream} for each owningBucketOrd is sorted by docIds, it is NOT GUARANTEED + * that doc IDs arrive in order across invocations for different owningBucketOrd. + * + *

It is NOT LEGAL for callers to mix calls to {@link #collect(DocIdStream, long)} and {@link + * #collect(int, long)}. + * + *

The default implementation calls {@code stream.forEach(doc -> collect(doc, owningBucketOrd))}. + */ + @ExperimentalApi + public void collect(DocIdStream stream, long owningBucketOrd) throws IOException { + stream.forEach(doc -> collect(doc, owningBucketOrd)); + } + + public void collect(int[] docIds, long owningBucketOrd) throws IOException { + for (int doc : docIds) { + collect(doc, owningBucketOrd); + } + } + @Override public void setScorer(Scorable scorer) throws IOException { // no-op by default diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/HistogramSkiplistLeafCollector.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/HistogramSkiplistLeafCollector.java new file mode 100644 index 0000000000000..2b51940ae54c1 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/HistogramSkiplistLeafCollector.java @@ -0,0 +1,171 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.aggregations.bucket; + +import org.apache.lucene.index.DocValuesSkipper; +import org.apache.lucene.index.NumericDocValues; +import org.apache.lucene.search.DocIdStream; +import org.apache.lucene.search.Scorable; +import org.opensearch.common.Rounding; +import org.opensearch.search.aggregations.LeafBucketCollector; +import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds; + +import java.io.IOException; + +/** + * Histogram collection logic using skip list. + * + * @opensearch.internal + */ +public class HistogramSkiplistLeafCollector extends LeafBucketCollector { + + private final NumericDocValues values; + private final DocValuesSkipper skipper; + private final Rounding.Prepared preparedRounding; + private final LongKeyedBucketOrds bucketOrds; + private final LeafBucketCollector sub; + private final BucketsAggregator aggregator; + + /** + * Max doc ID (inclusive) up to which all docs values may map to the same bucket. + */ + private int upToInclusive = -1; + + /** + * Whether all docs up to {@link #upToInclusive} values map to the same bucket. + */ + private boolean upToSameBucket; + + /** + * Index in bucketOrds for docs up to {@link #upToInclusive}. + */ + private long upToBucketIndex; + + public HistogramSkiplistLeafCollector( + NumericDocValues values, + DocValuesSkipper skipper, + Rounding.Prepared preparedRounding, + LongKeyedBucketOrds bucketOrds, + LeafBucketCollector sub, + BucketsAggregator aggregator + ) { + this.values = values; + this.skipper = skipper; + this.preparedRounding = preparedRounding; + this.bucketOrds = bucketOrds; + this.sub = sub; + this.aggregator = aggregator; + } + + @Override + public void setScorer(Scorable scorer) throws IOException { + if (sub != null) { + sub.setScorer(scorer); + } + } + + private void advanceSkipper(int doc, long owningBucketOrd) throws IOException { + if (doc > skipper.maxDocID(0)) { + skipper.advance(doc); + } + upToSameBucket = false; + + if (skipper.minDocID(0) > doc) { + // Corner case which happens if `doc` doesn't have a value and is between two intervals of + // the doc-value skip index. + upToInclusive = skipper.minDocID(0) - 1; + return; + } + + upToInclusive = skipper.maxDocID(0); + + // Now find the highest level where all docs map to the same bucket. + for (int level = 0; level < skipper.numLevels(); ++level) { + int totalDocsAtLevel = skipper.maxDocID(level) - skipper.minDocID(level) + 1; + long minBucket = preparedRounding.round(skipper.minValue(level)); + long maxBucket = preparedRounding.round(skipper.maxValue(level)); + + if (skipper.docCount(level) == totalDocsAtLevel && minBucket == maxBucket) { + // All docs at this level have a value, and all values map to the same bucket. + upToInclusive = skipper.maxDocID(level); + upToSameBucket = true; + upToBucketIndex = bucketOrds.add(owningBucketOrd, maxBucket); + if (upToBucketIndex < 0) { + upToBucketIndex = -1 - upToBucketIndex; + } + } else { + break; + } + } + } + + @Override + public void collect(int doc, long owningBucketOrd) throws IOException { + if (doc > upToInclusive) { + advanceSkipper(doc, owningBucketOrd); + } + + if (upToSameBucket) { + aggregator.incrementBucketDocCount(upToBucketIndex, 1L); + sub.collect(doc, upToBucketIndex); + } else if (values.advanceExact(doc)) { + final long value = values.longValue(); + long bucketIndex = bucketOrds.add(owningBucketOrd, preparedRounding.round(value)); + if (bucketIndex < 0) { + bucketIndex = -1 - bucketIndex; + aggregator.collectExistingBucket(sub, doc, bucketIndex); + } else { + aggregator.collectBucket(sub, doc, bucketIndex); + } + } + } + + @Override + public void collect(DocIdStream stream) throws IOException { + // This will only be called if its the top agg + collect(stream, 0); + } + + @Override + public void collect(DocIdStream stream, long owningBucketOrd) throws IOException { + // This will only be called if its the sub aggregation + for (;;) { + int upToExclusive = upToInclusive + 1; + if (upToExclusive < 0) { // overflow + upToExclusive = Integer.MAX_VALUE; + } + + if (upToSameBucket) { + if (sub == NO_OP_COLLECTOR) { + // stream.count maybe faster when we don't need to handle sub-aggs + long count = stream.count(upToExclusive); + aggregator.incrementBucketDocCount(upToBucketIndex, count); + } else { + int count = 0; + int[] docBuffer = new int[64]; + int cnt = Integer.MAX_VALUE; + while (cnt != 0) { + cnt = stream.intoArray(upToExclusive, docBuffer); + sub.collect(docBuffer, upToBucketIndex); + count += cnt; + } + aggregator.incrementBucketDocCount(upToBucketIndex, count); + } + } else { + stream.forEach(upToExclusive, doc -> collect(doc, owningBucketOrd)); + } + + if (stream.mayHaveRemaining()) { + advanceSkipper(upToExclusive, owningBucketOrd); + } else { + break; + } + } + } +} diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/rangecollector/SubAggRangeCollector.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/rangecollector/SubAggRangeCollector.java index 5c1f21b22e646..b0be76cf645c4 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/rangecollector/SubAggRangeCollector.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/rangecollector/SubAggRangeCollector.java @@ -14,6 +14,7 @@ import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.util.BitDocIdSet; import org.apache.lucene.util.FixedBitSet; +import org.opensearch.search.aggregations.BitSetDocIdStream; import org.opensearch.search.aggregations.BucketCollector; import org.opensearch.search.aggregations.LeafBucketCollector; import org.opensearch.search.aggregations.bucket.filterrewrite.FilterRewriteOptimizationContext; @@ -23,8 +24,6 @@ import java.util.function.BiConsumer; import java.util.function.Function; -import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS; - /** * Range collector implementation that supports sub-aggregations by collecting doc IDs. */ @@ -85,10 +84,7 @@ public void finalizePreviousRange() { DocIdSetIterator iterator = bitDocIdSet.iterator(); // build a new leaf collector for each bucket LeafBucketCollector sub = collectableSubAggregators.getLeafCollector(leafCtx); - while (iterator.nextDoc() != NO_MORE_DOCS) { - int currentDoc = iterator.docID(); - sub.collect(currentDoc, bucketOrd); - } + sub.collect(new BitSetDocIdStream(bitSet, 0), bucketOrd); logger.trace("collected sub aggregation for bucket {}", bucketOrd); } catch (IOException e) { throw new RuntimeException(e); diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java index 8fa9c61821fd8..7aa0fead4162f 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java @@ -31,7 +31,10 @@ package org.opensearch.search.aggregations.bucket.histogram; +import org.apache.lucene.index.DocValues; +import org.apache.lucene.index.DocValuesSkipper; import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.index.SortedNumericDocValues; import org.apache.lucene.search.ScoreMode; import org.apache.lucene.util.CollectionUtil; @@ -51,6 +54,7 @@ import org.opensearch.search.aggregations.LeafBucketCollectorBase; import org.opensearch.search.aggregations.bucket.DeferableBucketAggregator; import org.opensearch.search.aggregations.bucket.DeferringBucketCollector; +import org.opensearch.search.aggregations.bucket.HistogramSkiplistLeafCollector; import org.opensearch.search.aggregations.bucket.MergingBucketsDeferringCollector; import org.opensearch.search.aggregations.bucket.filterrewrite.DateHistogramAggregatorBridge; import org.opensearch.search.aggregations.bucket.filterrewrite.FilterRewriteOptimizationContext; @@ -135,6 +139,7 @@ static AutoDateHistogramAggregator build( protected int roundingIdx; protected Rounding.Prepared preparedRounding; + private final String fieldName; private final FilterRewriteOptimizationContext filterRewriteOptimizationContext; private AutoDateHistogramAggregator( @@ -218,6 +223,10 @@ protected Function bucketOrdProducer() { return (key) -> getBucketOrds().add(0, preparedRounding.round(key)); } }; + + this.fieldName = (valuesSource instanceof ValuesSource.Numeric.FieldData) + ? ((ValuesSource.Numeric.FieldData) valuesSource).getIndexFieldName() + : null; filterRewriteOptimizationContext = new FilterRewriteOptimizationContext(bridge, parent, subAggregators.length, context); } @@ -260,7 +269,21 @@ public final LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBuc return LeafBucketCollector.NO_OP_COLLECTOR; } + DocValuesSkipper skipper = null; + if (this.fieldName != null) { + skipper = ctx.reader().getDocValuesSkipper(this.fieldName); + } final SortedNumericDocValues values = valuesSource.longValues(ctx); + final NumericDocValues singleton = DocValues.unwrapSingleton(values); + + if (skipper != null && singleton != null) { + // TODO: add hard bounds support + // TODO: SkipListLeafCollector should be used if the getLeafCollector invocation is from + // filterRewriteOptimizationContext when parent != null. Removing the check to collect + // performance numbers for now + return new HistogramSkiplistLeafCollector(singleton, skipper, preparedRounding, getBucketOrds(), sub, this); + } + final LeafBucketCollector iteratingCollector = getLeafCollector(values, sub); return new LeafBucketCollectorBase(sub, values) { @Override diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java index 7ba939f64dbbf..7d785dec6d6a1 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java @@ -38,8 +38,6 @@ import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.index.SortedNumericDocValues; -import org.apache.lucene.search.DocIdStream; -import org.apache.lucene.search.Scorable; import org.apache.lucene.search.ScoreMode; import org.apache.lucene.util.CollectionUtil; import org.opensearch.common.Nullable; @@ -63,6 +61,7 @@ import org.opensearch.search.aggregations.StarTreeBucketCollector; import org.opensearch.search.aggregations.StarTreePreComputeCollector; import org.opensearch.search.aggregations.bucket.BucketsAggregator; +import org.opensearch.search.aggregations.bucket.HistogramSkiplistLeafCollector; import org.opensearch.search.aggregations.bucket.filterrewrite.DateHistogramAggregatorBridge; import org.opensearch.search.aggregations.bucket.filterrewrite.FilterRewriteOptimizationContext; import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds; @@ -234,7 +233,10 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCol if (skipper != null && singleton != null) { // TODO: add hard bounds support - if (hardBounds == null && parent == null) { + // TODO: SkipListLeafCollector should be used if the getLeafCollector invocation is from + // filterRewriteOptimizationContext when parent != null. Removing the check to collect + // performance numbers for now + if (hardBounds == null) { skipListCollectorsUsed++; return new HistogramSkiplistLeafCollector(singleton, skipper, preparedRounding, bucketOrds, sub, this); } @@ -434,149 +436,4 @@ public double bucketSize(long bucket, Rounding.DateTimeUnit unitSize) { return 1.0; } } - - private static class HistogramSkiplistLeafCollector extends LeafBucketCollector { - - private final NumericDocValues values; - private final DocValuesSkipper skipper; - private final Rounding.Prepared preparedRounding; - private final LongKeyedBucketOrds bucketOrds; - private final LeafBucketCollector sub; - private final BucketsAggregator aggregator; - - /** - * Max doc ID (inclusive) up to which all docs values may map to the same bucket. - */ - private int upToInclusive = -1; - - /** - * Whether all docs up to {@link #upToInclusive} values map to the same bucket. - */ - private boolean upToSameBucket; - - /** - * Index in bucketOrds for docs up to {@link #upToInclusive}. - */ - private long upToBucketIndex; - - HistogramSkiplistLeafCollector( - NumericDocValues values, - DocValuesSkipper skipper, - Rounding.Prepared preparedRounding, - LongKeyedBucketOrds bucketOrds, - LeafBucketCollector sub, - BucketsAggregator aggregator - ) { - this.values = values; - this.skipper = skipper; - this.preparedRounding = preparedRounding; - this.bucketOrds = bucketOrds; - this.sub = sub; - this.aggregator = aggregator; - } - - @Override - public void setScorer(Scorable scorer) throws IOException { - if (sub != null) { - sub.setScorer(scorer); - } - } - - private void advanceSkipper(int doc, long owningBucketOrd) throws IOException { - if (doc > skipper.maxDocID(0)) { - skipper.advance(doc); - } - upToSameBucket = false; - - if (skipper.minDocID(0) > doc) { - // Corner case which happens if `doc` doesn't have a value and is between two intervals of - // the doc-value skip index. - upToInclusive = skipper.minDocID(0) - 1; - return; - } - - upToInclusive = skipper.maxDocID(0); - - // Now find the highest level where all docs map to the same bucket. - for (int level = 0; level < skipper.numLevels(); ++level) { - int totalDocsAtLevel = skipper.maxDocID(level) - skipper.minDocID(level) + 1; - long minBucket = preparedRounding.round(skipper.minValue(level)); - long maxBucket = preparedRounding.round(skipper.maxValue(level)); - - if (skipper.docCount(level) == totalDocsAtLevel && minBucket == maxBucket) { - // All docs at this level have a value, and all values map to the same bucket. - upToInclusive = skipper.maxDocID(level); - upToSameBucket = true; - upToBucketIndex = bucketOrds.add(owningBucketOrd, maxBucket); - if (upToBucketIndex < 0) { - upToBucketIndex = -1 - upToBucketIndex; - } - } else { - break; - } - } - } - - @Override - public void collect(int doc, long owningBucketOrd) throws IOException { - if (doc > upToInclusive) { - advanceSkipper(doc, owningBucketOrd); - } - - if (upToSameBucket) { - aggregator.incrementBucketDocCount(upToBucketIndex, 1L); - sub.collect(doc, upToBucketIndex); - } else if (values.advanceExact(doc)) { - final long value = values.longValue(); - long bucketIndex = bucketOrds.add(owningBucketOrd, preparedRounding.round(value)); - if (bucketIndex < 0) { - bucketIndex = -1 - bucketIndex; - aggregator.collectExistingBucket(sub, doc, bucketIndex); - } else { - aggregator.collectBucket(sub, doc, bucketIndex); - } - } - } - - @Override - public void collect(int doc) throws IOException { - collect(doc, 0); - } - - @Override - public void collect(DocIdStream stream) throws IOException { - // This will only be called if its the top agg - for (;;) { - int upToExclusive = upToInclusive + 1; - if (upToExclusive < 0) { // overflow - upToExclusive = Integer.MAX_VALUE; - } - - if (upToSameBucket) { - if (sub == NO_OP_COLLECTOR) { - // stream.count maybe faster when we don't need to handle sub-aggs - long count = stream.count(upToExclusive); - aggregator.incrementBucketDocCount(upToBucketIndex, count); - } else { - final int[] count = { 0 }; - stream.forEach(upToExclusive, doc -> { - sub.collect(doc, upToBucketIndex); - count[0]++; - }); - aggregator.incrementBucketDocCount(upToBucketIndex, count[0]); - } - - } else { - stream.forEach(upToExclusive, this::collect); - } - - if (stream.mayHaveRemaining()) { - advanceSkipper(upToExclusive, 0); - } else { - break; - } - } - } - - } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregator.java index 5f99a9cc05558..7d36620eb8c08 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregator.java @@ -131,30 +131,39 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBuc final CompensatedSum kahanSummation = new CompensatedSum(0, 0); return new LeafBucketCollectorBase(sub, values) { + int[] docBuffer = new int[1]; + @Override public void collect(int doc, long bucket) throws IOException { + docBuffer[0] = doc; + collect(docBuffer, bucket); + } + + @Override + public void collect(int[] docBuffer, long bucket) throws IOException { counts = bigArrays.grow(counts, bucket + 1); sums = bigArrays.grow(sums, bucket + 1); compensations = bigArrays.grow(compensations, bucket + 1); - if (values.advanceExact(doc)) { - final int valueCount = values.docValueCount(); - counts.increment(bucket, valueCount); - // Compute the sum of double values with Kahan summation algorithm which is more - // accurate than naive summation. - double sum = sums.get(bucket); - double compensation = compensations.get(bucket); - - kahanSummation.reset(sum, compensation); - - for (int i = 0; i < valueCount; i++) { - double value = values.nextValue(); - kahanSummation.add(value); + double sum = sums.get(bucket); + double compensation = compensations.get(bucket); + kahanSummation.reset(sum, compensation); + int count = 0; + for (int doc : docBuffer) { + if (values.advanceExact(doc)) { + final int valueCount = values.docValueCount(); + count += valueCount; + // Compute the sum of double values with Kahan summation algorithm which is more + // accurate than naive summation. + for (int i = 0; i < valueCount; i++) { + double value = values.nextValue(); + kahanSummation.add(value); + } } - - sums.set(bucket, kahanSummation.value()); - compensations.set(bucket, kahanSummation.delta()); } + counts.increment(bucket, count); + sums.set(bucket, kahanSummation.value()); + compensations.set(bucket, kahanSummation.delta()); } }; } diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregator.java index fbba20d8a6d7d..9f7cbfd4c9ff2 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregator.java @@ -156,22 +156,30 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBuc final SortedNumericDoubleValues allValues = valuesSource.doubleValues(ctx); final NumericDoubleValues values = MultiValueMode.MAX.select(allValues); return new LeafBucketCollectorBase(sub, allValues) { + int[] docBuffer = new int[1]; @Override public void collect(int doc, long bucket) throws IOException { + docBuffer[0] = doc; + collect(docBuffer, bucket); + } + + @Override + public void collect(int[] docBuffer, long bucket) throws IOException { if (bucket >= maxes.size()) { long from = maxes.size(); maxes = bigArrays.grow(maxes, bucket + 1); maxes.fill(from, maxes.size(), Double.NEGATIVE_INFINITY); } - if (values.advanceExact(doc)) { - final double value = values.doubleValue(); - double max = maxes.get(bucket); - max = Math.max(max, value); - maxes.set(bucket, max); + + double max = maxes.get(bucket); + for (int doc : docBuffer) { + if (values.advanceExact(doc)) { + max = Math.max(max, values.doubleValue()); + } } + maxes.set(bucket, max); } - }; } diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregator.java index 5c2ed2b240a09..304b91eca5fe3 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregator.java @@ -156,20 +156,29 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBuc final SortedNumericDoubleValues allValues = valuesSource.doubleValues(ctx); final NumericDoubleValues values = MultiValueMode.MIN.select(allValues); return new LeafBucketCollectorBase(sub, allValues) { + int[] docBuffer = new int[1]; @Override public void collect(int doc, long bucket) throws IOException { + docBuffer[0] = doc; + collect(docBuffer, bucket); + } + + @Override + public void collect(int[] docBuffer, long bucket) throws IOException { if (bucket >= mins.size()) { long from = mins.size(); mins = bigArrays.grow(mins, bucket + 1); mins.fill(from, mins.size(), Double.POSITIVE_INFINITY); } - if (values.advanceExact(doc)) { - final double value = values.doubleValue(); - double min = mins.get(bucket); - min = Math.min(min, value); - mins.set(bucket, min); + + double min = mins.get(bucket); + for (int doc : docBuffer) { + if (values.advanceExact(doc)) { + min = Math.min(min, values.doubleValue()); + } } + mins.set(bucket, min); } }; } diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregator.java index ba32592f75ea1..bfdda1a0dcef3 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregator.java @@ -120,27 +120,34 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBuc final SortedNumericDoubleValues values = valuesSource.doubleValues(ctx); final CompensatedSum kahanSummation = new CompensatedSum(0, 0); return new LeafBucketCollectorBase(sub, values) { + int[] docBuffer = new int[1]; + @Override public void collect(int doc, long bucket) throws IOException { + docBuffer[0] = doc; + collect(docBuffer, bucket); + } + + @Override + public void collect(int[] docBuffer, long bucket) throws IOException { sums = bigArrays.grow(sums, bucket + 1); compensations = bigArrays.grow(compensations, bucket + 1); - if (values.advanceExact(doc)) { - final int valuesCount = values.docValueCount(); - // Compute the sum of double values with Kahan summation algorithm which is more - // accurate than naive summation. - double sum = sums.get(bucket); - double compensation = compensations.get(bucket); - kahanSummation.reset(sum, compensation); - - for (int i = 0; i < valuesCount; i++) { - double value = values.nextValue(); - kahanSummation.add(value); + // Compute the sum of double values with Kahan summation algorithm which is more + // accurate than naive summation. + double sum = sums.get(bucket); + double compensation = compensations.get(bucket); + kahanSummation.reset(sum, compensation); + for (int doc : docBuffer) { + if (values.advanceExact(doc)) { + for (int i = 0; i < values.docValueCount(); i++) { + double value = values.nextValue(); + kahanSummation.add(value); + } } - - compensations.set(bucket, kahanSummation.delta()); - sums.set(bucket, kahanSummation.value()); } + compensations.set(bucket, kahanSummation.delta()); + sums.set(bucket, kahanSummation.value()); } }; }