Skip to content

Commit a8e842a

Browse files
committed
Unify the in consumer and supplier in one interface
1 parent ee6d444 commit a8e842a

File tree

1 file changed

+38
-38
lines changed

1 file changed

+38
-38
lines changed

server/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java

Lines changed: 38 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,6 @@
4949
import java.util.Collections;
5050
import java.util.List;
5151
import java.util.Map;
52-
import java.util.function.IntConsumer;
53-
import java.util.function.IntSupplier;
5452
import java.util.function.Supplier;
5553

5654
/**
@@ -164,13 +162,6 @@ private SearchHits buildSearchHits(
164162
int docsInLeaf;
165163
int processedDocs;
166164

167-
private final IntSupplier getAndResetAccumulatedBytes = () -> {
168-
int bytesToSubmit = this.accumulatedBytesInLeaf;
169-
this.accumulatedBytesInLeaf = 0;
170-
return bytesToSubmit;
171-
};
172-
private final IntConsumer memoryUsageBytesAccumulator = (bytes) -> this.accumulatedBytesInLeaf += bytes;
173-
174165
@Override
175166
protected void setNextReader(LeafReaderContext ctx, int[] docsInLeaf) throws IOException {
176167
Timer timer = profiler.startNextReader();
@@ -215,8 +206,7 @@ protected SearchHit nextDoc(int doc) throws IOException {
215206
rankDocs == null ? null : rankDocs.get(doc),
216207
circuitBreaker,
217208
enoughBytesOrLastDocInLeaf,
218-
memoryUsageBytesAccumulator,
219-
getAndResetAccumulatedBytes
209+
this::accumulateLocally
220210
);
221211
boolean success = false;
222212
try {
@@ -233,6 +223,24 @@ protected SearchHit nextDoc(int doc) throws IOException {
233223
}
234224
}
235225
}
226+
227+
/**
228+
* Accumulates the given bytes in the leaf and returns the accumulated bytes in the leaf.
229+
* @param bytes the amount of bytes to accumulate locally
230+
* @param resetLocallyAccumulatedBytes if the local counter for the accumulated bytes should be reset. Normally this will be true
231+
* when submitting the accumulated bytes to the circuit breaker.
232+
* @return the currently accumulated bytes in the leaf, including the provided bytes
233+
*/
234+
private int accumulateLocally(int bytes, boolean resetLocallyAccumulatedBytes) {
235+
this.accumulatedBytesInLeaf += bytes;
236+
if (resetLocallyAccumulatedBytes) {
237+
int bytesToSubmit = this.accumulatedBytesInLeaf;
238+
this.accumulatedBytesInLeaf = 0;
239+
return bytesToSubmit;
240+
} else {
241+
return this.accumulatedBytesInLeaf;
242+
}
243+
}
236244
};
237245

