Skip to content

Commit 75f4232

Browse files
authored
Check the real memory circuit breaker when building internal aggregations (#117019)
checks periodically the real memory circuit breaker when allocating objects.
1 parent 49a1bb8 commit 75f4232

File tree

16 files changed

+138
-146
lines changed

16 files changed

+138
-146
lines changed

modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/adjacency/AdjacencyMatrixAggregator.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -188,17 +188,16 @@ public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throw
188188
}
189189
}
190190
try (LongArray bucketOrdsToBuild = bigArrays().newLongArray(totalBucketsToBuild)) {
191-
int builtBucketIndex = 0;
191+
int[] builtBucketIndex = new int[] { 0 };
192192
for (int ord = 0; ord < maxOrd; ord++) {
193193
if (bucketDocCount(ord) > 0) {
194-
bucketOrdsToBuild.set(builtBucketIndex++, ord);
194+
bucketOrdsToBuild.set(builtBucketIndex[0]++, ord);
195195
}
196196
}
197-
assert builtBucketIndex == totalBucketsToBuild;
198-
builtBucketIndex = 0;
197+
assert builtBucketIndex[0] == totalBucketsToBuild;
198+
builtBucketIndex[0] = 0;
199199
var bucketSubAggs = buildSubAggsForBuckets(bucketOrdsToBuild);
200-
InternalAggregation[] results = new InternalAggregation[Math.toIntExact(owningBucketOrds.size())];
201-
for (int owningBucketOrdIdx = 0; owningBucketOrdIdx < results.length; owningBucketOrdIdx++) {
200+
InternalAggregation[] aggregations = buildAggregations(Math.toIntExact(owningBucketOrds.size()), owningBucketOrdIdx -> {
202201
List<InternalAdjacencyMatrix.InternalBucket> buckets = new ArrayList<>(filters.length);
203202
for (int i = 0; i < keys.length; i++) {
204203
long bucketOrd = bucketOrd(owningBucketOrds.get(owningBucketOrdIdx), i);
@@ -207,10 +206,11 @@ public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throw
207206
// a date-histogram where we will look for transactions over time and can expect many
208207
// empty buckets.
209208
if (docCount > 0) {
209+
checkRealMemoryCBForInternalBucket();
210210
InternalAdjacencyMatrix.InternalBucket bucket = new InternalAdjacencyMatrix.InternalBucket(
211211
keys[i],
212212
docCount,
213-
bucketSubAggs.apply(builtBucketIndex++)
213+
bucketSubAggs.apply(builtBucketIndex[0]++)
214214
);
215215
buckets.add(bucket);
216216
}
@@ -226,17 +226,17 @@ public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throw
226226
InternalAdjacencyMatrix.InternalBucket bucket = new InternalAdjacencyMatrix.InternalBucket(
227227
intersectKey,
228228
docCount,
229-
bucketSubAggs.apply(builtBucketIndex++)
229+
bucketSubAggs.apply(builtBucketIndex[0]++)
230230
);
231231
buckets.add(bucket);
232232
}
233233
pos++;
234234
}
235235
}
236-
results[owningBucketOrdIdx] = new InternalAdjacencyMatrix(name, buckets, metadata());
237-
}
238-
assert builtBucketIndex == totalBucketsToBuild;
239-
return results;
236+
return new InternalAdjacencyMatrix(name, buckets, metadata());
237+
});
238+
assert builtBucketIndex[0] == totalBucketsToBuild;
239+
return aggregations;
240240
}
241241
}
242242

modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/TimeSeriesAggregator.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throw
7979
while (ordsEnum.next()) {
8080
long docCount = bucketDocCount(ordsEnum.ord());
8181
ordsEnum.readValue(spare);
82+
checkRealMemoryCBForInternalBucket();
8283
InternalTimeSeries.InternalBucket bucket = new InternalTimeSeries.InternalBucket(
8384
BytesRef.deepCopyOf(spare), // Closing bucketOrds will corrupt the bytes ref, so need to make a deep copy here.
8485
docCount,
@@ -101,11 +102,7 @@ public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throw
101102
}
102103
buildSubAggsForAllBuckets(allBucketsPerOrd, b -> b.bucketOrd, (b, a) -> b.aggregations = a);
103104

104-
InternalAggregation[] result = new InternalAggregation[Math.toIntExact(allBucketsPerOrd.size())];
105-
for (int ordIdx = 0; ordIdx < result.length; ordIdx++) {
106-
result[ordIdx] = buildResult(allBucketsPerOrd.get(ordIdx));
107-
}
108-
return result;
105+
return buildAggregations(Math.toIntExact(allBucketsPerOrd.size()), ordIdx -> buildResult(allBucketsPerOrd.get(ordIdx)));
109106
}
110107
}
111108

server/src/main/java/org/elasticsearch/search/aggregations/AggregatorBase.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
import org.apache.lucene.search.MatchAllDocsQuery;
1414
import org.apache.lucene.search.Query;
1515
import org.apache.lucene.search.ScoreMode;
16+
import org.elasticsearch.common.CheckedIntFunction;
17+
import org.elasticsearch.common.breaker.CircuitBreaker;
1618
import org.elasticsearch.common.breaker.CircuitBreakingException;
1719
import org.elasticsearch.common.util.BigArrays;
1820
import org.elasticsearch.common.util.Maps;
@@ -48,6 +50,8 @@ public abstract class AggregatorBase extends Aggregator {
4850

4951
private Map<String, Aggregator> subAggregatorbyName;
5052
private long requestBytesUsed;
53+
private final CircuitBreaker breaker;
54+
private int callCount;
5155

5256
/**
5357
* Constructs a new Aggregator.
@@ -72,6 +76,7 @@ protected AggregatorBase(
7276
this.metadata = metadata;
7377
this.parent = parent;
7478
this.context = context;
79+
this.breaker = context.breaker();
7580
assert factories != null : "sub-factories provided to BucketAggregator must not be null, use AggragatorFactories.EMPTY instead";
7681
this.subAggregators = factories.createSubAggregators(this, subAggregatorCardinality);
7782
context.addReleasable(this);
@@ -327,6 +332,30 @@ protected final InternalAggregations buildEmptySubAggregations() {
327332
return InternalAggregations.from(aggs);
328333
}
329334

335+
/**
336+
* Builds the aggregations array with the provided size and populates it using the provided function.
337+
*/
338+
protected final InternalAggregation[] buildAggregations(int size, CheckedIntFunction<InternalAggregation, IOException> aggFunction)
339+
throws IOException {
340+
final InternalAggregation[] results = new InternalAggregation[size];
341+
for (int i = 0; i < results.length; i++) {
342+
checkRealMemoryCB("internal_aggregation");
343+
results[i] = aggFunction.apply(i);
344+
}
345+
return results;
346+
}
347+
348+
/**
349+
* This method calls the circuit breaker from time to time in order to give it a chance to check available
350+
* memory in the parent breaker (Which should be a real memory breaker) and break the execution if we are running out.
351+
* To achieve that, we are passing 0 as the estimated bytes every 1024 calls
352+
*/
353+
protected final void checkRealMemoryCB(String label) {
354+
if ((++callCount & 0x3FF) == 0) {
355+
breaker.addEstimateBytesAndMaybeBreak(0, label);
356+
}
357+
}
358+
330359
@Override
331360
public String toString() {
332361
return name;

server/src/main/java/org/elasticsearch/search/aggregations/NonCollectingAggregator.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,6 @@ public final LeafBucketCollector getLeafCollector(AggregationExecutionContext ag
4141

4242
@Override
4343
public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException {
44-
InternalAggregation[] results = new InternalAggregation[Math.toIntExact(owningBucketOrds.size())];
45-
for (int ordIdx = 0; ordIdx < results.length; ordIdx++) {
46-
results[ordIdx] = buildEmptyAggregation();
47-
}
48-
return results;
44+
return buildAggregations(Math.toIntExact(owningBucketOrds.size()), ordIdx -> buildEmptyAggregation());
4945
}
5046
}

server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java

Lines changed: 30 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
package org.elasticsearch.search.aggregations.bucket;
1010

1111
import org.apache.lucene.index.LeafReaderContext;
12-
import org.elasticsearch.common.breaker.CircuitBreaker;
1312
import org.elasticsearch.common.util.IntArray;
1413
import org.elasticsearch.common.util.LongArray;
1514
import org.elasticsearch.common.util.ObjectArray;
@@ -42,10 +41,9 @@
4241
import java.util.function.ToLongFunction;
4342

4443
public abstract class BucketsAggregator extends AggregatorBase {
45-
private final CircuitBreaker breaker;
44+
4645
private LongArray docCounts;
4746
protected final DocCountProvider docCountProvider;
48-
private int callCount;
4947

5048
@SuppressWarnings("this-escape")
5149
public BucketsAggregator(
@@ -57,7 +55,6 @@ public BucketsAggregator(
5755
Map<String, Object> metadata
5856
) throws IOException {
5957
super(name, factories, aggCtx, parent, bucketCardinality, metadata);
60-
breaker = aggCtx.breaker();
6158
docCounts = bigArrays().newLongArray(1, true);
6259
docCountProvider = new DocCountProvider();
6360
}
@@ -83,7 +80,7 @@ public final void collectBucket(LeafBucketCollector subCollector, int doc, long
8380
grow(bucketOrd + 1);
8481
int docCount = docCountProvider.getDocCount(doc);
8582
if (docCounts.increment(bucketOrd, docCount) == docCount) {
86-
updateCircuitBreaker("allocated_buckets");
83+
checkRealMemoryCB("allocated_buckets");
8784
}
8885
subCollector.collect(doc, bucketOrd);
8986
}
@@ -176,7 +173,7 @@ protected final IntFunction<InternalAggregations> buildSubAggsForBuckets(LongArr
176173
prepareSubAggs(bucketOrdsToCollect);
177174
InternalAggregation[][] aggregations = new InternalAggregation[subAggregators.length][];
178175
for (int i = 0; i < subAggregators.length; i++) {
179-
updateCircuitBreaker("building_sub_aggregation");
176+
checkRealMemoryCB("building_sub_aggregation");
180177
aggregations[i] = subAggregators[i].buildAggregations(bucketOrdsToCollect);
181178
}
182179
return subAggsForBucketFunction(aggregations);
@@ -247,31 +244,30 @@ protected final <B> InternalAggregation[] buildAggregationsForFixedBucketCount(
247244
Function<List<B>, InternalAggregation> resultBuilder
248245
) throws IOException {
249246
try (LongArray bucketOrdsToCollect = bigArrays().newLongArray(owningBucketOrds.size() * bucketsPerOwningBucketOrd)) {
250-
int bucketOrdIdx = 0;
247+
final int[] bucketOrdIdx = new int[] { 0 };
251248
for (long i = 0; i < owningBucketOrds.size(); i++) {
252249
long ord = owningBucketOrds.get(i) * bucketsPerOwningBucketOrd;
253250
for (int offsetInOwningOrd = 0; offsetInOwningOrd < bucketsPerOwningBucketOrd; offsetInOwningOrd++) {
254-
bucketOrdsToCollect.set(bucketOrdIdx++, ord++);
251+
bucketOrdsToCollect.set(bucketOrdIdx[0]++, ord++);
255252
}
256253
}
257-
bucketOrdIdx = 0;
254+
bucketOrdIdx[0] = 0;
258255
var subAggregationResults = buildSubAggsForBuckets(bucketOrdsToCollect);
259256

260-
InternalAggregation[] results = new InternalAggregation[Math.toIntExact(owningBucketOrds.size())];
261-
for (int owningOrdIdx = 0; owningOrdIdx < results.length; owningOrdIdx++) {
257+
return buildAggregations(Math.toIntExact(owningBucketOrds.size()), ordIdx -> {
262258
List<B> buckets = new ArrayList<>(bucketsPerOwningBucketOrd);
263259
for (int offsetInOwningOrd = 0; offsetInOwningOrd < bucketsPerOwningBucketOrd; offsetInOwningOrd++) {
260+
checkRealMemoryCBForInternalBucket();
264261
buckets.add(
265262
bucketBuilder.build(
266263
offsetInOwningOrd,
267-
bucketDocCount(bucketOrdsToCollect.get(bucketOrdIdx)),
268-
subAggregationResults.apply(bucketOrdIdx++)
264+
bucketDocCount(bucketOrdsToCollect.get(bucketOrdIdx[0])),
265+
subAggregationResults.apply(bucketOrdIdx[0]++)
269266
)
270267
);
271268
}
272-
results[owningOrdIdx] = resultBuilder.apply(buckets);
273-
}
274-
return results;
269+
return resultBuilder.apply(buckets);
270+
});
275271
}
276272
}
277273

@@ -295,11 +291,10 @@ protected final InternalAggregation[] buildAggregationsForSingleBucket(
295291
* here but we don't because single bucket aggs never have.
296292
*/
297293
var subAggregationResults = buildSubAggsForBuckets(owningBucketOrds);
298-
InternalAggregation[] results = new InternalAggregation[Math.toIntExact(owningBucketOrds.size())];
299-
for (int ordIdx = 0; ordIdx < results.length; ordIdx++) {
300-
results[ordIdx] = resultBuilder.build(owningBucketOrds.get(ordIdx), subAggregationResults.apply(ordIdx));
301-
}
302-
return results;
294+
return buildAggregations(
295+
Math.toIntExact(owningBucketOrds.size()),
296+
ordIdx -> resultBuilder.build(owningBucketOrds.get(ordIdx), subAggregationResults.apply(ordIdx))
297+
);
303298
}
304299

305300
@FunctionalInterface
@@ -335,37 +330,36 @@ protected final <B> InternalAggregation[] buildAggregationsForVariableBuckets(
335330
);
336331
}
337332
try (LongArray bucketOrdsToCollect = bigArrays().newLongArray(totalOrdsToCollect)) {
338-
int b = 0;
333+
final int[] b = new int[] { 0 };
339334
for (long i = 0; i < owningBucketOrds.size(); i++) {
340335
LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds.get(i));
341336
while (ordsEnum.next()) {
342-
bucketOrdsToCollect.set(b++, ordsEnum.ord());
337+
bucketOrdsToCollect.set(b[0]++, ordsEnum.ord());
343338
}
344339
}
345340
var subAggregationResults = buildSubAggsForBuckets(bucketOrdsToCollect);
346341

347-
InternalAggregation[] results = new InternalAggregation[Math.toIntExact(owningBucketOrds.size())];
348-
b = 0;
349-
for (int ordIdx = 0; ordIdx < results.length; ordIdx++) {
342+
b[0] = 0;
343+
return buildAggregations(Math.toIntExact(owningBucketOrds.size()), ordIdx -> {
350344
final long owningBucketOrd = owningBucketOrds.get(ordIdx);
351345
List<B> buckets = new ArrayList<>(bucketsInOrd.get(ordIdx));
352346
LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrd);
353347
while (ordsEnum.next()) {
354-
if (bucketOrdsToCollect.get(b) != ordsEnum.ord()) {
348+
if (bucketOrdsToCollect.get(b[0]) != ordsEnum.ord()) {
355349
// If we hit this, something has gone horribly wrong and we need to investigate
356350
throw AggregationErrors.iterationOrderChangedWithoutMutating(
357351
bucketOrds.toString(),
358352
ordsEnum.ord(),
359-
bucketOrdsToCollect.get(b)
353+
bucketOrdsToCollect.get(b[0])
360354
);
361355
}
356+
checkRealMemoryCBForInternalBucket();
362357
buckets.add(
363-
bucketBuilder.build(ordsEnum.value(), bucketDocCount(ordsEnum.ord()), subAggregationResults.apply(b++))
358+
bucketBuilder.build(ordsEnum.value(), bucketDocCount(ordsEnum.ord()), subAggregationResults.apply(b[0]++))
364359
);
365360
}
366-
results[ordIdx] = resultBuilder.build(owningBucketOrd, buckets);
367-
}
368-
return results;
361+
return resultBuilder.build(owningBucketOrd, buckets);
362+
});
369363
}
370364
}
371365
}
@@ -425,14 +419,9 @@ protected void preGetSubLeafCollectors(LeafReaderContext ctx) throws IOException
425419
docCountProvider.setLeafReaderContext(ctx);
426420
}
427421

