Skip to content

Commit 822501b

Browse files
testBackloggedMergeTasksAreAllExecutedExactlyOnce
1 parent 79ccc74 commit 822501b

File tree

3 files changed

+96
-4
lines changed

3 files changed

+96
-4
lines changed

server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,17 +36,30 @@ public class ThreadPoolMergeExecutorService {
3636
* Initial value for IO write rate limit of individual merge tasks when doAutoIOThrottle is true
3737
*/
3838
private static final ByteSizeValue START_IO_RATE = ByteSizeValue.ofMb(20L);
39+
/**
40+
* Total number of submitted merge tasks that support IO auto throttling and that have not yet been executed.
41+
* This includes merge tasks that are currently running and that are backlogged (by their respective merge schedulers).
42+
*/
3943
private final AtomicInteger currentlySubmittedIOThrottledMergeTasksCount = new AtomicInteger();
44+
/**
45+
* The merge tasks that are waiting execution. This does NOT include backlogged or currently executing merge tasks.
46+
* For instance, this can be empty while there are backlogged merge tasks awaiting re-enqueuing.
47+
*/
4048
private final PriorityBlockingQueue<MergeTask> queuedMergeTasks = new PriorityBlockingQueue<>();
41-
// the set of all merge tasks currently being executed by merge threads from the pool,
42-
// in order to be able to update the IO throttle rate of merge tasks also after they have started (while executing)
49+
/**
50+
* The set of all merge tasks currently being executed by merge threads from the pool.
51+
* These are tracked notably in order to be able to update their disk IO throttle rate, after they have started, while executing.
52+
*/
4353
private final Set<MergeTask> currentlyRunningMergeTasks = ConcurrentCollections.newConcurrentSet();
4454
/**
4555
* Current IO write throttle rate, in bytes per sec, that's in effect for all currently running merge tasks,
4656
* across all {@link ThreadPoolMergeScheduler}s that use this instance of the queue.
4757
*/
4858
private final AtomicLong targetIORateBytesPerSec = new AtomicLong(START_IO_RATE.getBytes());
4959
private final ExecutorService executorService;
60+
/**
61+
* The maximum number of concurrently running merges, given the number of threads in the pool.
62+
*/
5063
private final int maxConcurrentMerges;
5164

5265
public static @Nullable ThreadPoolMergeExecutorService maybeCreateThreadPoolMergeExecutorService(
@@ -202,6 +215,11 @@ private static long newTargetIORateBytesPerSec(long currentTargetIORateBytesPerS
202215
return ByteSizeValue.ofBytes(targetIORateBytesPerSec.get()).getMbFrac();
203216
}
204217

218+
// exposed for tests
219+
int getMaxConcurrentMerges() {
220+
return maxConcurrentMerges;
221+
}
222+
205223
public boolean allDone() {
206224
return queuedMergeTasks.isEmpty()
207225
&& currentlyRunningMergeTasks.isEmpty()

server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -311,7 +311,7 @@ boolean runNowOrBacklog() {
311311
@Override
312312
public int compareTo(MergeTask other) {
313313
// sort smaller merges first, so they are executed before larger ones
314-
return Long.compare(onGoingMerge.getMerge().estimatedMergeBytes, other.onGoingMerge.getMerge().estimatedMergeBytes);
314+
return Long.compare(estimatedMergeSize(), other.estimatedMergeSize());
315315
}
316316

317317
public boolean supportsIOThrottling() {
@@ -396,6 +396,10 @@ void abortOnGoingMerge() {
396396
doMerge(mergeSource, onGoingMerge.getMerge());
397397
}
398398

399+
long estimatedMergeSize() {
400+
return onGoingMerge.getMerge().estimatedMergeBytes;
401+
}
402+
399403
@Override
400404
public String toString() {
401405
return name + (onGoingMerge.getMerge().isAborted() ? " (aborted)" : "");

server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,22 @@
1010
package org.elasticsearch.index.engine;
1111

1212
import org.elasticsearch.common.settings.Settings;
13+
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
1314
import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
15+
import org.elasticsearch.common.util.concurrent.EsExecutors;
1416
import org.elasticsearch.index.IndexSettings;
1517
import org.elasticsearch.test.ESTestCase;
1618
import org.elasticsearch.test.IndexSettingsModule;
1719
import org.elasticsearch.threadpool.TestThreadPool;
1820
import org.elasticsearch.threadpool.ThreadPool;
1921
import org.junit.Before;
2022

23+
import java.util.Collection;
24+
import java.util.concurrent.CountDownLatch;
25+
26+
import static org.hamcrest.Matchers.equalTo;
27+
import static org.mockito.ArgumentMatchers.anyDouble;
28+
import static org.mockito.Mockito.doAnswer;
2129
import static org.mockito.Mockito.mock;
2230
import static org.mockito.Mockito.times;
2331
import static org.mockito.Mockito.verify;
@@ -37,7 +45,7 @@ public void setUpThreadPool() {
3745
}
3846

3947
public void testMergeTasksAreAbortedWhenThreadPoolIsShutdown() {
40-
final TestThreadPool testThreadPool = new TestThreadPool("test");
48+
TestThreadPool testThreadPool = new TestThreadPool("test");
4149
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = ThreadPoolMergeExecutorService
4250
.maybeCreateThreadPoolMergeExecutorService(
4351
testThreadPool,
@@ -57,4 +65,66 @@ public void testMergeTasksAreAbortedWhenThreadPoolIsShutdown() {
5765
verify(mergeTask, times(0)).run();
5866
assertTrue(threadPoolMergeExecutorService.allDone());
5967
}
68+
69+
public void testBackloggedMergeTasksAreAllExecutedExactlyOnce() throws Exception {
70+
int mergeExecutorThreadCount = randomIntBetween(1, 2);
71+
Settings settings = Settings.builder()
72+
.put(ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.getKey(), true)
73+
// results in few merge threads, in order to increase contention
74+
.put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), mergeExecutorThreadCount)
75+
.build();
76+
try (TestThreadPool testThreadPool = new TestThreadPool("test", settings)) {
77+
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = ThreadPoolMergeExecutorService
78+
.maybeCreateThreadPoolMergeExecutorService(testThreadPool, settings);
79+
assertNotNull(threadPoolMergeExecutorService);
80+
assertThat(threadPoolMergeExecutorService.getMaxConcurrentMerges(), equalTo(mergeExecutorThreadCount));
81+
int mergeTaskCount = randomIntBetween(3, 30);
82+
CountDownLatch mergeTasksDoneLatch = new CountDownLatch(mergeTaskCount);
83+
CountDownLatch mergeTasksReadyLatch = new CountDownLatch(mergeTaskCount);
84+
CountDownLatch submitTaskLatch = new CountDownLatch(1);
85+
Collection<ThreadPoolMergeScheduler.MergeTask> generatedMergeTasks = ConcurrentCollections.newConcurrentSet();
86+
for (int i = 0; i < mergeTaskCount; i++) {
87+
new Thread(()-> {
88+
ThreadPoolMergeScheduler.MergeTask mergeTask = mock(ThreadPoolMergeScheduler.MergeTask.class);
89+
when(mergeTask.isRunning()).thenReturn(false);
90+
boolean supportsIOThrottling = randomBoolean();
91+
when(mergeTask.supportsIOThrottling()).thenReturn(supportsIOThrottling);
92+
long mergeSize = randomNonNegativeLong();
93+
when(mergeTask.estimatedMergeSize()).thenReturn(mergeSize);
94+
doAnswer(mock -> {
95+
// each individual merge task can either "run" or be "backlogged"
96+
boolean runNowOrBacklog = randomBoolean();
97+
if (runNowOrBacklog) {
98+
mergeTasksDoneLatch.countDown();
99+
} else {
100+
testThreadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
101+
// reenqueue backlogged merge task
102+
threadPoolMergeExecutorService.reEnqueueBackloggedMergeTask(mergeTask);
103+
});
104+
}
105+
return runNowOrBacklog;
106+
}).when(mergeTask).runNowOrBacklog();
107+
generatedMergeTasks.add(mergeTask);
108+
mergeTasksReadyLatch.countDown();
109+
// make all threads submit merge tasks at once
110+
safeAwait(submitTaskLatch);
111+
threadPoolMergeExecutorService.submitMergeTask(mergeTask);
112+
}).start();
113+
}
114+
safeAwait(mergeTasksReadyLatch);
115+
submitTaskLatch.countDown();
116+
safeAwait(mergeTasksDoneLatch);
117+
assertBusy(() -> {
118+
for (ThreadPoolMergeScheduler.MergeTask mergeTask : generatedMergeTasks) {
119+
verify(mergeTask, times(1)).run();
120+
if (mergeTask.supportsIOThrottling()) {
121+
verify(mergeTask).setIORateLimit(anyDouble());
122+
} else {
123+
verify(mergeTask, times(0)).setIORateLimit(anyDouble());
124+
}
125+
}
126+
threadPoolMergeExecutorService.allDone();
127+
});
128+
}
129+
}
60130
}

0 commit comments

Comments
 (0)