Skip to content
3 changes: 0 additions & 3 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -271,9 +271,6 @@ tests:
- class: org.elasticsearch.xpack.test.rest.XPackRestIT
method: test {p0=transform/transforms_start_stop/Test schedule_now on an already started transform}
issue: https://github.com/elastic/elasticsearch/issues/120720
- class: org.elasticsearch.index.engine.ThreadPoolMergeExecutorServiceTests
method: testIORateIsAdjustedForRunningMergeTasks
issue: https://github.com/elastic/elasticsearch/issues/125842
- class: org.elasticsearch.xpack.test.rest.XPackRestIT
method: test {p0=transform/transforms_start_stop/Verify start transform creates destination index with appropriate mapping}
issue: https://github.com/elastic/elasticsearch/issues/125854
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
Expand Down Expand Up @@ -299,9 +300,9 @@ public void testTargetIORateChangesWhenSubmittingMergeTasks() throws Exception {
}
}

public void testIORateIsAdjustedForRunningMergeTasks() throws Exception {
int mergeExecutorThreadCount = randomIntBetween(1, 3);
int mergesStillToSubmit = randomIntBetween(1, 10);
public void testIORateIsAdjustedForAllRunningMergeTasks() throws Exception {
int mergeExecutorThreadCount = randomIntBetween(1, 5);
int mergesStillToSubmit = randomIntBetween(1, 20);
int mergesStillToComplete = mergesStillToSubmit;
Settings settings = Settings.builder()
.put(ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.getKey(), true)
Expand All @@ -320,6 +321,7 @@ public void testIORateIsAdjustedForRunningMergeTasks() throws Exception {
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) testThreadPool.executor(ThreadPool.Names.MERGE);
Semaphore runMergeSemaphore = new Semaphore(0);
Set<MergeTask> currentlyRunningMergeTasksSet = ConcurrentCollections.newConcurrentSet();
Set<MergeTask> currentlyRunningOrAbortingMergeTasksSet = ConcurrentCollections.newConcurrentSet();
while (mergesStillToComplete > 0) {
if (mergesStillToSubmit > 0 && (currentlyRunningMergeTasksSet.isEmpty() || randomBoolean())) {
MergeTask mergeTask = mock(MergeTask.class);
Expand All @@ -337,39 +339,48 @@ public void testIORateIsAdjustedForRunningMergeTasks() throws Exception {
}).when(mergeTask).schedule();
doAnswer(mock -> {
currentlyRunningMergeTasksSet.add(mergeTask);
currentlyRunningOrAbortingMergeTasksSet.add(mergeTask);
// wait to be signalled before completing
runMergeSemaphore.acquire();
currentlyRunningOrAbortingMergeTasksSet.remove(mergeTask);
currentlyRunningMergeTasksSet.remove(mergeTask);
return null;
}).when(mergeTask).run();
doAnswer(mock -> {
currentlyRunningOrAbortingMergeTasksSet.add(mergeTask);
// wait to be signalled before completing
runMergeSemaphore.acquire();
currentlyRunningOrAbortingMergeTasksSet.remove(mergeTask);
return null;
}).when(mergeTask).abort();
int activeMergeTasksCount = threadPoolExecutor.getActiveCount();
threadPoolMergeExecutorService.submitMergeTask(mergeTask);
long newIORate = threadPoolMergeExecutorService.getTargetIORateBytesPerSec();
// all currently running merge tasks must be IO throttled
assertThat(runMergeSemaphore.availablePermits(), is(0));
boolean isAnyExecutorAvailable = currentlyRunningOrAbortingMergeTasksSet.size() < mergeExecutorThreadCount;
boolean mergeTaskSubmitted = threadPoolMergeExecutorService.submitMergeTask(mergeTask);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we assert that the runSemaphore has no permits here? I think that is an assumption.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pushed 5db6465

assertTrue(mergeTaskSubmitted);
if (isAnyExecutorAvailable) {
assertBusy(() -> assertThat(currentlyRunningOrAbortingMergeTasksSet, hasItem(mergeTask)));
}
long latestIORate = threadPoolMergeExecutorService.getTargetIORateBytesPerSec();
// all currently running merge tasks must be IO throttled to the latest IO Rate
assertBusy(() -> {
// await new merge to start executing
if (activeMergeTasksCount < mergeExecutorThreadCount) {
assertThat(threadPoolExecutor.getActiveCount(), is(activeMergeTasksCount + 1));
}
// assert IO throttle is set on the running merge tasks
// assert IO throttle is set on ALL the running merge tasks
for (MergeTask currentlyRunningMergeTask : currentlyRunningMergeTasksSet) {
var ioRateCaptor = ArgumentCaptor.forClass(Long.class);
verify(currentlyRunningMergeTask).run();
// only interested in the last invocation
var ioRateCaptor = ArgumentCaptor.forClass(Long.class);
verify(currentlyRunningMergeTask, atLeastOnce()).setIORateLimit(ioRateCaptor.capture());
assertThat(ioRateCaptor.getValue(), is(newIORate));
assertThat(ioRateCaptor.getValue(), is(latestIORate));
}
});
mergesStillToSubmit--;
} else {
long completedMerges = threadPoolExecutor.getCompletedTaskCount();
runMergeSemaphore.release();
// await merge to finish
assertBusy(() -> assertThat(threadPoolExecutor.getCompletedTaskCount(), is(completedMerges + 1)));
assertBusy(() -> {
assertThat(threadPoolExecutor.getCompletedTaskCount(), is(completedMerges + 1));
assertThat(runMergeSemaphore.availablePermits(), is(0));
});
mergesStillToComplete--;
}
}
Expand Down