1010package org .elasticsearch .index .engine ;
1111
1212import org .elasticsearch .common .settings .Settings ;
13+ import org .elasticsearch .common .unit .ByteSizeValue ;
1314import org .elasticsearch .common .util .concurrent .ConcurrentCollections ;
1415import org .elasticsearch .core .Nullable ;
1516import org .elasticsearch .index .engine .ThreadPoolMergeScheduler .MergeTask ;
1920import java .util .concurrent .ExecutorService ;
2021import java .util .concurrent .PriorityBlockingQueue ;
2122import java .util .concurrent .atomic .AtomicInteger ;
23+ import java .util .concurrent .atomic .AtomicLong ;
2224import java .util .concurrent .atomic .AtomicReference ;
2325
2426public class ThreadPoolMergeQueue {
2527 /**
26- * Floor for IO write rate limit (we will never go any lower than this)
28+ * Floor for IO write rate limit of individual merge tasks (we will never go any lower than this)
2729 */
28- private static final double MIN_MERGE_MB_PER_SEC = 5.0 ;
30+ private static final ByteSizeValue MIN_IO_RATE = ByteSizeValue . ofMb ( 5L ) ;
2931 /**
30- * Ceiling for IO write rate limit (we will never go any higher than this)
32+ * Ceiling for IO write rate limit of individual merge tasks (we will never go any higher than this)
3133 */
32- private static final double MAX_MERGE_MB_PER_SEC = 10240.0 ;
34+ private static final ByteSizeValue MAX_IO_RATE = ByteSizeValue . ofMb ( 10240L ) ;
3335 /**
34- * Initial value for IO write rate limit when doAutoIOThrottle is true
36+ * Initial value for IO write rate limit of individual merge tasks when doAutoIOThrottle is true
3537 */
36- private static final double START_MB_PER_SEC = 20.0 ;
38+ private static final ByteSizeValue START_IO_RATE = ByteSizeValue . ofMb ( 20L ) ;
3739 private final AtomicInteger activeIOThrottledMergeTasksCount = new AtomicInteger ();
3840 private final PriorityBlockingQueue <MergeTask > queuedMergeTasks = new PriorityBlockingQueue <>();
3941 // the set of all merge tasks currently being executed by merge threads from the pool,
4042 // in order to be able to update the IO throttle rate of merge tasks also after they have started (while executing)
4143 private final Set <MergeTask > currentlyRunningMergeTasks = ConcurrentCollections .newConcurrentSet ();
4244 /**
43- * Current IO write throttle rate, for all merge, across all merge schedulers (shards) on the node
45+ * Current IO write throttle rate, in bytes per sec, that's in effect for all currently running merge tasks,
46+ * across all {@link ThreadPoolMergeScheduler}s that use this instance of the queue.
4447 */
45- private final AtomicReference < Double > targetMBPerSec = new AtomicReference <>( START_MB_PER_SEC );
48+ private final AtomicLong targetIORateBytesPerSec = new AtomicLong ( START_IO_RATE . getBytes () );
4649 private final ExecutorService executorService ;
4750 private final int maxConcurrentMerges ;
4851
@@ -110,7 +113,7 @@ private void runMergeTask(MergeTask mergeTask) {
110113 assert added : "starting merge task [" + mergeTask + "] registered as already running" ;
111114 try {
112115 if (mergeTask .supportsIOThrottling ()) {
113- mergeTask .setIORateLimit (targetMBPerSec . get ());
116+ mergeTask .setIORateLimit (ByteSizeValue . ofBytes ( targetIORateBytesPerSec . get ()). getMbFrac ());
114117 }
115118 mergeTask .run ();
116119 } finally {
@@ -124,35 +127,48 @@ private void runMergeTask(MergeTask mergeTask) {
124127 }
125128
126129 private void maybeUpdateTargetMBPerSec () {
127- double currentTargetMBPerSec = targetMBPerSec .get ();
128- double newTargetMBPerSec = newTargetMBPerSec (currentTargetMBPerSec , activeIOThrottledMergeTasksCount .get (), maxConcurrentMerges );
129- if (currentTargetMBPerSec != newTargetMBPerSec && targetMBPerSec .compareAndSet (currentTargetMBPerSec , newTargetMBPerSec )) {
130+ long currentTargetIORateBytesPerSec = targetIORateBytesPerSec .get ();
131+ long newTargetIORateBytesPerSec = newTargetIORateBytesPerSec (
132+ currentTargetIORateBytesPerSec ,
133+ activeIOThrottledMergeTasksCount .get (),
134+ maxConcurrentMerges
135+ );
136+ if (currentTargetIORateBytesPerSec != newTargetIORateBytesPerSec
137+ && targetIORateBytesPerSec .compareAndSet (currentTargetIORateBytesPerSec , newTargetIORateBytesPerSec )) {
130138 // it's OK to have this method update merge tasks concurrently, with different targetMBPerSec values,
131139 // as it's not important that all merge tasks are throttled to the same IO rate at all time.
132140 // For performance reasons, we don't synchronize the updates to targetMBPerSec values with the update of running merges.
133141 currentlyRunningMergeTasks .forEach (mergeTask -> {
134142 if (mergeTask .supportsIOThrottling ()) {
135- mergeTask .setIORateLimit (newTargetMBPerSec );
143+ mergeTask .setIORateLimit (ByteSizeValue . ofBytes ( newTargetIORateBytesPerSec ). getMbFrac () );
136144 }
137145 });
138146 }
139147 }
140148
141- private static double newTargetMBPerSec (double currentTargetMBPerSec , int activeTasksCount , int maxConcurrentMerges ) {
142- final double newTargetMBPerSec ;
143- if (activeTasksCount < maxConcurrentMerges * 2 && currentTargetMBPerSec > MIN_MERGE_MB_PER_SEC ) {
144- newTargetMBPerSec = Math .max (MIN_MERGE_MB_PER_SEC , currentTargetMBPerSec / 1.1 );
145- } else if (activeTasksCount > maxConcurrentMerges * 4 && currentTargetMBPerSec < MAX_MERGE_MB_PER_SEC ) {
146- newTargetMBPerSec = Math .min (MAX_MERGE_MB_PER_SEC , currentTargetMBPerSec * 1.1 );
149+ private static long newTargetIORateBytesPerSec (long currentTargetIORateBytesPerSec , int activeTasksCount , int maxConcurrentMerges ) {
150+ final long newTargetIORateBytesPerSec ;
151+ if (activeTasksCount < maxConcurrentMerges * 2 && currentTargetIORateBytesPerSec > MIN_IO_RATE .getBytes ()) {
152+ // decrease target IO rate by 10% (capped)
153+ newTargetIORateBytesPerSec = Math .max (
154+ MIN_IO_RATE .getBytes (),
155+ currentTargetIORateBytesPerSec - currentTargetIORateBytesPerSec / 10L
156+ );
157+ } else if (activeTasksCount > maxConcurrentMerges * 4 && currentTargetIORateBytesPerSec < MAX_IO_RATE .getBytes ()) {
158+ // increase target IO rate by 10% (capped)
159+ newTargetIORateBytesPerSec = Math .min (
160+ MAX_IO_RATE .getBytes (),
161+ currentTargetIORateBytesPerSec + currentTargetIORateBytesPerSec / 10L
162+ );
147163 } else {
148- newTargetMBPerSec = currentTargetMBPerSec ;
164+ newTargetIORateBytesPerSec = currentTargetIORateBytesPerSec ;
149165 }
150- return newTargetMBPerSec ;
166+ return newTargetIORateBytesPerSec ;
151167 }
152168
153169 // exposed for stats
154170 double getTargetMBPerSec () {
155- return targetMBPerSec . get ();
171+ return ByteSizeValue . ofBytes ( targetIORateBytesPerSec . get ()). getMbFrac ();
156172 }
157173
158174 public boolean isEmpty () {
0 commit comments