Skip to content

Commit 0ee4599

Browse files
testUnavailableBudgetBlocksEnqueuedMergeTasks
1 parent 8f30793 commit 0ee4599

File tree

2 files changed

+146
-0
lines changed

2 files changed

+146
-0
lines changed

server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -745,6 +745,11 @@ int getMergeTasksQueueLength() {
745745
return queuedMergeTasks.queueSize();
746746
}
747747

748+
// exposed for tests
749+
MergeTaskPriorityBlockingQueue getMergeTasksQueue() {
750+
return queuedMergeTasks;
751+
}
752+
748753
// exposed for tests and stats
749754
long getTargetIORateBytesPerSec() {
750755
return targetIORateBytesPerSec.get();

server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceDiskSpaceTests.java

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,17 @@
1414
import org.elasticsearch.common.settings.ClusterSettings;
1515
import org.elasticsearch.common.settings.Settings;
1616
import org.elasticsearch.common.unit.ByteSizeValue;
17+
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
18+
import org.elasticsearch.common.util.concurrent.EsExecutors;
1719
import org.elasticsearch.core.PathUtils;
1820
import org.elasticsearch.core.PathUtilsForTesting;
1921
import org.elasticsearch.env.Environment;
2022
import org.elasticsearch.env.NodeEnvironment;
2123
import org.elasticsearch.env.TestEnvironment;
24+
import org.elasticsearch.index.engine.ThreadPoolMergeScheduler.Schedule;
2225
import org.elasticsearch.test.ESTestCase;
2326
import org.elasticsearch.threadpool.TestThreadPool;
27+
import org.elasticsearch.threadpool.ThreadPool;
2428
import org.junit.AfterClass;
2529
import org.junit.Before;
2630
import org.junit.BeforeClass;
@@ -33,12 +37,25 @@
3337
import java.nio.file.attribute.FileStoreAttributeView;
3438
import java.nio.file.spi.FileSystemProvider;
3539
import java.util.LinkedHashSet;
40+
import java.util.Set;
3641
import java.util.concurrent.CountDownLatch;
42+
import java.util.concurrent.Semaphore;
43+
import java.util.concurrent.ThreadPoolExecutor;
3744
import java.util.concurrent.TimeUnit;
45+
import java.util.concurrent.atomic.AtomicInteger;
46+
import java.util.concurrent.atomic.AtomicLong;
3847
import java.util.concurrent.atomic.AtomicReference;
3948

49+
import static org.elasticsearch.index.engine.ThreadPoolMergeExecutorServiceTests.getThreadPoolMergeExecutorService;
50+
import static org.elasticsearch.index.engine.ThreadPoolMergeScheduler.Schedule.ABORT;
51+
import static org.elasticsearch.index.engine.ThreadPoolMergeScheduler.Schedule.BACKLOG;
52+
import static org.elasticsearch.index.engine.ThreadPoolMergeScheduler.Schedule.RUN;
53+
import static org.hamcrest.Matchers.equalTo;
54+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
4055
import static org.hamcrest.Matchers.is;
56+
import static org.mockito.Mockito.doAnswer;
4157
import static org.mockito.Mockito.mock;
58+
import static org.mockito.Mockito.when;
4259

4360
public class ThreadPoolMergeExecutorServiceDiskSpaceTests extends ESTestCase {
4461

@@ -314,4 +331,128 @@ public void testAvailableDiskSpaceMonitorSettingsUpdate() throws Exception {
314331
}
315332
}
316333
}
334+
335+
public void testUnavailableBudgetBlocksEnqueuedMergeTasks() throws Exception {
336+
int mergeExecutorThreadCount = randomIntBetween(5, 10);
337+
// fewer merge tasks than pool threads so that there's always a free thread available
338+
int submittedMergesCount = randomIntBetween(1, mergeExecutorThreadCount - 1);
339+
Settings settings = Settings.builder()
340+
.put(this.settings)
341+
.put(ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.getKey(), true)
342+
.put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), mergeExecutorThreadCount)
343+
.put(ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING.getKey(), "100ms")
344+
.build();
345+
ClusterSettings clusterSettings = ClusterSettings.createBuiltInClusterSettings(settings);
346+
aFileStore.totalSpace = 150_000L;
347+
bFileStore.totalSpace = 140_000L;
348+
boolean aHasMoreSpace = randomBoolean();
349+
if (aHasMoreSpace) {
350+
// "a" has more available space
351+
aFileStore.usableSpace = 120_000L;
352+
bFileStore.usableSpace = 100_000L;
353+
} else {
354+
// "b" has more available space
355+
aFileStore.usableSpace = 90_000L;
356+
bFileStore.usableSpace = 110_000L;
357+
}
358+
long smallMergeTaskSize = 10L;
359+
try (TestThreadPool testThreadPool = new TestThreadPool("test", settings)) {
360+
try (NodeEnvironment nodeEnv = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings))) {
361+
try (
362+
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = ThreadPoolMergeExecutorService
363+
.maybeCreateThreadPoolMergeExecutorService(testThreadPool, clusterSettings, nodeEnv)
364+
) {
365+
assert threadPoolMergeExecutorService != null;
366+
// wait for the budget to be updated from the available disk space
367+
AtomicLong availableBudget = new AtomicLong();
368+
assertBusy(() -> {
369+
if (aHasMoreSpace) {
370+
// 120_000L (available) - 5% (default flood stage level) * 150_000L (total)
371+
assertThat(threadPoolMergeExecutorService.getMergeTasksQueue().getAvailableBudget(), is(112_500L));
372+
availableBudget.set(112_500L);
373+
} else {
374+
// 110_000L (available) - 5% (default flood stage level) * 140_000L (total)
375+
assertThat(threadPoolMergeExecutorService.getMergeTasksQueue().getAvailableBudget(), is(103_000L));
376+
availableBudget.set(103_000L);
377+
}
378+
});
379+
// tasks that wait on the latch will hold up the budget
380+
CountDownLatch blockMergeTasks = new CountDownLatch(1);
381+
// submit merge tasks that don't finish, in order to deplete the available budget
382+
while (submittedMergesCount > 0) {
383+
ThreadPoolMergeScheduler.MergeTask mergeTask = mock(ThreadPoolMergeScheduler.MergeTask.class);
384+
when(mergeTask.supportsIOThrottling()).thenReturn(randomBoolean());
385+
doAnswer(mock -> {
386+
Schedule schedule = randomFrom(Schedule.values());
387+
if (schedule == BACKLOG) {
388+
testThreadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
389+
// re-enqueue backlogged merge task
390+
threadPoolMergeExecutorService.reEnqueueBackloggedMergeTask(mergeTask);
391+
});
392+
}
393+
return schedule;
394+
}).when(mergeTask).schedule();
395+
// let some task complete, which will NOT hold up any budget
396+
boolean letTaskComplete = randomBoolean();
397+
if (letTaskComplete) {
398+
// this task will NOT hold up any budget because it runs quickly (it is not blocked)
399+
when(mergeTask.estimatedRemainingMergeSize()).thenReturn(randomLongBetween(1_000L, 10_000L));
400+
} else {
401+
// make sure the available budget is expended
402+
if (submittedMergesCount == 1) {
403+
when(mergeTask.estimatedRemainingMergeSize()).thenReturn(availableBudget.get());
404+
} else {
405+
long taskBudget = randomLongBetween(smallMergeTaskSize + 1L, availableBudget.get());
406+
when(mergeTask.estimatedRemainingMergeSize()).thenReturn(taskBudget);
407+
availableBudget.set(availableBudget.get() - taskBudget);
408+
}
409+
submittedMergesCount--;
410+
// this task will hold up budget because it blocks when it runs (to simulate it running for a long time)
411+
doAnswer(mock -> {
412+
// wait to be signalled before completing (this holds up budget)
413+
blockMergeTasks.await();
414+
return null;
415+
}).when(mergeTask).run();
416+
doAnswer(mock -> {
417+
// wait to be signalled before completing (this holds up budget)
418+
blockMergeTasks.await();
419+
return null;
420+
}).when(mergeTask).abort();
421+
}
422+
threadPoolMergeExecutorService.submitMergeTask(mergeTask);
423+
}
424+
// running (or aborting) merge tasks have depleted the available budget
425+
assertBusy(() -> {
426+
assertThat(threadPoolMergeExecutorService.getMergeTasksQueue().getAvailableBudget(), is(0L));
427+
});
428+
int moreMergeTasksCount = randomIntBetween(1, 10);
429+
// any new merge tasks will only be enqueued but not actually run, until more budget becomes available
430+
for (int i = 0; i < moreMergeTasksCount; i++) {
431+
ThreadPoolMergeScheduler.MergeTask mergeTask = mock(ThreadPoolMergeScheduler.MergeTask.class);
432+
when(mergeTask.supportsIOThrottling()).thenReturn(randomBoolean());
433+
// even "small" merge tasks cannot run because the available budget is "0"
434+
when(mergeTask.estimatedRemainingMergeSize()).thenReturn(randomLongBetween(1L, smallMergeTaskSize));
435+
when(mergeTask.schedule()).thenReturn(RUN);
436+
threadPoolMergeExecutorService.submitMergeTask(mergeTask);
437+
}
438+
assertBusy(() -> {
439+
assertThat(threadPoolMergeExecutorService.getMergeTasksQueue().getAvailableBudget(), is(0L));
440+
// all the newly submitted merge tasks have been enqueued, none executed
441+
assertThat(threadPoolMergeExecutorService.getMergeTasksQueueLength(), greaterThanOrEqualTo(moreMergeTasksCount));
442+
});
443+
// resume blocked merge tasks which should resume everything back
444+
blockMergeTasks.countDown();
445+
assertBusy(() -> {
446+
assertThat(threadPoolMergeExecutorService.getRunningMergeTasks().size(), is(0));
447+
assertThat(threadPoolMergeExecutorService.getMergeTasksQueueLength(), is(0));
448+
if (aHasMoreSpace) {
449+
assertThat(threadPoolMergeExecutorService.getMergeTasksQueue().getAvailableBudget(), is(112_500L));
450+
} else {
451+
assertThat(threadPoolMergeExecutorService.getMergeTasksQueue().getAvailableBudget(), is(103_000L));
452+
}
453+
});
454+
}
455+
}
456+
}
457+
}
317458
}

0 commit comments

Comments
 (0)