From 24ecd52b6f1742285d777d04fd0102682c9f0562 Mon Sep 17 00:00:00 2001 From: Albert Zaharovits Date: Tue, 1 Apr 2025 20:34:23 +0300 Subject: [PATCH 1/5] Fix ThreadPoolMergeExecutorServiceTests testIORateIsAdjustedForRunningMergeTasks --- muted-tests.yml | 3 --- .../index/engine/ThreadPoolMergeExecutorServiceTests.java | 7 +++++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/muted-tests.yml b/muted-tests.yml index 12a2f39617afe..dbe6ef0ac5a13 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -353,9 +353,6 @@ tests: - class: org.elasticsearch.xpack.test.rest.XPackRestIT method: test {p0=transform/transforms_start_stop/Test schedule_now on an already started transform} issue: https://github.com/elastic/elasticsearch/issues/120720 -- class: org.elasticsearch.index.engine.ThreadPoolMergeExecutorServiceTests - method: testIORateIsAdjustedForRunningMergeTasks - issue: https://github.com/elastic/elasticsearch/issues/125842 - class: org.elasticsearch.xpack.test.rest.XPackRestIT method: test {p0=transform/transforms_start_stop/Verify start transform creates destination index with appropriate mapping} issue: https://github.com/elastic/elasticsearch/issues/125854 diff --git a/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java index 0a99c5002d5ad..b8c6dcc35b2ef 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java @@ -281,8 +281,11 @@ public void testIORateIsAdjustedForRunningMergeTasks() throws Exception { doAnswer(mock -> { currentlyRunningMergeTasksSet.add(mergeTask); // wait to be signalled before completing - runMergeSemaphore.acquire(); - currentlyRunningMergeTasksSet.remove(mergeTask); + try { + runMergeSemaphore.acquire(); + } finally { + currentlyRunningMergeTasksSet.remove(mergeTask); + } return null; }).when(mergeTask).run(); doAnswer(mock -> { From f0e6243fafbe80d8dd14633f9171894d0055d9c5 Mon Sep 17 00:00:00 2001 From: Albert Zaharovits Date: Thu, 12 Jun 2025 20:07:24 +0300 Subject: [PATCH 2/5] Fix take-2 --- .../ThreadPoolMergeExecutorServiceTests.java | 41 ++++++++++--------- 1 file changed, 22 insertions(+), 19 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java index 00d62d1e18785..fe860d2e58ca7 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java @@ -51,6 +51,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.lessThanOrEqualTo; @@ -299,9 +300,9 @@ public void testTargetIORateChangesWhenSubmittingMergeTasks() throws Exception { } } - public void testIORateIsAdjustedForRunningMergeTasks() throws Exception { - int mergeExecutorThreadCount = randomIntBetween(1, 3); - int mergesStillToSubmit = randomIntBetween(1, 10); + public void testIORateIsAdjustedForAllRunningMergeTasks() throws Exception { + int mergeExecutorThreadCount = randomIntBetween(1, 5); + int mergesStillToSubmit = randomIntBetween(1, 20); int mergesStillToComplete = mergesStillToSubmit; Settings settings = Settings.builder() .put(ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.getKey(), true) @@ -320,6 +321,7 @@ public void testIORateIsAdjustedForRunningMergeTasks() throws Exception { ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) testThreadPool.executor(ThreadPool.Names.MERGE); Semaphore runMergeSemaphore = new Semaphore(0); Set currentlyRunningMergeTasksSet = ConcurrentCollections.newConcurrentSet(); + Set currentlyRunningOrAbortingMergeTasksSet = ConcurrentCollections.newConcurrentSet(); while (mergesStillToComplete > 0) { if (mergesStillToSubmit > 0 && (currentlyRunningMergeTasksSet.isEmpty() || randomBoolean())) { MergeTask mergeTask = mock(MergeTask.class); @@ -337,34 +339,35 @@ public void testIORateIsAdjustedForRunningMergeTasks() throws Exception { }).when(mergeTask).schedule(); doAnswer(mock -> { currentlyRunningMergeTasksSet.add(mergeTask); + currentlyRunningOrAbortingMergeTasksSet.add(mergeTask); // wait to be signalled before completing - try { - runMergeSemaphore.acquire(); - } finally { - currentlyRunningMergeTasksSet.remove(mergeTask); - } + runMergeSemaphore.acquire(); + currentlyRunningOrAbortingMergeTasksSet.remove(mergeTask); + currentlyRunningMergeTasksSet.remove(mergeTask); return null; }).when(mergeTask).run(); doAnswer(mock -> { + currentlyRunningOrAbortingMergeTasksSet.add(mergeTask); // wait to be signalled before completing runMergeSemaphore.acquire(); + currentlyRunningOrAbortingMergeTasksSet.remove(mergeTask); return null; }).when(mergeTask).abort(); - int activeMergeTasksCount = threadPoolExecutor.getActiveCount(); - threadPoolMergeExecutorService.submitMergeTask(mergeTask); - long newIORate = threadPoolMergeExecutorService.getTargetIORateBytesPerSec(); - // all currently running merge tasks must be IO throttled + boolean mergeTaskSubmitted = threadPoolMergeExecutorService.submitMergeTask(mergeTask); + assertTrue(mergeTaskSubmitted); assertBusy(() -> { - // await new merge to start executing - if (activeMergeTasksCount < mergeExecutorThreadCount) { - assertThat(threadPoolExecutor.getActiveCount(), is(activeMergeTasksCount + 1)); - } - // assert IO throttle is set on the running merge tasks + assertThat(currentlyRunningOrAbortingMergeTasksSet, hasItem(mergeTask)); + }); + long latestIORate = threadPoolMergeExecutorService.getTargetIORateBytesPerSec(); + // all currently running merge tasks must be IO throttled to the latest IO Rate + assertBusy(() -> { + // assert IO throttle is set on ALL the running merge tasks for (MergeTask currentlyRunningMergeTask : currentlyRunningMergeTasksSet) { - var ioRateCaptor = ArgumentCaptor.forClass(Long.class); + verify(currentlyRunningMergeTask).run(); // only interested in the last invocation + var ioRateCaptor = ArgumentCaptor.forClass(Long.class); verify(currentlyRunningMergeTask, atLeastOnce()).setIORateLimit(ioRateCaptor.capture()); - assertThat(ioRateCaptor.getValue(), is(newIORate)); + assertThat(ioRateCaptor.getValue(), is(latestIORate)); } }); mergesStillToSubmit--; From 75ac58b12779aa544e7c361f8a554a7b0e567aa3 Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Thu, 12 Jun 2025 17:20:17 +0000 Subject: [PATCH 3/5] [CI] Auto commit changes from spotless --- .../index/engine/ThreadPoolMergeExecutorServiceTests.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java index fe860d2e58ca7..d5923d6058871 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java @@ -355,9 +355,7 @@ public void testIORateIsAdjustedForAllRunningMergeTasks() throws Exception { }).when(mergeTask).abort(); boolean mergeTaskSubmitted = threadPoolMergeExecutorService.submitMergeTask(mergeTask); assertTrue(mergeTaskSubmitted); - assertBusy(() -> { - assertThat(currentlyRunningOrAbortingMergeTasksSet, hasItem(mergeTask)); - }); + assertBusy(() -> { assertThat(currentlyRunningOrAbortingMergeTasksSet, hasItem(mergeTask)); }); long latestIORate = threadPoolMergeExecutorService.getTargetIORateBytesPerSec(); // all currently running merge tasks must be IO throttled to the latest IO Rate assertBusy(() -> { From 5db6465dbf0db37361dad686f5730bb375e6d711 Mon Sep 17 00:00:00 2001 From: Albert Zaharovits Date: Fri, 13 Jun 2025 11:04:11 +0300 Subject: [PATCH 4/5] Assert semaphore count --- .../index/engine/ThreadPoolMergeExecutorServiceTests.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java index d5923d6058871..1b01f6c6a978d 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java @@ -353,9 +353,10 @@ public void testIORateIsAdjustedForAllRunningMergeTasks() throws Exception { currentlyRunningOrAbortingMergeTasksSet.remove(mergeTask); return null; }).when(mergeTask).abort(); + assertThat(runMergeSemaphore.availablePermits(), is(0)); boolean mergeTaskSubmitted = threadPoolMergeExecutorService.submitMergeTask(mergeTask); assertTrue(mergeTaskSubmitted); - assertBusy(() -> { assertThat(currentlyRunningOrAbortingMergeTasksSet, hasItem(mergeTask)); }); + assertBusy(() -> assertThat(currentlyRunningOrAbortingMergeTasksSet, hasItem(mergeTask))); long latestIORate = threadPoolMergeExecutorService.getTargetIORateBytesPerSec(); // all currently running merge tasks must be IO throttled to the latest IO Rate assertBusy(() -> { @@ -373,7 +374,10 @@ public void testIORateIsAdjustedForAllRunningMergeTasks() throws Exception { long completedMerges = threadPoolExecutor.getCompletedTaskCount(); runMergeSemaphore.release(); // await merge to finish - assertBusy(() -> assertThat(threadPoolExecutor.getCompletedTaskCount(), is(completedMerges + 1))); + assertBusy(() -> { + assertThat(threadPoolExecutor.getCompletedTaskCount(), is(completedMerges + 1)); + assertThat(runMergeSemaphore.availablePermits(), is(0)); + }); mergesStillToComplete--; } } From e349abb114fb5d91d97e29d46bb579b0d6407471 Mon Sep 17 00:00:00 2001 From: Albert Zaharovits Date: Mon, 16 Jun 2025 10:48:47 +0300 Subject: [PATCH 5/5] Fix test failure --- .../index/engine/ThreadPoolMergeExecutorServiceTests.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java index 1b01f6c6a978d..7a3df11e6c7a5 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java @@ -354,9 +354,12 @@ public void testIORateIsAdjustedForAllRunningMergeTasks() throws Exception { return null; }).when(mergeTask).abort(); assertThat(runMergeSemaphore.availablePermits(), is(0)); + boolean isAnyExecutorAvailable = currentlyRunningOrAbortingMergeTasksSet.size() < mergeExecutorThreadCount; boolean mergeTaskSubmitted = threadPoolMergeExecutorService.submitMergeTask(mergeTask); assertTrue(mergeTaskSubmitted); - assertBusy(() -> assertThat(currentlyRunningOrAbortingMergeTasksSet, hasItem(mergeTask))); + if (isAnyExecutorAvailable) { + assertBusy(() -> assertThat(currentlyRunningOrAbortingMergeTasksSet, hasItem(mergeTask))); + } long latestIORate = threadPoolMergeExecutorService.getTargetIORateBytesPerSec(); // all currently running merge tasks must be IO throttled to the latest IO Rate assertBusy(() -> {