diff --git a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java index 7c78698ac6f66..7e41fffdd5357 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java @@ -87,8 +87,10 @@ public class ThreadPoolMergeExecutorService { private ThreadPoolMergeExecutorService(ThreadPool threadPool) { this.executorService = threadPool.executor(ThreadPool.Names.MERGE); this.maxConcurrentMerges = threadPool.info(ThreadPool.Names.MERGE).getMax(); - this.concurrentMergesFloorLimitForThrottling = maxConcurrentMerges * 2; - this.concurrentMergesCeilLimitForThrottling = maxConcurrentMerges * 4; + // the intent here is to throttle down whenever we submit a task and no other task is running + this.concurrentMergesFloorLimitForThrottling = 2; + this.concurrentMergesCeilLimitForThrottling = maxConcurrentMerges * 2; + assert concurrentMergesFloorLimitForThrottling <= concurrentMergesCeilLimitForThrottling; } boolean submitMergeTask(MergeTask mergeTask) { @@ -230,10 +232,10 @@ private static long newTargetIORateBytesPerSec( ); } else if (currentlySubmittedIOThrottledMergeTasks > concurrentMergesCeilLimitForThrottling && currentTargetIORateBytesPerSec < MAX_IO_RATE.getBytes()) { - // increase target IO rate by 10% (capped) + // increase target IO rate by 20% (capped) newTargetIORateBytesPerSec = Math.min( MAX_IO_RATE.getBytes(), - currentTargetIORateBytesPerSec + currentTargetIORateBytesPerSec / 10L + currentTargetIORateBytesPerSec + currentTargetIORateBytesPerSec / 5L ); } else { newTargetIORateBytesPerSec = currentTargetIORateBytesPerSec; @@ -295,14 +297,4 @@ long getTargetIORateBytesPerSec() { int getMaxConcurrentMerges() { return maxConcurrentMerges; } - - // exposed for tests - int getConcurrentMergesFloorLimitForThrottling() { - return concurrentMergesFloorLimitForThrottling; - } - - // exposed for tests - int getConcurrentMergesCeilLimitForThrottling() { - return concurrentMergesCeilLimitForThrottling; - } } diff --git a/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java index 0a99c5002d5ad..8ce1645148337 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java @@ -219,18 +219,16 @@ public void testTargetIORateChangesWhenSubmittingMergeTasks() throws Exception { } long newIORate = threadPoolMergeExecutorService.getTargetIORateBytesPerSec(); if (supportsIOThrottling) { - if (submittedIOThrottledMergeTasks.get() < threadPoolMergeExecutorService - .getConcurrentMergesFloorLimitForThrottling()) { - // assert the IO rate decreases, with a floor limit, when there are few merge tasks enqueued + if (submittedIOThrottledMergeTasks.get() < 2) { + // assert the IO rate decreases, with a floor limit, when there is just a single merge task running assertThat(newIORate, either(is(MIN_IO_RATE.getBytes())).or(lessThan(currentIORate))); - } else if (submittedIOThrottledMergeTasks.get() > threadPoolMergeExecutorService - .getConcurrentMergesCeilLimitForThrottling()) { - // assert the IO rate increases, with a ceiling limit, when there are many merge tasks enqueued - assertThat(newIORate, either(is(MAX_IO_RATE.getBytes())).or(greaterThan(currentIORate))); - } else { - // assert the IO rate does NOT change when there are a couple of merge tasks enqueued - assertThat(newIORate, equalTo(currentIORate)); - } + } else if (submittedIOThrottledMergeTasks.get() > threadPoolMergeExecutorService.getMaxConcurrentMerges() * 2) { + // assert the IO rate increases, with a ceiling limit, when there are many merge tasks enqueued + assertThat(newIORate, either(is(MAX_IO_RATE.getBytes())).or(greaterThan(currentIORate))); + } else { + // assert the IO rate does NOT change when there are a couple of merge tasks enqueued + assertThat(newIORate, equalTo(currentIORate)); + } } else { // assert the IO rate does not change, when the merge task doesn't support IO throttling assertThat(newIORate, equalTo(currentIORate)); @@ -375,17 +373,16 @@ private void testIORateAdjustedForSubmittedTasks( initialTasksCounter--; threadPoolMergeExecutorService.submitMergeTask(mergeTask); long newTargetIORateLimit = threadPoolMergeExecutorService.getTargetIORateBytesPerSec(); - if (currentlySubmittedMergeTaskCount.get() < threadPoolMergeExecutorService.getConcurrentMergesFloorLimitForThrottling()) { + if (currentlySubmittedMergeTaskCount.get() < 2) { // assert the IO rate decreases, with a floor limit, when there are few merge tasks enqueued assertThat(newTargetIORateLimit, either(is(MIN_IO_RATE.getBytes())).or(lessThan(targetIORateLimit.get()))); - } else if (currentlySubmittedMergeTaskCount.get() > threadPoolMergeExecutorService - .getConcurrentMergesCeilLimitForThrottling()) { - // assert the IO rate increases, with a ceiling limit, when there are many merge tasks enqueued - assertThat(newTargetIORateLimit, either(is(MAX_IO_RATE.getBytes())).or(greaterThan(targetIORateLimit.get()))); - } else { - // assert the IO rate does change, when there are a couple of merge tasks enqueued - assertThat(newTargetIORateLimit, equalTo(targetIORateLimit.get())); - } + } else if (currentlySubmittedMergeTaskCount.get() > threadPoolMergeExecutorService.getMaxConcurrentMerges() * 2) { + // assert the IO rate increases, with a ceiling limit, when there are many merge tasks enqueued + assertThat(newTargetIORateLimit, either(is(MAX_IO_RATE.getBytes())).or(greaterThan(targetIORateLimit.get()))); + } else { + // assert the IO rate does not change, when there are a couple of merge tasks enqueued + assertThat(newTargetIORateLimit, equalTo(targetIORateLimit.get())); + } targetIORateLimit.set(newTargetIORateLimit); } else { // execute already submitted merge task