Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -479,9 +479,6 @@ tests:
- class: org.elasticsearch.xpack.ml.integration.ClassificationIT
method: testWithDatastreams
issue: https://github.com/elastic/elasticsearch/issues/129457
- class: org.elasticsearch.index.engine.ThreadPoolMergeExecutorServiceDiskSpaceTests
method: testMergeTasksAreUnblockedWhenMoreDiskSpaceBecomesAvailable
issue: https://github.com/elastic/elasticsearch/issues/129296
- class: org.elasticsearch.xpack.profiling.action.GetStatusActionIT
method: testWaitsUntilResourcesAreCreated
issue: https://github.com/elastic/elasticsearch/issues/129486
Expand Down Expand Up @@ -534,9 +531,6 @@ tests:
- class: org.elasticsearch.compute.aggregation.TopIntAggregatorFunctionTests
method: testManyInitialManyPartialFinalRunnerThrowing
issue: https://github.com/elastic/elasticsearch/issues/130145
- class: org.elasticsearch.index.engine.ThreadPoolMergeExecutorServiceDiskSpaceTests
method: testUnavailableBudgetBlocksNewMergeTasksFromStartingExecution
issue: https://github.com/elastic/elasticsearch/issues/130205
- class: org.elasticsearch.index.codec.vectors.cluster.KMeansLocalTests
method: testKMeansNeighbors
issue: https://github.com/elastic/elasticsearch/issues/130258
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Before;

import java.io.IOException;
import java.nio.file.FileStore;
Expand Down Expand Up @@ -59,8 +58,8 @@

public class ThreadPoolMergeExecutorServiceDiskSpaceTests extends ESTestCase {

private static TestMockFileStore aFileStore = new TestMockFileStore("mocka");
private static TestMockFileStore bFileStore = new TestMockFileStore("mockb");
private static TestMockFileStore aFileStore;
private static TestMockFileStore bFileStore;
private static String aPathPart;
private static String bPathPart;
private static int mergeExecutorThreadCount;
Expand All @@ -69,8 +68,10 @@ public class ThreadPoolMergeExecutorServiceDiskSpaceTests extends ESTestCase {
private static NodeEnvironment nodeEnvironment;
private static boolean setThreadPoolMergeSchedulerSetting;

@BeforeClass
public static void installMockUsableSpaceFS() throws Exception {
@Before
public void setupTestEnv() throws Exception {
aFileStore = new TestMockFileStore("mocka");
bFileStore = new TestMockFileStore("mockb");
FileSystem current = PathUtils.getDefaultFileSystem();
aPathPart = "a-" + randomUUID();
bPathPart = "b-" + randomUUID();
Expand All @@ -96,20 +97,21 @@ public static void installMockUsableSpaceFS() throws Exception {
nodeEnvironment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings));
}

@AfterClass
public static void removeMockUsableSpaceFS() {
@After
public void removeMockUsableSpaceFS() {
if (setThreadPoolMergeSchedulerSetting) {
assertWarnings(
"[indices.merge.scheduler.use_thread_pool] setting was deprecated in Elasticsearch "
+ "and will be removed in a future release. See the breaking changes documentation for the next major version."
);
}
Comment on lines +102 to +107
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated change that consolidates warning headers assertions.

PathUtilsForTesting.teardown();
aFileStore = null;
bFileStore = null;
testThreadPool.close();
nodeEnvironment.close();
}

@After
public void cleanupThreadPool() {
testThreadPool.scheduledTasks.clear();
}

static class TestCapturingThreadPool extends TestThreadPool {
final List<Tuple<TimeValue, Cancellable>> scheduledTasks = new ArrayList<>();

Expand Down Expand Up @@ -319,8 +321,6 @@ public void testDiskSpaceMonitorStartsAsDisabled() throws Exception {
)
);
}
aFileStore.throwIoException = false;
bFileStore.throwIoException = false;
}

public void testAvailableDiskSpaceMonitorWhenFileSystemStatErrors() throws Exception {
Expand Down Expand Up @@ -406,8 +406,6 @@ public void testAvailableDiskSpaceMonitorWhenFileSystemStatErrors() throws Excep
}
});
}
aFileStore.throwIoException = false;
bFileStore.throwIoException = false;
}

