Skip to content

Commit c0da9b4

Browse files
Add TEST MergeWithLowDiskSpaceIT testForceMergeIsBlockedThenUnblocked (#130189)
This adds a new IT for when forced merges are blocked (and then unblocked) because of the insufficient disk space situation. This test was suggested #127613 (review).
1 parent a48a7f9 commit c0da9b4

File tree

1 file changed

+121
-1
lines changed

1 file changed

+121
-1
lines changed

server/src/internalClusterTest/java/org/elasticsearch/index/engine/MergeWithLowDiskSpaceIT.java

Lines changed: 121 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,26 +9,36 @@
99

1010
package org.elasticsearch.index.engine;
1111

12+
import org.elasticsearch.action.ActionFuture;
1213
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
1314
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
15+
import org.elasticsearch.action.admin.indices.segments.ShardSegments;
1416
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
17+
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
1518
import org.elasticsearch.cluster.DiskUsageIntegTestCase;
1619
import org.elasticsearch.cluster.metadata.IndexMetadata;
1720
import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings;
1821
import org.elasticsearch.common.settings.Settings;
1922
import org.elasticsearch.common.util.concurrent.EsExecutors;
2023
import org.elasticsearch.index.IndexNotFoundException;
2124
import org.elasticsearch.indices.IndicesService;
25+
import org.elasticsearch.plugins.Plugin;
26+
import org.elasticsearch.plugins.PluginsService;
27+
import org.elasticsearch.telemetry.TestTelemetryPlugin;
2228
import org.elasticsearch.test.ESIntegTestCase;
2329
import org.elasticsearch.threadpool.ThreadPool;
2430
import org.junit.BeforeClass;
2531

32+
import java.util.ArrayList;
33+
import java.util.Collection;
34+
import java.util.List;
2635
import java.util.Locale;
2736
import java.util.stream.IntStream;
2837

2938
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
3039
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
3140
import static org.hamcrest.Matchers.equalTo;
41+
import static org.hamcrest.Matchers.greaterThan;
3242
import static org.hamcrest.Matchers.lessThan;
3343

3444
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
@@ -40,7 +50,14 @@ public static void setAvailableDiskSpaceBufferLimit() {
4050
// this has to be big in order to potentially accommodate the disk space for a few 100s of docs and a few merges,
4151
// because of the latency to process used disk space updates, and also because we cannot reliably separate indexing from merging
4252
// operations at this high abstraction level (merging is triggered more or less automatically in the background)
43-
MERGE_DISK_HIGH_WATERMARK_BYTES = randomLongBetween(1_000_000L, 2_000_000L);
53+
MERGE_DISK_HIGH_WATERMARK_BYTES = randomLongBetween(10_000_000L, 20_000_000L);
54+
}
55+
56+
@Override
57+
protected Collection<Class<? extends Plugin>> nodePlugins() {
58+
List<Class<? extends Plugin>> nodePluginsList = new ArrayList<>(super.nodePlugins());
59+
nodePluginsList.add(TestTelemetryPlugin.class);
60+
return nodePluginsList;
4461
}
4562

4663
@Override
@@ -155,8 +172,111 @@ public void testShardCloseWhenDiskSpaceInsufficient() throws Exception {
155172
});
156173
}
157174

