Skip to content

Commit 6eed252

Browse files
Slack merge throttling params for fewer merge tasks (#126016)
The intent here is to aim for fewer to-do merges enqueued for execution, and to unthrottle disk IO at a faster rate when the queue grows longer. Overall this results in less merge disk throttling. Relates elastic/elasticsearch-benchmarks#2437 #120869
1 parent a05d380 commit 6eed252

File tree

2 files changed

+23
-34
lines changed

2 files changed

+23
-34
lines changed

server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,10 @@ public class ThreadPoolMergeExecutorService {
8787
private ThreadPoolMergeExecutorService(ThreadPool threadPool) {
8888
this.executorService = threadPool.executor(ThreadPool.Names.MERGE);
8989
this.maxConcurrentMerges = threadPool.info(ThreadPool.Names.MERGE).getMax();
90-
this.concurrentMergesFloorLimitForThrottling = maxConcurrentMerges * 2;
91-
this.concurrentMergesCeilLimitForThrottling = maxConcurrentMerges * 4;
90+
// the intent here is to throttle down whenever we submit a task and no other task is running
91+
this.concurrentMergesFloorLimitForThrottling = 2;
92+
this.concurrentMergesCeilLimitForThrottling = maxConcurrentMerges * 2;
93+
assert concurrentMergesFloorLimitForThrottling <= concurrentMergesCeilLimitForThrottling;
9294
}
9395

9496
boolean submitMergeTask(MergeTask mergeTask) {
@@ -230,10 +232,10 @@ private static long newTargetIORateBytesPerSec(
230232
);
231233
} else if (currentlySubmittedIOThrottledMergeTasks > concurrentMergesCeilLimitForThrottling
232234
&& currentTargetIORateBytesPerSec < MAX_IO_RATE.getBytes()) {
233-
// increase target IO rate by 10% (capped)
235+
// increase target IO rate by 20% (capped)
234236
newTargetIORateBytesPerSec = Math.min(
235237
MAX_IO_RATE.getBytes(),
236-
currentTargetIORateBytesPerSec + currentTargetIORateBytesPerSec / 10L
238+
currentTargetIORateBytesPerSec + currentTargetIORateBytesPerSec / 5L
237239
);
238240
} else {
239241
newTargetIORateBytesPerSec = currentTargetIORateBytesPerSec;
@@ -295,14 +297,4 @@ long getTargetIORateBytesPerSec() {
295297
int getMaxConcurrentMerges() {
296298
return maxConcurrentMerges;
297299
}
298-
299-
// exposed for tests
300-
int getConcurrentMergesFloorLimitForThrottling() {
301-
return concurrentMergesFloorLimitForThrottling;
302-
}
303-
304-
// exposed for tests
305-
int getConcurrentMergesCeilLimitForThrottling() {
306-
return concurrentMergesCeilLimitForThrottling;
307-
}
308300
}

server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java

Lines changed: 17 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -219,18 +219,16 @@ public void testTargetIORateChangesWhenSubmittingMergeTasks() throws Exception {
219219
}
220220
long newIORate = threadPoolMergeExecutorService.getTargetIORateBytesPerSec();
221221
if (supportsIOThrottling) {
222-
if (submittedIOThrottledMergeTasks.get() < threadPoolMergeExecutorService
223-
.getConcurrentMergesFloorLimitForThrottling()) {
224-
// assert the IO rate decreases, with a floor limit, when there are few merge tasks enqueued
222+
if (submittedIOThrottledMergeTasks.get() < 2) {
223+
// assert the IO rate decreases, with a floor limit, when there is just a single merge task running
225224
assertThat(newIORate, either(is(MIN_IO_RATE.getBytes())).or(lessThan(currentIORate)));
226-
} else if (submittedIOThrottledMergeTasks.get() > threadPoolMergeExecutorService
227-
.getConcurrentMergesCeilLimitForThrottling()) {
228-
// assert the IO rate increases, with a ceiling limit, when there are many merge tasks enqueued
229-
assertThat(newIORate, either(is(MAX_IO_RATE.getBytes())).or(greaterThan(currentIORate)));
230-
} else {
231-
// assert the IO rate does NOT change when there are a couple of merge tasks enqueued
232-
assertThat(newIORate, equalTo(currentIORate));
233-
}
225+
} else if (submittedIOThrottledMergeTasks.get() > threadPoolMergeExecutorService.getMaxConcurrentMerges() * 2) {
226+
// assert the IO rate increases, with a ceiling limit, when there are many merge tasks enqueued
227+
assertThat(newIORate, either(is(MAX_IO_RATE.getBytes())).or(greaterThan(currentIORate)));
228+
} else {
229+
// assert the IO rate does NOT change when there are a couple of merge tasks enqueued
230+
assertThat(newIORate, equalTo(currentIORate));
231+
}
234232
} else {
235233
// assert the IO rate does not change, when the merge task doesn't support IO throttling
236234
assertThat(newIORate, equalTo(currentIORate));
@@ -375,17 +373,16 @@ private void testIORateAdjustedForSubmittedTasks(
375373
initialTasksCounter--;
376374
threadPoolMergeExecutorService.submitMergeTask(mergeTask);
377375
long newTargetIORateLimit = threadPoolMergeExecutorService.getTargetIORateBytesPerSec();
378-
if (currentlySubmittedMergeTaskCount.get() < threadPoolMergeExecutorService.getConcurrentMergesFloorLimitForThrottling()) {
376+
if (currentlySubmittedMergeTaskCount.get() < 2) {
379377
// assert the IO rate decreases, with a floor limit, when there are few merge tasks enqueued
380378
assertThat(newTargetIORateLimit, either(is(MIN_IO_RATE.getBytes())).or(lessThan(targetIORateLimit.get())));
381-
} else if (currentlySubmittedMergeTaskCount.get() > threadPoolMergeExecutorService
382-
.getConcurrentMergesCeilLimitForThrottling()) {
383-
// assert the IO rate increases, with a ceiling limit, when there are many merge tasks enqueued
384-
assertThat(newTargetIORateLimit, either(is(MAX_IO_RATE.getBytes())).or(greaterThan(targetIORateLimit.get())));
385-
} else {
386-
// assert the IO rate does change, when there are a couple of merge tasks enqueued
387-
assertThat(newTargetIORateLimit, equalTo(targetIORateLimit.get()));
388-
}
379+
} else if (currentlySubmittedMergeTaskCount.get() > threadPoolMergeExecutorService.getMaxConcurrentMerges() * 2) {
380+
// assert the IO rate increases, with a ceiling limit, when there are many merge tasks enqueued
381+
assertThat(newTargetIORateLimit, either(is(MAX_IO_RATE.getBytes())).or(greaterThan(targetIORateLimit.get())));
382+
} else {
383+
// assert the IO rate does not change, when there are a couple of merge tasks enqueued
384+
assertThat(newTargetIORateLimit, equalTo(targetIORateLimit.get()));
385+
}
389386
targetIORateLimit.set(newTargetIORateLimit);
390387
} else {
391388
// execute already submitted merge task

0 commit comments

Comments
 (0)