| 
13 | 13 | import org.apache.lucene.index.SortedDocValues;  | 
14 | 14 | import org.apache.lucene.index.SortedSetDocValues;  | 
15 | 15 | import org.apache.lucene.util.BytesRef;  | 
 | 16 | +import org.elasticsearch.common.util.IntArray;  | 
16 | 17 | import org.elasticsearch.common.util.LongArray;  | 
17 | 18 | import org.elasticsearch.common.util.ObjectArray;  | 
18 | 19 | import org.elasticsearch.core.Releasables;  | 
 | 
26 | 27 | import org.elasticsearch.search.aggregations.InternalOrder;  | 
27 | 28 | import org.elasticsearch.search.aggregations.LeafBucketCollector;  | 
28 | 29 | import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;  | 
 | 30 | +import org.elasticsearch.search.aggregations.bucket.terms.BucketAndOrd;  | 
29 | 31 | import org.elasticsearch.search.aggregations.bucket.terms.BucketPriorityQueue;  | 
30 | 32 | import org.elasticsearch.search.aggregations.bucket.terms.BytesKeyedBucketOrds;  | 
31 | 33 | import org.elasticsearch.search.aggregations.bucket.terms.InternalTerms;  | 
 | 
38 | 40 | import java.util.Arrays;  | 
39 | 41 | import java.util.Map;  | 
40 | 42 | import java.util.function.BiConsumer;  | 
41 |  | -import java.util.function.Supplier;  | 
42 | 43 | 
 
  | 