428-
/**
429-
* This method calls the circuit breaker from time to time in order to give it a chance to check available
430-
* memory in the parent breaker (Which should be a real memory breaker) and break the execution if we are running out.
431-
* To achieve that, we are passing 0 as the estimated bytes every 1024 calls
432-
*/
433-
private void updateCircuitBreaker(String label) {
434-
if ((++callCount & 0x3FF) == 0) {
435-
breaker.addEstimateBytesAndMaybeBreak(0, label);
436-
}
422+
/** This method should be called whenever a new bucket object is created. It will check the real memory
423+
* circuit breaker in a sampling fashion. See {@link #checkRealMemoryCB(String)} */
424+
protected final void checkRealMemoryCBForInternalBucket() {
425+
checkRealMemoryCB("internal_bucket");
437426
}
438427
}

server/src/main/java/org/elasticsearch/search/aggregations/bucket/countedterms/CountedTermsAggregator.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throw
140140
long docCount = bucketDocCount(ordsEnum.ord());
141141
otherDocCounts.increment(ordIdx, docCount);
142142
if (spare == null) {
143+
checkRealMemoryCBForInternalBucket();
143144
spare = emptyBucketBuilder.get();
144145
}
145146
ordsEnum.readValue(spare.getTermBytes());
@@ -158,16 +159,16 @@ public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throw
158159
}
159160

