Skip to content

Commit b1c2d7a

Browse files
Almost
1 parent cedf89f commit b1c2d7a

File tree

2 files changed

+96
-30
lines changed

2 files changed

+96
-30
lines changed

server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java

Lines changed: 90 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
import org.apache.logging.log4j.LogManager;
1313
import org.apache.logging.log4j.Logger;
14+
import org.elasticsearch.common.settings.ClusterSettings;
1415
import org.elasticsearch.common.settings.Setting;
1516
import org.elasticsearch.common.settings.Setting.Property;
1617
import org.elasticsearch.common.settings.Settings;
@@ -43,6 +44,7 @@
4344
import java.util.concurrent.atomic.AtomicLong;
4445
import java.util.concurrent.locks.Condition;
4546
import java.util.concurrent.locks.ReentrantLock;
47+
import java.util.function.Consumer;
4648
import java.util.function.LongUnaryOperator;
4749
import java.util.function.ToLongFunction;
4850

@@ -182,37 +184,54 @@ public Iterator<Setting<?>> settings() {
182184
private final int maxConcurrentMerges;
183185
private final int concurrentMergesFloorLimitForThrottling;
184186
private final int concurrentMergesCeilLimitForThrottling;
185-
private final Scheduler.Cancellable diskSpaceMonitor;
187+
private final AvailableDiskSpacePeriodicMonitor availableDiskSpacePeriodicMonitor;
186188

187189
private final List<MergeEventListener> mergeEventListeners = new CopyOnWriteArrayList<>();
188190

189191
public static @Nullable ThreadPoolMergeExecutorService maybeCreateThreadPoolMergeExecutorService(
190192
ThreadPool threadPool,
191193
Settings settings,
194+
ClusterSettings clusterSettings,
192195
NodeEnvironment nodeEnvironment
193196
) {
194197
if (ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.get(settings)) {
195-
return new ThreadPoolMergeExecutorService(threadPool, settings, nodeEnvironment);
198+
return new ThreadPoolMergeExecutorService(threadPool, settings, clusterSettings, nodeEnvironment);
196199
} else {
197200
return null;
198201
}
199202
}
200203

201-
private ThreadPoolMergeExecutorService(ThreadPool threadPool, Settings settings, NodeEnvironment nodeEnvironment) {
204+
private ThreadPoolMergeExecutorService(
205+
ThreadPool threadPool,
206+
Settings settings,
207+
ClusterSettings clusterSettings,
208+
NodeEnvironment nodeEnvironment
209+
) {
202210
this.executorService = threadPool.executor(ThreadPool.Names.MERGE);
203211
this.maxConcurrentMerges = threadPool.info(ThreadPool.Names.MERGE).getMax();
204212
// the intent here is to throttle down whenever we submit a task and no other task is running
205213
this.concurrentMergesFloorLimitForThrottling = 2;
206214
this.concurrentMergesCeilLimitForThrottling = maxConcurrentMerges * 2;
207215
assert concurrentMergesFloorLimitForThrottling <= concurrentMergesCeilLimitForThrottling;
208-
this.diskSpaceMonitor = threadPool.scheduleWithFixedDelay(
209-
new DiskSpaceMonitor(
216+
this.availableDiskSpacePeriodicMonitor = new AvailableDiskSpacePeriodicMonitor(
217+
nodeEnvironment.dataPaths(),
218+
threadPool,
210219
INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING.get(settings),
211220
INDICES_MERGE_DISK_HIGH_MAX_HEADROOM_SETTING.get(settings),
212-
nodeEnvironment.dataPaths()
213-
),
214-
INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING.get(settings),
215-
threadPool.generic()
221+
INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING.get(settings),
222+
(availableDiskSpaceByteSize) -> queuedMergeTasks.updateAvailableBudget(availableDiskSpaceByteSize.getBytes())
223+
);
224+
clusterSettings.addSettingsUpdateConsumer(
225+
INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING,
226+
this.availableDiskSpacePeriodicMonitor::setHighStageWatermark
227+
);
228+
clusterSettings.addSettingsUpdateConsumer(
229+
INDICES_MERGE_DISK_HIGH_MAX_HEADROOM_SETTING,
230+
this.availableDiskSpacePeriodicMonitor::setHighStageMaxHeadroom
231+
);
232+
clusterSettings.addSettingsUpdateConsumer(
233+
INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING,
234+
this.availableDiskSpacePeriodicMonitor::setCheckInterval
216235
);
217236
}
218237

@@ -354,24 +373,69 @@ private void abortMergeTask(MergeTask mergeTask) {
354373
}
355374
}
356375

357-
class DiskSpaceMonitor implements Runnable {
358-
private static final Logger LOGGER = LogManager.getLogger(ThreadPoolMergeExecutorService.DiskSpaceMonitor.class);
359-
private final RelativeByteSizeValue highStageWatermark;
360-
private final ByteSizeValue highStageMaxHeadroom;
376+
static class AvailableDiskSpacePeriodicMonitor implements Closeable {
377+
private static final Logger LOGGER = LogManager.getLogger(AvailableDiskSpacePeriodicMonitor.class);
361378
private final NodeEnvironment.DataPath[] dataPaths;
362-
363-
DiskSpaceMonitor(
379+
private final ThreadPool threadPool;
380+
private volatile RelativeByteSizeValue highStageWatermark;
381+
private volatile ByteSizeValue highStageMaxHeadroom;
382+
private volatile TimeValue checkInterval;
383+
private final Consumer<ByteSizeValue> updateConsumer;
384+
private volatile boolean closed;
385+
private volatile Scheduler.Cancellable monitor;
386+
387+
AvailableDiskSpacePeriodicMonitor(
388+
NodeEnvironment.DataPath[] dataPaths,
389+
ThreadPool threadPool,
364390
RelativeByteSizeValue highStageWatermark,
365391
ByteSizeValue highStageMaxHeadroom,
366-
NodeEnvironment.DataPath[] dataPaths
392+
TimeValue checkInterval,
393+
Consumer<ByteSizeValue> updateConsumer
367394
) {
395+
this.dataPaths = dataPaths;
396+
this.threadPool = threadPool;
368397
this.highStageWatermark = highStageWatermark;
369398
this.highStageMaxHeadroom = highStageMaxHeadroom;
370-
this.dataPaths = dataPaths;
399+
this.checkInterval = checkInterval;
400+
this.updateConsumer = updateConsumer;
401+
this.closed = false;
402+
reschedule();
403+
// early monitor run in the constructor
404+
run();
405+
}
406+
407+
public void setCheckInterval(TimeValue checkInterval) {
408+
this.checkInterval = checkInterval;
409+
reschedule();
410+
}
411+
412+
public void setHighStageWatermark(RelativeByteSizeValue highStageWatermark) {
413+
this.highStageWatermark = highStageWatermark;
414+
}
415+
416+
public void setHighStageMaxHeadroom(ByteSizeValue highStageMaxHeadroom) {
417+
this.highStageMaxHeadroom = highStageMaxHeadroom;
418+
}
419+
420+
private synchronized void reschedule() {
421+
if (monitor != null) {
422+
monitor.cancel();
423+
}
424+
if (closed == false && checkInterval.duration() > 0) {
425+
monitor = threadPool.scheduleWithFixedDelay(this::run, checkInterval, threadPool.generic());
426+
}
371427
}
372428

373429
@Override
374-
public void run() {
430+
public void close() throws IOException {
431+
closed = true;
432+
reschedule();
433+
}
434+
435+
private void run() {
436+
if (closed) {
437+
return;
438+
}
375439
FsInfo.Path mostAvailablePath = null;
376440
IOException fsInfoException = null;
377441
for (NodeEnvironment.DataPath dataPath : dataPaths) {
@@ -399,17 +463,18 @@ public void run() {
399463
// subtract the configured free disk space threshold
400464
mostAvailableDiskSpaceBytes -= getFreeBytesThreshold(mostAvailablePath.getTotal(), highStageWatermark, highStageMaxHeadroom)
401465
.getBytes();
466+
// clamp available space to 0
402467
long maxMergeSizeLimit = Math.max(0L, mostAvailableDiskSpaceBytes);
403-
queuedMergeTasks.updateAvailableBudget(maxMergeSizeLimit);
468+
updateConsumer.accept(ByteSizeValue.ofBytes(maxMergeSizeLimit));
404469
}
405470

406471
private static ByteSizeValue getFreeBytesThreshold(
407-
ByteSizeValue total,
408-
RelativeByteSizeValue watermark,
409-
ByteSizeValue maxHeadroom
472+
ByteSizeValue total,
473+
RelativeByteSizeValue watermark,
474+
ByteSizeValue maxHeadroom
410475
) {
411-
// If bytes are given, they can be readily returned as free bytes. If percentages are given, we need to calculate the free
412-
// bytes.
476+
// If bytes are given, they can be readily returned as free bytes.
477+
// If percentages are given, we need to calculate the free bytes.
413478
if (watermark.isAbsolute()) {
414479
return watermark.getAbsolute();
415480
}
@@ -517,7 +582,7 @@ public E element() {
517582

518583
@Override
519584
public void close() throws IOException {
520-
diskSpaceMonitor.cancel();
585+
availableDiskSpacePeriodicMonitor.close();
521586
}
522587

523588
private static long newTargetIORateBytesPerSec(

server/src/main/java/org/elasticsearch/indices/IndicesService.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -295,11 +295,6 @@ protected void doStart() {
295295
this.threadPool = builder.threadPool;
296296
this.pluginsService = builder.pluginsService;
297297
this.nodeEnv = builder.nodeEnv;
298-
this.threadPoolMergeExecutorService = ThreadPoolMergeExecutorService.maybeCreateThreadPoolMergeExecutorService(
299-
threadPool,
300-
settings,
301-
nodeEnv
302-
);
303298
this.parserConfig = XContentParserConfiguration.EMPTY.withDeprecationHandler(LoggingDeprecationHandler.INSTANCE)
304299
.withRegistry(builder.xContentRegistry);
305300
this.valuesSourceRegistry = builder.valuesSourceRegistry;
@@ -320,6 +315,12 @@ protected void doStart() {
320315
this.bigArrays = builder.bigArrays;
321316
this.scriptService = builder.scriptService;
322317
this.clusterService = builder.clusterService;
318+
this.threadPoolMergeExecutorService = ThreadPoolMergeExecutorService.maybeCreateThreadPoolMergeExecutorService(
319+
threadPool,
320+
settings,
321+
clusterService.getClusterSettings(),
322+
nodeEnv
323+
);
323324
this.projectResolver = builder.projectResolver;
324325
this.client = builder.client;
325326
this.idFieldDataEnabled = INDICES_ID_FIELD_DATA_ENABLED_SETTING.get(clusterService.getSettings());

0 commit comments

Comments
 (0)