Skip to content

Commit 1ddd3d3

Browse files
Thrashy in-between
1 parent b9b26fa commit 1ddd3d3

File tree

5 files changed

+186
-9
lines changed

5 files changed

+186
-9
lines changed

server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -632,6 +632,7 @@ public void apply(Settings value, Settings current, Settings previous) {
632632
ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING,
633633
MergeDiskSpaceMonitor.INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING,
634634
MergeDiskSpaceMonitor.INDICES_MERGE_DISK_HIGH_MAX_HEADROOM_SETTING,
635+
MergeDiskSpaceMonitor.INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING,
635636
TransportService.ENABLE_STACK_OVERFLOW_AVOIDANCE,
636637
DataStreamGlobalRetentionSettings.DATA_STREAMS_DEFAULT_RETENTION_SETTING,
637638
DataStreamGlobalRetentionSettings.DATA_STREAMS_MAX_RETENTION_SETTING,

server/src/main/java/org/elasticsearch/index/engine/MergeDiskSpaceMonitor.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.common.settings.Setting;
1313
import org.elasticsearch.common.unit.ByteSizeValue;
1414
import org.elasticsearch.common.unit.RelativeByteSizeValue;
15+
import org.elasticsearch.core.TimeValue;
1516

1617
import java.util.Iterator;
1718
import java.util.List;
@@ -102,4 +103,12 @@ public Iterator<Setting<?>> settings() {
102103
Setting.Property.NodeScope
103104
);
104105

106+
/** How frequently we check disk usage (default: 5 seconds). */
107+
public static final Setting<TimeValue> INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING = Setting.timeSetting(
108+
"indices.merge.disk.check_interval",
109+
TimeValue.timeValueSeconds(5),
110+
TimeValue.MINUS_ONE,
111+
Setting.Property.NodeScope
112+
);
113+
105114
}

server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java

Lines changed: 168 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,25 @@
99

1010
package org.elasticsearch.index.engine;
1111

12+
import org.elasticsearch.common.settings.Setting;
1213
import org.elasticsearch.common.settings.Settings;
1314
import org.elasticsearch.common.unit.ByteSizeValue;
15+
import org.elasticsearch.common.unit.RelativeByteSizeValue;
1416
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
1517
import org.elasticsearch.core.Nullable;
18+
import org.elasticsearch.core.TimeValue;
19+
import org.elasticsearch.env.NodeEnvironment;
1620
import org.elasticsearch.index.engine.ThreadPoolMergeScheduler.MergeTask;
21+
import org.elasticsearch.monitor.fs.FsInfo;
22+
import org.elasticsearch.threadpool.Scheduler;
1723
import org.elasticsearch.threadpool.ThreadPool;
1824

25+
import java.io.Closeable;
26+
import java.io.IOException;
1927
import java.util.Comparator;
28+
import java.util.Iterator;
29+
import java.util.List;
30+
import java.util.Map;
2031
import java.util.Set;
2132
import java.util.concurrent.ExecutorService;
2233
import java.util.concurrent.PriorityBlockingQueue;
@@ -28,8 +39,93 @@
2839
import static org.elasticsearch.index.engine.ThreadPoolMergeScheduler.Schedule.ABORT;
2940
import static org.elasticsearch.index.engine.ThreadPoolMergeScheduler.Schedule.BACKLOG;
3041
import static org.elasticsearch.index.engine.ThreadPoolMergeScheduler.Schedule.RUN;
42+
import static org.elasticsearch.index.engine.ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING;
43+
import static org.elasticsearch.monitor.fs.FsProbe.getFSInfo;
3144

