@@ -61,22 +61,24 @@ private ThreadPoolMergeQueue(ThreadPool threadPool) {
6161
6262 void submitMergeTask (MergeTask mergeTask ) {
6363 enqueueMergeTask (mergeTask );
64+ if (mergeTask .supportsIOThrottling ()) {
65+ // count submitted merge tasks that support IO auto throttling
66+ activeIOThrottledMergeTasksCount .incrementAndGet ();
67+ maybeUpdateTargetMBPerSec ();
68+ }
6469 executeSmallestMergeTask ();
6570 }
6671
6772 void enqueueMergeTask (MergeTask mergeTask ) {
6873 queuedMergeTasks .add (mergeTask );
69- if (mergeTask .supportsIOThrottling ()) {
70- activeIOThrottledMergeTasksCount .incrementAndGet ();
71- maybeUpdateTargetMBPerSec ();
72- }
7374 }
7475
7576 private void executeSmallestMergeTask () {
7677 final AtomicReference <MergeTask > smallestMergeTask = new AtomicReference <>();
7778 try {
7879 executorService .execute (() -> {
79- // one such task always executes a SINGLE merge task; this is important for merge queue statistics
80+ // one such runnable always executes a SINGLE merge task from the queue
81+ // this is important for merge queue statistics, i.e. the executor's queue size equals the merge tasks' queue size
8082 while (true ) {
8183 try {
8284 // will block if there are backlogged merges until they're enqueued again
@@ -85,11 +87,14 @@ private void executeSmallestMergeTask() {
8587 Thread .currentThread ().interrupt ();
8688 return ;
8789 }
90+ // the merge task's scheduler might backlog rather than execute the task
91+ // it's then the duty of the said merge scheduler to re-enqueue the backlogged merge task
8892 if (smallestMergeTask .get ().runNowOrBacklog ()) {
8993 runMergeTask (smallestMergeTask .get ());
9094 // one runnable one merge task
9195 return ;
9296 }
97+ // the merge task is backlogged by the merge scheduler,
9398 }
9499 });
95100 } catch (Exception e ) {
0 commit comments