Skip to content

Commit a096e42

Browse files
Add TEST MergeWithLowDiskSpaceIT testRelocationWhileForceMerging (#131767)
This adds a test that covers relocation for shards that are running a force merge. Relates #93503
1 parent 3212ec2 commit a096e42

File tree

1 file changed

+120
-0
lines changed

1 file changed

+120
-0
lines changed

server/src/internalClusterTest/java/org/elasticsearch/index/engine/MergeWithLowDiskSpaceIT.java

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,23 @@
1010
package org.elasticsearch.index.engine;
1111

1212
import org.elasticsearch.action.ActionFuture;
13+
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
1314
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
1415
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
16+
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteUtils;
17+
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse;
1518
import org.elasticsearch.action.admin.indices.segments.ShardSegments;
1619
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
1720
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
1821
import org.elasticsearch.cluster.DiskUsageIntegTestCase;
1922
import org.elasticsearch.cluster.metadata.IndexMetadata;
23+
import org.elasticsearch.cluster.metadata.Metadata;
2024
import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings;
25+
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
26+
import org.elasticsearch.common.Priority;
2127
import org.elasticsearch.common.settings.Settings;
2228
import org.elasticsearch.common.util.concurrent.EsExecutors;
29+
import org.elasticsearch.core.TimeValue;
2330
import org.elasticsearch.index.IndexNotFoundException;
2431
import org.elasticsearch.indices.IndicesService;
2532
import org.elasticsearch.plugins.Plugin;
@@ -33,16 +40,20 @@
3340
import java.util.Collection;
3441
import java.util.List;
3542
import java.util.Locale;
43+
import java.util.concurrent.TimeUnit;
3644
import java.util.stream.IntStream;
3745

3846
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
3947
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
4048
import static org.hamcrest.Matchers.equalTo;
4149
import static org.hamcrest.Matchers.greaterThan;
50+
import static org.hamcrest.Matchers.iterableWithSize;
4251
import static org.hamcrest.Matchers.lessThan;
52+
import static org.hamcrest.Matchers.not;
4353

4454
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
4555
public class MergeWithLowDiskSpaceIT extends DiskUsageIntegTestCase {
56+
private final TimeValue ACCEPTABLE_RELOCATION_TIME = new TimeValue(5, TimeUnit.MINUTES);
4657
protected static long MERGE_DISK_HIGH_WATERMARK_BYTES;
4758

4859
@BeforeClass
@@ -266,6 +277,115 @@ public void testForceMergeIsBlockedThenUnblocked() throws Exception {
266277
assertAcked(indicesAdmin().prepareDelete(indexName).get());
267278
}
268279

280+
public void testRelocationWhileForceMerging() throws Exception {
281+
final String node1 = internalCluster().startNode();
282+
ensureStableCluster(1);
283+
setTotalSpace(node1, Long.MAX_VALUE);
284+
String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
285+
prepareCreate(indexName, indexSettings(1, 0)).get();
286+
// get current disk space usage (for all indices on the node)
287+
IndicesStatsResponse stats = indicesAdmin().prepareStats().clear().setStore(true).get();
288+
long usedDiskSpaceAfterIndexing = stats.getTotal().getStore().sizeInBytes();
289+
// restrict the total disk space such that the next merge does not have sufficient disk space
290+
long insufficientTotalDiskSpace = usedDiskSpaceAfterIndexing + MERGE_DISK_HIGH_WATERMARK_BYTES - randomLongBetween(1L, 10L);
291+
setTotalSpace(node1, insufficientTotalDiskSpace);
292+
// node stats' FS stats should report that there is insufficient disk space available
293+
assertBusy(() -> {
294+
NodesStatsResponse nodesStatsResponse = client().admin().cluster().prepareNodesStats().setFs(true).get();
295+
assertThat(nodesStatsResponse.getNodes().size(), equalTo(1));
296+
NodeStats nodeStats = nodesStatsResponse.getNodes().get(0);
297+
assertThat(nodeStats.getFs().getTotal().getTotal().getBytes(), equalTo(insufficientTotalDiskSpace));
298+
assertThat(nodeStats.getFs().getTotal().getAvailable().getBytes(), lessThan(MERGE_DISK_HIGH_WATERMARK_BYTES));
299+
});
300+
int indexingRounds = randomIntBetween(5, 10);
301+
while (indexingRounds-- > 0) {
302+
indexRandom(
303+
true,
304+
true,
305+
true,
306+
false,
307+
IntStream.range(1, randomIntBetween(5, 10))
308+
.mapToObj(i -> prepareIndex(indexName).setSource("field", randomAlphaOfLength(50)))
309+
.toList()
310+
);
311+
}
312+
// the max segments argument makes it a blocking call
313+
ActionFuture<BroadcastResponse> forceMergeBeforeRelocationFuture = indicesAdmin().prepareForceMerge(indexName)
314+
.setMaxNumSegments(1)
315+
.execute();
316+
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = internalCluster().getInstance(IndicesService.class, node1)
317+
.getThreadPoolMergeExecutorService();
318+
TestTelemetryPlugin testTelemetryPlugin = getTelemetryPlugin(node1);
319+
assertBusy(() -> {
320+
// merge executor says merging is blocked due to insufficient disk space while there is a single merge task enqueued
321+
assertThat(threadPoolMergeExecutorService.getMergeTasksQueueLength(), equalTo(1));
322+
assertTrue(threadPoolMergeExecutorService.isMergingBlockedDueToInsufficientDiskSpace());
323+
// telemetry says that there are indeed some segments enqueued to be merged
324+
testTelemetryPlugin.collect();
325+
assertThat(
326+
testTelemetryPlugin.getLongGaugeMeasurement(MergeMetrics.MERGE_SEGMENTS_QUEUED_USAGE).getLast().getLong(),
327+
greaterThan(0L)
328+
);
329+
// but still no merges are currently running
330+
assertThat(
331+
testTelemetryPlugin.getLongGaugeMeasurement(MergeMetrics.MERGE_SEGMENTS_RUNNING_USAGE).getLast().getLong(),
332+
equalTo(0L)
333+
);
334+
// indices stats also says that no merge is currently running (blocked merges are NOT considered as "running")
335+
IndicesStatsResponse indicesStatsResponse = client().admin().indices().prepareStats(indexName).setMerge(true).get();
336+
long currentMergeCount = indicesStatsResponse.getIndices().get(indexName).getPrimaries().merge.getCurrent();
337+
assertThat(currentMergeCount, equalTo(0L));
338+
});
339+
// the force merge call is still blocked
340+
assertFalse(forceMergeBeforeRelocationFuture.isCancelled());
341+
assertFalse(forceMergeBeforeRelocationFuture.isDone());
342+
// merge executor still confirms merging is blocked due to insufficient disk space
343+
assertTrue(threadPoolMergeExecutorService.isMergingBlockedDueToInsufficientDiskSpace());
344+
IndicesSegmentResponse indicesSegmentResponseBeforeRelocation = indicesAdmin().prepareSegments(indexName).get();
345+
// the index should have more than 1 segments at this stage
346+
assertThat(
347+
indicesSegmentResponseBeforeRelocation.getIndices().get(indexName).iterator().next().shards()[0].getSegments(),
348+
iterableWithSize(greaterThan(1))
349+
);
350+
// start another node
351+
final String node2 = internalCluster().startNode();
352+
ensureStableCluster(2);
353+
setTotalSpace(node2, Long.MAX_VALUE);
354+
// relocate the shard from node1 to node2
355+
ClusterRerouteUtils.reroute(client(), new MoveAllocationCommand(indexName, 0, node1, node2, Metadata.DEFAULT_PROJECT_ID));
356+
ClusterHealthResponse clusterHealthResponse = clusterAdmin().prepareHealth(TEST_REQUEST_TIMEOUT)
357+
.setWaitForEvents(Priority.LANGUID)
358+
.setWaitForNoRelocatingShards(true)
359+
.setTimeout(ACCEPTABLE_RELOCATION_TIME)
360+
.get();
361+
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
362+
// the force merge call is now unblocked
363+
assertBusy(() -> {
364+
assertTrue(forceMergeBeforeRelocationFuture.isDone());
365+
assertFalse(forceMergeBeforeRelocationFuture.isCancelled());
366+
});
367+
// there is some merging going on in the {@code PostRecoveryMerger} after recovery, but that's not guaranteeing us a single segment,
368+
// so let's trigger a force merge to 1 segment again (this one should succeed promptly)
369+
indicesAdmin().prepareForceMerge(indexName).setMaxNumSegments(1).get();
370+
IndicesSegmentResponse indicesSegmentResponseAfterRelocation = indicesAdmin().prepareSegments(indexName).get();
371+
// assert there's only one segment now
372+
assertThat(
373+
indicesSegmentResponseAfterRelocation.getIndices().get(indexName).iterator().next().shards()[0].getSegments(),
374+
iterableWithSize(1)
375+
);
376+
// also assert that the shard was indeed moved to a different node
377+
assertThat(
378+
indicesSegmentResponseAfterRelocation.getIndices().get(indexName).iterator().next().shards()[0].getShardRouting()
379+
.currentNodeId(),
380+
not(
381+
equalTo(
382+
indicesSegmentResponseBeforeRelocation.getIndices().get(indexName).iterator().next().shards()[0].getShardRouting()
383+
.currentNodeId()
384+
)
385+
)
386+
);
387+
}
388+
269389
public void setTotalSpace(String dataNodeName, long totalSpace) {
270390
getTestFileStore(dataNodeName).setTotalSpace(totalSpace);
271391
refreshClusterInfo();

0 commit comments

Comments
 (0)