Skip to content

Commit 322c6c0

Browse files
Fix testUnavailableBudgetBlocksNewMergeTasksFromStartingExecution
1 parent 09e511b commit 322c6c0

File tree

2 files changed

+73
-48
lines changed

2 files changed

+73
-48
lines changed

server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -754,8 +754,8 @@ int getMergeTasksQueueLength() {
754754
}
755755

756756
// exposed for tests
757-
MergeTaskPriorityBlockingQueue getMergeTasksQueue() {
758-
return queuedMergeTasks;
757+
long getDiskSpaceAvailableForNewMergeTasks() {
758+
return queuedMergeTasks.getAvailableBudget();
759759
}
760760

761761
// exposed for tests and stats

server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceDiskSpaceTests.java

Lines changed: 71 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,9 @@
3535
import java.nio.file.attribute.FileAttributeView;
3636
import java.nio.file.attribute.FileStoreAttributeView;
3737
import java.nio.file.spi.FileSystemProvider;
38+
import java.util.ArrayList;
3839
import java.util.LinkedHashSet;
40+
import java.util.List;
3941
import java.util.concurrent.CountDownLatch;
4042
import java.util.concurrent.TimeUnit;
4143
import java.util.concurrent.atomic.AtomicLong;
@@ -47,6 +49,8 @@
4749
import static org.hamcrest.Matchers.is;
4850
import static org.mockito.Mockito.doAnswer;
4951
import static org.mockito.Mockito.mock;
52+
import static org.mockito.Mockito.times;
53+
import static org.mockito.Mockito.verify;
5054
import static org.mockito.Mockito.when;
5155

5256
public class ThreadPoolMergeExecutorServiceDiskSpaceTests extends ESTestCase {
@@ -324,15 +328,15 @@ public void testAvailableDiskSpaceMonitorSettingsUpdate() throws Exception {
324328
}
325329
}
326330

327-
public void testUnavailableBudgetBlocksEnqueuedMergeTasks() throws Exception {
331+
public void testUnavailableBudgetBlocksNewMergeTasksFromStartingExecution() throws Exception {
328332
int mergeExecutorThreadCount = randomIntBetween(5, 10);
329333
// fewer merge tasks than pool threads so that there's always a free thread available
330334
int submittedMergesCount = randomIntBetween(1, mergeExecutorThreadCount - 1);
331335
Settings settings = Settings.builder()
332336
.put(this.settings)
333337
.put(ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.getKey(), true)
334338
.put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), mergeExecutorThreadCount)
335-
.put(ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING.getKey(), "100ms")
339+
.put(ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING.getKey(), "50ms")
336340
.build();
337341
ClusterSettings clusterSettings = ClusterSettings.createBuiltInClusterSettings(settings);
338342
aFileStore.totalSpace = 150_000L;
@@ -347,7 +351,6 @@ public void testUnavailableBudgetBlocksEnqueuedMergeTasks() throws Exception {
347351
aFileStore.usableSpace = 90_000L;
348352
bFileStore.usableSpace = 110_000L;
349353
}
350-
long smallMergeTaskSize = 10L;
351354
try (TestThreadPool testThreadPool = new TestThreadPool("test", settings)) {
352355
try (NodeEnvironment nodeEnv = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings))) {
353356
try (
@@ -356,20 +359,20 @@ public void testUnavailableBudgetBlocksEnqueuedMergeTasks() throws Exception {
356359
) {
357360
assert threadPoolMergeExecutorService != null;
358361
// wait for the budget to be updated from the available disk space
359-
AtomicLong availableBudget = new AtomicLong();
362+
AtomicLong expectedAvailableBudget = new AtomicLong();
360363
assertBusy(() -> {
361364
if (aHasMoreSpace) {
362365
// 120_000L (available) - 5% (default flood stage level) * 150_000L (total)
363-
assertThat(threadPoolMergeExecutorService.getMergeTasksQueue().getAvailableBudget(), is(112_500L));
364-
availableBudget.set(112_500L);
366+
assertThat(threadPoolMergeExecutorService.getDiskSpaceAvailableForNewMergeTasks(), is(112_500L));
367+
expectedAvailableBudget.set(112_500L);
365368
} else {
366369
// 110_000L (available) - 5% (default flood stage level) * 140_000L (total)
367-
assertThat(threadPoolMergeExecutorService.getMergeTasksQueue().getAvailableBudget(), is(103_000L));
368-
availableBudget.set(103_000L);
370+
assertThat(threadPoolMergeExecutorService.getDiskSpaceAvailableForNewMergeTasks(), is(103_000L));
371+
expectedAvailableBudget.set(103_000L);
369372
}
370373
});
371-
// tasks that wait on the latch will hold up the budget
372-
CountDownLatch blockMergeTasks = new CountDownLatch(1);
374+
List<ThreadPoolMergeScheduler.MergeTask> runningOrAbortingMergeTasksList = new ArrayList<>();
375+
List<CountDownLatch> latchesBlockingMergeTasksList = new ArrayList<>();
373376
// submit merge tasks that don't finish, in order to deplete the available budget
374377
while (submittedMergesCount > 0) {
375378
ThreadPoolMergeScheduler.MergeTask mergeTask = mock(ThreadPoolMergeScheduler.MergeTask.class);
@@ -390,57 +393,79 @@ public void testUnavailableBudgetBlocksEnqueuedMergeTasks() throws Exception {
390393
// this task will NOT hold up any budget because it runs quickly (it is not blocked)
391394
when(mergeTask.estimatedRemainingMergeSize()).thenReturn(randomLongBetween(1_000L, 10_000L));
392395
} else {
393-
// make sure the available budget is expended
394-
if (submittedMergesCount == 1) {
395-
when(mergeTask.estimatedRemainingMergeSize()).thenReturn(availableBudget.get());
396-
} else {
397-
long taskBudget = randomLongBetween(smallMergeTaskSize + 1L, availableBudget.get());
398-
when(mergeTask.estimatedRemainingMergeSize()).thenReturn(taskBudget);
399-
availableBudget.set(availableBudget.get() - taskBudget);
400-
}
396+
CountDownLatch blockMergeTaskLatch = new CountDownLatch(1);
397+
long taskBudget = randomLongBetween(1L, expectedAvailableBudget.get());
398+
when(mergeTask.estimatedRemainingMergeSize()).thenReturn(taskBudget);
399+
expectedAvailableBudget.set(expectedAvailableBudget.get() - taskBudget);
401400
submittedMergesCount--;
402401
// this task will hold up budget because it blocks when it runs (to simulate it running for a long time)
403402
doAnswer(mock -> {
404403
// wait to be signalled before completing (this holds up budget)
405-
blockMergeTasks.await();
404+
blockMergeTaskLatch.await();
406405
return null;
407406
}).when(mergeTask).run();
408407
doAnswer(mock -> {
409408
// wait to be signalled before completing (this holds up budget)
410-
blockMergeTasks.await();
409+
blockMergeTaskLatch.await();
411410
return null;
412411
}).when(mergeTask).abort();
412+
runningOrAbortingMergeTasksList.add(mergeTask);
413+
latchesBlockingMergeTasksList.add(blockMergeTaskLatch);
413414
}
414415
threadPoolMergeExecutorService.submitMergeTask(mergeTask);
415416
}
416-
// running (or aborting) merge tasks have depleted the available budget
417-
assertBusy(() -> { assertThat(threadPoolMergeExecutorService.getMergeTasksQueue().getAvailableBudget(), is(0L)); });
418-
int moreMergeTasksCount = randomIntBetween(1, 10);
419-
// any new merge tasks will only be enqueued but not actually run, until more budget becomes available
420-
for (int i = 0; i < moreMergeTasksCount; i++) {
421-
ThreadPoolMergeScheduler.MergeTask mergeTask = mock(ThreadPoolMergeScheduler.MergeTask.class);
422-
when(mergeTask.supportsIOThrottling()).thenReturn(randomBoolean());
423-
// even "small" merge tasks cannot run because the available budget is "0"
424-
when(mergeTask.estimatedRemainingMergeSize()).thenReturn(randomLongBetween(1L, smallMergeTaskSize));
425-
when(mergeTask.schedule()).thenReturn(RUN);
426-
threadPoolMergeExecutorService.submitMergeTask(mergeTask);
427-
}
428-
assertBusy(() -> {
429-
assertThat(threadPoolMergeExecutorService.getMergeTasksQueue().getAvailableBudget(), is(0L));
430-
// all the newly submitted merge tasks have been enqueued, none executed
431-
assertThat(threadPoolMergeExecutorService.getMergeTasksQueueLength(), greaterThanOrEqualTo(moreMergeTasksCount));
432-
});
433-
// resume blocked merge tasks which should resume everything back
434-
blockMergeTasks.countDown();
435-
assertBusy(() -> {
436-
assertThat(threadPoolMergeExecutorService.getRunningMergeTasks().size(), is(0));
437-
assertThat(threadPoolMergeExecutorService.getMergeTasksQueueLength(), is(0));
438-
if (aHasMoreSpace) {
439-
assertThat(threadPoolMergeExecutorService.getMergeTasksQueue().getAvailableBudget(), is(112_500L));
417+
// currently running (or aborting) merge tasks have consumed some of the available budget
418+
assertBusy(
419+
() -> assertThat(
420+
threadPoolMergeExecutorService.getDiskSpaceAvailableForNewMergeTasks(),
421+
is(expectedAvailableBudget.get())
422+
)
423+
);
424+
while (runningOrAbortingMergeTasksList.isEmpty() == false) {
425+
assertBusy(
426+
() -> assertThat(
427+
threadPoolMergeExecutorService.getDiskSpaceAvailableForNewMergeTasks(),
428+
is(expectedAvailableBudget.get())
429+
)
430+
);
431+
ThreadPoolMergeScheduler.MergeTask mergeTask1 = mock(ThreadPoolMergeScheduler.MergeTask.class);
432+
when(mergeTask1.supportsIOThrottling()).thenReturn(randomBoolean());
433+
when(mergeTask1.schedule()).thenReturn(RUN);
434+
ThreadPoolMergeScheduler.MergeTask mergeTask2 = mock(ThreadPoolMergeScheduler.MergeTask.class);
435+
when(mergeTask2.supportsIOThrottling()).thenReturn(randomBoolean());
436+
when(mergeTask2.schedule()).thenReturn(RUN);
437+
boolean task1Runs = randomBoolean();
438+
long currentAvailableBudget = expectedAvailableBudget.get();
439+
long overBudget = randomLongBetween(currentAvailableBudget + 1L, currentAvailableBudget + 100L);
440+
long underBudget = randomLongBetween(0L, currentAvailableBudget);
441+
if (task1Runs) {
442+
// merge task 1 can run because it is under budget
443+
when(mergeTask1.estimatedRemainingMergeSize()).thenReturn(underBudget);
444+
// merge task 2 cannot run because it is over budget
445+
when(mergeTask2.estimatedRemainingMergeSize()).thenReturn(overBudget);
440446
} else {
441-
assertThat(threadPoolMergeExecutorService.getMergeTasksQueue().getAvailableBudget(), is(103_000L));
447+
// merge task 1 cannot run because it is over budget
448+
when(mergeTask1.estimatedRemainingMergeSize()).thenReturn(overBudget);
449+
// merge task 2 can run because it is under budget
450+
when(mergeTask2.estimatedRemainingMergeSize()).thenReturn(underBudget);
442451
}
443-
});
452+
threadPoolMergeExecutorService.submitMergeTask(mergeTask1);
453+
threadPoolMergeExecutorService.submitMergeTask(mergeTask2);
454+
assertBusy(() -> {
455+
if (task1Runs) {
456+
verify(mergeTask1).run();
457+
verify(mergeTask2, times(0)).run();
458+
} else {
459+
verify(mergeTask2).run();
460+
verify(mergeTask1, times(0)).run();
461+
}
462+
});
463+
// let one task from the bunch tasks, that are holding budget, finish running
464+
int index = randomIntBetween(0, runningOrAbortingMergeTasksList.size() - 1);
465+
latchesBlockingMergeTasksList.remove(index).countDown();
466+
ThreadPoolMergeScheduler.MergeTask completedMergeTask = runningOrAbortingMergeTasksList.remove(index);
467+
expectedAvailableBudget.set(expectedAvailableBudget.get() + completedMergeTask.estimatedRemainingMergeSize());
468+
}
444469
}
445470
}
446471
}

0 commit comments

Comments
 (0)