Skip to content

Commit a053636

Browse files
Add TEST MergeWithLowDiskSpaceIT testRelocationWhileForceMerging (#131767)
This adds a test that covers relocation for shards that are running a force merge. Relates #93503
1 parent 21eab62 commit a053636

File tree

1 file changed

+120
-0
lines changed

1 file changed

+120
-0
lines changed

server/src/internalClusterTest/java/org/elasticsearch/index/engine/MergeWithLowDiskSpaceIT.java

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,23 @@
1010
package org.elasticsearch.index.engine;
1111

1212
import org.elasticsearch.action.ActionFuture;
13+
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
1314
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
1415
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
16+
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteUtils;
17+
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse;
1518
import org.elasticsearch.action.admin.indices.segments.ShardSegments;
1619
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
1720
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
1821
import org.elasticsearch.cluster.DiskUsageIntegTestCase;
1922
import org.elasticsearch.cluster.metadata.IndexMetadata;
23+
import org.elasticsearch.cluster.metadata.Metadata;
2024
import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings;
25+
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
26+
import org.elasticsearch.common.Priority;
2127
import org.elasticsearch.common.settings.Settings;
2228
import org.elasticsearch.common.util.concurrent.EsExecutors;
29+
import org.elasticsearch.core.TimeValue;
2330
import org.elasticsearch.index.IndexNotFoundException;
2431
import org.elasticsearch.indices.IndicesService;
2532
import org.elasticsearch.test.ESIntegTestCase;
@@ -28,16 +35,20 @@
2835

2936
import java.util.List;
3037
import java.util.Locale;
38+
import java.util.concurrent.TimeUnit;
3139
import java.util.stream.IntStream;
3240

3341
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
3442
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
3543
import static org.hamcrest.Matchers.equalTo;
3644
import static org.hamcrest.Matchers.greaterThan;
45+
import static org.hamcrest.Matchers.iterableWithSize;
3746
import static org.hamcrest.Matchers.lessThan;
47+
import static org.hamcrest.Matchers.not;
3848

3949
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
4050
public class MergeWithLowDiskSpaceIT extends DiskUsageIntegTestCase {
51+
private final TimeValue ACCEPTABLE_RELOCATION_TIME = new TimeValue(5, TimeUnit.MINUTES);
4152
protected static long MERGE_DISK_HIGH_WATERMARK_BYTES;
4253

4354
@BeforeClass
@@ -238,6 +249,115 @@ public void testForceMergeIsBlockedThenUnblocked() throws Exception {
238249
assertAcked(indicesAdmin().prepareDelete(indexName).get());
239250
}
240251

252+
public void testRelocationWhileForceMerging() throws Exception {
253+
final String node1 = internalCluster().startNode();
254+
ensureStableCluster(1);
255+
setTotalSpace(node1, Long.MAX_VALUE);
256+
String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
257+
prepareCreate(indexName, indexSettings(1, 0)).get();
258+
// get current disk space usage (for all indices on the node)
259+
IndicesStatsResponse stats = indicesAdmin().prepareStats().clear().setStore(true).get();
260+
long usedDiskSpaceAfterIndexing = stats.getTotal().getStore().sizeInBytes();
261+
// restrict the total disk space such that the next merge does not have sufficient disk space
262+
long insufficientTotalDiskSpace = usedDiskSpaceAfterIndexing + MERGE_DISK_HIGH_WATERMARK_BYTES - randomLongBetween(1L, 10L);
263+
setTotalSpace(node1, insufficientTotalDiskSpace);
264+
// node stats' FS stats should report that there is insufficient disk space available
265+
assertBusy(() -> {
266+
NodesStatsResponse nodesStatsResponse = client().admin().cluster().prepareNodesStats().setFs(true).get();
267+
assertThat(nodesStatsResponse.getNodes().size(), equalTo(1));
268+
NodeStats nodeStats = nodesStatsResponse.getNodes().get(0);
269+
assertThat(nodeStats.getFs().getTotal().getTotal().getBytes(), equalTo(insufficientTotalDiskSpace));
270+
assertThat(nodeStats.getFs().getTotal().getAvailable().getBytes(), lessThan(MERGE_DISK_HIGH_WATERMARK_BYTES));
271+
});
272+
int indexingRounds = randomIntBetween(5, 10);
273+
while (indexingRounds-- > 0) {
274+
indexRandom(
275+
true,
276+
true,
277+
true,
278+
false,
279+
IntStream.range(1, randomIntBetween(5, 10))
280+
.mapToObj(i -> prepareIndex(indexName).setSource("field", randomAlphaOfLength(50)))
281+
.toList()
282+
);
283+
}
284+
// the max segments argument makes it a blocking call
285+
ActionFuture<BroadcastResponse> forceMergeBeforeRelocationFuture = indicesAdmin().prepareForceMerge(indexName)
286+
.setMaxNumSegments(1)
287+
.execute();
288+
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = internalCluster().getInstance(IndicesService.class, node1)
289+
.getThreadPoolMergeExecutorService();
290+
TestTelemetryPlugin testTelemetryPlugin = getTelemetryPlugin(node1);
291+
assertBusy(() -> {
292+
// merge executor says merging is blocked due to insufficient disk space while there is a single merge task enqueued
293+
assertThat(threadPoolMergeExecutorService.getMergeTasksQueueLength(), equalTo(1));
294+
assertTrue(threadPoolMergeExecutorService.isMergingBlockedDueToInsufficientDiskSpace());
295+
// telemetry says that there are indeed some segments enqueued to be merged
296+
testTelemetryPlugin.collect();
297+
assertThat(
298+
testTelemetryPlugin.getLongGaugeMeasurement(MergeMetrics.MERGE_SEGMENTS_QUEUED_USAGE).getLast().getLong(),
299+
greaterThan(0L)
300+
);
301+
// but still no merges are currently running
302+
assertThat(
303+
testTelemetryPlugin.getLongGaugeMeasurement(MergeMetrics.MERGE_SEGMENTS_RUNNING_USAGE).getLast().getLong(),
304+
equalTo(0L)
305+
);
306+
// indices stats also says that no merge is currently running (blocked merges are NOT considered as "running")
307+
IndicesStatsResponse indicesStatsResponse = client().admin().indices().prepareStats(indexName).setMerge(true).get();
308+
long currentMergeCount = indicesStatsResponse.getIndices().get(indexName).getPrimaries().merge.getCurrent();
309+
assertThat(currentMergeCount, equalTo(0L));
310+
});
311+
// the force merge call is still blocked
312+
assertFalse(forceMergeBeforeRelocationFuture.isCancelled());
313+
assertFalse(forceMergeBeforeRelocationFuture.isDone());
314+
// merge executor still confirms merging is blocked due to insufficient disk space
315+
assertTrue(threadPoolMergeExecutorService.isMergingBlockedDueToInsufficientDiskSpace());
316+
IndicesSegmentResponse indicesSegmentResponseBeforeRelocation = indicesAdmin().prepareSegments(indexName).get();
317+
// the index should have more than 1 segments at this stage
318+
assertThat(
319+
indicesSegmentResponseBeforeRelocation.getIndices().get(indexName).iterator().next().shards()[0].getSegments(),
320+
iterableWithSize(greaterThan(1))
321+
);
322+
// start another node
323+
final String node2 = internalCluster().startNode();
324+
ensureStableCluster(2);
325+
setTotalSpace(node2, Long.MAX_VALUE);
326+
// relocate the shard from node1 to node2
327+
ClusterRerouteUtils.reroute(client(), new MoveAllocationCommand(indexName, 0, node1, node2, Metadata.DEFAULT_PROJECT_ID));
328+
ClusterHealthResponse clusterHealthResponse = clusterAdmin().prepareHealth(TEST_REQUEST_TIMEOUT)
329+
.setWaitForEvents(Priority.LANGUID)
330+
.setWaitForNoRelocatingShards(true)
331+
.setTimeout(ACCEPTABLE_RELOCATION_TIME)
332+
.get();
333+
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
334+
// the force merge call is now unblocked
335+
assertBusy(() -> {
336+
assertTrue(forceMergeBeforeRelocationFuture.isDone());
337+
assertFalse(forceMergeBeforeRelocationFuture.isCancelled());
338+
});
339+
// there is some merging going on in the {@code PostRecoveryMerger} after recovery, but that's not guaranteeing us a single segment,
340+
// so let's trigger a force merge to 1 segment again (this one should succeed promptly)
341+
indicesAdmin().prepareForceMerge(indexName).setMaxNumSegments(1).get();
342+
IndicesSegmentResponse indicesSegmentResponseAfterRelocation = indicesAdmin().prepareSegments(indexName).get();
343+
// assert there's only one segment now
344+
assertThat(
345+
indicesSegmentResponseAfterRelocation.getIndices().get(indexName).iterator().next().shards()[0].getSegments(),
346+
iterableWithSize(1)
347+
);
348+
// also assert that the shard was indeed moved to a different node
349+
assertThat(
350+
indicesSegmentResponseAfterRelocation.getIndices().get(indexName).iterator().next().shards()[0].getShardRouting()
351+
.currentNodeId(),
352+
not(
353+
equalTo(
354+
indicesSegmentResponseBeforeRelocation.getIndices().get(indexName).iterator().next().shards()[0].getShardRouting()
355+
.currentNodeId()
356+
)
357+
)
358+
);
359+
}
360+
241361
public void setTotalSpace(String dataNodeName, long totalSpace) {
242362
getTestFileStore(dataNodeName).setTotalSpace(totalSpace);
243363
refreshClusterInfo();

0 commit comments

Comments
 (0)