Skip to content

Commit 9883d3e

Browse files
[8.19] Add TEST MergeWithLowDiskSpaceIT testRelocationWhileForceMerging (#131767) (#131779)
* Add TEST MergeWithLowDiskSpaceIT testRelocationWhileForceMerging (#131767) This adds a test that covers relocation for shards that are running a force merge. Relates #93503 * Fix MergeWithLowDiskSpaceIT testRelocationWhileForceMerging (#131806) The index settings are randomized in the test, but this test suite doesn't work when indices have a custom data path. * Fix compilation
1 parent d7d2be4 commit 9883d3e

File tree

1 file changed

+110
-0
lines changed

1 file changed

+110
-0
lines changed

server/src/internalClusterTest/java/org/elasticsearch/index/engine/MergeWithLowDiskSpaceIT.java

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,22 @@
1010
package org.elasticsearch.index.engine;
1111

1212
import org.elasticsearch.action.ActionFuture;
13+
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
1314
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
1415
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
16+
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteUtils;
17+
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse;
1518
import org.elasticsearch.action.admin.indices.segments.ShardSegments;
1619
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
1720
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
1821
import org.elasticsearch.cluster.DiskUsageIntegTestCase;
1922
import org.elasticsearch.cluster.metadata.IndexMetadata;
2023
import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings;
24+
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
25+
import org.elasticsearch.common.Priority;
2126
import org.elasticsearch.common.settings.Settings;
2227
import org.elasticsearch.common.util.concurrent.EsExecutors;
28+
import org.elasticsearch.core.TimeValue;
2329
import org.elasticsearch.index.IndexNotFoundException;
2430
import org.elasticsearch.indices.IndicesService;
2531
import org.elasticsearch.test.ESIntegTestCase;
@@ -28,16 +34,20 @@
2834

2935
import java.util.List;
3036
import java.util.Locale;
37+
import java.util.concurrent.TimeUnit;
3138
import java.util.stream.IntStream;
3239

3340
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
3441
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
3542
import static org.hamcrest.Matchers.equalTo;
3643
import static org.hamcrest.Matchers.greaterThan;
44+
import static org.hamcrest.Matchers.iterableWithSize;
3745
import static org.hamcrest.Matchers.lessThan;
46+
import static org.hamcrest.Matchers.not;
3847

3948
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
4049
public class MergeWithLowDiskSpaceIT extends DiskUsageIntegTestCase {
50+
private final TimeValue ACCEPTABLE_RELOCATION_TIME = new TimeValue(5, TimeUnit.MINUTES);
4151
protected static long MERGE_DISK_HIGH_WATERMARK_BYTES;
4252

4353
@BeforeClass
@@ -235,6 +245,106 @@ public void testForceMergeIsBlockedThenUnblocked() throws Exception {
235245
assertAcked(indicesAdmin().prepareDelete(indexName).get());
236246
}
237247

248+
public void testRelocationWhileForceMerging() throws Exception {
249+
final String node1 = internalCluster().startNode();
250+
ensureStableCluster(1);
251+
setTotalSpace(node1, Long.MAX_VALUE);
252+
String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
253+
createIndex(
254+
indexName,
255+
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).build()
256+
);
257+
// get current disk space usage (for all indices on the node)
258+
IndicesStatsResponse stats = indicesAdmin().prepareStats().clear().setStore(true).get();
259+
long usedDiskSpaceAfterIndexing = stats.getTotal().getStore().sizeInBytes();
260+
// restrict the total disk space such that the next merge does not have sufficient disk space
261+
long insufficientTotalDiskSpace = usedDiskSpaceAfterIndexing + MERGE_DISK_HIGH_WATERMARK_BYTES - randomLongBetween(1L, 10L);
262+
setTotalSpace(node1, insufficientTotalDiskSpace);
263+
// node stats' FS stats should report that there is insufficient disk space available
264+
assertBusy(() -> {
265+
NodesStatsResponse nodesStatsResponse = client().admin().cluster().prepareNodesStats().setFs(true).get();
266+
assertThat(nodesStatsResponse.getNodes().size(), equalTo(1));
267+
NodeStats nodeStats = nodesStatsResponse.getNodes().get(0);
268+
assertThat(nodeStats.getFs().getTotal().getTotal().getBytes(), equalTo(insufficientTotalDiskSpace));
269+
assertThat(nodeStats.getFs().getTotal().getAvailable().getBytes(), lessThan(MERGE_DISK_HIGH_WATERMARK_BYTES));
270+
});
271+
int indexingRounds = randomIntBetween(5, 10);
272+
while (indexingRounds-- > 0) {
273+
indexRandom(
274+
true,
275+
true,
276+
true,
277+
false,
278+
IntStream.range(1, randomIntBetween(5, 10))
279+
.mapToObj(i -> prepareIndex(indexName).setSource("field", randomAlphaOfLength(50)))
280+
.toList()
281+
);
282+
}
283+
// the max segments argument makes it a blocking call
284+
ActionFuture<BroadcastResponse> forceMergeBeforeRelocationFuture = indicesAdmin().prepareForceMerge(indexName)
285+
.setMaxNumSegments(1)
286+
.execute();
287+
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = internalCluster().getInstance(IndicesService.class, node1)
288+
.getThreadPoolMergeExecutorService();
289+
assertBusy(() -> {
290+
// merge executor says merging is blocked due to insufficient disk space while there is a single merge task enqueued
291+
assertThat(threadPoolMergeExecutorService.getMergeTasksQueueLength(), equalTo(1));
292+
assertTrue(threadPoolMergeExecutorService.isMergingBlockedDueToInsufficientDiskSpace());
293+
// indices stats also says that no merge is currently running (blocked merges are NOT considered as "running")
294+
IndicesStatsResponse indicesStatsResponse = client().admin().indices().prepareStats(indexName).setMerge(true).get();
295+
long currentMergeCount = indicesStatsResponse.getIndices().get(indexName).getPrimaries().merge.getCurrent();
296+
assertThat(currentMergeCount, equalTo(0L));
297+
});
298+
// the force merge call is still blocked
299+
assertFalse(forceMergeBeforeRelocationFuture.isCancelled());
300+
assertFalse(forceMergeBeforeRelocationFuture.isDone());
301+
// merge executor still confirms merging is blocked due to insufficient disk space
302+
assertTrue(threadPoolMergeExecutorService.isMergingBlockedDueToInsufficientDiskSpace());
303+
IndicesSegmentResponse indicesSegmentResponseBeforeRelocation = indicesAdmin().prepareSegments(indexName).get();
304+
// the index should have more than 1 segments at this stage
305+
assertThat(
306+
indicesSegmentResponseBeforeRelocation.getIndices().get(indexName).iterator().next().shards()[0].getSegments(),
307+
iterableWithSize(greaterThan(1))
308+
);
309+
// start another node
310+
final String node2 = internalCluster().startNode();
311+
ensureStableCluster(2);
312+
setTotalSpace(node2, Long.MAX_VALUE);
313+
// relocate the shard from node1 to node2
314+
ClusterRerouteUtils.reroute(client(), new MoveAllocationCommand(indexName, 0, node1, node2));
315+
ClusterHealthResponse clusterHealthResponse = clusterAdmin().prepareHealth(TEST_REQUEST_TIMEOUT)
316+
.setWaitForEvents(Priority.LANGUID)
317+
.setWaitForNoRelocatingShards(true)
318+
.setTimeout(ACCEPTABLE_RELOCATION_TIME)
319+
.get();
320+
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
321+
// the force merge call is now unblocked
322+
assertBusy(() -> {
323+
assertTrue(forceMergeBeforeRelocationFuture.isDone());
324+
assertFalse(forceMergeBeforeRelocationFuture.isCancelled());
325+
});
326+
// there is some merging going on in the {@code PostRecoveryMerger} after recovery, but that's not guaranteeing us a single segment,
327+
// so let's trigger a force merge to 1 segment again (this one should succeed promptly)
328+
indicesAdmin().prepareForceMerge(indexName).setMaxNumSegments(1).get();
329+
IndicesSegmentResponse indicesSegmentResponseAfterRelocation = indicesAdmin().prepareSegments(indexName).get();
330+
// assert there's only one segment now
331+
assertThat(
332+
indicesSegmentResponseAfterRelocation.getIndices().get(indexName).iterator().next().shards()[0].getSegments(),
333+
iterableWithSize(1)
334+
);
335+
// also assert that the shard was indeed moved to a different node
336+
assertThat(
337+
indicesSegmentResponseAfterRelocation.getIndices().get(indexName).iterator().next().shards()[0].getShardRouting()
338+
.currentNodeId(),
339+
not(
340+
equalTo(
341+
indicesSegmentResponseBeforeRelocation.getIndices().get(indexName).iterator().next().shards()[0].getShardRouting()
342+
.currentNodeId()
343+
)
344+
)
345+
);
346+
}
347+
238348
public void setTotalSpace(String dataNodeName, long totalSpace) {
239349
getTestFileStore(dataNodeName).setTotalSpace(totalSpace);
240350
refreshClusterInfo();

0 commit comments

Comments
 (0)