2727
2828import java .io .Closeable ;
2929import java .io .IOException ;
30+ import java .util .Arrays ;
3031import java .util .Comparator ;
3132import java .util .IdentityHashMap ;
3233import java .util .Iterator ;
@@ -150,7 +151,6 @@ public Iterator<Setting<?>> settings() {
150151 * Initial value for IO write rate limit of individual merge tasks when doAutoIOThrottle is true
151152 */
152153 static final ByteSizeValue START_IO_RATE = ByteSizeValue .ofMb (20L );
153- private static final Logger LOGGER = LogManager .getLogger (ThreadPoolMergeExecutorService .class );
154154 /**
155155 * Total number of submitted merge tasks that support IO auto throttling and that have not yet been run (or aborted).
156156 * This includes merge tasks that are currently running and that are backlogged (by their respective merge schedulers).
@@ -356,6 +356,7 @@ private void abortMergeTask(MergeTask mergeTask) {
356356 }
357357
358358 class DiskSpaceMonitor implements Runnable {
359+ private static final Logger LOGGER = LogManager .getLogger (ThreadPoolMergeExecutorService .DiskSpaceMonitor .class );
359360 private final RelativeByteSizeValue highStageWatermark ;
360361 private final ByteSizeValue highStageMaxHeadroom ;
361362 private final NodeEnvironment .DataPath [] dataPaths ;
@@ -372,17 +373,13 @@ class DiskSpaceMonitor implements Runnable {
372373
373374 @ Override
374375 public void run () {
375- FsInfo .Path leastAvailablePath = null ;
376+ FsInfo .Path mostAvailablePath = null ;
376377 IOException fsInfoException = null ;
377- if (dataPaths == null ) {
378- LOGGER .warn ("Cannot read filesystem info because data path is not set in the env" );
379- return ;
380- }
381378 for (NodeEnvironment .DataPath dataPath : dataPaths ) {
382379 try {
383380 FsInfo .Path fsInfo = getFSInfo (dataPath ); // uncached
384- if (leastAvailablePath == null || leastAvailablePath .getAvailable ().getBytes () > fsInfo .getAvailable ().getBytes ()) {
385- leastAvailablePath = fsInfo ;
381+ if (mostAvailablePath == null || mostAvailablePath .getAvailable ().getBytes () < fsInfo .getAvailable ().getBytes ()) {
382+ mostAvailablePath = fsInfo ;
386383 }
387384 } catch (IOException e ) {
388385 if (fsInfoException == null ) {
@@ -395,20 +392,15 @@ public void run() {
395392 if (fsInfoException != null ) {
396393 LOGGER .warn ("unexpected exception reading filesystem info" , fsInfoException );
397394 }
398- if (leastAvailablePath == null ) {
399- LOGGER .error ("Cannot read filesystem info" );
395+ if (mostAvailablePath == null ) {
396+ LOGGER .error ("Cannot read filesystem info for node data paths " + Arrays . toString ( dataPaths ) );
400397 return ;
401398 }
402- // subtract disk space that already running merges are expected to fill
403- long leastAvailableDiskSpaceBytes = leastAvailablePath .getAvailable ().getBytes ();
404- for (MergeTask mergeTask : runningMergeTasks ) {
405- leastAvailableDiskSpaceBytes -= mergeTask .estimatedRemainingMergeSize ();
406- }
407- // also subtract the configured free disk space threshold
408- leastAvailableDiskSpaceBytes -= getFreeBytesThreshold (leastAvailablePath .getTotal (), highStageWatermark , highStageMaxHeadroom )
399+ long mostAvailableDiskSpaceBytes = mostAvailablePath .getAvailable ().getBytes ();
400+ // subtract the configured free disk space threshold
401+ mostAvailableDiskSpaceBytes -= getFreeBytesThreshold (mostAvailablePath .getTotal (), highStageWatermark , highStageMaxHeadroom )
409402 .getBytes ();
410- // the rest is the maximum disk space available for a new merge task
411- long maxMergeSizeLimit = Math .max (0L , leastAvailableDiskSpaceBytes );
403+ long maxMergeSizeLimit = Math .max (0L , mostAvailableDiskSpaceBytes );
412404 queuedMergeTasks .updateAvailableBudget (maxMergeSizeLimit );
413405 }
414406
@@ -474,9 +466,9 @@ void updateAvailableBudget(long availableBudget) {
474466 lock .lock ();
475467 try {
476468 this .availableBudget = availableBudget ;
477- // also update budget per element
469+ // update the per- element budget
478470 unreleasedBudgetPerElement .replaceAll ((e , v ) -> budgetFunction .applyAsLong (e ));
479- // update available budget given the per element budget
471+ // update available budget given the per- element budget
480472 this .availableBudget -= unreleasedBudgetPerElement .values ().stream ().reduce (0L , Long ::sum );
481473 elementAvailable .signalAll ();
482474 } finally {
@@ -509,6 +501,7 @@ public void close() {
509501 final ReentrantLock lock = PriorityBlockingQueueWithBudget .this .lock ;
510502 lock .lock ();
511503 try {
504+ assert unreleasedBudgetPerElement .containsKey (element );
512505 availableBudget += unreleasedBudgetPerElement .remove (element );
513506 elementAvailable .signalAll ();
514507 } finally {
0 commit comments