Skip to content

Commit 71da92a

Browse files
Review
1 parent ea6c186 commit 71da92a

File tree

2 files changed

+8
-19
lines changed

2 files changed

+8
-19
lines changed

server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,10 @@ public class ThreadPoolMergeExecutorService {
8787
private ThreadPoolMergeExecutorService(ThreadPool threadPool) {
8888
this.executorService = threadPool.executor(ThreadPool.Names.MERGE);
8989
this.maxConcurrentMerges = threadPool.info(ThreadPool.Names.MERGE).getMax();
90+
// the intent here is to throttle down whenever we submit a task and no other task is running
9091
this.concurrentMergesFloorLimitForThrottling = 2;
9192
this.concurrentMergesCeilLimitForThrottling = maxConcurrentMerges * 2;
93+
assert concurrentMergesFloorLimitForThrottling <= concurrentMergesCeilLimitForThrottling;
9294
}
9395

9496
boolean submitMergeTask(MergeTask mergeTask) {
@@ -233,7 +235,7 @@ private static long newTargetIORateBytesPerSec(
233235
// increase target IO rate by 20% (capped)
234236
newTargetIORateBytesPerSec = Math.min(
235237
MAX_IO_RATE.getBytes(),
236-
currentTargetIORateBytesPerSec + currentTargetIORateBytesPerSec / 20L
238+
currentTargetIORateBytesPerSec + currentTargetIORateBytesPerSec / 5L
237239
);
238240
} else {
239241
newTargetIORateBytesPerSec = currentTargetIORateBytesPerSec;
@@ -295,14 +297,4 @@ long getTargetIORateBytesPerSec() {
295297
int getMaxConcurrentMerges() {
296298
return maxConcurrentMerges;
297299
}
298-
299-
// exposed for tests
300-
int getConcurrentMergesFloorLimitForThrottling() {
301-
return concurrentMergesFloorLimitForThrottling;
302-
}
303-
304-
// exposed for tests
305-
int getConcurrentMergesCeilLimitForThrottling() {
306-
return concurrentMergesCeilLimitForThrottling;
307-
}
308300
}

server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -219,12 +219,10 @@ public void testTargetIORateChangesWhenSubmittingMergeTasks() throws Exception {
219219
}
220220
long newIORate = threadPoolMergeExecutorService.getTargetIORateBytesPerSec();
221221
if (supportsIOThrottling) {
222-
if (submittedIOThrottledMergeTasks.get() < threadPoolMergeExecutorService
223-
.getConcurrentMergesFloorLimitForThrottling()) {
224-
// assert the IO rate decreases, with a floor limit, when there are few merge tasks enqueued
222+
if (submittedIOThrottledMergeTasks.get() < 2) {
223+
// assert the IO rate decreases, with a floor limit, when there is just a single merge task running
225224
assertThat(newIORate, either(is(MIN_IO_RATE.getBytes())).or(lessThan(currentIORate)));
226-
} else if (submittedIOThrottledMergeTasks.get() > threadPoolMergeExecutorService
227-
.getConcurrentMergesCeilLimitForThrottling()) {
225+
} else if (submittedIOThrottledMergeTasks.get() > threadPoolMergeExecutorService.getMaxConcurrentMerges() * 2) {
228226
// assert the IO rate increases, with a ceiling limit, when there are many merge tasks enqueued
229227
assertThat(newIORate, either(is(MAX_IO_RATE.getBytes())).or(greaterThan(currentIORate)));
230228
} else {
@@ -375,11 +373,10 @@ private void testIORateAdjustedForSubmittedTasks(
375373
initialTasksCounter--;
376374
threadPoolMergeExecutorService.submitMergeTask(mergeTask);
377375
long newTargetIORateLimit = threadPoolMergeExecutorService.getTargetIORateBytesPerSec();
378-
if (currentlySubmittedMergeTaskCount.get() < threadPoolMergeExecutorService.getConcurrentMergesFloorLimitForThrottling()) {
376+
if (currentlySubmittedMergeTaskCount.get() < 2) {
379377
// assert the IO rate decreases, with a floor limit, when there are few merge tasks enqueued
380378
assertThat(newTargetIORateLimit, either(is(MIN_IO_RATE.getBytes())).or(lessThan(targetIORateLimit.get())));
381-
} else if (currentlySubmittedMergeTaskCount.get() > threadPoolMergeExecutorService
382-
.getConcurrentMergesCeilLimitForThrottling()) {
379+
} else if (currentlySubmittedMergeTaskCount.get() > threadPoolMergeExecutorService.getMaxConcurrentMerges() * 2) {
383380
// assert the IO rate increases, with a ceiling limit, when there are many merge tasks enqueued
384381
assertThat(newTargetIORateLimit, either(is(MAX_IO_RATE.getBytes())).or(greaterThan(targetIORateLimit.get())));
385382
} else {

0 commit comments

Comments
 (0)