|
17 | 17 | import org.elasticsearch.common.settings.Settings; |
18 | 18 | import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue; |
19 | 19 | import org.elasticsearch.common.util.concurrent.EsExecutors; |
| 20 | +import org.elasticsearch.env.NodeEnvironment; |
20 | 21 | import org.elasticsearch.index.IndexSettings; |
21 | 22 | import org.elasticsearch.index.MergeSchedulerConfig; |
22 | 23 | import org.elasticsearch.index.engine.ThreadPoolMergeScheduler.MergeTask; |
|
26 | 27 | import org.elasticsearch.test.IndexSettingsModule; |
27 | 28 | import org.elasticsearch.threadpool.TestThreadPool; |
28 | 29 | import org.elasticsearch.threadpool.ThreadPool; |
| 30 | +import org.junit.After; |
29 | 31 | import org.mockito.ArgumentCaptor; |
30 | 32 |
|
31 | 33 | import java.io.IOException; |
|
53 | 55 |
|
54 | 56 | public class ThreadPoolMergeSchedulerTests extends ESTestCase { |
55 | 57 |
|
| 58 | + private NodeEnvironment nodeEnvironment; |
| 59 | + |
| 60 | + @After |
| 61 | + public void closeNodeEnv() { |
| 62 | + if (nodeEnvironment != null) { |
| 63 | + nodeEnvironment.close(); |
| 64 | + nodeEnvironment = null; |
| 65 | + } |
| 66 | + } |
| 67 | + |
56 | 68 | public void testMergesExecuteInSizeOrder() throws IOException { |
57 | 69 | DeterministicTaskQueue threadPoolTaskQueue = new DeterministicTaskQueue(); |
| 70 | + Settings settings = Settings.builder() |
| 71 | + .put(ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING.getKey(), "0") |
| 72 | + .build(); |
| 73 | + nodeEnvironment = newNodeEnvironment(settings); |
58 | 74 | ThreadPoolMergeExecutorService threadPoolMergeExecutorService = ThreadPoolMergeExecutorServiceTests |
59 | | - .getThreadPoolMergeExecutorService(threadPoolTaskQueue.getThreadPool(), Settings.EMPTY, newNodeEnvironment(Settings.EMPTY)); |
| 75 | + .getThreadPoolMergeExecutorService(threadPoolTaskQueue.getThreadPool(), settings, nodeEnvironment); |
60 | 76 | try ( |
61 | 77 | ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler( |
62 | 78 | new ShardId("index", "_na_", 1), |
@@ -345,9 +361,10 @@ public void testMergeSourceWithFollowUpMergesRunSequentially() throws Exception |
345 | 361 | .put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), mergeExecutorThreadCount) |
346 | 362 | .put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), mergeExecutorThreadCount) |
347 | 363 | .build(); |
| 364 | + nodeEnvironment = newNodeEnvironment(settings); |
348 | 365 | try (TestThreadPool testThreadPool = new TestThreadPool("test", settings)) { |
349 | 366 | ThreadPoolMergeExecutorService threadPoolMergeExecutorService = ThreadPoolMergeExecutorServiceTests |
350 | | - .getThreadPoolMergeExecutorService(testThreadPool, settings, newNodeEnvironment(settings)); |
| 367 | + .getThreadPoolMergeExecutorService(testThreadPool, settings, nodeEnvironment); |
351 | 368 | assertThat(threadPoolMergeExecutorService.getMaxConcurrentMerges(), equalTo(mergeExecutorThreadCount)); |
352 | 369 | try ( |
353 | 370 | ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler( |
@@ -418,9 +435,10 @@ public void testMergesRunConcurrently() throws Exception { |
418 | 435 | .put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), mergeExecutorThreadCount) |
419 | 436 | .put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), mergeSchedulerMaxThreadCount) |
420 | 437 | .build(); |
| 438 | + nodeEnvironment = newNodeEnvironment(settings); |
421 | 439 | try (TestThreadPool testThreadPool = new TestThreadPool("test", settings)) { |
422 | 440 | ThreadPoolMergeExecutorService threadPoolMergeExecutorService = ThreadPoolMergeExecutorServiceTests |
423 | | - .getThreadPoolMergeExecutorService(testThreadPool, settings, newNodeEnvironment(settings)); |
| 441 | + .getThreadPoolMergeExecutorService(testThreadPool, settings, nodeEnvironment); |
424 | 442 | assertThat(threadPoolMergeExecutorService.getMaxConcurrentMerges(), equalTo(mergeExecutorThreadCount)); |
425 | 443 | ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) testThreadPool.executor(ThreadPool.Names.MERGE); |
426 | 444 | try ( |
@@ -504,9 +522,10 @@ public void testSchedulerCloseWaitsForRunningMerge() throws Exception { |
504 | 522 | .put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), mergeExecutorThreadCount) |
505 | 523 | .put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), mergeSchedulerMaxThreadCount) |
506 | 524 | .build(); |
| 525 | + nodeEnvironment = newNodeEnvironment(settings); |
507 | 526 | try (TestThreadPool testThreadPool = new TestThreadPool("test", settings)) { |
508 | 527 | ThreadPoolMergeExecutorService threadPoolMergeExecutorService = ThreadPoolMergeExecutorServiceTests |
509 | | - .getThreadPoolMergeExecutorService(testThreadPool, settings, newNodeEnvironment(settings)); |
| 528 | + .getThreadPoolMergeExecutorService(testThreadPool, settings, nodeEnvironment); |
510 | 529 | assertThat(threadPoolMergeExecutorService.getMaxConcurrentMerges(), equalTo(mergeExecutorThreadCount)); |
511 | 530 | try ( |
512 | 531 | ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler( |
|
0 commit comments