|
10 | 10 | package org.elasticsearch.index.engine;
|
11 | 11 |
|
12 | 12 | import org.elasticsearch.action.ActionFuture;
|
| 13 | +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; |
13 | 14 | import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
|
14 | 15 | import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
|
| 16 | +import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteUtils; |
| 17 | +import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse; |
15 | 18 | import org.elasticsearch.action.admin.indices.segments.ShardSegments;
|
16 | 19 | import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
|
17 | 20 | import org.elasticsearch.action.support.broadcast.BroadcastResponse;
|
18 | 21 | import org.elasticsearch.cluster.DiskUsageIntegTestCase;
|
19 | 22 | import org.elasticsearch.cluster.metadata.IndexMetadata;
|
| 23 | +import org.elasticsearch.cluster.metadata.Metadata; |
20 | 24 | import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings;
|
| 25 | +import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand; |
| 26 | +import org.elasticsearch.common.Priority; |
21 | 27 | import org.elasticsearch.common.settings.Settings;
|
22 | 28 | import org.elasticsearch.common.util.concurrent.EsExecutors;
|
| 29 | +import org.elasticsearch.core.TimeValue; |
23 | 30 | import org.elasticsearch.index.IndexNotFoundException;
|
24 | 31 | import org.elasticsearch.indices.IndicesService;
|
25 | 32 | import org.elasticsearch.plugins.Plugin;
|
|
33 | 40 | import java.util.Collection;
|
34 | 41 | import java.util.List;
|
35 | 42 | import java.util.Locale;
|
| 43 | +import java.util.concurrent.TimeUnit; |
36 | 44 | import java.util.stream.IntStream;
|
37 | 45 |
|
38 | 46 | import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
39 | 47 | import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
|
40 | 48 | import static org.hamcrest.Matchers.equalTo;
|
41 | 49 | import static org.hamcrest.Matchers.greaterThan;
|
| 50 | +import static org.hamcrest.Matchers.iterableWithSize; |
42 | 51 | import static org.hamcrest.Matchers.lessThan;
|
| 52 | +import static org.hamcrest.Matchers.not; |
43 | 53 |
|
44 | 54 | @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
|
45 | 55 | public class MergeWithLowDiskSpaceIT extends DiskUsageIntegTestCase {
|
| 56 | + private final TimeValue ACCEPTABLE_RELOCATION_TIME = new TimeValue(5, TimeUnit.MINUTES); |
46 | 57 | protected static long MERGE_DISK_HIGH_WATERMARK_BYTES;
|
47 | 58 |
|
48 | 59 | @BeforeClass
|
@@ -266,6 +277,118 @@ public void testForceMergeIsBlockedThenUnblocked() throws Exception {
|
266 | 277 | assertAcked(indicesAdmin().prepareDelete(indexName).get());
|
267 | 278 | }
|
268 | 279 |
|
| 280 | + public void testRelocationWhileForceMerging() throws Exception { |
| 281 | + final String node1 = internalCluster().startNode(); |
| 282 | + ensureStableCluster(1); |
| 283 | + setTotalSpace(node1, Long.MAX_VALUE); |
| 284 | + String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); |
| 285 | + createIndex( |
| 286 | + indexName, |
| 287 | + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).build() |
| 288 | + ); |
| 289 | + // get current disk space usage (for all indices on the node) |
| 290 | + IndicesStatsResponse stats = indicesAdmin().prepareStats().clear().setStore(true).get(); |
| 291 | + long usedDiskSpaceAfterIndexing = stats.getTotal().getStore().sizeInBytes(); |
| 292 | + // restrict the total disk space such that the next merge does not have sufficient disk space |
| 293 | + long insufficientTotalDiskSpace = usedDiskSpaceAfterIndexing + MERGE_DISK_HIGH_WATERMARK_BYTES - randomLongBetween(1L, 10L); |
| 294 | + setTotalSpace(node1, insufficientTotalDiskSpace); |
| 295 | + // node stats' FS stats should report that there is insufficient disk space available |
| 296 | + assertBusy(() -> { |
| 297 | + NodesStatsResponse nodesStatsResponse = client().admin().cluster().prepareNodesStats().setFs(true).get(); |
| 298 | + assertThat(nodesStatsResponse.getNodes().size(), equalTo(1)); |
| 299 | + NodeStats nodeStats = nodesStatsResponse.getNodes().get(0); |
| 300 | + assertThat(nodeStats.getFs().getTotal().getTotal().getBytes(), equalTo(insufficientTotalDiskSpace)); |
| 301 | + assertThat(nodeStats.getFs().getTotal().getAvailable().getBytes(), lessThan(MERGE_DISK_HIGH_WATERMARK_BYTES)); |
| 302 | + }); |
| 303 | + int indexingRounds = randomIntBetween(5, 10); |
| 304 | + while (indexingRounds-- > 0) { |
| 305 | + indexRandom( |
| 306 | + true, |
| 307 | + true, |
| 308 | + true, |
| 309 | + false, |
| 310 | + IntStream.range(1, randomIntBetween(5, 10)) |
| 311 | + .mapToObj(i -> prepareIndex(indexName).setSource("field", randomAlphaOfLength(50))) |
| 312 | + .toList() |
| 313 | + ); |
| 314 | + } |
| 315 | + // the max segments argument makes it a blocking call |
| 316 | + ActionFuture<BroadcastResponse> forceMergeBeforeRelocationFuture = indicesAdmin().prepareForceMerge(indexName) |
| 317 | + .setMaxNumSegments(1) |
| 318 | + .execute(); |
| 319 | + ThreadPoolMergeExecutorService threadPoolMergeExecutorService = internalCluster().getInstance(IndicesService.class, node1) |
| 320 | + .getThreadPoolMergeExecutorService(); |
| 321 | + TestTelemetryPlugin testTelemetryPlugin = getTelemetryPlugin(node1); |
| 322 | + assertBusy(() -> { |
| 323 | + // merge executor says merging is blocked due to insufficient disk space while there is a single merge task enqueued |
| 324 | + assertThat(threadPoolMergeExecutorService.getMergeTasksQueueLength(), equalTo(1)); |
| 325 | + assertTrue(threadPoolMergeExecutorService.isMergingBlockedDueToInsufficientDiskSpace()); |
| 326 | + // telemetry says that there are indeed some segments enqueued to be merged |
| 327 | + testTelemetryPlugin.collect(); |
| 328 | + assertThat( |
| 329 | + testTelemetryPlugin.getLongGaugeMeasurement(MergeMetrics.MERGE_SEGMENTS_QUEUED_USAGE).getLast().getLong(), |
| 330 | + greaterThan(0L) |
| 331 | + ); |
| 332 | + // but still no merges are currently running |
| 333 | + assertThat( |
| 334 | + testTelemetryPlugin.getLongGaugeMeasurement(MergeMetrics.MERGE_SEGMENTS_RUNNING_USAGE).getLast().getLong(), |
| 335 | + equalTo(0L) |
| 336 | + ); |
| 337 | + // indices stats also says that no merge is currently running (blocked merges are NOT considered as "running") |
| 338 | + IndicesStatsResponse indicesStatsResponse = client().admin().indices().prepareStats(indexName).setMerge(true).get(); |
| 339 | + long currentMergeCount = indicesStatsResponse.getIndices().get(indexName).getPrimaries().merge.getCurrent(); |
| 340 | + assertThat(currentMergeCount, equalTo(0L)); |
| 341 | + }); |
| 342 | + // the force merge call is still blocked |
| 343 | + assertFalse(forceMergeBeforeRelocationFuture.isCancelled()); |
| 344 | + assertFalse(forceMergeBeforeRelocationFuture.isDone()); |
| 345 | + // merge executor still confirms merging is blocked due to insufficient disk space |
| 346 | + assertTrue(threadPoolMergeExecutorService.isMergingBlockedDueToInsufficientDiskSpace()); |
| 347 | + IndicesSegmentResponse indicesSegmentResponseBeforeRelocation = indicesAdmin().prepareSegments(indexName).get(); |
| 348 | + // the index should have more than 1 segments at this stage |
| 349 | + assertThat( |
| 350 | + indicesSegmentResponseBeforeRelocation.getIndices().get(indexName).iterator().next().shards()[0].getSegments(), |
| 351 | + iterableWithSize(greaterThan(1)) |
| 352 | + ); |
| 353 | + // start another node |
| 354 | + final String node2 = internalCluster().startNode(); |
| 355 | + ensureStableCluster(2); |
| 356 | + setTotalSpace(node2, Long.MAX_VALUE); |
| 357 | + // relocate the shard from node1 to node2 |
| 358 | + ClusterRerouteUtils.reroute(client(), new MoveAllocationCommand(indexName, 0, node1, node2, Metadata.DEFAULT_PROJECT_ID)); |
| 359 | + ClusterHealthResponse clusterHealthResponse = clusterAdmin().prepareHealth(TEST_REQUEST_TIMEOUT) |
| 360 | + .setWaitForEvents(Priority.LANGUID) |
| 361 | + .setWaitForNoRelocatingShards(true) |
| 362 | + .setTimeout(ACCEPTABLE_RELOCATION_TIME) |
| 363 | + .get(); |
| 364 | + assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); |
| 365 | + // the force merge call is now unblocked |
| 366 | + assertBusy(() -> { |
| 367 | + assertTrue(forceMergeBeforeRelocationFuture.isDone()); |
| 368 | + assertFalse(forceMergeBeforeRelocationFuture.isCancelled()); |
| 369 | + }); |
| 370 | + // there is some merging going on in the {@code PostRecoveryMerger} after recovery, but that's not guaranteeing us a single segment, |
| 371 | + // so let's trigger a force merge to 1 segment again (this one should succeed promptly) |
| 372 | + indicesAdmin().prepareForceMerge(indexName).setMaxNumSegments(1).get(); |
| 373 | + IndicesSegmentResponse indicesSegmentResponseAfterRelocation = indicesAdmin().prepareSegments(indexName).get(); |
| 374 | + // assert there's only one segment now |
| 375 | + assertThat( |
| 376 | + indicesSegmentResponseAfterRelocation.getIndices().get(indexName).iterator().next().shards()[0].getSegments(), |
| 377 | + iterableWithSize(1) |
| 378 | + ); |
| 379 | + // also assert that the shard was indeed moved to a different node |
| 380 | + assertThat( |
| 381 | + indicesSegmentResponseAfterRelocation.getIndices().get(indexName).iterator().next().shards()[0].getShardRouting() |
| 382 | + .currentNodeId(), |
| 383 | + not( |
| 384 | + equalTo( |
| 385 | + indicesSegmentResponseBeforeRelocation.getIndices().get(indexName).iterator().next().shards()[0].getShardRouting() |
| 386 | + .currentNodeId() |
| 387 | + ) |
| 388 | + ) |
| 389 | + ); |
| 390 | + } |
| 391 | + |
269 | 392 | public void setTotalSpace(String dataNodeName, long totalSpace) {
|
270 | 393 | getTestFileStore(dataNodeName).setTotalSpace(totalSpace);
|
271 | 394 | refreshClusterInfo();
|
|
0 commit comments