4040import java .util .Set ;
4141import java .util .concurrent .ExecutorService ;
4242import java .util .concurrent .TimeUnit ;
43+ import java .util .concurrent .atomic .AtomicBoolean ;
4344
4445public class ThreadPoolMergeScheduler extends MergeScheduler implements ElasticsearchMergeScheduler {
4546 /**
@@ -72,6 +73,7 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics
7273 private final ThreadLocal <MergeRateLimiter > onGoingMergeRateLimiter = new ThreadLocal <>();
7374 private final PriorityQueue <MergeTask > activeMergeTasksLocalSchedulerQueue = new PriorityQueue <>();
7475 private final List <MergeTask > activeMergeTasksExecutingOnLocalSchedulerList = new ArrayList <>();
76+ private final AtomicBoolean isThrottling = new AtomicBoolean ();
7577
7678 public ThreadPoolMergeScheduler (ShardId shardId , IndexSettings indexSettings , ThreadPool threadPool ) {
7779 this .config = indexSettings .getMergeSchedulerConfig ();
@@ -120,6 +122,10 @@ protected void beforeMerge(OnGoingMerge merge) {}
120122 */
121123 protected void afterMerge (OnGoingMerge merge ) {}
122124
125+ protected void activateThrottling (int numActiveMerges ) {}
126+
127+ protected void deactivateThrottling (int numActiveMerges ) {}
128+
123129 public int getMaxMergeCount () {
124130 return config .getMaxMergeCount ();
125131 }
@@ -144,13 +150,15 @@ private void submitNewMergeTask(MergeSource mergeSource, MergePolicy.OneMerge me
144150 activeMergeTasksLocalSchedulerQueue .add (mergeTask );
145151 }
146152 maybeExecuteNextMerge ();
153+ maybeActivateThrottling ();
147154 }
148155
149156 private void mergeDone (MergeTask mergeTask ) {
150157 synchronized (this ) {
151158 activeMergeTasksExecutingOnLocalSchedulerList .remove (mergeTask );
152159 }
153160 maybeExecuteNextMerge ();
161+ maybeDeactivateThrottling ();
154162 }
155163
156164 private void maybeExecuteNextMerge () {
@@ -169,6 +177,20 @@ private void maybeExecuteNextMerge() {
169177 executorService .execute (mergeTask );
170178 }
171179
180+ private void maybeActivateThrottling () {
181+ int numActiveMerges = activeMergeTasksExecutingOnLocalSchedulerList .size ();
182+ if (numActiveMerges > config .getMaxMergeCount () && isThrottling .getAndSet (true ) == false ) {
183+ activateThrottling (numActiveMerges );
184+ }
185+ }
186+
187+ private void maybeDeactivateThrottling () {
188+ int numActiveMerges = activeMergeTasksExecutingOnLocalSchedulerList .size ();
189+ if (numActiveMerges <= config .getMaxMergeCount () && isThrottling .getAndSet (false ) == true ) {
190+ deactivateThrottling (numActiveMerges );
191+ }
192+ }
193+
172194 private static double maybeUpdateTargetMBPerSec (int poolSize ) {
173195 if (activeThrottledMergeTasksAcrossSchedulersSet .size () < poolSize * 2 && targetMBPerSec > MIN_MERGE_MB_PER_SEC ) {
174196 return Math .max (MIN_MERGE_MB_PER_SEC , targetMBPerSec / 1.1 );
0 commit comments