Skip to content

Commit f0b72fe

Browse files
wrap for merge in the executor merge scheduler
1 parent f5a1a8d commit f0b72fe

File tree

1 file changed

+28
-0
lines changed

1 file changed

+28
-0
lines changed

server/src/main/java/org/elasticsearch/index/engine/ExecutorMergeScheduler.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,12 @@
1414
import org.apache.lucene.index.MergeRateLimiter;
1515
import org.apache.lucene.index.MergeScheduler;
1616
import org.apache.lucene.index.MergeTrigger;
17+
import org.apache.lucene.store.Directory;
18+
import org.apache.lucene.store.FilterDirectory;
19+
import org.apache.lucene.store.IOContext;
20+
import org.apache.lucene.store.IndexOutput;
21+
import org.apache.lucene.store.RateLimitedIndexOutput;
22+
import org.apache.lucene.store.RateLimiter;
1723
import org.elasticsearch.common.logging.Loggers;
1824
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
1925
import org.elasticsearch.core.TimeValue;
@@ -136,6 +142,28 @@ protected void doMerge(MergeSource mergeSource, MergePolicy.OneMerge merge) thro
136142
mergeSource.merge(merge);
137143
}
138144

145+
@Override
146+
public Directory wrapForMerge(MergePolicy.OneMerge merge, Directory in) {
147+
// Return a wrapped Directory which has rate-limited output.
148+
// Note: the rate limiter is only per thread. So, if there are multiple merge threads running
149+
// and throttling is required, each thread will be throttled independently.
150+
// The implication of this, is that the total IO rate could be higher than the target rate.
151+
RateLimiter rateLimiter = onGoingMergeRateLimiter.get();
152+
return new FilterDirectory(in) {
153+
@Override
154+
public IndexOutput createOutput(String name, IOContext context) throws IOException {
155+
ensureOpen();
156+
157+
// This Directory is only supposed to be used during merging,
158+
// so all writes should have MERGE context, else there is a bug
159+
// somewhere that is failing to pass down the right IOContext:
160+
assert context.context() == IOContext.Context.MERGE : "got context=" + context.context();
161+
162+
return new RateLimitedIndexOutput(rateLimiter, in.createOutput(name, context));
163+
}
164+
};
165+
}
166+
139167
protected class MergeTask extends AbstractRunnable implements Comparable<MergeTask> {
140168
private final MergeSource mergeSource;
141169
private final OnGoingMerge onGoingMerge;

0 commit comments

Comments
 (0)