Skip to content

Commit ee6d444

Browse files
committed
Default accounting buffer to 1M, drop int boxing.
1 parent 23f3794 commit ee6d444

File tree

4 files changed

+42
-25
lines changed

4 files changed

+42
-25
lines changed

server/src/main/java/org/elasticsearch/common/MemoryAccountingBytesRefCounted.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,31 +12,32 @@
1212
import org.elasticsearch.common.breaker.CircuitBreaker;
1313
import org.elasticsearch.core.AbstractRefCounted;
1414

15+
import java.util.concurrent.atomic.AtomicInteger;
16+
1517
/**
1618
* A ref counted object that accounts for memory usage in bytes and releases the
1719
* accounted memory from the circuit breaker when the reference count reaches zero.
1820
*/
1921
public final class MemoryAccountingBytesRefCounted extends AbstractRefCounted {
2022

21-
private int bytes;
23+
private final AtomicInteger bytes = new AtomicInteger(0);
2224
private final CircuitBreaker breaker;
2325

24-
private MemoryAccountingBytesRefCounted(int bytes, CircuitBreaker breaker) {
25-
this.bytes = bytes;
26+
private MemoryAccountingBytesRefCounted(CircuitBreaker breaker) {
2627
this.breaker = breaker;
2728
}
2829

2930
public static MemoryAccountingBytesRefCounted create(CircuitBreaker breaker) {
30-
return new MemoryAccountingBytesRefCounted(0, breaker);
31+
return new MemoryAccountingBytesRefCounted(breaker);
3132
}
3233

3334
public void account(int bytes, String label) {
3435
breaker.addEstimateBytesAndMaybeBreak(bytes, label);
35-
this.bytes += bytes;
36+
this.bytes.addAndGet(bytes);
3637
}
3738

3839
@Override
3940
protected void closeInternal() {
40-
breaker.addWithoutBreaking(-bytes);
41+
breaker.addWithoutBreaking(-bytes.get());
4142
}
4243
}

server/src/main/java/org/elasticsearch/search/SearchService.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
281281
*/
282282
public static final Setting<ByteSizeValue> MEMORY_ACCOUNTING_BUFFER_SIZE = Setting.byteSizeSetting(
283283
"search.memory_accounting_buffer_size",
284-
ByteSizeValue.of(32, ByteSizeUnit.KB),
284+
ByteSizeValue.of(1, ByteSizeUnit.MB),
285285
Property.Dynamic,
286286
Property.NodeScope
287287
);
@@ -323,7 +323,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
323323

324324
private volatile boolean enableRewriteAggsToFilterByFilter;
325325

326-
private volatile ByteSizeValue memoryAccountingBufferSize;
326+
private volatile long memoryAccountingBufferSize;
327327

328328
private final Cancellable keepAliveReaper;
329329

@@ -403,7 +403,7 @@ public SearchService(
403403
clusterService.getClusterSettings()
404404
.addSettingsUpdateConsumer(QUERY_PHASE_PARALLEL_COLLECTION_ENABLED, this::setEnableQueryPhaseParallelCollection);
405405

406-
memoryAccountingBufferSize = MEMORY_ACCOUNTING_BUFFER_SIZE.get(settings);
406+
memoryAccountingBufferSize = MEMORY_ACCOUNTING_BUFFER_SIZE.get(settings).getBytes();
407407
clusterService.getClusterSettings().addSettingsUpdateConsumer(MEMORY_ACCOUNTING_BUFFER_SIZE, this::setMemoryAccountingBufferSize);
408408
}
409409

@@ -420,7 +420,7 @@ private void setEnableQueryPhaseParallelCollection(boolean enableQueryPhaseParal
420420
}
421421

422422
private void setMemoryAccountingBufferSize(ByteSizeValue memoryAccountingBufferSize) {
423-
this.memoryAccountingBufferSize = memoryAccountingBufferSize;
423+
this.memoryAccountingBufferSize = memoryAccountingBufferSize.getBytes();
424424
}
425425

426426
private static void validateKeepAlives(TimeValue defaultKeepAlive, TimeValue maxKeepAlive) {
@@ -811,7 +811,7 @@ public void executeRankFeaturePhase(RankFeatureShardRequest request, SearchShard
811811
return searchContext.rankFeatureResult();
812812
}
813813
RankFeatureShardPhase.prepareForFetch(searchContext, request);
814-
fetchPhase.execute(searchContext, docIds, null, circuitBreaker, memoryAccountingBufferSize.getBytes());
814+
fetchPhase.execute(searchContext, docIds, null, circuitBreaker, memoryAccountingBufferSize);
815815
RankFeatureShardPhase.processFetch(searchContext);
816816
var rankFeatureResult = searchContext.rankFeatureResult();
817817
rankFeatureResult.incRef();
@@ -829,7 +829,7 @@ private QueryFetchSearchResult executeFetchPhase(ReaderContext reader, SearchCon
829829
Releasable scope = tracer.withScope(context.getTask());
830830
SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context, true, afterQueryTime)
831831
) {
832-
fetchPhase.execute(context, shortcutDocIdsToLoad(context), null, circuitBreaker, memoryAccountingBufferSize.getBytes());
832+
fetchPhase.execute(context, shortcutDocIdsToLoad(context), null, circuitBreaker, memoryAccountingBufferSize);
833833
if (reader.singleSession()) {
834834
freeReaderContext(reader.id());
835835
}
@@ -1000,7 +1000,7 @@ public void executeFetchPhase(ShardFetchRequest request, CancellableTask task, A
10001000
request.docIds(),
10011001
request.getRankDocks(),
10021002
circuitBreaker,
1003-
memoryAccountingBufferSize.getBytes()
1003+
memoryAccountingBufferSize
10041004
);
10051005
if (readerContext.singleSession()) {
10061006
freeReaderContext(request.contextId());

server/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,8 @@
4949
import java.util.Collections;
5050
import java.util.List;
5151
import java.util.Map;
52-
import java.util.function.Consumer;
52+
import java.util.function.IntConsumer;
53+
import java.util.function.IntSupplier;
5354
import java.util.function.Supplier;
5455

5556
/**
@@ -163,12 +164,12 @@ private SearchHits buildSearchHits(
163164
int docsInLeaf;
164165
int processedDocs;
165166

166-
private final Supplier<Integer> getAndResetAccumulatedBytes = () -> {
167+
private final IntSupplier getAndResetAccumulatedBytes = () -> {
167168
int bytesToSubmit = this.accumulatedBytesInLeaf;
168169
this.accumulatedBytesInLeaf = 0;
169170
return bytesToSubmit;
170171
};
171-
private final Consumer<Integer> memoryUsageBytesAccumulator = (bytes) -> this.accumulatedBytesInLeaf += bytes;
172+
private final IntConsumer memoryUsageBytesAccumulator = (bytes) -> this.accumulatedBytesInLeaf += bytes;
172173

173174
@Override
174175
protected void setNextReader(LeafReaderContext ctx, int[] docsInLeaf) throws IOException {
@@ -282,8 +283,8 @@ private HitContext prepareHitContext(
282283
RankDoc rankDoc,
283284
CircuitBreaker circuitBreaker,
284285
boolean submitToCb,
285-
Consumer<Integer> memoryUsageBytesAccumulator,
286-
Supplier<Integer> accumulatedBytesInLeaf
286+
IntConsumer memoryUsageBytesAccumulator,
287+
IntSupplier accumulatedBytesInLeaf
287288
) throws IOException {
288289
if (nestedDocuments.advance(docId - subReaderContext.docBase) == null) {
289290
return prepareNonNestedHitContext(
@@ -332,8 +333,8 @@ private static HitContext prepareNonNestedHitContext(
332333
RankDoc rankDoc,
333334
CircuitBreaker circuitBreaker,
334335
boolean accountMemoryWithCircuitBreaker,
335-
Consumer<Integer> memoryUsageBytesAccumulator,
336-
Supplier<Integer> accumulatedBytesInLeaf
336+
IntConsumer memoryUsageBytesAccumulator,
337+
IntSupplier accumulatedBytesInLeaf
337338
) throws IOException {
338339
int subDocId = docId - subReaderContext.docBase;
339340

@@ -370,7 +371,7 @@ private static HitContext prepareNonNestedHitContext(
370371
source = sourceLoader.source(leafStoredFieldLoader, subDocId);
371372
memoryUsageBytesAccumulator.accept(source.internalSourceRef().length());
372373
if (accountMemoryWithCircuitBreaker) {
373-
memAccountingRefCounted.account(accumulatedBytesInLeaf.get(), "fetch phase source loader");
374+
memAccountingRefCounted.account(accumulatedBytesInLeaf.getAsInt(), "fetch phase source loader");
374375
}
375376
} catch (CircuitBreakingException e) {
376377
hit.decRef();
@@ -403,8 +404,8 @@ private static Supplier<Source> lazyStoredSourceLoader(
403404
int doc,
404405
MemoryAccountingBytesRefCounted memAccountingRefCounted,
405406
boolean submitToCB,
406-
Consumer<Integer> memoryUsageAccumulator,
407-
Supplier<Integer> accumulatedBytesInLeaf
407+
IntConsumer memoryUsageAccumulator,
408+
IntSupplier accumulatedBytesInLeaf
408409
) {
409410
return () -> {
410411
StoredFieldLoader rootLoader = profiler.storedFields(StoredFieldLoader.create(true, Collections.emptySet()));
@@ -414,7 +415,7 @@ private static Supplier<Source> lazyStoredSourceLoader(
414415
BytesReference source = leafRootLoader.source();
415416
memoryUsageAccumulator.accept(source.length());
416417
if (submitToCB) {
417-
memAccountingRefCounted.account(accumulatedBytesInLeaf.get(), "lazy fetch phase source loader");
418+
memAccountingRefCounted.account(accumulatedBytesInLeaf.getAsInt(), "lazy fetch phase source loader");
418419
}
419420
return Source.fromBytes(source);
420421
} catch (IOException e) {

server/src/test/java/org/elasticsearch/search/SearchServiceSingleNodeTests.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@
166166
import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE;
167167
import static org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason.DELETED;
168168
import static org.elasticsearch.search.SearchService.DEFAULT_SIZE;
169+
import static org.elasticsearch.search.SearchService.MEMORY_ACCOUNTING_BUFFER_SIZE;
169170
import static org.elasticsearch.search.SearchService.QUERY_PHASE_PARALLEL_COLLECTION_ENABLED;
170171
import static org.elasticsearch.search.SearchService.SEARCH_WORKER_THREADS_ENABLED;
171172
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
@@ -2940,8 +2941,16 @@ public void testFetchPhaseAccountsForSourceMemoryUsage() throws Exception {
29402941
}
29412942
if (randomBoolean()) {
29422943
// 1 segment test is also useful so we enable the batching branch of the memory accounting
2943-
// we do local accounting up to 32kb (by default) before submitting to cb
2944+
29442945
indicesAdmin().prepareForceMerge("index").setMaxNumSegments(1).get();
2946+
2947+
// let's do local accounting up to 32kb before submitting to cb
2948+
ClusterUpdateSettingsResponse response = client().admin()
2949+
.cluster()
2950+
.prepareUpdateSettings(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT)
2951+
.setPersistentSettings(Settings.builder().put(MEMORY_ACCOUNTING_BUFFER_SIZE.getKey(), "32k").build())
2952+
.get();
2953+
assertTrue(response.isAcknowledged());
29452954
}
29462955

29472956
SearchService service = getInstanceFromNode(SearchService.class);
@@ -2993,6 +3002,12 @@ public void testFetchPhaseAccountsForSourceMemoryUsage() throws Exception {
29933002
if (readerContext != null) {
29943003
service.freeReaderContext(readerContext.id());
29953004
}
3005+
// reset original default setting
3006+
client().admin()
3007+
.cluster()
3008+
.prepareUpdateSettings(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT)
3009+
.setPersistentSettings(Settings.builder().putNull(SearchService.MEMORY_ACCOUNTING_BUFFER_SIZE.getKey()).build())
3010+
.get();
29963011
if (fetchSearchResult != null) {
29973012
long usedBeforeResultDecRef = breaker.getUsed();
29983013
if (fetchSearchResult.decRef()) {

0 commit comments

Comments
 (0)