@@ -67,6 +67,14 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics
6767 private volatile boolean closed = false ;
6868 private final MergeMemoryEstimateProvider mergeMemoryEstimateProvider ;
6969
70+ /**
71+ * Creates a thread-pool-based merge scheduler that runs merges in a thread pool.
72+ *
73+ * @param shardId the shard id associated with this merge scheduler
74+ * @param indexSettings used to obtain the {@link MergeSchedulerConfig}
75+ * @param threadPoolMergeExecutorService the executor service used to execute merge tasks from this scheduler
76+ * @param mergeMemoryEstimateProvider provides an estimate for how much memory a merge will take
77+ */
7078 public ThreadPoolMergeScheduler (
7179 ShardId shardId ,
7280 IndexSettings indexSettings ,
@@ -146,6 +154,16 @@ protected void beforeMerge(OnGoingMerge merge) {}
146154 */
147155 protected void afterMerge (OnGoingMerge merge ) {}
148156
157+ /**
158+ * A callback allowing for custom logic when a merge is queued.
159+ */
160+ protected void mergeQueued (OnGoingMerge merge ) {}
161+
162+ /**
163+ * A callback allowing for custom logic after a merge is executed or aborted.
164+ */
165+ protected void mergeExecutedOrAborted (OnGoingMerge merge ) {}
166+
149167 /**
150168 * A callback that's invoked when indexing should throttle down indexing in order to let merging to catch up.
151169 */
@@ -157,6 +175,34 @@ protected void enableIndexingThrottling(int numRunningMerges, int numQueuedMerge
157175 */
158176 protected void disableIndexingThrottling (int numRunningMerges , int numQueuedMerges , int configuredMaxMergeCount ) {}
159177
178+ /**
179+ * Returns true if scheduled merges should be skipped (aborted)
180+ */
181+ protected boolean shouldSkipMerge () {
182+ return false ;
183+ }
184+
185+ /**
186+ * Returns true if IO-throttling is enabled
187+ */
188+ protected boolean isAutoThrottle () {
189+ return config .isAutoThrottle ();
190+ }
191+
192+ /**
193+ * Returns the maximum number of active merges before being throttled
194+ */
195+ protected int getMaxMergeCount () {
196+ return config .getMaxMergeCount ();
197+ }
198+
199+ /**
200+ * Returns the maximum number of threads running merges before being throttled
201+ */
202+ protected int getMaxThreadCount () {
203+ return config .getMaxThreadCount ();
204+ }
205+
160206 /**
161207 * A callback for exceptions thrown while merging.
162208 */
@@ -168,6 +214,7 @@ protected void handleMergeException(Throwable t) {
168214 boolean submitNewMergeTask (MergeSource mergeSource , MergePolicy .OneMerge merge , MergeTrigger mergeTrigger ) {
169215 try {
170216 MergeTask mergeTask = newMergeTask (mergeSource , merge , mergeTrigger );
217+ mergeQueued (mergeTask .onGoingMerge );
171218 return threadPoolMergeExecutorService .submitMergeTask (mergeTask );
172219 } finally {
173220 checkMergeTaskThrottling ();
@@ -183,7 +230,7 @@ MergeTask newMergeTask(MergeSource mergeSource, MergePolicy.OneMerge merge, Merg
183230 return new MergeTask (
184231 mergeSource ,
185232 merge ,
186- isAutoThrottle && config . isAutoThrottle (),
233+ isAutoThrottle && isAutoThrottle (),
187234 "Lucene Merge Task #" + submittedMergeTaskCount .incrementAndGet () + " for shard " + shardId ,
188235 estimateMergeMemoryBytes
189236 );
@@ -193,7 +240,7 @@ private void checkMergeTaskThrottling() {
193240 long submittedMergesCount = submittedMergeTaskCount .get ();
194241 long doneMergesCount = doneMergeTaskCount .get ();
195242 int runningMergesCount = runningMergeTasks .size ();
196- int configuredMaxMergeCount = config . getMaxMergeCount ();
243+ int configuredMaxMergeCount = getMaxMergeCount ();
197244 // both currently running and enqueued merge tasks are considered "active" for throttling purposes
198245 int activeMerges = (int ) (submittedMergesCount - doneMergesCount );
199246 if (activeMerges > configuredMaxMergeCount
@@ -223,7 +270,12 @@ synchronized Schedule schedule(MergeTask mergeTask) {
223270 if (closed ) {
224271 // do not run or backlog tasks when closing the merge scheduler, instead abort them
225272 return Schedule .ABORT ;
226- } else if (runningMergeTasks .size () < config .getMaxThreadCount ()) {
273+ } else if (shouldSkipMerge ()) {
274+ if (verbose ()) {
275+ message (String .format (Locale .ROOT , "skipping merge task %s" , mergeTask ));
276+ }
277+ return Schedule .ABORT ;
278+ } else if (runningMergeTasks .size () < getMaxThreadCount ()) {
227279 boolean added = runningMergeTasks .put (mergeTask .onGoingMerge .getMerge (), mergeTask ) == null ;
228280 assert added : "starting merge task [" + mergeTask + "] registered as already running" ;
229281 return Schedule .RUN ;
@@ -243,8 +295,9 @@ synchronized void mergeTaskFinishedRunning(MergeTask mergeTask) {
243295 maybeSignalAllMergesDoneAfterClose ();
244296 }
245297
246- private void mergeTaskDone () {
298+ private void mergeTaskDone (OnGoingMerge merge ) {
247299 doneMergeTaskCount .incrementAndGet ();
300+ mergeExecutedOrAborted (merge );
248301 checkMergeTaskThrottling ();
249302 }
250303
@@ -255,7 +308,7 @@ private synchronized void maybeSignalAllMergesDoneAfterClose() {
255308 }
256309
257310 private synchronized void enqueueBackloggedTasks () {
258- int maxBackloggedTasksToEnqueue = config . getMaxThreadCount () - runningMergeTasks .size ();
311+ int maxBackloggedTasksToEnqueue = getMaxThreadCount () - runningMergeTasks .size ();
259312 // enqueue all backlogged tasks when closing, as the queue expects all backlogged tasks to always be enqueued back
260313 while (closed || maxBackloggedTasksToEnqueue -- > 0 ) {
261314 MergeTask backloggedMergeTask = backloggedMergeTasks .poll ();
@@ -408,7 +461,7 @@ public void run() {
408461 try {
409462 mergeTaskFinishedRunning (this );
410463 } finally {
411- mergeTaskDone ();
464+ mergeTaskDone (onGoingMerge );
412465 }
413466 try {
414467 // kick-off any follow-up merge
@@ -452,7 +505,7 @@ void abort() {
452505 if (verbose ()) {
453506 message (String .format (Locale .ROOT , "merge task %s end abort" , this ));
454507 }
455- mergeTaskDone ();
508+ mergeTaskDone (onGoingMerge );
456509 }
457510 }
458511
0 commit comments