Skip to content

Commit aa7e08b

Browse files
super(MergeTask::estimatedRemainingMergeSize, 0L)
1 parent a2c3cc2 commit aa7e08b

File tree

3 files changed

+87
-8
lines changed

3 files changed

+87
-8
lines changed

server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -408,6 +408,11 @@ static AvailableDiskSpacePeriodicMonitor startDiskSpaceMonitoring(
408408
}
409409
}
410410
);
411+
if (availableDiskSpacePeriodicMonitor.isScheduled() == false) {
412+
// in case the disk space monitor starts off as disabled, then make sure that merging is NOT blocked
413+
// (in the other case, merging IS blocked until the first update for the available disk space)
414+
availableDiskSpaceUpdateConsumer.accept(ByteSizeValue.ofBytes(Long.MAX_VALUE));
415+
}
411416
clusterSettings.addSettingsUpdateConsumer(
412417
INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING,
413418
availableDiskSpacePeriodicMonitor::setHighStageWatermark
@@ -470,10 +475,19 @@ private synchronized void reschedule() {
470475
monitor.cancel();
471476
}
472477
if (closed == false && checkInterval.duration() > 0) {
478+
// do an eager run,
479+
// in order to increase responsiveness in case the period is long and something blocks waiting for the first update
480+
threadPool.generic().execute(this::run);
473481
monitor = threadPool.scheduleWithFixedDelay(this::run, checkInterval, threadPool.generic());
482+
} else {
483+
monitor = null;
474484
}
475485
}
476486