175+
public void testForceMergeIsBlockedThenUnblocked() throws Exception {
176+
String node = internalCluster().startNode();
177+
ensureStableCluster(1);
178+
setTotalSpace(node, Long.MAX_VALUE);
179+
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = internalCluster().getInstance(IndicesService.class, node)
180+
.getThreadPoolMergeExecutorService();
181+
TestTelemetryPlugin testTelemetryPlugin = getTelemetryPlugin(node);
182+
// create some index
183+
final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
184+
createIndex(
185+
indexName,
186+
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).build()
187+
);
188+
// get current disk space usage (for all indices on the node)
189+
IndicesStatsResponse stats = indicesAdmin().prepareStats().clear().setStore(true).get();
190+
long usedDiskSpaceAfterIndexing = stats.getTotal().getStore().sizeInBytes();
191+
// restrict the total disk space such that the next merge does not have sufficient disk space
192+
long insufficientTotalDiskSpace = usedDiskSpaceAfterIndexing + MERGE_DISK_HIGH_WATERMARK_BYTES - randomLongBetween(1L, 10L);
193+
setTotalSpace(node, insufficientTotalDiskSpace);
194+
// node stats' FS stats should report that there is insufficient disk space available
195+
assertBusy(() -> {
196+
NodesStatsResponse nodesStatsResponse = client().admin().cluster().prepareNodesStats().setFs(true).get();
197+
assertThat(nodesStatsResponse.getNodes().size(), equalTo(1));
198+
NodeStats nodeStats = nodesStatsResponse.getNodes().get(0);
199+
assertThat(nodeStats.getFs().getTotal().getTotal().getBytes(), equalTo(insufficientTotalDiskSpace));
200+
assertThat(nodeStats.getFs().getTotal().getAvailable().getBytes(), lessThan(MERGE_DISK_HIGH_WATERMARK_BYTES));
201+
});
202+
int indexingRounds = randomIntBetween(2, 5);
203+
while (indexingRounds-- > 0) {
204+
indexRandom(
205+
true,
206+
true,
207+
true,
208+
false,
209+
IntStream.range(1, randomIntBetween(2, 5))
210+
.mapToObj(i -> prepareIndex(indexName).setSource("field", randomAlphaOfLength(50)))
211+
.toList()
212+
);
213+
}
214+
// the max segments argument makes it a blocking call
215+
ActionFuture<BroadcastResponse> forceMergeFuture = indicesAdmin().prepareForceMerge(indexName).setMaxNumSegments(1).execute();
216+
assertBusy(() -> {
217+
// merge executor says merging is blocked due to insufficient disk space while there is a single merge task enqueued
218+
assertThat(threadPoolMergeExecutorService.getMergeTasksQueueLength(), equalTo(1));
219+
assertTrue(threadPoolMergeExecutorService.isMergingBlockedDueToInsufficientDiskSpace());
220+
// telemetry says that there are indeed some segments enqueued to be merged
221+
testTelemetryPlugin.collect();
222+
assertThat(
223+
testTelemetryPlugin.getLongGaugeMeasurement(MergeMetrics.MERGE_SEGMENTS_QUEUED_USAGE).getLast().getLong(),
224+
greaterThan(0L)
225+
);
226+
// but still no merges are currently running
227+
assertThat(
228+
testTelemetryPlugin.getLongGaugeMeasurement(MergeMetrics.MERGE_SEGMENTS_RUNNING_USAGE).getLast().getLong(),
229+
equalTo(0L)
230+
);
231+
// indices stats also says that no merge is currently running (blocked merges are NOT considered as "running")
232+
IndicesStatsResponse indicesStatsResponse = client().admin().indices().prepareStats(indexName).setMerge(true).get();
233+
long currentMergeCount = indicesStatsResponse.getIndices().get(indexName).getPrimaries().merge.getCurrent();
234+
assertThat(currentMergeCount, equalTo(0L));
235+
});
236+
// the force merge call is still blocked
237+
assertFalse(forceMergeFuture.isCancelled());
238+
assertFalse(forceMergeFuture.isDone());
239+
// merge executor still confirms merging is blocked due to insufficient disk space
240+
assertTrue(threadPoolMergeExecutorService.isMergingBlockedDueToInsufficientDiskSpace());
241+
// make disk space available in order to unblock the merge
242+
if (randomBoolean()) {
243+
setTotalSpace(node, Long.MAX_VALUE);
244+
} else {
245+
updateClusterSettings(
246+
Settings.builder().put(ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING.getKey(), "0b")
247+
);
248+
}
249+
// wait for the merge call to return
250+
safeGet(forceMergeFuture);
251+
IndicesStatsResponse indicesStatsResponse = indicesAdmin().prepareStats(indexName).setMerge(true).get();
252+
testTelemetryPlugin.collect();
253+
// assert index stats and telemetry report no merging in progress (after force merge returned)
254+
long currentMergeCount = indicesStatsResponse.getIndices().get(indexName).getPrimaries().merge.getCurrent();
255+
assertThat(currentMergeCount, equalTo(0L));
256+
assertThat(testTelemetryPlugin.getLongGaugeMeasurement(MergeMetrics.MERGE_SEGMENTS_QUEUED_USAGE).getLast().getLong(), equalTo(0L));
257+
assertThat(testTelemetryPlugin.getLongGaugeMeasurement(MergeMetrics.MERGE_SEGMENTS_RUNNING_USAGE).getLast().getLong(), equalTo(0L));
258+
// but some merging took place (there might have been other merges automatically triggered before the force merge call)
259+
long totalMergeCount = indicesStatsResponse.getIndices().get(indexName).getPrimaries().merge.getTotal();
260+
assertThat(totalMergeCount, greaterThan(0L));
261+
assertThat(testTelemetryPlugin.getLongCounterMeasurement(MergeMetrics.MERGE_DOCS_TOTAL).getLast().getLong(), greaterThan(0L));
262+
// assert there's a single segment after the force merge
263+
List<ShardSegments> shardSegments = getShardSegments(indexName);
264+
assertThat(shardSegments.size(), equalTo(1));
265+
assertThat(shardSegments.get(0).getSegments().size(), equalTo(1));
266+
assertAcked(indicesAdmin().prepareDelete(indexName).get());
267+
}
268+
158269
public void setTotalSpace(String dataNodeName, long totalSpace) {
159270
getTestFileStore(dataNodeName).setTotalSpace(totalSpace);
160271
refreshClusterInfo();
161272
}
273+
274+
private TestTelemetryPlugin getTelemetryPlugin(String dataNodeName) {
275+
var plugin = internalCluster().getInstance(PluginsService.class, dataNodeName)
276+
.filterPlugins(TestTelemetryPlugin.class)
277+
.findFirst()
278+
.orElseThrow();
279+
plugin.resetMeter();
280+
return plugin;
281+
}
162282
}

0 commit comments

Comments
 (0)