160161
buildSubAggsForAllBuckets(topBucketsPerOrd, InternalTerms.Bucket::getBucketOrd, InternalTerms.Bucket::setAggregations);
161-
InternalAggregation[] result = new InternalAggregation[Math.toIntExact(topBucketsPerOrd.size())];
162-
for (int ordIdx = 0; ordIdx < result.length; ordIdx++) {
162+
163+
return buildAggregations(Math.toIntExact(owningBucketOrds.size()), ordIdx -> {
163164
final BucketOrder reduceOrder;
164165
if (isKeyOrder(order) == false) {
165166
reduceOrder = InternalOrder.key(true);
166167
Arrays.sort(topBucketsPerOrd.get(ordIdx), reduceOrder.comparator());
167168
} else {
168169
reduceOrder = order;
169170
}
170-
result[ordIdx] = new StringTerms(
171+
return new StringTerms(
171172
name,
172173
reduceOrder,
173174
order,
@@ -181,8 +182,7 @@ public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throw
181182
Arrays.asList(topBucketsPerOrd.get(ordIdx)),
182183
null
183184
);
184-
}
185-
return result;
185+
});
186186
}
187187
}
188188

server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoGridAggregator.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throw
144144
LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds.get(ordIdx));
145145
while (ordsEnum.next()) {
146146
if (spare == null) {
147+
checkRealMemoryCBForInternalBucket();
147148
spare = newEmptyBucket();
148149
}
149150

@@ -162,11 +163,10 @@ public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throw
162163
}
163164
}
164165
buildSubAggsForAllBuckets(topBucketsPerOrd, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs);
165-
InternalAggregation[] results = new InternalAggregation[Math.toIntExact(topBucketsPerOrd.size())];
166-
for (int ordIdx = 0; ordIdx < results.length; ordIdx++) {
167-
results[ordIdx] = buildAggregation(name, requiredSize, Arrays.asList(topBucketsPerOrd.get(ordIdx)), metadata());
168-
}
169-
return results;
166+
return buildAggregations(
167+
Math.toIntExact(owningBucketOrds.size()),
168+
ordIdx -> buildAggregation(name, requiredSize, Arrays.asList(topBucketsPerOrd.get(ordIdx)), metadata())
169+
);
170170
}
171171
}
172172

0 commit comments

Comments
 (0)