From aeea9fb7a19bfa9ca61f738bcd94c03733274394 Mon Sep 17 00:00:00 2001 From: Albert Zaharovits Date: Tue, 1 Apr 2025 10:33:23 +0300 Subject: [PATCH 1/4] params adjust --- .../index/engine/ThreadPoolMergeExecutorService.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java index 7c78698ac6f66..8ec08f7efc26d 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java @@ -87,8 +87,8 @@ public class ThreadPoolMergeExecutorService { private ThreadPoolMergeExecutorService(ThreadPool threadPool) { this.executorService = threadPool.executor(ThreadPool.Names.MERGE); this.maxConcurrentMerges = threadPool.info(ThreadPool.Names.MERGE).getMax(); - this.concurrentMergesFloorLimitForThrottling = maxConcurrentMerges * 2; - this.concurrentMergesCeilLimitForThrottling = maxConcurrentMerges * 4; + this.concurrentMergesFloorLimitForThrottling = 2; + this.concurrentMergesCeilLimitForThrottling = maxConcurrentMerges * 2; } boolean submitMergeTask(MergeTask mergeTask) { @@ -230,10 +230,10 @@ private static long newTargetIORateBytesPerSec( ); } else if (currentlySubmittedIOThrottledMergeTasks > concurrentMergesCeilLimitForThrottling && currentTargetIORateBytesPerSec < MAX_IO_RATE.getBytes()) { - // increase target IO rate by 10% (capped) + // increase target IO rate by 20% (capped) newTargetIORateBytesPerSec = Math.min( MAX_IO_RATE.getBytes(), - currentTargetIORateBytesPerSec + currentTargetIORateBytesPerSec / 10L + currentTargetIORateBytesPerSec + currentTargetIORateBytesPerSec / 20L ); } else { newTargetIORateBytesPerSec = currentTargetIORateBytesPerSec; From 71da92a42a7706d2bd04d4b458dc06646e39c3d0 Mon Sep 17 00:00:00 2001 From: Albert Zaharovits Date: Tue, 1 Apr 2025 19:58:56 +0300 Subject: [PATCH 2/4] Review --- .../engine/ThreadPoolMergeExecutorService.java | 14 +++----------- .../ThreadPoolMergeExecutorServiceTests.java | 13 +++++-------- 2 files changed, 8 insertions(+), 19 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java index 8ec08f7efc26d..7e41fffdd5357 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java @@ -87,8 +87,10 @@ public class ThreadPoolMergeExecutorService { private ThreadPoolMergeExecutorService(ThreadPool threadPool) { this.executorService = threadPool.executor(ThreadPool.Names.MERGE); this.maxConcurrentMerges = threadPool.info(ThreadPool.Names.MERGE).getMax(); + // the intent here is to throttle down whenever we submit a task and no other task is running this.concurrentMergesFloorLimitForThrottling = 2; this.concurrentMergesCeilLimitForThrottling = maxConcurrentMerges * 2; + assert concurrentMergesFloorLimitForThrottling <= concurrentMergesCeilLimitForThrottling; } boolean submitMergeTask(MergeTask mergeTask) { @@ -233,7 +235,7 @@ private static long newTargetIORateBytesPerSec( // increase target IO rate by 20% (capped) newTargetIORateBytesPerSec = Math.min( MAX_IO_RATE.getBytes(), - currentTargetIORateBytesPerSec + currentTargetIORateBytesPerSec / 20L + currentTargetIORateBytesPerSec + currentTargetIORateBytesPerSec / 5L ); } else { newTargetIORateBytesPerSec = currentTargetIORateBytesPerSec; @@ -295,14 +297,4 @@ long getTargetIORateBytesPerSec() { int getMaxConcurrentMerges() { return maxConcurrentMerges; } - - // exposed for tests - int getConcurrentMergesFloorLimitForThrottling() { - return concurrentMergesFloorLimitForThrottling; - } - - // exposed for tests - int getConcurrentMergesCeilLimitForThrottling() { - return concurrentMergesCeilLimitForThrottling; - } } diff --git a/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java index 0a99c5002d5ad..805a2b3eb9c51 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java @@ -219,12 +219,10 @@ public void testTargetIORateChangesWhenSubmittingMergeTasks() throws Exception { } long newIORate = threadPoolMergeExecutorService.getTargetIORateBytesPerSec(); if (supportsIOThrottling) { - if (submittedIOThrottledMergeTasks.get() < threadPoolMergeExecutorService - .getConcurrentMergesFloorLimitForThrottling()) { - // assert the IO rate decreases, with a floor limit, when there are few merge tasks enqueued + if (submittedIOThrottledMergeTasks.get() < 2) { + // assert the IO rate decreases, with a floor limit, when there is just a single merge task running assertThat(newIORate, either(is(MIN_IO_RATE.getBytes())).or(lessThan(currentIORate))); - } else if (submittedIOThrottledMergeTasks.get() > threadPoolMergeExecutorService - .getConcurrentMergesCeilLimitForThrottling()) { + } else if (submittedIOThrottledMergeTasks.get() > threadPoolMergeExecutorService.getMaxConcurrentMerges() * 2) { // assert the IO rate increases, with a ceiling limit, when there are many merge tasks enqueued assertThat(newIORate, either(is(MAX_IO_RATE.getBytes())).or(greaterThan(currentIORate))); } else { @@ -375,11 +373,10 @@ private void testIORateAdjustedForSubmittedTasks( initialTasksCounter--; threadPoolMergeExecutorService.submitMergeTask(mergeTask); long newTargetIORateLimit = threadPoolMergeExecutorService.getTargetIORateBytesPerSec(); - if (currentlySubmittedMergeTaskCount.get() < threadPoolMergeExecutorService.getConcurrentMergesFloorLimitForThrottling()) { + if (currentlySubmittedMergeTaskCount.get() < 2) { // assert the IO rate decreases, with a floor limit, when there are few merge tasks enqueued assertThat(newTargetIORateLimit, either(is(MIN_IO_RATE.getBytes())).or(lessThan(targetIORateLimit.get()))); - } else if (currentlySubmittedMergeTaskCount.get() > threadPoolMergeExecutorService - .getConcurrentMergesCeilLimitForThrottling()) { + } else if (currentlySubmittedMergeTaskCount.get() > threadPoolMergeExecutorService.getMaxConcurrentMerges() * 2) { // assert the IO rate increases, with a ceiling limit, when there are many merge tasks enqueued assertThat(newTargetIORateLimit, either(is(MAX_IO_RATE.getBytes())).or(greaterThan(targetIORateLimit.get()))); } else { From 0b538e56051999b1dc96e00b02eb7e1daccdf8b4 Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Tue, 1 Apr 2025 17:08:31 +0000 Subject: [PATCH 3/4] [CI] Auto commit changes from spotless --- .../ThreadPoolMergeExecutorServiceTests.java | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java index 805a2b3eb9c51..90669cf87c108 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java @@ -223,12 +223,12 @@ public void testTargetIORateChangesWhenSubmittingMergeTasks() throws Exception { // assert the IO rate decreases, with a floor limit, when there is just a single merge task running assertThat(newIORate, either(is(MIN_IO_RATE.getBytes())).or(lessThan(currentIORate))); } else if (submittedIOThrottledMergeTasks.get() > threadPoolMergeExecutorService.getMaxConcurrentMerges() * 2) { - // assert the IO rate increases, with a ceiling limit, when there are many merge tasks enqueued - assertThat(newIORate, either(is(MAX_IO_RATE.getBytes())).or(greaterThan(currentIORate))); - } else { - // assert the IO rate does NOT change when there are a couple of merge tasks enqueued - assertThat(newIORate, equalTo(currentIORate)); - } + // assert the IO rate increases, with a ceiling limit, when there are many merge tasks enqueued + assertThat(newIORate, either(is(MAX_IO_RATE.getBytes())).or(greaterThan(currentIORate))); + } else { + // assert the IO rate does NOT change when there are a couple of merge tasks enqueued + assertThat(newIORate, equalTo(currentIORate)); + } } else { // assert the IO rate does not change, when the merge task doesn't support IO throttling assertThat(newIORate, equalTo(currentIORate)); @@ -377,12 +377,12 @@ private void testIORateAdjustedForSubmittedTasks( // assert the IO rate decreases, with a floor limit, when there are few merge tasks enqueued assertThat(newTargetIORateLimit, either(is(MIN_IO_RATE.getBytes())).or(lessThan(targetIORateLimit.get()))); } else if (currentlySubmittedMergeTaskCount.get() > threadPoolMergeExecutorService.getMaxConcurrentMerges() * 2) { - // assert the IO rate increases, with a ceiling limit, when there are many merge tasks enqueued - assertThat(newTargetIORateLimit, either(is(MAX_IO_RATE.getBytes())).or(greaterThan(targetIORateLimit.get()))); - } else { - // assert the IO rate does change, when there are a couple of merge tasks enqueued - assertThat(newTargetIORateLimit, equalTo(targetIORateLimit.get())); - } + // assert the IO rate increases, with a ceiling limit, when there are many merge tasks enqueued + assertThat(newTargetIORateLimit, either(is(MAX_IO_RATE.getBytes())).or(greaterThan(targetIORateLimit.get()))); + } else { + // assert the IO rate does change, when there are a couple of merge tasks enqueued + assertThat(newTargetIORateLimit, equalTo(targetIORateLimit.get())); + } targetIORateLimit.set(newTargetIORateLimit); } else { // execute already submitted merge task From d1aa26cab700cb34ef6de0245c7da8256a9246cf Mon Sep 17 00:00:00 2001 From: Albert Zaharovits Date: Wed, 2 Apr 2025 11:06:41 +0300 Subject: [PATCH 4/4] Update server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java Co-authored-by: Henning Andersen <33268011+henningandersen@users.noreply.github.com> --- .../index/engine/ThreadPoolMergeExecutorServiceTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java index 90669cf87c108..8ce1645148337 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java @@ -380,7 +380,7 @@ private void testIORateAdjustedForSubmittedTasks( // assert the IO rate increases, with a ceiling limit, when there are many merge tasks enqueued assertThat(newTargetIORateLimit, either(is(MAX_IO_RATE.getBytes())).or(greaterThan(targetIORateLimit.get()))); } else { - // assert the IO rate does change, when there are a couple of merge tasks enqueued + // assert the IO rate does not change, when there are a couple of merge tasks enqueued assertThat(newTargetIORateLimit, equalTo(targetIORateLimit.get())); } targetIORateLimit.set(newTargetIORateLimit);