2020import org .apache .lucene .store .IndexOutput ;
2121import org .apache .lucene .store .RateLimitedIndexOutput ;
2222import org .apache .lucene .store .RateLimiter ;
23+ import org .apache .lucene .util .SetOnce ;
2324import org .elasticsearch .common .logging .Loggers ;
2425import org .elasticsearch .common .util .concurrent .AbstractRunnable ;
2526import org .elasticsearch .core .TimeValue ;
2829import org .elasticsearch .index .merge .MergeStats ;
2930import org .elasticsearch .index .merge .OnGoingMerge ;
3031import org .elasticsearch .index .shard .ShardId ;
32+ import org .elasticsearch .threadpool .ThreadPool ;
3133
3234import java .io .IOException ;
35+ import java .util .HashSet ;
3336import java .util .Locale ;
3437import java .util .Set ;
35- import java .util .concurrent .ExecutorService ;
38+ import java .util .concurrent .ThreadPoolExecutor ;
3639import java .util .concurrent .TimeUnit ;
3740
38- public class ExecutorMergeScheduler extends MergeScheduler implements ElasticsearchMergeScheduler {
41+ public class ThreadPoolMergeScheduler extends MergeScheduler implements ElasticsearchMergeScheduler {
42+ /**
43+ * Floor for IO write rate limit (we will never go any lower than this)
44+ */
45+ private static final double MIN_MERGE_MB_PER_SEC = 5.0 ;
46+ /**
47+ * Ceiling for IO write rate limit (we will never go any higher than this)
48+ */
49+ private static final double MAX_MERGE_MB_PER_SEC = 10240.0 ;
50+ /**
51+ * Initial value for IO write rate limit when doAutoIOThrottle is true
52+ */
53+ private static final double START_MB_PER_SEC = 20.0 ;
54+ /**
55+ * Current IO write throttle rate, for all merge, across all merge schedulers (shards) on the node
56+ */
57+ private static volatile double targetMBPerSec = START_MB_PER_SEC ;
58+ /**
59+ * The set of all active merges, across all merge schedulers (i.e. across all shards), on the local node.
60+ * This is used to implement auto IO throttling that's the same across all merge schedulers.
61+ */
62+ private static final Set <MergeTask > activeThrottledMergeTasksAcrossSchedulersSet = new HashSet <>();
3963
4064 private final MergeSchedulerConfig config ;
4165 private final Logger logger ;
4266 private final MergeTracking mergeTracking ;
43- private final ExecutorService executorService ;
67+ private final ThreadPoolExecutor threadPoolExecutor ;
4468 private final ThreadLocal <MergeRateLimiter > onGoingMergeRateLimiter = new ThreadLocal <>();
4569
46- public ExecutorMergeScheduler (ShardId shardId , IndexSettings indexSettings , ExecutorService executorService ) {
70+ public ThreadPoolMergeScheduler (ShardId shardId , IndexSettings indexSettings , ThreadPool threadPool ) {
4771 this .config = indexSettings .getMergeSchedulerConfig ();
4872 this .logger = Loggers .getLogger (getClass (), shardId );
49- // TODO: use real IO rate here
50- this . mergeTracking = new MergeTracking ( logger , () -> Double . POSITIVE_INFINITY );
51- this .executorService = executorService ;
73+ this . mergeTracking = new MergeTracking ( logger , () -> this . config . isAutoThrottle () ? targetMBPerSec : Double . POSITIVE_INFINITY );
74+ // all merge schedulers must use the same thread pool
75+ this .threadPoolExecutor = ( ThreadPoolExecutor ) threadPool . executor ( ThreadPool . Names . MERGE ) ;
5276 }
5377
5478 @ Override
@@ -75,7 +99,7 @@ public void refreshConfig() {
7599 public void merge (MergeSource mergeSource , MergeTrigger trigger ) throws IOException {
76100 MergePolicy .OneMerge merge = mergeSource .getNextMerge ();
77101 if (merge != null ) {
78- submitNewMergeTask (mergeSource , merge );
102+ submitNewMergeTask (mergeSource , merge , trigger );
79103 }
80104 }
81105
@@ -89,7 +113,7 @@ protected void beforeMerge(OnGoingMerge merge) {}
89113 */
90114 protected void afterMerge (OnGoingMerge merge ) {}
91115
92- public synchronized int getMaxMergeCount () {
116+ public int getMaxMergeCount () {
93117 return config .getMaxMergeCount ();
94118 }
95119
@@ -100,39 +124,52 @@ public MergeScheduler clone() {
100124 return this ;
101125 }
102126
103- protected void handleMergeException (Throwable exc ) {
104- if (exc instanceof MergePolicy .MergeException mergeException ) {
105- throw mergeException ;
106- } else {
107- throw new MergePolicy .MergeException (exc );
108- }
127+ protected void handleMergeException (Throwable t ) {
128+ throw new MergePolicy .MergeException (t );
109129 }
110130
111- private void submitNewMergeTask (MergeSource mergeSource , MergePolicy .OneMerge merge ) {
112- MergeTask mergeTask = mergeTask (mergeSource , merge );
113- executorService .execute (mergeTask );
131+ private void submitNewMergeTask (MergeSource mergeSource , MergePolicy .OneMerge merge , MergeTrigger mergeTrigger ) {
132+ MergeTask mergeTask = newMergeTask (mergeSource , merge , mergeTrigger );
133+ if (mergeTask .isAutoThrottle ) {
134+ trackNewActiveThrottledMergeTask (mergeTask , threadPoolExecutor .getMaximumPoolSize ());
135+ }
136+ threadPoolExecutor .execute (mergeTask );
114137 }
115138
116- private MergeTask mergeTask (MergeSource mergeSource , MergePolicy .OneMerge merge ) {
117- return new MergeTask (mergeSource , merge , "TODO" );
139+ private static double maybeUpdateTargetMBPerSec (int poolSize ) {
140+ if (activeThrottledMergeTasksAcrossSchedulersSet .size () < poolSize * 2 && targetMBPerSec > MIN_MERGE_MB_PER_SEC ) {
141+ return Math .max (MIN_MERGE_MB_PER_SEC , targetMBPerSec / 1.1 );
142+ } else if (activeThrottledMergeTasksAcrossSchedulersSet .size () > poolSize * 4 && targetMBPerSec < MAX_MERGE_MB_PER_SEC ) {
143+ return Math .min (MAX_MERGE_MB_PER_SEC , targetMBPerSec * 1.1 );
144+ }
145+ return targetMBPerSec ;
118146 }
119147
120- @ Override
121- /** Overridden to route messages to our logger too, in addition to the {@link org.apache.lucene.util.InfoStream} that lucene uses. */
122- protected boolean verbose () {
123- if (logger .isTraceEnabled ()) {
148+ private synchronized static boolean trackNewActiveThrottledMergeTask (MergeTask newMergeTask , int poolSize ) {
149+ assert newMergeTask .isAutoThrottle : "only tracking throttled merge tasks" ;
150+ if (activeThrottledMergeTasksAcrossSchedulersSet .add (newMergeTask )) {
151+ double newTargetMBPerSec = maybeUpdateTargetMBPerSec (poolSize );
152+ if (newTargetMBPerSec != targetMBPerSec ) {
153+ targetMBPerSec = newTargetMBPerSec ;
154+ for (MergeTask mergeTask : activeThrottledMergeTasksAcrossSchedulersSet ) {
155+ mergeTask .rateLimiter .setMBPerSec (targetMBPerSec );
156+ }
157+ }
124158 return true ;
125159 }
126- return super . verbose () ;
160+ return false ;
127161 }
128162
129- @ Override
130- /** Overridden to route messages to our logger too, in addition to the {@link org.apache.lucene.util.InfoStream} that lucene uses. */
131- protected void message (String message ) {
132- if (logger .isTraceEnabled ()) {
133- logger .trace ("{}" , message );
134- }
135- super .message (message );
163+ private synchronized static boolean removeFromActiveThrottledMergeTasks (MergeTask doneMergeTask ) {
164+ assert doneMergeTask .isAutoThrottle : "only tracking throttled merge tasks" ;
165+ return activeThrottledMergeTasksAcrossSchedulersSet .remove (doneMergeTask );
166+ }
167+
168+ private MergeTask newMergeTask (MergeSource mergeSource , MergePolicy .OneMerge merge , MergeTrigger mergeTrigger ) {
169+ boolean isAutoThrottle = config .isAutoThrottle ()
170+ && mergeTrigger != MergeTrigger .CLOSING
171+ && merge .getStoreMergeInfo ().mergeMaxNumSegments () == -1 ; // i.e. is NOT a force merge
172+ return new MergeTask (mergeSource , merge , isAutoThrottle , "TODO" );
136173 }
137174
138175 /**
@@ -164,17 +201,26 @@ public IndexOutput createOutput(String name, IOContext context) throws IOExcepti
164201 };
165202 }
166203
167- protected class MergeTask extends AbstractRunnable implements Comparable <MergeTask > {
204+ final class MergeTask extends AbstractRunnable implements Comparable <MergeTask > {
205+ private final String name ;
206+ private final SetOnce <Long > mergeStartTimeNS ;
168207 private final MergeSource mergeSource ;
169208 private final OnGoingMerge onGoingMerge ;
170209 private final MergeRateLimiter rateLimiter ;
171- private final String name ;
210+ private final boolean isAutoThrottle ;
172211
173- public MergeTask (MergeSource mergeSource , MergePolicy .OneMerge merge , String name ) {
212+ MergeTask (MergeSource mergeSource , MergePolicy .OneMerge merge , boolean isAutoThrottle , String name ) {
213+ this .name = name ;
214+ this .mergeStartTimeNS = new SetOnce <>();
174215 this .mergeSource = mergeSource ;
175216 this .onGoingMerge = new OnGoingMerge (merge );
176217 this .rateLimiter = new MergeRateLimiter (merge .getMergeProgress ());
177- this .name = name ;
218+ this .isAutoThrottle = isAutoThrottle ;
219+ if (isAutoThrottle ) {
220+ this .rateLimiter .setMBPerSec (targetMBPerSec );
221+ } else {
222+ this .rateLimiter .setMBPerSec (Double .POSITIVE_INFINITY );
223+ }
178224 }
179225
180226 @ Override
@@ -184,8 +230,10 @@ public int compareTo(MergeTask other) {
184230 }
185231
186232 @ Override
187- public void doRun () {
188- final long startTimeNS = System .nanoTime ();
233+ public void doRun () throws Exception {
234+ assert isAutoThrottle == false || activeThrottledMergeTasksAcrossSchedulersSet .contains (this )
235+ : "a running throttled merge should already count as an 'active' merge" ;
236+ mergeStartTimeNS .set (System .nanoTime ());
189237 try {
190238 onGoingMergeRateLimiter .set (this .rateLimiter );
191239 beforeMerge (onGoingMerge );
@@ -204,57 +252,80 @@ public void doRun() {
204252 getSegmentName (onGoingMerge .getMerge ()),
205253 bytesToMB (onGoingMerge .getMerge ().estimatedMergeBytes ),
206254 bytesToMB (rateLimiter .getTotalBytesWritten ()),
207- nsToSec (System .nanoTime () - startTimeNS ),
255+ nsToSec (System .nanoTime () - mergeStartTimeNS . get () ),
208256 nsToSec (rateLimiter .getTotalStoppedNS ()),
209257 nsToSec (rateLimiter .getTotalPausedNS ()),
210258 rateToString (rateLimiter .getMBPerSec ())
211259 )
212260 );
213261 }
214- if (verbose ()) {
215- message (String .format (Locale .ROOT , "merge task %s end" , getName ()));
216- }
217- } catch (Throwable exc ) {
218- if (exc instanceof MergePolicy .MergeAbortedException ) {
262+ } catch (Throwable t ) {
263+ if (t instanceof MergePolicy .MergeAbortedException ) {
219264 // OK to ignore. This is what Lucene's ConcurrentMergeScheduler does
265+ } else if (t instanceof Exception == false ) {
266+ // onFailure and onAfter should better be called for Errors too
267+ throw new RuntimeException (t );
220268 } else {
221- handleMergeException (exc );
222- }
223- } finally {
224- try {
225- afterMerge (onGoingMerge );
226- } finally {
227- onGoingMergeRateLimiter .remove ();
228- long tookMS = TimeValue .nsecToMSec (System .nanoTime () - startTimeNS );
229- mergeTracking .mergeFinished (onGoingMerge .getMerge (), onGoingMerge , tookMS );
269+ throw t ;
230270 }
231271 }
232272 }
233273
234274 @ Override
235275 public void onAfter () {
236- MergePolicy .OneMerge nextMerge ;
276+ assert isAutoThrottle == false || activeThrottledMergeTasksAcrossSchedulersSet .contains (this )
277+ : "onAfter should always be invoked on active (and run) merges" ;
278+ assert this .mergeStartTimeNS .get () != null : "onAfter should always be invoked after doRun" ;
237279 try {
238- nextMerge = mergeSource .getNextMerge ();
239- } catch (IllegalStateException e ) {
240280 if (verbose ()) {
241- message ("merge task poll failed, likely that index writer is failed" );
281+ message (String .format (Locale .ROOT , "merge task %s end" , getName ()));
282+ }
283+ afterMerge (onGoingMerge );
284+ } finally {
285+ onGoingMergeRateLimiter .remove ();
286+ long tookMS = TimeValue .nsecToMSec (System .nanoTime () - mergeStartTimeNS .get ());
287+ try {
288+ mergeTracking .mergeFinished (onGoingMerge .getMerge (), onGoingMerge , tookMS );
289+ } finally {
290+ if (isAutoThrottle ) {
291+ removeFromActiveThrottledMergeTasks (this );
292+ }
293+ // kick-off next merge, if any
294+ MergePolicy .OneMerge nextMerge = null ;
295+ try {
296+ nextMerge = mergeSource .getNextMerge ();
297+ } catch (IllegalStateException e ) {
298+ if (verbose ()) {
299+ message ("merge task poll failed, likely that index writer is failed" );
300+ }
301+ // ignore exception, we expect the IW failure to be logged elsewhere
302+ }
303+ if (nextMerge != null ) {
304+ submitNewMergeTask (mergeSource , nextMerge , MergeTrigger .MERGE_FINISHED );
305+ }
242306 }
243- return ; // ignore exception, we expect the IW failure to be logged elsewhere
244- }
245- if (nextMerge != null ) {
246- submitNewMergeTask (mergeSource , nextMerge );
247307 }
248308 }
249309
250310 @ Override
251311 public void onFailure (Exception e ) {
252- // doRun already handles exceptions, this is just to be extra defensive from any future code modifications
312+ if (isAutoThrottle ) {
313+ removeFromActiveThrottledMergeTasks (this );
314+ }
315+ // most commonly the merge should've already be aborted by now,
316+ // plus the engine is probably going to be failed when any merge fails,
317+ // but keep this in case something believes calling `MergeTask#onFailure` is a sane way to abort a merge
318+ abortOnGoingMerge ();
253319 handleMergeException (e );
254320 }
255321
256322 @ Override
257323 public void onRejection (Exception e ) {
324+ assert isAutoThrottle == false || activeThrottledMergeTasksAcrossSchedulersSet .contains (this )
325+ : "only an 'active' merge can be rejected by the thread pool" ;
326+ if (isAutoThrottle ) {
327+ removeFromActiveThrottledMergeTasks (this );
328+ }
258329 if (verbose ()) {
259330 message (String .format (Locale .ROOT , "merge task [%s] rejected by thread pool, aborting" , onGoingMerge .getId ()));
260331 }
@@ -275,6 +346,24 @@ private String getName() {
275346 }
276347 }
277348
349+ @ Override
350+ /* Overridden to route messages to our logger too, in addition to the {@link org.apache.lucene.util.InfoStream} that lucene uses. */
351+ protected boolean verbose () {
352+ if (logger .isTraceEnabled ()) {
353+ return true ;
354+ }
355+ return super .verbose ();
356+ }
357+
358+ @ Override
359+ /* Overridden to route messages to our logger too, in addition to the {@link org.apache.lucene.util.InfoStream} that lucene uses. */
360+ protected void message (String message ) {
361+ if (logger .isTraceEnabled ()) {
362+ logger .trace ("{}" , message );
363+ }
364+ super .message (message );
365+ }
366+
278367 private static double nsToSec (long ns ) {
279368 return ns / (double ) TimeUnit .SECONDS .toNanos (1 );
280369 }
0 commit comments