public void testAvailableDiskSpaceMonitorSettingsUpdate() throws Exception {
Expand Down Expand Up @@ -516,12 +514,6 @@ public void testAvailableDiskSpaceMonitorSettingsUpdate() throws Exception {
}
}, 5, TimeUnit.SECONDS);
}
if (setThreadPoolMergeSchedulerSetting) {
assertWarnings(
"[indices.merge.scheduler.use_thread_pool] setting was deprecated in Elasticsearch "
+ "and will be removed in a future release. See the breaking changes documentation for the next major version."
);
}
}

public void testAbortingOrRunningMergeTaskHoldsUpBudget() throws Exception {
Expand Down Expand Up @@ -564,7 +556,7 @@ public void testAbortingOrRunningMergeTaskHoldsUpBudget() throws Exception {
testDoneLatch.await();
return null;
}).when(stallingMergeTask).abort();
threadPoolMergeExecutorService.submitMergeTask(stallingMergeTask);
assertTrue(threadPoolMergeExecutorService.submitMergeTask(stallingMergeTask));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated change, just to make sure nothing unexpectedly fails when submitting a merge task.

// assert the merge task is holding up disk space budget
expectedAvailableBudget.set(expectedAvailableBudget.get() - taskBudget);
assertBusy(
Expand All @@ -574,7 +566,7 @@ public void testAbortingOrRunningMergeTaskHoldsUpBudget() throws Exception {
ThreadPoolMergeScheduler.MergeTask mergeTask = mock(ThreadPoolMergeScheduler.MergeTask.class);
when(mergeTask.estimatedRemainingMergeSize()).thenReturn(randomLongBetween(0L, expectedAvailableBudget.get()));
when(mergeTask.schedule()).thenReturn(RUN);
threadPoolMergeExecutorService.submitMergeTask(mergeTask);
assertTrue(threadPoolMergeExecutorService.submitMergeTask(mergeTask));
assertBusy(() -> {
verify(mergeTask).schedule();
verify(mergeTask).run();
Expand All @@ -595,12 +587,6 @@ public void testAbortingOrRunningMergeTaskHoldsUpBudget() throws Exception {
assertThat(threadPoolMergeExecutorService.allDone(), is(true));
});
}
if (setThreadPoolMergeSchedulerSetting) {
assertWarnings(
"[indices.merge.scheduler.use_thread_pool] setting was deprecated in Elasticsearch "
+ "and will be removed in a future release. See the breaking changes documentation for the next major version."
);
}
}

public void testBackloggedMergeTasksDoNotHoldUpBudget() throws Exception {
Expand Down Expand Up @@ -654,7 +640,7 @@ && randomBoolean()) {
testDoneLatch.await();
return null;
}).when(mergeTask).abort();
threadPoolMergeExecutorService.submitMergeTask(mergeTask);
assertTrue(threadPoolMergeExecutorService.submitMergeTask(mergeTask));
if (mergeTask.schedule() == RUN) {
runningMergeTasks.add(mergeTask);
} else {
Expand All @@ -679,7 +665,7 @@ && randomBoolean()) {
return RUN;
}
}).when(mergeTask).schedule();
threadPoolMergeExecutorService.submitMergeTask(mergeTask);
assertTrue(threadPoolMergeExecutorService.submitMergeTask(mergeTask));
backloggingMergeTasksScheduleCountMap.put(mergeTask, 1);
}
int checkRounds = randomIntBetween(1, 10);
Expand Down Expand Up @@ -712,7 +698,7 @@ && randomBoolean()) {
long taskBudget = randomLongBetween(1L, backloggedMergeTaskDiskSpaceBudget);
when(mergeTask.estimatedRemainingMergeSize()).thenReturn(taskBudget);
when(mergeTask.schedule()).thenReturn(RUN);
threadPoolMergeExecutorService.submitMergeTask(mergeTask);
assertTrue(threadPoolMergeExecutorService.submitMergeTask(mergeTask));
assertBusy(() -> {
verify(mergeTask).schedule();
verify(mergeTask).run();
Expand All @@ -739,12 +725,6 @@ && randomBoolean()) {
assertThat(threadPoolMergeExecutorService.allDone(), is(true));
});
}
if (setThreadPoolMergeSchedulerSetting) {
assertWarnings(
"[indices.merge.scheduler.use_thread_pool] setting was deprecated in Elasticsearch "
+ "and will be removed in a future release. See the breaking changes documentation for the next major version."
);
}
}