32-
public class ThreadPoolMergeExecutorService {
45+
public class ThreadPoolMergeExecutorService implements Closeable {
46+
/** How frequently we check disk usage (default: 5 seconds). */
47+
public static final Setting<TimeValue> INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING = Setting.timeSetting(
48+
"indices.merge.disk.check_interval",
49+
TimeValue.timeValueSeconds(5),
50+
TimeValue.MINUS_ONE,
51+
Setting.Property.NodeScope
52+
);
53+
public static final Setting<RelativeByteSizeValue> INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING = new Setting<>(
54+
"indices.merge.disk.watermark.high",
55+
"96%",
56+
(s) -> RelativeByteSizeValue.parseRelativeByteSizeValue(s, "indices.merge.disk.watermark.high"),
57+
new Setting.Validator<>() {
58+
@Override
59+
public void validate(RelativeByteSizeValue value) {}
60+
61+
@Override
62+
public void validate(RelativeByteSizeValue value, Map<Setting<?>, Object> settings, boolean isPresent) {
63+
if (isPresent && settings.get(USE_THREAD_POOL_MERGE_SCHEDULER_SETTING).equals(Boolean.FALSE)) {
64+
throw new IllegalArgumentException(
65+
"indices merge watermark setting is only effective when ["
66+
+ USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.getKey()
67+
+ "] is set to [true]"
68+
);
69+
}
70+
}
71+
72+
@Override
73+
public Iterator<Setting<?>> settings() {
74+
List<Setting<?>> res = List.of(INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING, USE_THREAD_POOL_MERGE_SCHEDULER_SETTING);
75+
return res.iterator();
76+
}
77+
},
78+
Setting.Property.NodeScope
79+
);
80+
public static final Setting<ByteSizeValue> INDICES_MERGE_DISK_HIGH_MAX_HEADROOM_SETTING = new Setting<>(
81+
"indices.merge.disk.watermark.high.max_headroom",
82+
(settings) -> {
83+
if (INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING.exists(settings)) {
84+
return "-1";
85+
} else {
86+
return "40GB";
87+
}
88+
},
89+
(s) -> ByteSizeValue.parseBytesSizeValue(s, "indices.merge.disk.watermark.high.max_headroom"),
90+
new Setting.Validator<>() {
91+
@Override
92+
public void validate(ByteSizeValue value) {}
93+
94+
@Override
95+
public void validate(final ByteSizeValue value, final Map<Setting<?>, Object> settings, boolean isPresent) {
96+
if (isPresent) {
97+
if (value.equals(ByteSizeValue.MINUS_ONE)) {
98+
throw new IllegalArgumentException("setting a headroom value to less than 0 is not supported");
99+
}
100+
if (settings.get(USE_THREAD_POOL_MERGE_SCHEDULER_SETTING).equals(Boolean.FALSE)) {
101+
throw new IllegalArgumentException(
102+
"indices merge max headroom setting is only effective when ["
103+
+ USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.getKey()
104+
+ "] is set to [true]"
105+
);
106+
}
107+
}
108+
final ByteSizeValue highHeadroom = (ByteSizeValue) settings.get(INDICES_MERGE_DISK_HIGH_MAX_HEADROOM_SETTING);
109+
final RelativeByteSizeValue highWatermark = (RelativeByteSizeValue) settings.get(INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING);
110+
if (highWatermark.isAbsolute() && highHeadroom.equals(ByteSizeValue.MINUS_ONE) == false) {
111+
throw new IllegalArgumentException(
112+
"indices merge max headroom setting is set, but disk watermark value is not a relative value"
113+
);
114+
}
115+
}
116+
117+
@Override
118+
public Iterator<Setting<?>> settings() {
119+
List<Setting<?>> res = List.of(
120+
INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING,
121+
INDICES_MERGE_DISK_HIGH_MAX_HEADROOM_SETTING,
122+
USE_THREAD_POOL_MERGE_SCHEDULER_SETTING
123+
);
124+
return res.iterator();
125+
}
126+
},
127+
Setting.Property.NodeScope
128+
);
33129
/**
34130
* Floor for IO write rate limit of individual merge tasks (we will never go any lower than this)
35131
*/
@@ -72,25 +168,37 @@ public class ThreadPoolMergeExecutorService {
72168
private final int maxConcurrentMerges;
73169
private final int concurrentMergesFloorLimitForThrottling;
74170
private final int concurrentMergesCeilLimitForThrottling;
171+
private final AtomicLong leastAvailableDiskSpaceBytes;
172+
private final NodeEnvironment.DataPath[] dataPaths;
173+
private final Scheduler.Cancellable diskSpaceMonitor;
75174

76175
public static @Nullable ThreadPoolMergeExecutorService maybeCreateThreadPoolMergeExecutorService(
77176
ThreadPool threadPool,
78-
Settings settings
177+
Settings settings,
178+
NodeEnvironment nodeEnvironment
79179
) {
80180
if (ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.get(settings)) {
81-
return new ThreadPoolMergeExecutorService(threadPool);
181+
return new ThreadPoolMergeExecutorService(threadPool, settings, nodeEnvironment);
82182
} else {
83183
return null;
84184
}
85185
}
86186

87-
private ThreadPoolMergeExecutorService(ThreadPool threadPool) {
187+
private ThreadPoolMergeExecutorService(ThreadPool threadPool, Settings settings, NodeEnvironment nodeEnvironment) {
88188
this.executorService = threadPool.executor(ThreadPool.Names.MERGE);
89189
this.maxConcurrentMerges = threadPool.info(ThreadPool.Names.MERGE).getMax();
90190
// the intent here is to throttle down whenever we submit a task and no other task is running
91191
this.concurrentMergesFloorLimitForThrottling = 2;
92192
this.concurrentMergesCeilLimitForThrottling = maxConcurrentMerges * 2;
93193
assert concurrentMergesFloorLimitForThrottling <= concurrentMergesCeilLimitForThrottling;
194+
this.leastAvailableDiskSpaceBytes = new AtomicLong();
195+
this.dataPaths = nodeEnvironment.dataPaths();
196+
this.diskSpaceMonitor = threadPool.scheduleWithFixedDelay(
197+
new DiskSpaceMonitor(INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING.get(settings),
198+
INDICES_MERGE_DISK_HIGH_MAX_HEADROOM_SETTING.get(settings)),
199+
INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING.get(settings),
200+
threadPool.generic()
201+
);
94202
}
95203

96204
boolean submitMergeTask(MergeTask mergeTask) {
@@ -216,6 +324,62 @@ private void abortMergeTask(MergeTask mergeTask) {
216324
}
217325
}
218326

327+
private class DiskSpaceMonitor implements Runnable {
328+
329+
private final RelativeByteSizeValue highStageWatermark;
330+
private final ByteSizeValue highStageMaxHeadroom;
331+
332+
DiskSpaceMonitor(RelativeByteSizeValue highStageWatermark, ByteSizeValue highStageMaxHeadroom) {
333+
this.highStageWatermark = highStageWatermark;
334+
this.highStageMaxHeadroom = highStageMaxHeadroom;
335+
}
336+
337+
@Override
338+
public void run() {
339+
FsInfo.Path leastAvailablePath = null;
340+
for (int i = 0; i < ThreadPoolMergeExecutorService.this.dataPaths.length; i++) {
341+
try {
342+
FsInfo.Path fsInfo = getFSInfo(ThreadPoolMergeExecutorService.this.dataPaths[i]); // uncached
343+
if (leastAvailablePath == null || leastAvailablePath.getAvailable().getBytes() > fsInfo.getAvailable().getBytes()) {
344+
leastAvailablePath = fsInfo;
345+
}
346+
} catch (IOException e) {
347+
// TODO log
348+
throw new RuntimeException(e);
349+
}
350+
}
351+
// TODO log if leastAvailablePath is null
352+
// subtract disk space that's already "reserved" for running merges
353+
long leastAvailableDiskSpaceBytes = leastAvailablePath.getAvailable().getBytes();
354+
for (MergeTask mergeTask : runningMergeTasks) {
355+
leastAvailableDiskSpaceBytes -= mergeTask.estimatedRemainingMergeSize();
356+
}
357+
// subtract the headroom space
358+
leastAvailableDiskSpaceBytes -= getFreeBytesThreshold(leastAvailablePath.getTotal(), highStageWatermark, highStageMaxHeadroom)
359+
.getBytes();
360+
leastAvailableDiskSpaceBytes = Math.max(0L, leastAvailableDiskSpaceBytes);
361+
// the maximum disk space a new merge can use
362+
ThreadPoolMergeExecutorService.this.leastAvailableDiskSpaceBytes.set(leastAvailableDiskSpaceBytes);
363+
}
364+
365+
private static ByteSizeValue getFreeBytesThreshold(ByteSizeValue total, RelativeByteSizeValue watermark, ByteSizeValue maxHeadroom) {
366+
// If bytes are given, they can be readily returned as free bytes. If percentages are given, we need to calculate the free bytes.
367+
if (watermark.isAbsolute()) {
368+
return watermark.getAbsolute();
369+
}
370+
return ByteSizeValue.subtract(total, watermark.calculateValue(total, maxHeadroom));
371+
}
372+
}
373+
374+
@Override
375+
public void close() throws IOException {
376+
diskSpaceMonitor.cancel();
377+
}
378+
379+
public boolean usingMaxTargetIORateBytesPerSec() {
380+
return MAX_IO_RATE.getBytes() == targetIORateBytesPerSec.get();
381+
}
382+
219383
private static long newTargetIORateBytesPerSec(
220384
long currentTargetIORateBytesPerSec,
221385
int currentlySubmittedIOThrottledMergeTasks,
@@ -274,10 +438,6 @@ interface UpdateConsumer {
274438
}
275439
}
276440

277-
public boolean usingMaxTargetIORateBytesPerSec() {
278-
return MAX_IO_RATE.getBytes() == targetIORateBytesPerSec.get();
279-
}
280-
281441
// exposed for tests
282442
Set<MergeTask> getRunningMergeTasks() {
283443
return runningMergeTasks;

server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -443,12 +443,18 @@ void abort() {
443443
}
444444
}
445445

446+
// TODO javadoc
446447
long estimatedMergeSize() {
447448
// TODO is it possible that `estimatedMergeBytes` be `0` for correctly initialize merges,
448449
// or is it always the case that if `estimatedMergeBytes` is `0` that means that the merge has not yet been initialized?
449450
return onGoingMerge.getMerge().getStoreMergeInfo().estimatedMergeBytes();
450451
}
451452

453+
// TODO javadoc
454+
long estimatedRemainingMergeSize() {
455+
return Math.max(0L, estimatedMergeSize() - rateLimiter.getTotalBytesWritten());
456+
}
457+
452458
@Override
453459
public String toString() {
454460
return name + (onGoingMerge.getMerge().isAborted() ? " (aborted)" : "");

server/src/main/java/org/elasticsearch/indices/IndicesService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -366,7 +366,8 @@ public void onRemoval(ShardId shardId, String fieldName, boolean wasEvicted, lon
366366
indicesFieldDataCache,
367367
cacheCleaner,
368368
indicesRequestCache,
369-
indicesQueryCache
369+
indicesQueryCache,
370+
threadPoolMergeExecutorService
370371
);
371372
} catch (IOException e) {
372373
throw new UncheckedIOException(e);

0 commit comments

Comments
 (0)