Skip to content

Commit b7b616b

Browse files
[9.0] Add TEST MergeWithLowDiskSpaceIT testRelocationWhileForceMerging (elastic#131767) (elastic#131781)
* Add TEST MergeWithLowDiskSpaceIT testRelocationWhileForceMerging (elastic#131767) This adds a test that covers relocation for shards that are running a force merge. Relates elastic#93503 * Fix MergeWithLowDiskSpaceIT testRelocationWhileForceMerging (elastic#131806) The index settings are randomized in the test, but this test suite doesn't work when indices have a custom data path. * Fix compilation
1 parent f1715d3 commit b7b616b

File tree

1 file changed

+110
-0
lines changed

1 file changed

+110
-0
lines changed

server/src/internalClusterTest/java/org/elasticsearch/index/engine/MergeWithLowDiskSpaceIT.java

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,22 @@
1010
package org.elasticsearch.index.engine;
1111

1212
import org.elasticsearch.action.ActionFuture;
13+
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
1314
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
1415
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
16+
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteUtils;
17+
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse;
1518
import org.elasticsearch.action.admin.indices.segments.ShardSegments;
1619
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
1720
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
1821
import org.elasticsearch.cluster.DiskUsageIntegTestCase;
1922
import org.elasticsearch.cluster.metadata.IndexMetadata;
2023
import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings;
24+
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
25+
import org.elasticsearch.common.Priority;
2126
import org.elasticsearch.common.settings.Settings;
2227
import org.elasticsearch.common.util.concurrent.EsExecutors;
28+
import org.elasticsearch.core.TimeValue;
2329
import org.elasticsearch.index.IndexNotFoundException;
2430
import org.elasticsearch.indices.IndicesService;
2531
import org.elasticsearch.test.ESIntegTestCase;
@@ -28,16 +34,20 @@
2834

2935
import java.util.List;
3036
import java.util.Locale;
37+
import java.util.concurrent.TimeUnit;
3138
import java.util.stream.IntStream;
3239

3340
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
3441
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
3542
import static org.hamcrest.Matchers.equalTo;
3643
import static org.hamcrest.Matchers.greaterThan;
44+
import static org.hamcrest.Matchers.iterableWithSize;
3745
import static org.hamcrest.Matchers.lessThan;
46+
import static org.hamcrest.Matchers.not;
3847

3948
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
4049
public class MergeWithLowDiskSpaceIT extends DiskUsageIntegTestCase {
50+
private final TimeValue ACCEPTABLE_RELOCATION_TIME = new TimeValue(5, TimeUnit.MINUTES);
4151
protected static long MERGE_DISK_HIGH_WATERMARK_BYTES;
4252

4353
@BeforeClass
@@ -238,6 +248,106 @@ public void testForceMergeIsBlockedThenUnblocked() throws Exception {
238248
assertAcked(indicesAdmin().prepareDelete(indexName).get());
239249
}
240250

251+
public void testRelocationWhileForceMerging() throws Exception {
252+
final String node1 = internalCluster().startNode();
253+
ensureStableCluster(1);
254+
setTotalSpace(node1, Long.MAX_VALUE);
255+
String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
256+
createIndex(
257+
indexName,
258+
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).build()
259+
);
260+
// get current disk space usage (for all indices on the node)
261+
IndicesStatsResponse stats = indicesAdmin().prepareStats().clear().setStore(true).get();
262+
long usedDiskSpaceAfterIndexing = stats.getTotal().getStore().sizeInBytes();
263+
// restrict the total disk space such that the next merge does not have sufficient disk space
264+
long insufficientTotalDiskSpace = usedDiskSpaceAfterIndexing + MERGE_DISK_HIGH_WATERMARK_BYTES - randomLongBetween(1L, 10L);
265+
setTotalSpace(node1, insufficientTotalDiskSpace);
266+
// node stats' FS stats should report that there is insufficient disk space available
267+
assertBusy(() -> {
268+
NodesStatsResponse nodesStatsResponse = client().admin().cluster().prepareNodesStats().setFs(true).get();
269+
assertThat(nodesStatsResponse.getNodes().size(), equalTo(1));
270+
NodeStats nodeStats = nodesStatsResponse.getNodes().get(0);
271+
assertThat(nodeStats.getFs().getTotal().getTotal().getBytes(), equalTo(insufficientTotalDiskSpace));
272+
assertThat(nodeStats.getFs().getTotal().getAvailable().getBytes(), lessThan(MERGE_DISK_HIGH_WATERMARK_BYTES));
273+
});
274+
int indexingRounds = randomIntBetween(5, 10);
275+
while (indexingRounds-- > 0) {
276+
indexRandom(
277+
true,
278+
true,
279+
true,
280+
false,
281+
IntStream.range(1, randomIntBetween(5, 10))
282+
.mapToObj(i -> prepareIndex(indexName).setSource("field", randomAlphaOfLength(50)))
283+
.toList()
284+
);
285+
}
286+
// the max segments argument makes it a blocking call
287+
ActionFuture<BroadcastResponse> forceMergeBeforeRelocationFuture = indicesAdmin().prepareForceMerge(indexName)
288+
.setMaxNumSegments(1)
289+
.execute();
290+
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = internalCluster().getInstance(IndicesService.class, node1)
291+
.getThreadPoolMergeExecutorService();
292+
assertBusy(() -> {
293+
// merge executor says merging is blocked due to insufficient disk space while there is a single merge task enqueued
294+
assertThat(threadPoolMergeExecutorService.getMergeTasksQueueLength(), equalTo(1));
295+
assertTrue(threadPoolMergeExecutorService.isMergingBlockedDueToInsufficientDiskSpace());
296+
// indices stats also says that no merge is currently running (blocked merges are NOT considered as "running")
297+
IndicesStatsResponse indicesStatsResponse = client().admin().indices().prepareStats(indexName).setMerge(true).get();
298+
long currentMergeCount = indicesStatsResponse.getIndices().get(indexName).getPrimaries().merge.getCurrent();
299+
assertThat(currentMergeCount, equalTo(0L));
300+
});
301+
// the force merge call is still blocked
302+
assertFalse(forceMergeBeforeRelocationFuture.isCancelled());
303+
assertFalse(forceMergeBeforeRelocationFuture.isDone());
304+
// merge executor still confirms merging is blocked due to insufficient disk space
305+
assertTrue(threadPoolMergeExecutorService.isMergingBlockedDueToInsufficientDiskSpace());
306+
IndicesSegmentResponse indicesSegmentResponseBeforeRelocation = indicesAdmin().prepareSegments(indexName).get();
307+
// the index should have more than 1 segments at this stage
308+
assertThat(
309+
indicesSegmentResponseBeforeRelocation.getIndices().get(indexName).iterator().next().shards()[0].getSegments(),
310+
iterableWithSize(greaterThan(1))
311+
);
312+
// start another node
313+
final String node2 = internalCluster().startNode();
314+
ensureStableCluster(2);
315+
setTotalSpace(node2, Long.MAX_VALUE);
316+
// relocate the shard from node1 to node2
317+
ClusterRerouteUtils.reroute(client(), new MoveAllocationCommand(indexName, 0, node1, node2));
318+
ClusterHealthResponse clusterHealthResponse = clusterAdmin().prepareHealth(TEST_REQUEST_TIMEOUT)
319+
.setWaitForEvents(Priority.LANGUID)
320+
.setWaitForNoRelocatingShards(true)
321+
.setTimeout(ACCEPTABLE_RELOCATION_TIME)
322+
.get();
323+
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
324+
// the force merge call is now unblocked
325+
assertBusy(() -> {
326+
assertTrue(forceMergeBeforeRelocationFuture.isDone());
327+
assertFalse(forceMergeBeforeRelocationFuture.isCancelled());
328+
});
329+
// there is some merging going on in the {@code PostRecoveryMerger} after recovery, but that's not guaranteeing us a single segment,
330+
// so let's trigger a force merge to 1 segment again (this one should succeed promptly)
331+
indicesAdmin().prepareForceMerge(indexName).setMaxNumSegments(1).get();
332+
IndicesSegmentResponse indicesSegmentResponseAfterRelocation = indicesAdmin().prepareSegments(indexName).get();
333+
// assert there's only one segment now
334+
assertThat(
335+
indicesSegmentResponseAfterRelocation.getIndices().get(indexName).iterator().next().shards()[0].getSegments(),
336+
iterableWithSize(1)
337+
);
338+
// also assert that the shard was indeed moved to a different node
339+
assertThat(
340+
indicesSegmentResponseAfterRelocation.getIndices().get(indexName).iterator().next().shards()[0].getShardRouting()
341+
.currentNodeId(),
342+
not(
343+
equalTo(
344+
indicesSegmentResponseBeforeRelocation.getIndices().get(indexName).iterator().next().shards()[0].getShardRouting()
345+
.currentNodeId()
346+
)
347+
)
348+
);
349+
}
350+
241351
public void setTotalSpace(String dataNodeName, long totalSpace) {
242352
getTestFileStore(dataNodeName).setTotalSpace(totalSpace);
243353
refreshClusterInfo();

0 commit comments

Comments
 (0)