public void testUnavailableBudgetBlocksNewMergeTasksFromStartingExecution() throws Exception {
Expand Down Expand Up @@ -823,7 +803,7 @@ public void testUnavailableBudgetBlocksNewMergeTasksFromStartingExecution() thro
runningOrAbortingMergeTasksList.add(mergeTask);
latchesBlockingMergeTasksList.add(blockMergeTaskLatch);
}
threadPoolMergeExecutorService.submitMergeTask(mergeTask);
assertTrue(threadPoolMergeExecutorService.submitMergeTask(mergeTask));
}
// currently running (or aborting) merge tasks have consumed some of the available budget
while (runningOrAbortingMergeTasksList.isEmpty() == false) {
Expand Down Expand Up @@ -855,8 +835,8 @@ public void testUnavailableBudgetBlocksNewMergeTasksFromStartingExecution() thro
// merge task 2 can run because it is under budget
when(mergeTask2.estimatedRemainingMergeSize()).thenReturn(underBudget);
}
threadPoolMergeExecutorService.submitMergeTask(mergeTask1);
threadPoolMergeExecutorService.submitMergeTask(mergeTask2);
assertTrue(threadPoolMergeExecutorService.submitMergeTask(mergeTask1));
assertTrue(threadPoolMergeExecutorService.submitMergeTask(mergeTask2));
assertBusy(() -> {
if (task1Runs) {
verify(mergeTask1).schedule();
Expand Down Expand Up @@ -890,12 +870,6 @@ public void testUnavailableBudgetBlocksNewMergeTasksFromStartingExecution() thro
bFileStore.usableSpace = Long.MAX_VALUE;
assertBusy(() -> assertThat(threadPoolMergeExecutorService.allDone(), is(true)));
}
if (setThreadPoolMergeSchedulerSetting) {
assertWarnings(
"[indices.merge.scheduler.use_thread_pool] setting was deprecated in Elasticsearch "
+ "and will be removed in a future release. See the breaking changes documentation for the next major version."
);
}
}

public void testEnqueuedMergeTasksAreUnblockedWhenEstimatedMergeSizeChanges() throws Exception {
Expand Down Expand Up @@ -990,12 +964,6 @@ public void testEnqueuedMergeTasksAreUnblockedWhenEstimatedMergeSizeChanges() th
}
});
}
if (setThreadPoolMergeSchedulerSetting) {
assertWarnings(
"[indices.merge.scheduler.use_thread_pool] setting was deprecated in Elasticsearch "
+ "and will be removed in a future release. See the breaking changes documentation for the next major version."
);
}
}

public void testMergeTasksAreUnblockedWhenMoreDiskSpaceBecomesAvailable() throws Exception {
Expand Down Expand Up @@ -1058,7 +1026,7 @@ public void testMergeTasksAreUnblockedWhenMoreDiskSpaceBecomesAvailable() throws
testDoneLatch.await();
return null;
}).when(mergeTask).abort();
threadPoolMergeExecutorService.submitMergeTask(mergeTask);
assertTrue(threadPoolMergeExecutorService.submitMergeTask(mergeTask));
if (mergeTask.schedule() == RUN) {
runningMergeTasks.add(mergeTask);
} else {
Expand All @@ -1083,7 +1051,7 @@ public void testMergeTasksAreUnblockedWhenMoreDiskSpaceBecomesAvailable() throws
when(mergeTask.estimatedRemainingMergeSize()).thenReturn(taskBudget);
Schedule schedule = randomFrom(RUN, ABORT);
when(mergeTask.schedule()).thenReturn(schedule);
threadPoolMergeExecutorService.submitMergeTask(mergeTask);
assertTrue(threadPoolMergeExecutorService.submitMergeTask(mergeTask));
if (schedule == RUN) {
overBudgetTasksToRunList.add(mergeTask);
} else {
Expand Down Expand Up @@ -1150,11 +1118,5 @@ public void testMergeTasksAreUnblockedWhenMoreDiskSpaceBecomesAvailable() throws
);
});
}
if (setThreadPoolMergeSchedulerSetting) {
assertWarnings(
"[indices.merge.scheduler.use_thread_pool] setting was deprecated in Elasticsearch "
+ "and will be removed in a future release. See the breaking changes documentation for the next major version."
);
}
}
}