43 | 44 | import static java.util.Collections.emptyList;  | 
44 | 45 | import static org.apache.lucene.index.SortedSetDocValues.NO_MORE_ORDS;  | 
@@ -115,51 +116,57 @@ public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throw  | 
115 | 116 |             LongArray otherDocCounts = bigArrays().newLongArray(owningBucketOrds.size());  | 
116 | 117 |             ObjectArray<StringTerms.Bucket[]> topBucketsPerOrd = bigArrays().newObjectArray(owningBucketOrds.size())  | 
117 | 118 |         ) {  | 
118 |  | -            for (long ordIdx = 0; ordIdx < topBucketsPerOrd.size(); ordIdx++) {  | 
119 |  | -                int size = (int) Math.min(bucketOrds.size(), bucketCountThresholds.getShardSize());  | 
120 |  | - | 
121 |  | -                // as users can't control sort order, in practice we'll always sort by doc count descending  | 
122 |  | -                try (  | 
123 |  | -                    BucketPriorityQueue<StringTerms.Bucket> ordered = new BucketPriorityQueue<>(  | 
124 |  | -                        size,  | 
125 |  | -                        bigArrays(),  | 
126 |  | -                        partiallyBuiltBucketComparator  | 
127 |  | -                    )  | 
128 |  | -                ) {  | 
129 |  | -                    StringTerms.Bucket spare = null;  | 
130 |  | -                    BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds.get(ordIdx));  | 
131 |  | -                    Supplier<StringTerms.Bucket> emptyBucketBuilder = () -> new StringTerms.Bucket(  | 
132 |  | -                        new BytesRef(),  | 
133 |  | -                        0,  | 
134 |  | -                        null,  | 
135 |  | -                        false,  | 
136 |  | -                        0,  | 
137 |  | -                        format  | 
138 |  | -                    );  | 
139 |  | -                    while (ordsEnum.next()) {  | 
140 |  | -                        long docCount = bucketDocCount(ordsEnum.ord());  | 
141 |  | -                        otherDocCounts.increment(ordIdx, docCount);  | 
142 |  | -                        if (spare == null) {  | 
143 |  | -                            checkRealMemoryCBForInternalBucket();  | 
144 |  | -                            spare = emptyBucketBuilder.get();  | 
 | 119 | +            try (IntArray bucketsToCollect = bigArrays().newIntArray(owningBucketOrds.size())) {  | 
 | 120 | +                // find how many buckets we are going to collect  | 
 | 121 | +                long ordsToCollect = 0;  | 
 | 122 | +                for (long ordIdx = 0; ordIdx < owningBucketOrds.size(); ordIdx++) {  | 
 | 123 | +                    int size = (int) Math.min(bucketOrds.bucketsInOrd(owningBucketOrds.get(ordIdx)), bucketCountThresholds.getShardSize());  | 
 | 124 | +                    bucketsToCollect.set(ordIdx, size);  | 
 | 125 | +                    ordsToCollect += size;  | 
 | 126 | +                }  | 
 | 127 | +                try (LongArray ordsArray = bigArrays().newLongArray(ordsToCollect)) {  | 
 | 128 | +                    long ordsCollected = 0;  | 
 | 129 | +                    for (long ordIdx = 0; ordIdx < owningBucketOrds.size(); ordIdx++) {  | 
 | 130 | +                        // as users can't control sort order, in practice we'll always sort by doc count descending  | 
 | 131 | +                        try (  | 
 | 132 | +                            BucketPriorityQueue<StringTerms.Bucket> ordered = new BucketPriorityQueue<>(  | 
 | 133 | +                                bucketsToCollect.get(ordIdx),  | 
 | 134 | +                                bigArrays(),  | 
 | 135 | +                                order.partiallyBuiltBucketComparator(this)  | 
 | 136 | +                            )  | 
 | 137 | +                        ) {  | 
 | 138 | +                            BucketAndOrd<StringTerms.Bucket> spare = null;  | 
 | 139 | +                            BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds.get(ordIdx));  | 
 | 140 | +                            while (ordsEnum.next()) {  | 
 | 141 | +                                long docCount = bucketDocCount(ordsEnum.ord());  | 
 | 142 | +                                otherDocCounts.increment(ordIdx, docCount);  | 
 | 143 | +                                if (spare == null) {  | 
 | 144 | +                                    checkRealMemoryCBForInternalBucket();  | 
 | 145 | +                                    spare = new BucketAndOrd<>(new StringTerms.Bucket(new BytesRef(), 0, null, false, 0, format));  | 
 | 146 | +                                }  | 
 | 147 | +                                ordsEnum.readValue(spare.bucket.getTermBytes());  | 
 | 148 | +                                spare.bucket.setDocCount(docCount);  | 
 | 149 | +                                spare.ord = ordsEnum.ord();  | 
 | 150 | +                                spare = ordered.insertWithOverflow(spare);  | 
 | 151 | +                            }  | 
 | 152 | +                            final int orderedSize = (int) ordered.size();  | 
 | 153 | +                            final StringTerms.Bucket[] buckets = new StringTerms.Bucket[orderedSize];  | 
 | 154 | +                            for (int i = orderedSize - 1; i >= 0; --i) {  | 
 | 155 | +                                BucketAndOrd<StringTerms.Bucket> bucketAndOrd = ordered.pop();  | 
 | 156 | +                                buckets[i] = bucketAndOrd.bucket;  | 
 | 157 | +                                ordsArray.set(ordsCollected + i, bucketAndOrd.ord);  | 
 | 158 | +                                otherDocCounts.increment(ordIdx, -bucketAndOrd.bucket.getDocCount());  | 
 | 159 | +                                bucketAndOrd.bucket.setTermBytes(BytesRef.deepCopyOf(bucketAndOrd.bucket.getTermBytes()));  | 
 | 160 | +                            }  | 
 | 161 | +                            topBucketsPerOrd.set(ordIdx, buckets);  | 
 | 162 | +                            ordsCollected += orderedSize;  | 
145 | 163 |                         }  | 
146 |  | -                        ordsEnum.readValue(spare.getTermBytes());  | 
147 |  | -                        spare.setDocCount(docCount);  | 
148 |  | -                        spare.setBucketOrd(ordsEnum.ord());  | 
149 |  | -                        spare = ordered.insertWithOverflow(spare);  | 
150 |  | -                    }  | 
151 |  | - | 
152 |  | -                    topBucketsPerOrd.set(ordIdx, new StringTerms.Bucket[(int) ordered.size()]);  | 
153 |  | -                    for (int i = (int) ordered.size() - 1; i >= 0; --i) {  | 
154 |  | -                        topBucketsPerOrd.get(ordIdx)[i] = ordered.pop();  | 
155 |  | -                        otherDocCounts.increment(ordIdx, -topBucketsPerOrd.get(ordIdx)[i].getDocCount());  | 
156 |  | -                        topBucketsPerOrd.get(ordIdx)[i].setTermBytes(BytesRef.deepCopyOf(topBucketsPerOrd.get(ordIdx)[i].getTermBytes()));  | 
157 | 164 |                     }  | 
 | 165 | +                    assert ordsCollected == ordsArray.size();  | 
 | 166 | +                    buildSubAggsForAllBuckets(topBucketsPerOrd, ordsArray, InternalTerms.Bucket::setAggregations);  | 
158 | 167 |                 }  | 
159 | 168 |             }  | 
160 | 169 | 
 
  | 
161 |  | -            buildSubAggsForAllBuckets(topBucketsPerOrd, InternalTerms.Bucket::getBucketOrd, InternalTerms.Bucket::setAggregations);  | 
162 |  | - | 
163 | 170 |             return buildAggregations(Math.toIntExact(owningBucketOrds.size()), ordIdx -> {  | 
164 | 171 |                 final BucketOrder reduceOrder;  | 
165 | 172 |                 if (isKeyOrder(order) == false) {  | 
 | 
0 commit comments