4141import java .util .concurrent .ExecutorService ;
4242import java .util .concurrent .TimeUnit ;
4343import java .util .concurrent .atomic .AtomicBoolean ;
44+ import java .util .concurrent .atomic .AtomicLong ;
4445
4546public class ThreadPoolMergeScheduler extends MergeScheduler implements ElasticsearchMergeScheduler {
4647 /**
@@ -67,13 +68,20 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics
6768
6869 private final MergeSchedulerConfig config ;
6970 private final Logger logger ;
71+ // per-scheduler merge stats
7072 private final MergeTracking mergeTracking ;
73+ // this
7174 private final ExecutorService executorService ;
75+ // the size of the per-node
7276 private final int maxThreadPoolSize ;
77+ // used to communicate the IO rate limit to the {@IndexOutput} that's actually writing the merge
7378 private final ThreadLocal <MergeRateLimiter > onGoingMergeRateLimiter = new ThreadLocal <>();
7479 private final PriorityQueue <MergeTask > activeMergeTasksLocalSchedulerQueue = new PriorityQueue <>();
7580 private final List <MergeTask > activeMergeTasksExecutingOnLocalSchedulerList = new ArrayList <>();
81+ // set when incoming merges should be throttled
7682 private final AtomicBoolean isThrottling = new AtomicBoolean ();
83+ // how many {@link MergeTask}s have kicked off (this is used to name them).
84+ private final AtomicLong mergeTaskCount = new AtomicLong ();
7785
7886 public ThreadPoolMergeScheduler (ShardId shardId , IndexSettings indexSettings , ThreadPool threadPool ) {
7987 this .config = indexSettings .getMergeSchedulerConfig ();
@@ -101,7 +109,13 @@ public MergeScheduler getMergeScheduler() {
101109
102110 @ Override
103111 public void refreshConfig () {
104- // No-op
112+ // in case max merge count changed
113+ maybeActivateThrottling ();
114+ maybeDeactivateThrottling ();
115+ // in case max thread count changed (and more merges can be running simultaneously)
116+ while (maybeExecuteNextMerge ()) {}
117+ // the IO auto-throttled setting change is only honoured for new merges
118+ // (existing ones continue with the value of the setting when the merge created (queued))
105119 }
106120
107121 @ Override
@@ -157,20 +171,22 @@ private void mergeDone(MergeTask mergeTask) {
157171 maybeDeactivateThrottling ();
158172 }
159173
160- private void maybeExecuteNextMerge () {
174+ private boolean maybeExecuteNextMerge () {
161175 MergeTask mergeTask ;
162176 synchronized (this ) {
163177 if (activeMergeTasksExecutingOnLocalSchedulerList .size () >= config .getMaxThreadCount ()) {
164- return ;
178+ // enough concurrent merges per scheduler (per shard) are already running
179+ return false ;
165180 }
166181 mergeTask = activeMergeTasksLocalSchedulerQueue .poll ();
167182 if (mergeTask == null ) {
168183 // no more merges to execute
169- return ;
184+ return false ;
170185 }
171186 activeMergeTasksExecutingOnLocalSchedulerList .add (mergeTask );
172187 }
173188 executorService .execute (mergeTask );
189+ return true ;
174190 }
175191
176192 private void maybeActivateThrottling () {
@@ -227,7 +243,7 @@ private MergeTask newMergeTask(MergeSource mergeSource, MergePolicy.OneMerge mer
227243 boolean isAutoThrottle = config .isAutoThrottle ()
228244 && mergeTrigger != MergeTrigger .CLOSING
229245 && merge .getStoreMergeInfo ().mergeMaxNumSegments () == -1 ; // i.e. is NOT a force merge
230- return new MergeTask (mergeSource , merge , isAutoThrottle , "TODO" );
246+ return new MergeTask (mergeSource , merge , isAutoThrottle , "Lucene Merge #" + mergeTaskCount . incrementAndGet () );
231247 }
232248
233249 /**
0 commit comments