Skip to content

Commit 8308411

Browse files
Fix ThreadPoolMergeExecutorServiceTests testIORateIsAdjustedForAllRunningMergeTasks (#130545)
The test submits merge tasks that support IO throttling, and asserts that all the currently running merge tasks are indeed IO throttled after the new one was submitted. The test erroneously tried to assert a property on the set of currently running merge tasks, which is very difficult to do since all merge tasks are possibly backlogged and re-enqueued asynchronously multiple times before they are run or aborted (so looking at the threadpool merge task queue there's no telling which merge task will execute first). Fixes #129531
1 parent c1b43d8 commit 8308411

File tree

2 files changed

+1
-16
lines changed

2 files changed

+1
-16
lines changed

muted-tests.yml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -500,9 +500,6 @@ tests:
500500
- class: org.elasticsearch.xpack.security.PermissionsIT
501501
method: testWhenUserLimitedByOnlyAliasOfIndexCanWriteToIndexWhichWasRolledoverByILMPolicy
502502
issue: https://github.com/elastic/elasticsearch/issues/129481
503-
- class: org.elasticsearch.index.engine.ThreadPoolMergeExecutorServiceTests
504-
method: testIORateIsAdjustedForAllRunningMergeTasks
505-
issue: https://github.com/elastic/elasticsearch/issues/129531
506503
- class: org.elasticsearch.upgrades.IndexingIT
507504
method: testIndexing
508505
issue: https://github.com/elastic/elasticsearch/issues/129533

server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@
5151
import static org.hamcrest.Matchers.equalTo;
5252
import static org.hamcrest.Matchers.greaterThan;
5353
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
54-
import static org.hamcrest.Matchers.hasItem;
5554
import static org.hamcrest.Matchers.is;
5655
import static org.hamcrest.Matchers.lessThan;
5756
import static org.hamcrest.Matchers.lessThanOrEqualTo;
@@ -329,7 +328,6 @@ public void testIORateIsAdjustedForAllRunningMergeTasks() throws Exception {
329328
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) testThreadPool.executor(ThreadPool.Names.MERGE);
330329
Semaphore runMergeSemaphore = new Semaphore(0);
331330
Set<MergeTask> currentlyRunningMergeTasksSet = ConcurrentCollections.newConcurrentSet();
332-
Set<MergeTask> currentlyRunningOrAbortingMergeTasksSet = ConcurrentCollections.newConcurrentSet();
333331
while (mergesStillToComplete > 0) {
334332
if (mergesStillToSubmit > 0 && (currentlyRunningMergeTasksSet.isEmpty() || randomBoolean())) {
335333
MergeTask mergeTask = mock(MergeTask.class);
@@ -347,27 +345,17 @@ public void testIORateIsAdjustedForAllRunningMergeTasks() throws Exception {
347345
}).when(mergeTask).schedule();
348346
doAnswer(mock -> {
349347
currentlyRunningMergeTasksSet.add(mergeTask);
350-
currentlyRunningOrAbortingMergeTasksSet.add(mergeTask);
351348
// wait to be signalled before completing
352349
runMergeSemaphore.acquire();
353-
currentlyRunningOrAbortingMergeTasksSet.remove(mergeTask);
354350
currentlyRunningMergeTasksSet.remove(mergeTask);
355351
return null;
356352
}).when(mergeTask).run();
357353
doAnswer(mock -> {
358-
currentlyRunningOrAbortingMergeTasksSet.add(mergeTask);
359354
// wait to be signalled before completing
360355
runMergeSemaphore.acquire();
361-
currentlyRunningOrAbortingMergeTasksSet.remove(mergeTask);
362356
return null;
363357
}).when(mergeTask).abort();
364-
assertThat(runMergeSemaphore.availablePermits(), is(0));
365-
boolean isAnyExecutorAvailable = currentlyRunningOrAbortingMergeTasksSet.size() < mergeExecutorThreadCount;
366-
boolean mergeTaskSubmitted = threadPoolMergeExecutorService.submitMergeTask(mergeTask);
367-
assertTrue(mergeTaskSubmitted);
368-
if (isAnyExecutorAvailable) {
369-
assertBusy(() -> assertThat(currentlyRunningOrAbortingMergeTasksSet, hasItem(mergeTask)));
370-
}
358+
assertTrue(threadPoolMergeExecutorService.submitMergeTask(mergeTask));
371359
long latestIORate = threadPoolMergeExecutorService.getTargetIORateBytesPerSec();
372360
// all currently running merge tasks must be IO throttled to the latest IO Rate
373361
assertBusy(() -> {

0 commit comments

Comments
 (0)