Skip to content

Commit d07b32b

Browse files
updateConsumer.accept(ByteSizeValue.MINUS_ONE)
1 parent f4c962e commit d07b32b

File tree

2 files changed

+144
-14
lines changed

2 files changed

+144
-14
lines changed

server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import java.util.concurrent.CopyOnWriteArrayList;
4040
import java.util.concurrent.ExecutorService;
4141
import java.util.concurrent.RejectedExecutionException;
42+
import java.util.concurrent.atomic.AtomicBoolean;
4243
import java.util.concurrent.atomic.AtomicInteger;
4344
import java.util.concurrent.atomic.AtomicLong;
4445
import java.util.concurrent.locks.Condition;
@@ -390,15 +391,23 @@ static AvailableDiskSpacePeriodicMonitor startDiskSpaceMonitoring(
390391
ThreadPool threadPool,
391392
NodeEnvironment.DataPath[] dataPaths,
392393
ClusterSettings clusterSettings,
393-
Consumer<ByteSizeValue> updateConsumer
394+
Consumer<ByteSizeValue> availableDiskSpaceUpdateConsumer
394395
) {
395396
AvailableDiskSpacePeriodicMonitor availableDiskSpacePeriodicMonitor = new AvailableDiskSpacePeriodicMonitor(
396397
dataPaths,
397398
threadPool,
398399
clusterSettings.get(INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING),
399400
clusterSettings.get(INDICES_MERGE_DISK_HIGH_MAX_HEADROOM_SETTING),
400401
clusterSettings.get(INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING),
401-
updateConsumer
402+
availableDiskSpaceByteSize -> {
403+
if (availableDiskSpaceByteSize.equals(ByteSizeValue.MINUS_ONE)) {
404+
// The merge executor is currently unaware of the available disk space because of an error.
405+
// Merges are NOT blocked if the available disk space is insufficient.
406+
availableDiskSpaceUpdateConsumer.accept(ByteSizeValue.ofBytes(Long.MAX_VALUE));
407+
} else {
408+
availableDiskSpaceUpdateConsumer.accept(availableDiskSpaceByteSize);
409+
}
410+
}
402411
);
403412
clusterSettings.addSettingsUpdateConsumer(
404413
INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING,
@@ -497,6 +506,7 @@ private void run() {
497506
}
498507
if (mostAvailablePath == null) {
499508
LOGGER.error("Cannot read filesystem info for node data paths " + Arrays.toString(dataPaths));
509+
updateConsumer.accept(ByteSizeValue.MINUS_ONE);
500510
return;
501511
}
502512
long mostAvailableDiskSpaceBytes = mostAvailablePath.getAvailable().getBytes();

server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceDiskSpaceTests.java

Lines changed: 132 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.junit.AfterClass;
2828
import org.junit.BeforeClass;
2929

30+
import java.io.IOException;
3031
import java.nio.file.FileStore;
3132
import java.nio.file.FileSystem;
3233
import java.nio.file.Path;
@@ -40,7 +41,6 @@
4041
import java.util.concurrent.CountDownLatch;
4142
import java.util.concurrent.TimeUnit;
4243
import java.util.concurrent.atomic.AtomicLong;
43-
import java.util.concurrent.atomic.AtomicReference;
4444

4545
import static org.elasticsearch.index.engine.ThreadPoolMergeScheduler.Schedule.ABORT;
4646
import static org.elasticsearch.index.engine.ThreadPoolMergeScheduler.Schedule.BACKLOG;
@@ -121,6 +121,7 @@ static class TestMockFileStore extends FileStore {
121121
public volatile long totalSpace;
122122
public volatile long freeSpace;
123123
public volatile long usableSpace;
124+
public volatile boolean throwIoException;
124125

125126
private final String desc;
126127

@@ -149,17 +150,26 @@ public boolean isReadOnly() {
149150
}
150151

151152
@Override
152-
public long getTotalSpace() {
153+
public long getTotalSpace() throws IOException {
154+
if (throwIoException) {
155+
throw new IOException("Test IO Exception");
156+
}
153157
return totalSpace;
154158
}
155159

156160
@Override
157-
public long getUnallocatedSpace() {
161+
public long getUnallocatedSpace() throws IOException {
162+
if (throwIoException) {
163+
throw new IOException("Test IO Exception");
164+
}
158165
return freeSpace;
159166
}
160167

161168
@Override
162-
public long getUsableSpace() {
169+
public long getUsableSpace() throws IOException {
170+
if (throwIoException) {
171+
throw new IOException("Test IO Exception");
172+
}
163173
return usableSpace;
164174
}
165175

@@ -190,24 +200,134 @@ public void testAvailableDiskSpaceMonitorWithDefaultSettings() throws Exception
190200
aFileStore.totalSpace = aFileStore.usableSpace * 2;
191201
bFileStore.usableSpace = 1_000L;
192202
bFileStore.totalSpace = bFileStore.usableSpace * 2;
193-
AtomicReference<ByteSizeValue> availableDiskSpaceForMerging = new AtomicReference<>();
194-
CountDownLatch diskSpaceMonitor = new CountDownLatch(1);
203+
LinkedHashSet<ByteSizeValue> availableDiskSpaceUpdates = new LinkedHashSet<>();
204+
try (
205+
var diskSpacePeriodicMonitor = ThreadPoolMergeExecutorService.startDiskSpaceMonitoring(
206+
testThreadPool,
207+
nodeEnvironment.dataPaths(),
208+
ClusterSettings.createBuiltInClusterSettings(settings),
209+
(availableDiskSpace) -> {
210+
synchronized (availableDiskSpaceUpdates) {
211+
availableDiskSpaceUpdates.add(availableDiskSpace);
212+
}
213+
}
214+
)
215+
) {
216+
assertBusy(() -> {
217+
synchronized (availableDiskSpaceUpdates) {
218+
assertThat(availableDiskSpaceUpdates.size(), is(1));
219+
// 100_000 (available) - 5% (default flood stage level) * 200_000 (total space)
220+
assertThat(availableDiskSpaceUpdates.getLast().getBytes(), is(90_000L));
221+
}
222+
});
223+
// "b" now has more available space
224+
bFileStore.usableSpace = 110_000L;
225+
bFileStore.totalSpace = 130_000L;
226+
assertBusy(() -> {
227+
synchronized (availableDiskSpaceUpdates) {
228+
assertThat(availableDiskSpaceUpdates.size(), is(2));
229+
// 110_000 (available) - 5% (default flood stage level) * 130_000 (total space)
230+
assertThat(availableDiskSpaceUpdates.getLast().getBytes(), is(103_500L));
231+
}
232+
});
233+
// available space for "a" and "b" is below the limit => it's clamp down to "0"
234+
aFileStore.usableSpace = 100L;
235+
bFileStore.usableSpace = 1_000L;
236+
assertBusy(() -> {
237+
synchronized (availableDiskSpaceUpdates) {
238+
assertThat(availableDiskSpaceUpdates.size(), is(3));
239+
// 1_000 (available) - 5% (default flood stage level) * 130_000 (total space) < 0
240+
assertThat(availableDiskSpaceUpdates.getLast().getBytes(), is(0L));
241+
}
242+
});
243+
}
244+
}
245+
246+
public void testAvailableDiskSpaceMonitorWhenFileSystemStatErrors() throws Exception {
247+
aFileStore.usableSpace = randomLongBetween(1L, 100L);
248+
aFileStore.totalSpace = randomLongBetween(1L, 100L);
249+
bFileStore.usableSpace = randomLongBetween(1L, 100L);
250+
bFileStore.totalSpace = randomLongBetween(1L, 100L);
251+
boolean aErrorsFirst = randomBoolean();
252+
if (aErrorsFirst) {
253+
// the "a" file system will error when collecting stats
254+
aFileStore.throwIoException = true;
255+
bFileStore.throwIoException = false;
256+
} else {
257+
aFileStore.throwIoException = false;
258+
bFileStore.throwIoException = true;
259+
}
260+
LinkedHashSet<ByteSizeValue> availableDiskSpaceUpdates = new LinkedHashSet<>();
195261
try (
196262
var diskSpacePeriodicMonitor = ThreadPoolMergeExecutorService.startDiskSpaceMonitoring(
197263
testThreadPool,
198264
nodeEnvironment.dataPaths(),
199265
ClusterSettings.createBuiltInClusterSettings(settings),
200266
(availableDiskSpace) -> {
201-
availableDiskSpaceForMerging.set(availableDiskSpace);
202-
diskSpaceMonitor.countDown();
267+
synchronized (availableDiskSpaceUpdates) {
268+
availableDiskSpaceUpdates.add(availableDiskSpace);
269+
}
203270
}
204271
)
205272
) {
206-
// wait for the disk space monitor to do a first run
207-
safeAwait(diskSpaceMonitor);
273+
assertBusy(() -> {
274+
synchronized (availableDiskSpaceUpdates) {
275+
assertThat(availableDiskSpaceUpdates.size(), is(1));
276+
if (aErrorsFirst) {
277+
// uses the stats from "b"
278+
assertThat(
279+
availableDiskSpaceUpdates.getLast().getBytes(),
280+
// the default 5% (same as flood stage level)
281+
is(Math.max(bFileStore.usableSpace - bFileStore.totalSpace / 20, 0L))
282+
);
283+
} else {
284+
// uses the stats from "a"
285+
assertThat(
286+
availableDiskSpaceUpdates.getLast().getBytes(),
287+
// the default 5% (same as flood stage level)
288+
is(Math.max(aFileStore.usableSpace - aFileStore.totalSpace / 20, 0L))
289+
);
290+
}
291+
}
292+
});
293+
if (aErrorsFirst) {
294+
// the "b" file system will also now error when collecting stats
295+
bFileStore.throwIoException = true;
296+
}
297+
assertBusy(() -> {
298+
synchronized (availableDiskSpaceUpdates) {
299+
assertThat(availableDiskSpaceUpdates.size(), is(2));
300+
// consider the available disk space as unlimited when no fs stats can be collected
301+
assertThat(availableDiskSpaceUpdates.getLast().getBytes(), is(Long.MAX_VALUE));
302+
}
303+
});
304+
if (aErrorsFirst) {
305+
// "a" fs stats collection recovered
306+
aFileStore.throwIoException = false;
307+
}
308+
assertBusy(() -> {
309+
synchronized (availableDiskSpaceUpdates) {
310+
assertThat(availableDiskSpaceUpdates.size(), is(3));
311+
if (aErrorsFirst) {
312+
// uses the stats from "a"
313+
assertThat(
314+
availableDiskSpaceUpdates.getLast().getBytes(),
315+
// the default 5% (same as flood stage level)
316+
is(Math.max(aFileStore.usableSpace - aFileStore.totalSpace / 20, 0L))
317+
);
318+
} else {
319+
// uses the stats from "b"
320+
assertThat(
321+
availableDiskSpaceUpdates.getLast().getBytes(),
322+
// the default 5% (same as flood stage level)
323+
is(Math.max(bFileStore.usableSpace - bFileStore.totalSpace / 20, 0L))
324+
);
325+
}
326+
}
327+
});
208328
}
209-
// 100_000 (available) - 5% (default flood stage level) * 200_000 (total space)
210-
assertThat(availableDiskSpaceForMerging.get().getBytes(), is(90_000L));
329+
aFileStore.throwIoException = false;
330+
bFileStore.throwIoException = false;
211331
}
212332

213333
public void testAvailableDiskSpaceMonitorSettingsUpdate() throws Exception {

0 commit comments

Comments
 (0)