|
30 | 30 | import java.io.IOException; |
31 | 31 | import java.util.ArrayList; |
32 | 32 | import java.util.List; |
| 33 | +import java.util.concurrent.CountDownLatch; |
33 | 34 | import java.util.concurrent.Semaphore; |
34 | 35 | import java.util.concurrent.ThreadPoolExecutor; |
35 | 36 | import java.util.concurrent.atomic.AtomicBoolean; |
36 | 37 | import java.util.concurrent.atomic.AtomicInteger; |
37 | 38 |
|
| 39 | +import static org.hamcrest.Matchers.contains; |
38 | 40 | import static org.hamcrest.Matchers.equalTo; |
39 | 41 | import static org.hamcrest.Matchers.is; |
40 | 42 | import static org.hamcrest.Matchers.lessThanOrEqualTo; |
41 | 43 | import static org.mockito.ArgumentMatchers.any; |
42 | 44 | import static org.mockito.Mockito.doAnswer; |
43 | 45 | import static org.mockito.Mockito.mock; |
44 | 46 | import static org.mockito.Mockito.verify; |
| 47 | +import static org.mockito.Mockito.verifyNoInteractions; |
45 | 48 | import static org.mockito.Mockito.when; |
46 | 49 |
|
47 | 50 | public class ThreadPoolMergeSchedulerTests extends ESTestCase { |
@@ -262,6 +265,72 @@ public void testMergesRunConcurrently() throws Exception { |
262 | 265 | } |
263 | 266 | } |
264 | 267 |
|
| 268 | + public void testSchedulerCloseWaitsForRunningMerge() throws Exception { |
| 269 | + int mergeSchedulerMaxThreadCount = randomIntBetween(1, 3); |
| 270 | + int mergeExecutorThreadCount = randomIntBetween(1, 3); |
| 271 | + Settings settings = Settings.builder() |
| 272 | + .put(settingsWithMergeScheduler) |
| 273 | + .put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), mergeExecutorThreadCount) |
| 274 | + .put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), mergeSchedulerMaxThreadCount) |
| 275 | + .build(); |
| 276 | + try (TestThreadPool testThreadPool = new TestThreadPool("test", settings)) { |
| 277 | + ThreadPoolMergeExecutorService threadPoolMergeExecutorService = ThreadPoolMergeExecutorService |
| 278 | + .maybeCreateThreadPoolMergeExecutorService(testThreadPool, settings); |
| 279 | + assertNotNull(threadPoolMergeExecutorService); |
| 280 | + assertThat(threadPoolMergeExecutorService.getMaxConcurrentMerges(), equalTo(mergeExecutorThreadCount)); |
| 281 | + ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler( |
| 282 | + new ShardId("index", "_na_", 1), |
| 283 | + IndexSettingsModule.newIndexSettings("index", settings), |
| 284 | + threadPoolMergeExecutorService |
| 285 | + ); |
| 286 | + CountDownLatch mergeDoneLatch = new CountDownLatch(1); |
| 287 | + MergeSource mergeSource = mock(MergeSource.class); |
| 288 | + OneMerge oneMerge = mock(OneMerge.class); |
| 289 | + when(oneMerge.getStoreMergeInfo()).thenReturn(getNewMergeInfo(randomLongBetween(1L, 10L))); |
| 290 | + when(oneMerge.getMergeProgress()).thenReturn(new MergePolicy.OneMergeProgress()); |
| 291 | + when(mergeSource.getNextMerge()).thenReturn(oneMerge, (OneMerge) null); |
| 292 | + doAnswer(invocation -> { |
| 293 | + OneMerge merge = (OneMerge) invocation.getArguments()[0]; |
| 294 | + assertFalse(merge.isAborted()); |
| 295 | + // wait to be signalled before completing the merge |
| 296 | + mergeDoneLatch.await(); |
| 297 | + return null; |
| 298 | + }).when(mergeSource).merge(any(OneMerge.class)); |
| 299 | + threadPoolMergeScheduler.merge(mergeSource, randomFrom(MergeTrigger.values())); |
| 300 | + Thread t = new Thread(() -> { |
| 301 | + try { |
| 302 | + threadPoolMergeScheduler.close(); |
| 303 | + } catch (IOException e) { |
| 304 | + fail(e); |
| 305 | + } |
| 306 | + }); |
| 307 | + t.start(); |
| 308 | + try { |
| 309 | + assertTrue(t.isAlive()); |
| 310 | + // ensure the merge scheduler is effectively "closed" |
| 311 | + assertBusy(() -> { |
| 312 | + MergeSource mergeSource2 = mock(MergeSource.class); |
| 313 | + threadPoolMergeScheduler.merge(mergeSource2, randomFrom(MergeTrigger.values())); |
| 314 | + // when the merge scheduler is closed it won't pull in any new merges from the merge source |
| 315 | + verifyNoInteractions(mergeSource2); |
| 316 | + }); |
| 317 | + // assert the merge still shows up as "running" |
| 318 | + assertThat(threadPoolMergeScheduler.getCurrentlyRunningMergeTasks().keySet(), contains(oneMerge)); |
| 319 | + assertThat(threadPoolMergeScheduler.getBackloggedMergeTasks().size(), is(0)); |
| 320 | + assertTrue(t.isAlive()); |
| 321 | + // signal the merge to finish |
| 322 | + mergeDoneLatch.countDown(); |
| 323 | + } finally { |
| 324 | + t.join(); |
| 325 | + } |
| 326 | + assertBusy(() -> { |
| 327 | + assertThat(threadPoolMergeScheduler.getCurrentlyRunningMergeTasks().size(), is(0)); |
| 328 | + assertThat(threadPoolMergeScheduler.getBackloggedMergeTasks().size(), is(0)); |
| 329 | + assertTrue(threadPoolMergeExecutorService.allDone()); |
| 330 | + }); |
| 331 | + } |
| 332 | + } |
| 333 | + |
265 | 334 | public void testAutoIOThrottleForMergeTasksWhenSchedulerDisablesIt() throws Exception { |
266 | 335 | // merge scheduler configured with auto IO throttle disabled |
267 | 336 | Settings settings = Settings.builder() |
|
0 commit comments