|
13 | 13 | import org.apache.lucene.index.SortedDocValues; |
14 | 14 | import org.apache.lucene.index.SortedSetDocValues; |
15 | 15 | import org.apache.lucene.util.BytesRef; |
| 16 | +import org.elasticsearch.common.util.IntArray; |
16 | 17 | import org.elasticsearch.common.util.LongArray; |
17 | 18 | import org.elasticsearch.common.util.ObjectArray; |
18 | 19 | import org.elasticsearch.core.Releasables; |
|
26 | 27 | import org.elasticsearch.search.aggregations.InternalOrder; |
27 | 28 | import org.elasticsearch.search.aggregations.LeafBucketCollector; |
28 | 29 | import org.elasticsearch.search.aggregations.LeafBucketCollectorBase; |
| 30 | +import org.elasticsearch.search.aggregations.bucket.terms.BucketAndOrd; |
29 | 31 | import org.elasticsearch.search.aggregations.bucket.terms.BucketPriorityQueue; |
30 | 32 | import org.elasticsearch.search.aggregations.bucket.terms.BytesKeyedBucketOrds; |
31 | 33 | import org.elasticsearch.search.aggregations.bucket.terms.InternalTerms; |
|
38 | 40 | import java.util.Arrays; |
39 | 41 | import java.util.Map; |
40 | 42 | import java.util.function.BiConsumer; |
41 | | -import java.util.function.Supplier; |
42 | 43 |
|
43 | 44 | import static java.util.Collections.emptyList; |
44 | 45 | import static org.elasticsearch.search.aggregations.InternalOrder.isKeyOrder; |
@@ -115,51 +116,57 @@ public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throw |
115 | 116 | LongArray otherDocCounts = bigArrays().newLongArray(owningBucketOrds.size()); |
116 | 117 | ObjectArray<StringTerms.Bucket[]> topBucketsPerOrd = bigArrays().newObjectArray(owningBucketOrds.size()) |
117 | 118 | ) { |
118 | | - for (long ordIdx = 0; ordIdx < topBucketsPerOrd.size(); ordIdx++) { |
119 | | - int size = (int) Math.min(bucketOrds.size(), bucketCountThresholds.getShardSize()); |
120 | | - |
121 | | - // as users can't control sort order, in practice we'll always sort by doc count descending |
122 | | - try ( |
123 | | - BucketPriorityQueue<StringTerms.Bucket> ordered = new BucketPriorityQueue<>( |
124 | | - size, |
125 | | - bigArrays(), |
126 | | - partiallyBuiltBucketComparator |
127 | | - ) |
128 | | - ) { |
129 | | - StringTerms.Bucket spare = null; |
130 | | - BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds.get(ordIdx)); |
131 | | - Supplier<StringTerms.Bucket> emptyBucketBuilder = () -> new StringTerms.Bucket( |
132 | | - new BytesRef(), |
133 | | - 0, |
134 | | - null, |
135 | | - false, |
136 | | - 0, |
137 | | - format |
138 | | - ); |
139 | | - while (ordsEnum.next()) { |
140 | | - long docCount = bucketDocCount(ordsEnum.ord()); |
141 | | - otherDocCounts.increment(ordIdx, docCount); |
142 | | - if (spare == null) { |
143 | | - checkRealMemoryCBForInternalBucket(); |
144 | | - spare = emptyBucketBuilder.get(); |
| 119 | + try (IntArray bucketsToCollect = bigArrays().newIntArray(owningBucketOrds.size())) { |
| 120 | + // find how many buckets we are going to collect |
| 121 | + long ordsToCollect = 0; |
| 122 | + for (long ordIdx = 0; ordIdx < owningBucketOrds.size(); ordIdx++) { |
| 123 | + int size = (int) Math.min(bucketOrds.bucketsInOrd(owningBucketOrds.get(ordIdx)), bucketCountThresholds.getShardSize()); |
| 124 | + bucketsToCollect.set(ordIdx, size); |
| 125 | + ordsToCollect += size; |
| 126 | + } |
| 127 | + try (LongArray ordsArray = bigArrays().newLongArray(ordsToCollect)) { |
| 128 | + long ordsCollected = 0; |
| 129 | + for (long ordIdx = 0; ordIdx < owningBucketOrds.size(); ordIdx++) { |
| 130 | + // as users can't control sort order, in practice we'll always sort by doc count descending |
| 131 | + try ( |
| 132 | + BucketPriorityQueue<StringTerms.Bucket> ordered = new BucketPriorityQueue<>( |
| 133 | + bucketsToCollect.get(ordIdx), |
| 134 | + bigArrays(), |
| 135 | + order.partiallyBuiltBucketComparator(this) |
| 136 | + ) |
| 137 | + ) { |
| 138 | + BucketAndOrd<StringTerms.Bucket> spare = null; |
| 139 | + BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds.get(ordIdx)); |
| 140 | + while (ordsEnum.next()) { |
| 141 | + long docCount = bucketDocCount(ordsEnum.ord()); |
| 142 | + otherDocCounts.increment(ordIdx, docCount); |
| 143 | + if (spare == null) { |
| 144 | + checkRealMemoryCBForInternalBucket(); |
| 145 | + spare = new BucketAndOrd<>(new StringTerms.Bucket(new BytesRef(), 0, null, false, 0, format)); |
| 146 | + } |
| 147 | + ordsEnum.readValue(spare.bucket.getTermBytes()); |
| 148 | + spare.bucket.setDocCount(docCount); |
| 149 | + spare.ord = ordsEnum.ord(); |
| 150 | + spare = ordered.insertWithOverflow(spare); |
| 151 | + } |
| 152 | + final int orderedSize = (int) ordered.size(); |
| 153 | + final StringTerms.Bucket[] buckets = new StringTerms.Bucket[orderedSize]; |
| 154 | + for (int i = orderedSize - 1; i >= 0; --i) { |
| 155 | + BucketAndOrd<StringTerms.Bucket> bucketAndOrd = ordered.pop(); |
| 156 | + buckets[i] = bucketAndOrd.bucket; |
| 157 | + ordsArray.set(ordsCollected + i, bucketAndOrd.ord); |
| 158 | + otherDocCounts.increment(ordIdx, -bucketAndOrd.bucket.getDocCount()); |
| 159 | + bucketAndOrd.bucket.setTermBytes(BytesRef.deepCopyOf(bucketAndOrd.bucket.getTermBytes())); |
| 160 | + } |
| 161 | + topBucketsPerOrd.set(ordIdx, buckets); |
| 162 | + ordsCollected += orderedSize; |
145 | 163 | } |
146 | | - ordsEnum.readValue(spare.getTermBytes()); |
147 | | - spare.setDocCount(docCount); |
148 | | - spare.setBucketOrd(ordsEnum.ord()); |
149 | | - spare = ordered.insertWithOverflow(spare); |
150 | | - } |
151 | | - |
152 | | - topBucketsPerOrd.set(ordIdx, new StringTerms.Bucket[(int) ordered.size()]); |
153 | | - for (int i = (int) ordered.size() - 1; i >= 0; --i) { |
154 | | - topBucketsPerOrd.get(ordIdx)[i] = ordered.pop(); |
155 | | - otherDocCounts.increment(ordIdx, -topBucketsPerOrd.get(ordIdx)[i].getDocCount()); |
156 | | - topBucketsPerOrd.get(ordIdx)[i].setTermBytes(BytesRef.deepCopyOf(topBucketsPerOrd.get(ordIdx)[i].getTermBytes())); |
157 | 164 | } |
| 165 | + assert ordsCollected == ordsArray.size(); |
| 166 | + buildSubAggsForAllBuckets(topBucketsPerOrd, ordsArray, InternalTerms.Bucket::setAggregations); |
158 | 167 | } |
159 | 168 | } |
160 | 169 |
|
161 | | - buildSubAggsForAllBuckets(topBucketsPerOrd, InternalTerms.Bucket::getBucketOrd, InternalTerms.Bucket::setAggregations); |
162 | | - |
163 | 170 | return buildAggregations(Math.toIntExact(owningBucketOrds.size()), ordIdx -> { |
164 | 171 | final BucketOrder reduceOrder; |
165 | 172 | if (isKeyOrder(order) == false) { |
|
0 commit comments