diff --git a/muted-tests.yml b/muted-tests.yml index cb8a6fdf0d903..fb59fa19deddb 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -271,9 +271,6 @@ tests: - class: org.elasticsearch.xpack.test.rest.XPackRestIT method: test {p0=transform/transforms_start_stop/Test schedule_now on an already started transform} issue: https://github.com/elastic/elasticsearch/issues/120720 -- class: org.elasticsearch.index.engine.ThreadPoolMergeExecutorServiceTests - method: testIORateIsAdjustedForRunningMergeTasks - issue: https://github.com/elastic/elasticsearch/issues/125842 - class: org.elasticsearch.xpack.test.rest.XPackRestIT method: test {p0=transform/transforms_start_stop/Verify start transform creates destination index with appropriate mapping} issue: https://github.com/elastic/elasticsearch/issues/125854 diff --git a/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java index 9b74d68326108..7a3df11e6c7a5 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java @@ -51,6 +51,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.lessThanOrEqualTo; @@ -299,9 +300,9 @@ public void testTargetIORateChangesWhenSubmittingMergeTasks() throws Exception { } } - public void testIORateIsAdjustedForRunningMergeTasks() throws Exception { - int mergeExecutorThreadCount = randomIntBetween(1, 3); - int mergesStillToSubmit = randomIntBetween(1, 10); + public void testIORateIsAdjustedForAllRunningMergeTasks() throws Exception { + int mergeExecutorThreadCount = randomIntBetween(1, 5); + int mergesStillToSubmit = randomIntBetween(1, 20); int mergesStillToComplete = mergesStillToSubmit; Settings settings = Settings.builder() .put(ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.getKey(), true) @@ -320,6 +321,7 @@ public void testIORateIsAdjustedForRunningMergeTasks() throws Exception { ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) testThreadPool.executor(ThreadPool.Names.MERGE); Semaphore runMergeSemaphore = new Semaphore(0); Set currentlyRunningMergeTasksSet = ConcurrentCollections.newConcurrentSet(); + Set currentlyRunningOrAbortingMergeTasksSet = ConcurrentCollections.newConcurrentSet(); while (mergesStillToComplete > 0) { if (mergesStillToSubmit > 0 && (currentlyRunningMergeTasksSet.isEmpty() || randomBoolean())) { MergeTask mergeTask = mock(MergeTask.class); @@ -337,31 +339,37 @@ public void testIORateIsAdjustedForRunningMergeTasks() throws Exception { }).when(mergeTask).schedule(); doAnswer(mock -> { currentlyRunningMergeTasksSet.add(mergeTask); + currentlyRunningOrAbortingMergeTasksSet.add(mergeTask); // wait to be signalled before completing runMergeSemaphore.acquire(); + currentlyRunningOrAbortingMergeTasksSet.remove(mergeTask); currentlyRunningMergeTasksSet.remove(mergeTask); return null; }).when(mergeTask).run(); doAnswer(mock -> { + currentlyRunningOrAbortingMergeTasksSet.add(mergeTask); // wait to be signalled before completing runMergeSemaphore.acquire(); + currentlyRunningOrAbortingMergeTasksSet.remove(mergeTask); return null; }).when(mergeTask).abort(); - int activeMergeTasksCount = threadPoolExecutor.getActiveCount(); - threadPoolMergeExecutorService.submitMergeTask(mergeTask); - long newIORate = threadPoolMergeExecutorService.getTargetIORateBytesPerSec(); - // all currently running merge tasks must be IO throttled + assertThat(runMergeSemaphore.availablePermits(), is(0)); + boolean isAnyExecutorAvailable = currentlyRunningOrAbortingMergeTasksSet.size() < mergeExecutorThreadCount; + boolean mergeTaskSubmitted = threadPoolMergeExecutorService.submitMergeTask(mergeTask); + assertTrue(mergeTaskSubmitted); + if (isAnyExecutorAvailable) { + assertBusy(() -> assertThat(currentlyRunningOrAbortingMergeTasksSet, hasItem(mergeTask))); + } + long latestIORate = threadPoolMergeExecutorService.getTargetIORateBytesPerSec(); + // all currently running merge tasks must be IO throttled to the latest IO Rate assertBusy(() -> { - // await new merge to start executing - if (activeMergeTasksCount < mergeExecutorThreadCount) { - assertThat(threadPoolExecutor.getActiveCount(), is(activeMergeTasksCount + 1)); - } - // assert IO throttle is set on the running merge tasks + // assert IO throttle is set on ALL the running merge tasks for (MergeTask currentlyRunningMergeTask : currentlyRunningMergeTasksSet) { - var ioRateCaptor = ArgumentCaptor.forClass(Long.class); + verify(currentlyRunningMergeTask).run(); // only interested in the last invocation + var ioRateCaptor = ArgumentCaptor.forClass(Long.class); verify(currentlyRunningMergeTask, atLeastOnce()).setIORateLimit(ioRateCaptor.capture()); - assertThat(ioRateCaptor.getValue(), is(newIORate)); + assertThat(ioRateCaptor.getValue(), is(latestIORate)); } }); mergesStillToSubmit--; @@ -369,7 +377,10 @@ public void testIORateIsAdjustedForRunningMergeTasks() throws Exception { long completedMerges = threadPoolExecutor.getCompletedTaskCount(); runMergeSemaphore.release(); // await merge to finish - assertBusy(() -> assertThat(threadPoolExecutor.getCompletedTaskCount(), is(completedMerges + 1))); + assertBusy(() -> { + assertThat(threadPoolExecutor.getCompletedTaskCount(), is(completedMerges + 1)); + assertThat(runMergeSemaphore.availablePermits(), is(0)); + }); mergesStillToComplete--; } }