Skip to content

Commit 3c635b9

Browse files
Done
1 parent 32d0546 commit 3c635b9

File tree

2 files changed

+23
-9
lines changed

2 files changed

+23
-9
lines changed

server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -244,8 +244,8 @@ private void checkMergeTaskThrottling() {
244244
// both currently running and enqueued merge tasks are considered "active" for throttling purposes
245245
int activeMerges = (int) (submittedMergesCount - doneMergesCount);
246246
if (activeMerges > configuredMaxMergeCount
247-
// only throttle indexing if disk IO is un-throttled, and we still can't keep up with the merge load
248-
&& threadPoolMergeExecutorService.usingMaxTargetIORateBytesPerSec()
247+
// only throttle indexing if disk IO is un-throttled (if enabled), and we still can't keep up with the merge load
248+
&& (config.isAutoThrottle() == false || threadPoolMergeExecutorService.usingMaxTargetIORateBytesPerSec())
249249
&& shouldThrottleIncomingMerges.get() == false) {
250250
// maybe enable merge task throttling
251251
synchronized (shouldThrottleIncomingMerges) {

server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,15 @@ public void testSimpleMergeTaskReEnqueueingBySize() {
196196
}
197197
}
198198

199-
public void testIndexingThrottlingWhenSubmittingMerges() {
199+
public void testIndexingThrottlingWhenSubmittingMergesWithDiskIOThrottlingEnabled() {
200+
testIndexingThrottlingWhenSubmittingMerges(true);
201+
}
202+
203+
public void testIndexingThrottlingWhenSubmittingMergesWithDiskIOThrottlingDisabled() {
204+
testIndexingThrottlingWhenSubmittingMerges(false);
205+
}
206+
207+
private void testIndexingThrottlingWhenSubmittingMerges(boolean withDiskIOThrottlingEnabled) {
200208
final int maxThreadCount = randomIntBetween(1, 5);
201209
// settings validation requires maxMergeCount >= maxThreadCount
202210
final int maxMergeCount = maxThreadCount + randomIntBetween(0, 5);
@@ -209,6 +217,7 @@ public void testIndexingThrottlingWhenSubmittingMerges() {
209217
Settings mergeSchedulerSettings = Settings.builder()
210218
.put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), maxThreadCount)
211219
.put(MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING.getKey(), maxMergeCount)
220+
.put(MergeSchedulerConfig.AUTO_THROTTLE_SETTING.getKey(), withDiskIOThrottlingEnabled)
212221
.build();
213222
TestThreadPoolMergeScheduler threadPoolMergeScheduler = new TestThreadPoolMergeScheduler(
214223
new ShardId("index", "_na_", 1),
@@ -224,20 +233,20 @@ public void testIndexingThrottlingWhenSubmittingMerges() {
224233
while (submittedMerges < mergesToSubmit - 1) {
225234
isUsingMaxTargetIORate.set(randomBoolean());
226235
if (submittedMergeTasks.isEmpty() == false && randomBoolean()) {
227-
// maybe schedule one submitted merge
236+
// maybe schedule one of the submitted merges (but still it's not run)
228237
MergeTask mergeTask = randomFrom(submittedMergeTasks);
229238
submittedMergeTasks.remove(mergeTask);
230239
mergeTask.schedule();
231240
} else {
232-
// submit one merge
241+
// submit one new merge
233242
MergeSource mergeSource = mock(MergeSource.class);
234243
OneMerge oneMerge = mock(OneMerge.class);
235244
when(oneMerge.getStoreMergeInfo()).thenReturn(getNewMergeInfo(randomLongBetween(1L, 10L)));
236245
when(oneMerge.getMergeProgress()).thenReturn(new MergePolicy.OneMergeProgress());
237246
when(mergeSource.getNextMerge()).thenReturn(oneMerge, (OneMerge) null);
238247
threadPoolMergeScheduler.merge(mergeSource, randomFrom(MergeTrigger.values()));
239248
submittedMerges++;
240-
if (isUsingMaxTargetIORate.get() && submittedMerges > maxMergeCount) {
249+
if ((isUsingMaxTargetIORate.get() || withDiskIOThrottlingEnabled == false) && submittedMerges > maxMergeCount) {
241250
expectIndexThrottling = true;
242251
} else if (submittedMerges <= maxMergeCount) {
243252
expectIndexThrottling = false;
@@ -246,15 +255,20 @@ public void testIndexingThrottlingWhenSubmittingMerges() {
246255
// assert IO throttle state
247256
assertThat(threadPoolMergeScheduler.isIndexingThrottlingEnabled(), is(expectIndexThrottling));
248257
}
249-
// submit one last merge when IO throttling is at max value
250-
isUsingMaxTargetIORate.set(true);
258+
if (withDiskIOThrottlingEnabled) {
259+
// submit one last merge when IO throttling is at max value
260+
isUsingMaxTargetIORate.set(true);
261+
} else {
262+
// but if disk IO throttling is not enabled, indexing throttling should still be triggered
263+
isUsingMaxTargetIORate.set(randomBoolean());
264+
}
251265
MergeSource mergeSource = mock(MergeSource.class);
252266
OneMerge oneMerge = mock(OneMerge.class);
253267
when(oneMerge.getStoreMergeInfo()).thenReturn(getNewMergeInfo(randomLongBetween(1L, 10L)));
254268
when(oneMerge.getMergeProgress()).thenReturn(new MergePolicy.OneMergeProgress());
255269
when(mergeSource.getNextMerge()).thenReturn(oneMerge, (OneMerge) null);
256270
threadPoolMergeScheduler.merge(mergeSource, randomFrom(MergeTrigger.values()));
257-
// assert index throttling because IO throttling is at max value
271+
// assert indexing throttling state because IO throttling is at max value OR disk IO throttling is disabled
258272
assertThat(threadPoolMergeScheduler.isIndexingThrottlingEnabled(), is(true));
259273
}
260274

0 commit comments

Comments
 (0)