|
9 | 9 |
|
10 | 10 | package org.elasticsearch.index.engine; |
11 | 11 |
|
| 12 | +import org.apache.logging.log4j.LogManager; |
| 13 | +import org.apache.logging.log4j.Logger; |
12 | 14 | import org.elasticsearch.common.settings.Setting; |
13 | 15 | import org.elasticsearch.common.settings.Settings; |
14 | 16 | import org.elasticsearch.common.unit.ByteSizeValue; |
@@ -138,6 +140,7 @@ public Iterator<Setting<?>> settings() { |
138 | 140 | * Initial value for IO write rate limit of individual merge tasks when doAutoIOThrottle is true |
139 | 141 | */ |
140 | 142 | static final ByteSizeValue START_IO_RATE = ByteSizeValue.ofMb(20L); |
| 143 | + private static final Logger LOGGER = LogManager.getLogger(ThreadPoolMergeExecutorService.class); |
141 | 144 | /** |
142 | 145 | * Total number of submitted merge tasks that support IO auto throttling and that have not yet been run (or aborted). |
143 | 146 | * This includes merge tasks that are currently running and that are backlogged (by their respective merge schedulers). |
@@ -195,7 +198,7 @@ private ThreadPoolMergeExecutorService(ThreadPool threadPool, Settings settings, |
195 | 198 | this.dataPaths = nodeEnvironment.dataPaths(); |
196 | 199 | this.diskSpaceMonitor = threadPool.scheduleWithFixedDelay( |
197 | 200 | new DiskSpaceMonitor(INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING.get(settings), |
198 | | - INDICES_MERGE_DISK_HIGH_MAX_HEADROOM_SETTING.get(settings)), |
| 201 | + INDICES_MERGE_DISK_HIGH_MAX_HEADROOM_SETTING.get(settings), nodeEnvironment.dataPaths()), |
199 | 202 | INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING.get(settings), |
200 | 203 | threadPool.generic() |
201 | 204 | ); |
@@ -324,46 +327,68 @@ private void abortMergeTask(MergeTask mergeTask) { |
324 | 327 | } |
325 | 328 | } |
326 | 329 |
|
327 | | - private class DiskSpaceMonitor implements Runnable { |
| 330 | + class DiskSpaceMonitor implements Runnable { |
328 | 331 |
|
329 | 332 | private final RelativeByteSizeValue highStageWatermark; |
330 | 333 | private final ByteSizeValue highStageMaxHeadroom; |
| 334 | + private final NodeEnvironment.DataPath[] dataPaths; |
331 | 335 |
|
332 | | - DiskSpaceMonitor(RelativeByteSizeValue highStageWatermark, ByteSizeValue highStageMaxHeadroom) { |
| 336 | + DiskSpaceMonitor( |
| 337 | + RelativeByteSizeValue highStageWatermark, |
| 338 | + ByteSizeValue highStageMaxHeadroom, |
| 339 | + NodeEnvironment.DataPath[] dataPaths |
| 340 | + ) { |
333 | 341 | this.highStageWatermark = highStageWatermark; |
334 | 342 | this.highStageMaxHeadroom = highStageMaxHeadroom; |
| 343 | + this.dataPaths = dataPaths; |
335 | 344 | } |
336 | 345 |
|
337 | 346 | @Override |
338 | 347 | public void run() { |
339 | 348 | FsInfo.Path leastAvailablePath = null; |
340 | | - for (int i = 0; i < ThreadPoolMergeExecutorService.this.dataPaths.length; i++) { |
| 349 | + IOException fsInfoException = null; |
| 350 | + for (NodeEnvironment.DataPath dataPath : dataPaths) { |
341 | 351 | try { |
342 | | - FsInfo.Path fsInfo = getFSInfo(ThreadPoolMergeExecutorService.this.dataPaths[i]); // uncached |
| 352 | + FsInfo.Path fsInfo = getFSInfo(dataPath); // uncached |
343 | 353 | if (leastAvailablePath == null || leastAvailablePath.getAvailable().getBytes() > fsInfo.getAvailable().getBytes()) { |
344 | 354 | leastAvailablePath = fsInfo; |
345 | 355 | } |
346 | 356 | } catch (IOException e) { |
347 | | - // TODO log |
348 | | - throw new RuntimeException(e); |
| 357 | + if (fsInfoException == null) { |
| 358 | + fsInfoException = e; |
| 359 | + } else { |
| 360 | + fsInfoException.addSuppressed(e); |
| 361 | + } |
349 | 362 | } |
350 | 363 | } |
351 | | - // TODO log if leastAvailablePath is null |
352 | | - // subtract disk space that's already "reserved" for running merges |
| 364 | + if (fsInfoException != null) { |
| 365 | + LOGGER.warn("unexpected exception reading filesystem info", fsInfoException); |
| 366 | + } |
| 367 | + if (leastAvailablePath == null) { |
| 368 | + LOGGER.error("Cannot read filesystem info"); |
| 369 | + return; |
| 370 | + } |
| 371 | + // subtract disk space that already running merges are expected to fill |
353 | 372 | long leastAvailableDiskSpaceBytes = leastAvailablePath.getAvailable().getBytes(); |
354 | 373 | for (MergeTask mergeTask : runningMergeTasks) { |
355 | 374 | leastAvailableDiskSpaceBytes -= mergeTask.estimatedRemainingMergeSize(); |
356 | 375 | } |
357 | | - // subtract the headroom space |
| 376 | + // also subtract the configured headroom space |
358 | 377 | leastAvailableDiskSpaceBytes -= getFreeBytesThreshold(leastAvailablePath.getTotal(), highStageWatermark, highStageMaxHeadroom) |
359 | 378 | .getBytes(); |
| 379 | + // this is the maximum disk space available for a new merge task |
360 | 380 | leastAvailableDiskSpaceBytes = Math.max(0L, leastAvailableDiskSpaceBytes); |
361 | | - // the maximum disk space a new merge can use |
| 381 | + // TODO update the priority queue |
362 | 382 | ThreadPoolMergeExecutorService.this.leastAvailableDiskSpaceBytes.set(leastAvailableDiskSpaceBytes); |
363 | 383 | } |
364 | 384 |
|
365 | | - private static ByteSizeValue getFreeBytesThreshold(ByteSizeValue total, RelativeByteSizeValue watermark, ByteSizeValue maxHeadroom) { |
366 | | - // If bytes are given, they can be readily returned as free bytes. If percentages are given, we need to calculate the free bytes. |
| 385 | + private static ByteSizeValue getFreeBytesThreshold( |
| 386 | + ByteSizeValue total, |
| 387 | + RelativeByteSizeValue watermark, |
| 388 | + ByteSizeValue maxHeadroom |
| 389 | + ) { |
| 390 | + // If bytes are given, they can be readily returned as free bytes. If percentages are given, we need to calculate the free |
| 391 | + // bytes. |
367 | 392 | if (watermark.isAbsolute()) { |
368 | 393 | return watermark.getAbsolute(); |
369 | 394 | } |
|
0 commit comments