Skip to content

Commit 03b705e

Browse files
testBackloggedMergeTasksDoNotHoldUpBudget
1 parent d1654e0 commit 03b705e

File tree

3 files changed

+151
-11
lines changed

3 files changed

+151
-11
lines changed

server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ private ThreadPoolMergeExecutorService(ThreadPool threadPool, ClusterSettings cl
236236
}
237237

238238
boolean submitMergeTask(MergeTask mergeTask) {
239-
assert mergeTask.isRunning() == false;
239+
assert mergeTask.hasStartedRunning() == false;
240240
// first enqueue the runnable that runs exactly one merge task (the smallest it can find)
241241
if (enqueueMergeTaskExecution() == false) {
242242
// if the thread pool cannot run the merge, just abort it
@@ -275,7 +275,7 @@ boolean submitMergeTask(MergeTask mergeTask) {
275275
}
276276

277277
void reEnqueueBackloggedMergeTask(MergeTask mergeTask) {
278-
assert mergeTask.isRunning() == false;
278+
assert mergeTask.hasStartedRunning() == false;
279279
enqueueMergeTask(mergeTask);
280280
}
281281

@@ -347,7 +347,7 @@ private boolean enqueueMergeTaskExecution() {
347347
}
348348

349349
private void runMergeTask(MergeTask mergeTask) {
350-
assert mergeTask.isRunning() == false;
350+
assert mergeTask.hasStartedRunning() == false;
351351
boolean added = runningMergeTasks.add(mergeTask);
352352
assert added : "starting merge task [" + mergeTask + "] registered as already running";
353353
try {
@@ -366,7 +366,7 @@ private void runMergeTask(MergeTask mergeTask) {
366366
}
367367

368368
private void abortMergeTask(MergeTask mergeTask) {
369-
assert mergeTask.isRunning() == false;
369+
assert mergeTask.hasStartedRunning() == false;
370370
assert runningMergeTasks.contains(mergeTask) == false;
371371
try {
372372
mergeTask.abort();

server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,7 @@ private void checkMergeTaskThrottling() {
266266
// exposed for tests
267267
// synchronized so that {@code #closed}, {@code #runningMergeTasks} and {@code #backloggedMergeTasks} are modified atomically
268268
synchronized Schedule schedule(MergeTask mergeTask) {
269-
assert mergeTask.isRunning() == false;
269+
assert mergeTask.hasStartedRunning() == false;
270270
if (closed) {
271271
// do not run or backlog tasks when closing the merge scheduler, instead abort them
272272
return Schedule.ABORT;
@@ -280,7 +280,7 @@ synchronized Schedule schedule(MergeTask mergeTask) {
280280
assert added : "starting merge task [" + mergeTask + "] registered as already running";
281281
return Schedule.RUN;
282282
} else {
283-
assert mergeTask.isRunning() == false;
283+
assert mergeTask.hasStartedRunning() == false;
284284
backloggedMergeTasks.add(mergeTask);
285285
return Schedule.BACKLOG;
286286
}
@@ -404,7 +404,11 @@ public void setIORateLimit(long ioRateLimitBytesPerSec) {
404404
this.rateLimiter.setMBPerSec(ByteSizeValue.ofBytes(ioRateLimitBytesPerSec).getMbFrac());
405405
}
406406

407-
public boolean isRunning() {
407+
/**
408+
* Returns {@code true} if this task is currently running, or was run in the past.
409+
* An aborted task (see {@link #abort()}) is considered as NOT run.
410+
*/
411+
public boolean hasStartedRunning() {
408412
boolean isRunning = mergeStartTimeNS.get() > 0L;
409413
assert isRunning != false || rateLimiter.getTotalBytesWritten() == 0L;
410414
return isRunning;
@@ -418,7 +422,7 @@ public boolean isRunning() {
418422
*/
419423
@Override
420424
public void run() {
421-
assert isRunning() == false;
425+
assert hasStartedRunning() == false;
422426
assert ThreadPoolMergeScheduler.this.runningMergeTasks.containsKey(onGoingMerge.getMerge())
423427
: "runNowOrBacklog must be invoked before actually running the merge task";
424428
try {
@@ -483,7 +487,7 @@ public void run() {
483487
* (by the {@link org.apache.lucene.index.IndexWriter}) to any subsequent merges.
484488
*/
485489
void abort() {
486-
assert isRunning() == false;
490+
assert hasStartedRunning() == false;
487491
assert ThreadPoolMergeScheduler.this.runningMergeTasks.containsKey(onGoingMerge.getMerge()) == false
488492
: "cannot abort a merge task that's already running";
489493
if (verbose()) {
@@ -512,6 +516,12 @@ void abort() {
512516
}
513517
}
514518

519+
/**
520+
* Before the merge task started running, this returns the estimated required disk space for the merge to complete
521+
* (i.e. the estimated disk space size of the resulting segment following the merge).
522+
* While the merge is running, the returned estimation is updated to take into account the data that's already been written.
523+
* After the merge completes, the estimation returned here should ideally be close to "0".
524+
*/
515525
long estimatedRemainingMergeSize() {
516526
// TODO is it possible that `estimatedMergeBytes` be `0` for correctly initialize merges,
517527
// or is it always the case that if `estimatedMergeBytes` is `0` that means that the merge has not yet been initialized?

server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceDiskSpaceTests.java

Lines changed: 132 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,12 @@
4343
import java.util.concurrent.atomic.AtomicLong;
4444
import java.util.concurrent.atomic.AtomicReference;
4545

46+
import static org.elasticsearch.index.engine.ThreadPoolMergeScheduler.Schedule.ABORT;
4647
import static org.elasticsearch.index.engine.ThreadPoolMergeScheduler.Schedule.BACKLOG;
4748
import static org.elasticsearch.index.engine.ThreadPoolMergeScheduler.Schedule.RUN;
49+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
4850
import static org.hamcrest.Matchers.is;
51+
import static org.mockito.Mockito.atLeastOnce;
4952
import static org.mockito.Mockito.doAnswer;
5053
import static org.mockito.Mockito.mock;
5154
import static org.mockito.Mockito.times;
@@ -210,6 +213,7 @@ public void testAvailableDiskSpaceMonitorSingleUpdateWithDefaultSettings() throw
210213
public void testAvailableDiskSpaceMonitorSettingsUpdate() throws Exception {
211214
Settings settings = Settings.builder()
212215
.put(this.settings)
216+
// the default of "5s" slows down testing
213217
.put(ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING.getKey(), "50ms")
214218
.build();
215219
ClusterSettings clusterSettings = ClusterSettings.createBuiltInClusterSettings(settings);
@@ -327,6 +331,128 @@ public void testAvailableDiskSpaceMonitorSettingsUpdate() throws Exception {
327331
}
328332
}
329333

334+
public void testBackloggedMergeTasksDoNotHoldUpBudget() throws Exception {
335+
aFileStore.totalSpace = randomLongBetween(100L, 10_000_000L);
336+
bFileStore.totalSpace = randomLongBetween(100L, 10_000_000L);
337+
aFileStore.usableSpace = randomLongBetween(10L, aFileStore.totalSpace);
338+
bFileStore.usableSpace = randomLongBetween(10L, bFileStore.totalSpace);
339+
boolean aHasMoreSpace = aFileStore.usableSpace > bFileStore.usableSpace;
340+
Settings settings = Settings.builder()
341+
.put(this.settings)
342+
// the default of "5s" slows down testing
343+
.put(ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING.getKey(), "30ms")
344+
.build();
345+
try (TestThreadPool testThreadPool = new TestThreadPool("test", settings)) {
346+
try (NodeEnvironment nodeEnv = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings))) {
347+
try (
348+
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = ThreadPoolMergeExecutorService
349+
.maybeCreateThreadPoolMergeExecutorService(
350+
testThreadPool,
351+
ClusterSettings.createBuiltInClusterSettings(settings),
352+
nodeEnv
353+
)
354+
) {
355+
assert threadPoolMergeExecutorService != null;
356+
assertThat(threadPoolMergeExecutorService.getMaxConcurrentMerges(), greaterThanOrEqualTo(1));
357+
// assumes the 5% default value for the remaining space watermark
358+
final AtomicLong expectedAvailableBudget = new AtomicLong(
359+
aHasMoreSpace
360+
? aFileStore.usableSpace - aFileStore.totalSpace / 20
361+
: bFileStore.usableSpace - bFileStore.totalSpace / 20
362+
);
363+
assertBusy(
364+
() -> assertThat(
365+
threadPoolMergeExecutorService.getDiskSpaceAvailableForNewMergeTasks(),
366+
is(expectedAvailableBudget.get())
367+
)
368+
);
369+
long backloggedMergeTaskDiskSpaceBudget = randomLongBetween(1L, expectedAvailableBudget.get());
370+
CountDownLatch blockMergeTaskLatch = new CountDownLatch(1);
371+
// take care that there's still at least one thread available to run merges
372+
int maxBlockingTasksToSubmit = threadPoolMergeExecutorService.getMaxConcurrentMerges() - 1;
373+
// first maybe submit some running or aborting merge tasks that hold up some budget while running or aborting
374+
while (expectedAvailableBudget.get() - backloggedMergeTaskDiskSpaceBudget > 0L
375+
&& maxBlockingTasksToSubmit-- > 0
376+
&& randomBoolean()) {
377+
ThreadPoolMergeScheduler.MergeTask mergeTask = mock(ThreadPoolMergeScheduler.MergeTask.class);
378+
long taskBudget = randomLongBetween(1L, expectedAvailableBudget.get() - backloggedMergeTaskDiskSpaceBudget);
379+
when(mergeTask.estimatedRemainingMergeSize()).thenReturn(taskBudget);
380+
when(mergeTask.schedule()).thenReturn(randomFrom(RUN, ABORT));
381+
expectedAvailableBudget.set(expectedAvailableBudget.get() - taskBudget);
382+
// this task will hold up budget because it blocks when it runs (to simulate it running for a long time)
383+
doAnswer(mock -> {
384+
// wait to be signalled before completing (this holds up budget)
385+
blockMergeTaskLatch.await();
386+
return null;
387+
}).when(mergeTask).run();
388+
doAnswer(mock -> {
389+
// wait to be signalled before completing (this holds up budget)
390+
blockMergeTaskLatch.await();
391+
return null;
392+
}).when(mergeTask).abort();
393+
threadPoolMergeExecutorService.submitMergeTask(mergeTask);
394+
}
395+
assertBusy(
396+
() -> assertThat(
397+
threadPoolMergeExecutorService.getDiskSpaceAvailableForNewMergeTasks(),
398+
is(expectedAvailableBudget.get())
399+
)
400+
);
401+
// submit some backlogging merge tasks which should NOT hold up any budget
402+
List<ThreadPoolMergeScheduler.MergeTask> backloggingMergeTasks = new ArrayList<>();
403+
int backloggingTaskCount = randomIntBetween(1, 5);
404+
while (backloggingTaskCount-- > 0) {
405+
ThreadPoolMergeScheduler.MergeTask mergeTask = mock(ThreadPoolMergeScheduler.MergeTask.class);
406+
long taskBudget = randomLongBetween(1L, backloggedMergeTaskDiskSpaceBudget);
407+
when(mergeTask.estimatedRemainingMergeSize()).thenReturn(taskBudget);
408+
doAnswer(mock -> {
409+
// maybe re-enqueue backlogged merge task
410+
if (randomBoolean()) {
411+
testThreadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
412+
threadPoolMergeExecutorService.reEnqueueBackloggedMergeTask(mergeTask);
413+
});
414+
}
415+
// always backlog these merge tasks
416+
return BACKLOG;
417+
}).when(mergeTask).schedule();
418+
threadPoolMergeExecutorService.submitMergeTask(mergeTask);
419+
backloggingMergeTasks.add(mergeTask);
420+
}
421+
// assert all backlogging merge tasks have been scheduled and possibly re-enqueued, none run and none aborted,
422+
// AND the available budget is intact
423+
assertBusy(
424+
() -> {
425+
for (ThreadPoolMergeScheduler.MergeTask mergeTask : backloggingMergeTasks) {
426+
verify(mergeTask, atLeastOnce()).schedule();
427+
}
428+
for (ThreadPoolMergeScheduler.MergeTask mergeTask : backloggingMergeTasks) {
429+
verify(mergeTask, times(0)).run();
430+
verify(mergeTask, times(0)).abort();
431+
}
432+
// budget hasn't changed!
433+
assertThat(
434+
threadPoolMergeExecutorService.getDiskSpaceAvailableForNewMergeTasks(),
435+
is(expectedAvailableBudget.get())
436+
);
437+
}
438+
);
439+
// double check that submitting a runnable merge task under budget works correctly
440+
ThreadPoolMergeScheduler.MergeTask mergeTask = mock(ThreadPoolMergeScheduler.MergeTask.class);
441+
long taskBudget = randomLongBetween(1L, backloggedMergeTaskDiskSpaceBudget);
442+
when(mergeTask.estimatedRemainingMergeSize()).thenReturn(taskBudget);
443+
when(mergeTask.schedule()).thenReturn(RUN);
444+
threadPoolMergeExecutorService.submitMergeTask(mergeTask);
445+
assertBusy(() -> {
446+
verify(mergeTask).schedule();
447+
verify(mergeTask).run();
448+
});
449+
// let the test finish
450+
blockMergeTaskLatch.countDown();
451+
}
452+
}
453+
}
454+
}
455+
330456
public void testUnavailableBudgetBlocksNewMergeTasksFromStartingExecution() throws Exception {
331457
int mergeExecutorThreadCount = randomIntBetween(3, 7);
332458
// fewer merge tasks than pool threads so that there's always a free thread available
@@ -335,9 +461,9 @@ public void testUnavailableBudgetBlocksNewMergeTasksFromStartingExecution() thro
335461
.put(this.settings)
336462
.put(ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.getKey(), true)
337463
.put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), mergeExecutorThreadCount)
464+
// the default of "5s" slows down testing
338465
.put(ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING.getKey(), "30ms")
339466
.build();
340-
ClusterSettings clusterSettings = ClusterSettings.createBuiltInClusterSettings(settings);
341467
aFileStore.totalSpace = 150_000L;
342468
bFileStore.totalSpace = 140_000L;
343469
boolean aHasMoreSpace = randomBoolean();
@@ -354,7 +480,11 @@ public void testUnavailableBudgetBlocksNewMergeTasksFromStartingExecution() thro
354480
try (NodeEnvironment nodeEnv = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings))) {
355481
try (
356482
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = ThreadPoolMergeExecutorService
357-
.maybeCreateThreadPoolMergeExecutorService(testThreadPool, clusterSettings, nodeEnv)
483+
.maybeCreateThreadPoolMergeExecutorService(
484+
testThreadPool,
485+
ClusterSettings.createBuiltInClusterSettings(settings),
486+
nodeEnv
487+
)
358488
) {
359489
assert threadPoolMergeExecutorService != null;
360490
// wait for the budget to be updated from the available disk space

0 commit comments

Comments
 (0)