487+
boolean isScheduled() {
488+
return monitor != null && closed == false;
489+
}
490+
477491
@Override
478492
public void close() throws IOException {
479493
closed = true;
@@ -533,10 +547,10 @@ private static ByteSizeValue getFreeBytesThreshold(
533547

534548
static class MergeTaskPriorityBlockingQueue extends PriorityBlockingQueueWithBudget<MergeTask> {
535549
MergeTaskPriorityBlockingQueue() {
536-
// start with unlimited budget (so this will behave like a regular priority queue until {@link #updateBudget} is invoked)
537-
// use the "remaining" merge size as the budget function so that the "budget" of taken elements is updated according
538-
// to the remaining disk space requirements of currently running merge tasks
539-
super(MergeTask::estimatedRemainingMergeSize, Long.MAX_VALUE);
550+
// start with 0 budget (so takes on this queue will always block until {@link #updateBudget} is invoked)
551+
// use the estimated *remaining* merge size as the budget function so that the disk space budget of taken (in-use) elements is
552+
// updated according to the remaining disk space requirements of the currently running merge tasks
553+
super(MergeTask::estimatedRemainingMergeSize, 0L);
540554
}
541555

542556
// exposed for tests

server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceDiskSpaceTests.java

Lines changed: 67 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
import org.elasticsearch.common.util.concurrent.EsExecutors;
1818
import org.elasticsearch.core.PathUtils;
1919
import org.elasticsearch.core.PathUtilsForTesting;
20+
import org.elasticsearch.core.TimeValue;
21+
import org.elasticsearch.core.Tuple;
2022
import org.elasticsearch.env.Environment;
2123
import org.elasticsearch.env.NodeEnvironment;
2224
import org.elasticsearch.env.TestEnvironment;
@@ -34,11 +36,14 @@
3436
import java.nio.file.attribute.FileAttributeView;
3537
import java.nio.file.attribute.FileStoreAttributeView;
3638
import java.nio.file.spi.FileSystemProvider;
39+
import java.util.ArrayDeque;
3740
import java.util.ArrayList;
41+
import java.util.Deque;
3842
import java.util.IdentityHashMap;
3943
import java.util.LinkedHashSet;
4044
import java.util.List;
4145
import java.util.concurrent.CountDownLatch;
46+
import java.util.concurrent.Executor;
4247
import java.util.concurrent.TimeUnit;
4348
import java.util.concurrent.atomic.AtomicLong;
4449

@@ -61,7 +66,7 @@ public class ThreadPoolMergeExecutorServiceDiskSpaceTests extends ESTestCase {
6166
private static String bPathPart;
6267
private static int mergeExecutorThreadCount;
6368
private static Settings settings;
64-
private static TestThreadPool testThreadPool;
69+
private static CapturingThreadPool testThreadPool;
6570
private static NodeEnvironment nodeEnvironment;
6671

6772
@BeforeClass
@@ -86,7 +91,7 @@ public static void installMockUsableSpaceFS() throws Exception {
8691
settingsBuilder.put(ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.getKey(), true);
8792
}
8893
settings = settingsBuilder.build();
89-
testThreadPool = new TestThreadPool("test", settings);
94+
testThreadPool = new CapturingThreadPool("test", settings);
9095
nodeEnvironment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings));
9196
}
9297

@@ -99,6 +104,21 @@ public static void removeMockUsableSpaceFS() {
99104
nodeEnvironment.close();
100105
}
101106

107+
static class CapturingThreadPool extends TestThreadPool {
108+
final List<Tuple<TimeValue, Cancellable>> scheduledTasks = new ArrayList<>();
109+
110+
CapturingThreadPool(String name, Settings settings) {
111+
super(name, settings);
112+
}
113+
114+
@Override
115+
public Cancellable scheduleWithFixedDelay(Runnable command, TimeValue interval, Executor executor) {
116+
Cancellable cancellable = super.scheduleWithFixedDelay(command, interval, executor);
117+
scheduledTasks.add(new Tuple<>(interval, cancellable));
118+
return cancellable;
119+
}
120+
}
121+
102122
static class TestMockUsableSpaceFileSystemProvider extends FilterFileSystemProvider {
103123

104124
TestMockUsableSpaceFileSystemProvider(FileSystem inner) {
@@ -243,6 +263,51 @@ public void testAvailableDiskSpaceMonitorWithDefaultSettings() throws Exception
243263
}
244264
}
245265

266+
public void testDiskSpaceMonitorStartsAsDisabled() throws Exception {
267+
aFileStore.usableSpace = randomLongBetween(1L, 100L);
268+
aFileStore.totalSpace = randomLongBetween(1L, 100L);
269+
aFileStore.throwIoException = randomBoolean();
270+
bFileStore.usableSpace = randomLongBetween(1L, 100L);
271+
bFileStore.totalSpace = randomLongBetween(1L, 100L);
272+
bFileStore.throwIoException = randomBoolean();
273+
Settings.Builder settingsBuilder = Settings.builder().put(settings);
274+
if (randomBoolean()) {
275+
settingsBuilder.put(ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING.getKey(), "0");
276+
} else {
277+
settingsBuilder.put(ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING.getKey(), "0s");
278+
}
279+
Settings settings = settingsBuilder.build();
280+
ClusterSettings clusterSettings = ClusterSettings.createBuiltInClusterSettings(settings);
281+
LinkedHashSet<ByteSizeValue> availableDiskSpaceUpdates = new LinkedHashSet<>();
282+
try (
283+
var diskSpacePeriodicMonitor = ThreadPoolMergeExecutorService.startDiskSpaceMonitoring(
284+
testThreadPool,
285+
nodeEnvironment.dataPaths(),
286+
clusterSettings,
287+
(availableDiskSpace) -> {
288+
synchronized (availableDiskSpaceUpdates) {
289+
availableDiskSpaceUpdates.add(availableDiskSpace);
290+
}
291+
}
292+
)
293+
) {
294+
assertThat(diskSpacePeriodicMonitor.isScheduled(), is(false));
295+
assertThat(availableDiskSpaceUpdates.size(), is(1));
296+
assertThat(availableDiskSpaceUpdates.getLast().getBytes(), is(Long.MAX_VALUE));
297+
// updating monitoring interval should enable the monitor
298+
String intervalSettingValue = randomFrom("1s", "123ms", "5ns", "2h");
299+
clusterSettings.applySettings(
300+
Settings.builder()
301+
.put(ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING.getKey(), intervalSettingValue)
302+
.build()
303+
);
304+
assertThat(diskSpacePeriodicMonitor.isScheduled(), is(true));
305+
assertThat(testThreadPool.scheduledTasks.size(), is(1));
306+
}
307+
aFileStore.throwIoException = false;
308+
bFileStore.throwIoException = false;
309+
}
310+
246311
public void testAvailableDiskSpaceMonitorWhenFileSystemStatErrors() throws Exception {
247312
aFileStore.usableSpace = randomLongBetween(1L, 100L);
248313
aFileStore.totalSpace = randomLongBetween(1L, 100L);

server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -764,7 +764,7 @@ public void testMergeTasksExecuteInSizeOrder() throws IOException {
764764

765765
public void testMergeTaskQueueAvailableBudgetTracking() throws Exception {
766766
MergeTaskPriorityBlockingQueue mergeTaskPriorityBlockingQueue = new MergeTaskPriorityBlockingQueue();
767-
assertThat(mergeTaskPriorityBlockingQueue.getAvailableBudget(), is(Long.MAX_VALUE));
767+
assertThat(mergeTaskPriorityBlockingQueue.getAvailableBudget(), is(0L));
768768
long availableBudget = randomLongBetween(1, 10);
769769
mergeTaskPriorityBlockingQueue.updateBudget(availableBudget);
770770
assertThat(mergeTaskPriorityBlockingQueue.getAvailableBudget(), is(availableBudget));
@@ -819,7 +819,7 @@ public void testMergeTaskQueueAvailableBudgetTracking() throws Exception {
819819

820820
public void testMergeTaskQueueBudgetTrackingWhenEstimatedRemainingMergeSizeChanges() throws Exception {
821821
MergeTaskPriorityBlockingQueue mergeTaskPriorityBlockingQueue = new MergeTaskPriorityBlockingQueue();
822-
assertThat(mergeTaskPriorityBlockingQueue.getAvailableBudget(), is(Long.MAX_VALUE));
822+
assertThat(mergeTaskPriorityBlockingQueue.getAvailableBudget(), is(0L));
823823
// plenty of available budget (this should be fixed for this test)
824824
final long availableBudget = randomLongBetween(1000L, 2000L);
825825
mergeTaskPriorityBlockingQueue.updateBudget(availableBudget);

0 commit comments

Comments
 (0)