Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,10 @@ public class ThreadPoolMergeExecutorService {
private ThreadPoolMergeExecutorService(ThreadPool threadPool) {
this.executorService = threadPool.executor(ThreadPool.Names.MERGE);
this.maxConcurrentMerges = threadPool.info(ThreadPool.Names.MERGE).getMax();
this.concurrentMergesFloorLimitForThrottling = maxConcurrentMerges * 2;
this.concurrentMergesCeilLimitForThrottling = maxConcurrentMerges * 4;
// the intent here is to throttle down whenever we submit a task and no other task is running
this.concurrentMergesFloorLimitForThrottling = 2;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we put a comment here that this means to only decrease throttle rate when we submit a task and no other tasks are running?

this.concurrentMergesCeilLimitForThrottling = maxConcurrentMerges * 2;
assert concurrentMergesFloorLimitForThrottling <= concurrentMergesCeilLimitForThrottling;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps assert that concurrentMergesCeilLimitForThrottling >= concurrentMergesFloorLimitForThrottling - I feel like it adds readabililty.


boolean submitMergeTask(MergeTask mergeTask) {
Expand Down Expand Up @@ -230,10 +232,10 @@ private static long newTargetIORateBytesPerSec(
);
} else if (currentlySubmittedIOThrottledMergeTasks > concurrentMergesCeilLimitForThrottling
&& currentTargetIORateBytesPerSec < MAX_IO_RATE.getBytes()) {
// increase target IO rate by 10% (capped)
// increase target IO rate by 20% (capped)
newTargetIORateBytesPerSec = Math.min(
MAX_IO_RATE.getBytes(),
currentTargetIORateBytesPerSec + currentTargetIORateBytesPerSec / 10L
currentTargetIORateBytesPerSec + currentTargetIORateBytesPerSec / 5L
);
} else {
newTargetIORateBytesPerSec = currentTargetIORateBytesPerSec;
Expand Down Expand Up @@ -295,14 +297,4 @@ long getTargetIORateBytesPerSec() {
int getMaxConcurrentMerges() {
return maxConcurrentMerges;
}

// exposed for tests
int getConcurrentMergesFloorLimitForThrottling() {
return concurrentMergesFloorLimitForThrottling;
}

// exposed for tests
int getConcurrentMergesCeilLimitForThrottling() {
return concurrentMergesCeilLimitForThrottling;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -219,18 +219,16 @@ public void testTargetIORateChangesWhenSubmittingMergeTasks() throws Exception {
}
long newIORate = threadPoolMergeExecutorService.getTargetIORateBytesPerSec();
if (supportsIOThrottling) {
if (submittedIOThrottledMergeTasks.get() < threadPoolMergeExecutorService
.getConcurrentMergesFloorLimitForThrottling()) {
// assert the IO rate decreases, with a floor limit, when there are few merge tasks enqueued
if (submittedIOThrottledMergeTasks.get() < 2) {
// assert the IO rate decreases, with a floor limit, when there is just a single merge task running
assertThat(newIORate, either(is(MIN_IO_RATE.getBytes())).or(lessThan(currentIORate)));
} else if (submittedIOThrottledMergeTasks.get() > threadPoolMergeExecutorService
.getConcurrentMergesCeilLimitForThrottling()) {
// assert the IO rate increases, with a ceiling limit, when there are many merge tasks enqueued
assertThat(newIORate, either(is(MAX_IO_RATE.getBytes())).or(greaterThan(currentIORate)));
} else {
// assert the IO rate does NOT change when there are a couple of merge tasks enqueued
assertThat(newIORate, equalTo(currentIORate));
}
} else if (submittedIOThrottledMergeTasks.get() > threadPoolMergeExecutorService.getMaxConcurrentMerges() * 2) {
// assert the IO rate increases, with a ceiling limit, when there are many merge tasks enqueued
assertThat(newIORate, either(is(MAX_IO_RATE.getBytes())).or(greaterThan(currentIORate)));
} else {
// assert the IO rate does NOT change when there are a couple of merge tasks enqueued
assertThat(newIORate, equalTo(currentIORate));
}
} else {
// assert the IO rate does not change, when the merge task doesn't support IO throttling
assertThat(newIORate, equalTo(currentIORate));
Expand Down Expand Up @@ -375,17 +373,16 @@ private void testIORateAdjustedForSubmittedTasks(
initialTasksCounter--;
threadPoolMergeExecutorService.submitMergeTask(mergeTask);
long newTargetIORateLimit = threadPoolMergeExecutorService.getTargetIORateBytesPerSec();
if (currentlySubmittedMergeTaskCount.get() < threadPoolMergeExecutorService.getConcurrentMergesFloorLimitForThrottling()) {
if (currentlySubmittedMergeTaskCount.get() < 2) {
// assert the IO rate decreases, with a floor limit, when there are few merge tasks enqueued
assertThat(newTargetIORateLimit, either(is(MIN_IO_RATE.getBytes())).or(lessThan(targetIORateLimit.get())));
} else if (currentlySubmittedMergeTaskCount.get() > threadPoolMergeExecutorService
.getConcurrentMergesCeilLimitForThrottling()) {
// assert the IO rate increases, with a ceiling limit, when there are many merge tasks enqueued
assertThat(newTargetIORateLimit, either(is(MAX_IO_RATE.getBytes())).or(greaterThan(targetIORateLimit.get())));
} else {
// assert the IO rate does change, when there are a couple of merge tasks enqueued
assertThat(newTargetIORateLimit, equalTo(targetIORateLimit.get()));
}
} else if (currentlySubmittedMergeTaskCount.get() > threadPoolMergeExecutorService.getMaxConcurrentMerges() * 2) {
// assert the IO rate increases, with a ceiling limit, when there are many merge tasks enqueued
assertThat(newTargetIORateLimit, either(is(MAX_IO_RATE.getBytes())).or(greaterThan(targetIORateLimit.get())));
} else {
// assert the IO rate does not change, when there are a couple of merge tasks enqueued
assertThat(newTargetIORateLimit, equalTo(targetIORateLimit.get()));
}
targetIORateLimit.set(newTargetIORateLimit);
} else {
// execute already submitted merge task
Expand Down