238246
SearchHit[] hits = docsIterator.iterate(
@@ -283,8 +291,7 @@ private HitContext prepareHitContext(
283291
RankDoc rankDoc,
284292
CircuitBreaker circuitBreaker,
285293
boolean submitToCb,
286-
IntConsumer memoryUsageBytesAccumulator,
287-
IntSupplier accumulatedBytesInLeaf
294+
IntBooleanFunction memoryUsageAccumulator
288295
) throws IOException {
289296
if (nestedDocuments.advance(docId - subReaderContext.docBase) == null) {
290297
return prepareNonNestedHitContext(
@@ -298,8 +305,7 @@ private HitContext prepareHitContext(
298305
rankDoc,
299306
circuitBreaker,
300307
submitToCb,
301-
memoryUsageBytesAccumulator,
302-
accumulatedBytesInLeaf
308+
memoryUsageAccumulator
303309
);
304310
} else {
305311
return prepareNestedHitContext(
@@ -332,17 +338,16 @@ private static HitContext prepareNonNestedHitContext(
332338
IdLoader.Leaf idLoader,
333339
RankDoc rankDoc,
334340
CircuitBreaker circuitBreaker,
335-
boolean accountMemoryWithCircuitBreaker,
336-
IntConsumer memoryUsageBytesAccumulator,
337-
IntSupplier accumulatedBytesInLeaf
341+
boolean submitToCB,
342+
IntBooleanFunction memoryUsageAccumulator
338343
) throws IOException {
339344
int subDocId = docId - subReaderContext.docBase;
340345

341346
leafStoredFieldLoader.advanceTo(subDocId);
342347

343348
MemoryAccountingBytesRefCounted memAccountingRefCounted = null;
344349
RefCounted refCountedHit = null;
345-
if (accountMemoryWithCircuitBreaker) {
350+
if (submitToCB) {
346351
memAccountingRefCounted = MemoryAccountingBytesRefCounted.create(circuitBreaker);
347352
refCountedHit = LeakTracker.wrap(memAccountingRefCounted);
348353
}
@@ -351,15 +356,7 @@ private static HitContext prepareNonNestedHitContext(
351356
SearchHit hit = new SearchHit(docId, null, null, refCountedHit);
352357
// TODO: can we use real pooled buffers here as well?
353358
Source source = Source.lazy(
354-
lazyStoredSourceLoader(
355-
profiler,
356-
subReaderContext,
357-
subDocId,
358-
memAccountingRefCounted,
359-
accountMemoryWithCircuitBreaker,
360-
memoryUsageBytesAccumulator,
361-
accumulatedBytesInLeaf
362-
)
359+
lazyStoredSourceLoader(profiler, subReaderContext, subDocId, memAccountingRefCounted, submitToCB, memoryUsageAccumulator)
363360
);
364361
return new HitContext(hit, subReaderContext, subDocId, Map.of(), source, rankDoc);
365362
} else {
@@ -369,9 +366,9 @@ private static HitContext prepareNonNestedHitContext(
369366
Timer timer = profiler.startLoadingSource();
370367
try {
371368
source = sourceLoader.source(leafStoredFieldLoader, subDocId);
372-
memoryUsageBytesAccumulator.accept(source.internalSourceRef().length());
373-
if (accountMemoryWithCircuitBreaker) {
374-
memAccountingRefCounted.account(accumulatedBytesInLeaf.getAsInt(), "fetch phase source loader");
369+
int accumulatedInLeaf = memoryUsageAccumulator.apply(source.internalSourceRef().length(), submitToCB);
370+
if (submitToCB) {
371+
memAccountingRefCounted.account(accumulatedInLeaf, "fetch phase source loader");
375372
}
376373
} catch (CircuitBreakingException e) {
377374
hit.decRef();
@@ -388,9 +385,8 @@ private static HitContext prepareNonNestedHitContext(
388385
subReaderContext,
389386
subDocId,
390387
memAccountingRefCounted,
391-
accountMemoryWithCircuitBreaker,
392-
memoryUsageBytesAccumulator,
393-
accumulatedBytesInLeaf
388+
submitToCB,
389+
memoryUsageAccumulator
394390
)
395391
);
396392
}
@@ -404,18 +400,17 @@ private static Supplier<Source> lazyStoredSourceLoader(
404400
int doc,
405401
MemoryAccountingBytesRefCounted memAccountingRefCounted,
406402
boolean submitToCB,
407-
IntConsumer memoryUsageAccumulator,
408-
IntSupplier accumulatedBytesInLeaf
403+
IntBooleanFunction memoryUsageAccumulator
409404
) {
410405
return () -> {
411406
StoredFieldLoader rootLoader = profiler.storedFields(StoredFieldLoader.create(true, Collections.emptySet()));
412407
try {
413408
LeafStoredFieldLoader leafRootLoader = rootLoader.getLoader(ctx, null);
414409
leafRootLoader.advanceTo(doc);
415410
BytesReference source = leafRootLoader.source();
416-
memoryUsageAccumulator.accept(source.length());
411+
int accumulatedInLeaf = memoryUsageAccumulator.apply(source.length(), submitToCB);
417412
if (submitToCB) {
418-
memAccountingRefCounted.account(accumulatedBytesInLeaf.getAsInt(), "lazy fetch phase source loader");
413+
memAccountingRefCounted.account(accumulatedInLeaf, "lazy fetch phase source loader");
419414
}
420415
return Source.fromBytes(source);
421416
} catch (IOException e) {
@@ -518,4 +513,9 @@ public String toString() {
518513
}
519514
};
520515
}
516+
517+
@FunctionalInterface
518+
private interface IntBooleanFunction {
519+
int apply(int i, boolean b);
520+
}
521521
}

0 commit comments

Comments
 (0)