|
11 | 11 | import org.elasticsearch.common.io.stream.StreamOutput;
|
12 | 12 | import org.elasticsearch.common.io.stream.Writeable;
|
13 | 13 | import org.elasticsearch.common.util.LongObjectPagedHashMap;
|
| 14 | +import org.elasticsearch.core.Releasables; |
14 | 15 | import org.elasticsearch.search.aggregations.AggregationReduceContext;
|
15 | 16 | import org.elasticsearch.search.aggregations.InternalAggregation;
|
16 | 17 | import org.elasticsearch.search.aggregations.InternalAggregations;
|
@@ -78,30 +79,35 @@ public List<InternalGeoGridBucket> getBuckets() {
|
78 | 79 | @Override
|
79 | 80 | public InternalGeoGrid<B> reduce(List<InternalAggregation> aggregations, AggregationReduceContext reduceContext) {
|
80 | 81 | LongObjectPagedHashMap<List<InternalGeoGridBucket>> buckets = null;
|
81 |
| - for (InternalAggregation aggregation : aggregations) { |
82 |
| - @SuppressWarnings("unchecked") |
83 |
| - InternalGeoGrid<B> grid = (InternalGeoGrid<B>) aggregation; |
84 |
| - if (buckets == null) { |
85 |
| - buckets = new LongObjectPagedHashMap<>(grid.buckets.size(), reduceContext.bigArrays()); |
86 |
| - } |
87 |
| - for (Object obj : grid.buckets) { |
88 |
| - InternalGeoGridBucket bucket = (InternalGeoGridBucket) obj; |
89 |
| - List<InternalGeoGridBucket> existingBuckets = buckets.get(bucket.hashAsLong()); |
90 |
| - if (existingBuckets == null) { |
91 |
| - existingBuckets = new ArrayList<>(aggregations.size()); |
92 |
| - buckets.put(bucket.hashAsLong(), existingBuckets); |
| 82 | + final BucketPriorityQueue<InternalGeoGridBucket> ordered; |
| 83 | + try { |
| 84 | + for (InternalAggregation aggregation : aggregations) { |
| 85 | + @SuppressWarnings("unchecked") |
| 86 | + InternalGeoGrid<B> grid = (InternalGeoGrid<B>) aggregation; |
| 87 | + if (buckets == null) { |
| 88 | + buckets = new LongObjectPagedHashMap<>(grid.buckets.size(), reduceContext.bigArrays()); |
| 89 | + } |
| 90 | + for (InternalGeoGridBucket bucket : grid.buckets) { |
| 91 | + List<InternalGeoGridBucket> existingBuckets = buckets.get(bucket.hashAsLong()); |
| 92 | + if (existingBuckets == null) { |
| 93 | + existingBuckets = new ArrayList<>(aggregations.size()); |
| 94 | + buckets.put(bucket.hashAsLong(), existingBuckets); |
| 95 | + } |
| 96 | + existingBuckets.add(bucket); |
93 | 97 | }
|
94 |
| - existingBuckets.add(bucket); |
95 | 98 | }
|
96 |
| - } |
97 | 99 |
|
98 |
| - final int size = Math.toIntExact(reduceContext.isFinalReduce() == false ? buckets.size() : Math.min(requiredSize, buckets.size())); |
99 |
| - BucketPriorityQueue<InternalGeoGridBucket> ordered = new BucketPriorityQueue<>(size); |
100 |
| - for (LongObjectPagedHashMap.Cursor<List<InternalGeoGridBucket>> cursor : buckets) { |
101 |
| - List<InternalGeoGridBucket> sameCellBuckets = cursor.value; |
102 |
| - ordered.insertWithOverflow(reduceBucket(sameCellBuckets, reduceContext)); |
| 100 | + final int size = Math.toIntExact( |
| 101 | + reduceContext.isFinalReduce() == false ? buckets.size() : Math.min(requiredSize, buckets.size()) |
| 102 | + ); |
| 103 | + ordered = new BucketPriorityQueue<>(size); |
| 104 | + for (LongObjectPagedHashMap.Cursor<List<InternalGeoGridBucket>> cursor : buckets) { |
| 105 | + List<InternalGeoGridBucket> sameCellBuckets = cursor.value; |
| 106 | + ordered.insertWithOverflow(reduceBucket(sameCellBuckets, reduceContext)); |
| 107 | + } |
| 108 | + } finally { |
| 109 | + Releasables.close(buckets); |
103 | 110 | }
|
104 |
| - buckets.close(); |
105 | 111 | InternalGeoGridBucket[] list = new InternalGeoGridBucket[ordered.size()];
|
106 | 112 | for (int i = ordered.size() - 1; i >= 0; i--) {
|
107 | 113 | list[i] = ordered.pop();
|
|
0 commit comments