|  | 
| 10 | 10 | package org.elasticsearch.index.engine; | 
| 11 | 11 | 
 | 
| 12 | 12 | import org.elasticsearch.action.ActionFuture; | 
|  | 13 | +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; | 
| 13 | 14 | import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; | 
| 14 | 15 | import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; | 
|  | 16 | +import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteUtils; | 
|  | 17 | +import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse; | 
| 15 | 18 | import org.elasticsearch.action.admin.indices.segments.ShardSegments; | 
| 16 | 19 | import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; | 
| 17 | 20 | import org.elasticsearch.action.support.broadcast.BroadcastResponse; | 
| 18 | 21 | import org.elasticsearch.cluster.DiskUsageIntegTestCase; | 
| 19 | 22 | import org.elasticsearch.cluster.metadata.IndexMetadata; | 
|  | 23 | +import org.elasticsearch.cluster.metadata.Metadata; | 
| 20 | 24 | import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings; | 
|  | 25 | +import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand; | 
|  | 26 | +import org.elasticsearch.common.Priority; | 
| 21 | 27 | import org.elasticsearch.common.settings.Settings; | 
| 22 | 28 | import org.elasticsearch.common.util.concurrent.EsExecutors; | 
|  | 29 | +import org.elasticsearch.core.TimeValue; | 
| 23 | 30 | import org.elasticsearch.index.IndexNotFoundException; | 
| 24 | 31 | import org.elasticsearch.indices.IndicesService; | 
| 25 | 32 | import org.elasticsearch.plugins.Plugin; | 
|  | 
| 33 | 40 | import java.util.Collection; | 
| 34 | 41 | import java.util.List; | 
| 35 | 42 | import java.util.Locale; | 
|  | 43 | +import java.util.concurrent.TimeUnit; | 
| 36 | 44 | import java.util.stream.IntStream; | 
| 37 | 45 | 
 | 
| 38 | 46 | import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; | 
| 39 | 47 | import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; | 
| 40 | 48 | import static org.hamcrest.Matchers.equalTo; | 
| 41 | 49 | import static org.hamcrest.Matchers.greaterThan; | 
|  | 50 | +import static org.hamcrest.Matchers.iterableWithSize; | 
| 42 | 51 | import static org.hamcrest.Matchers.lessThan; | 
|  | 52 | +import static org.hamcrest.Matchers.not; | 
| 43 | 53 | 
 | 
| 44 | 54 | @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) | 
| 45 | 55 | public class MergeWithLowDiskSpaceIT extends DiskUsageIntegTestCase { | 
|  | 56 | +    private final TimeValue ACCEPTABLE_RELOCATION_TIME = new TimeValue(5, TimeUnit.MINUTES); | 
| 46 | 57 |     protected static long MERGE_DISK_HIGH_WATERMARK_BYTES; | 
| 47 | 58 | 
 | 
| 48 | 59 |     @BeforeClass | 
| @@ -266,6 +277,118 @@ public void testForceMergeIsBlockedThenUnblocked() throws Exception { | 
| 266 | 277 |         assertAcked(indicesAdmin().prepareDelete(indexName).get()); | 
| 267 | 278 |     } | 
| 268 | 279 | 
 | 
|  | 280 | +    public void testRelocationWhileForceMerging() throws Exception { | 
|  | 281 | +        final String node1 = internalCluster().startNode(); | 
|  | 282 | +        ensureStableCluster(1); | 
|  | 283 | +        setTotalSpace(node1, Long.MAX_VALUE); | 
|  | 284 | +        String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); | 
|  | 285 | +        createIndex( | 
|  | 286 | +            indexName, | 
|  | 287 | +            Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).build() | 
|  | 288 | +        ); | 
|  | 289 | +        // get current disk space usage (for all indices on the node) | 
|  | 290 | +        IndicesStatsResponse stats = indicesAdmin().prepareStats().clear().setStore(true).get(); | 
|  | 291 | +        long usedDiskSpaceAfterIndexing = stats.getTotal().getStore().sizeInBytes(); | 
|  | 292 | +        // restrict the total disk space such that the next merge does not have sufficient disk space | 
|  | 293 | +        long insufficientTotalDiskSpace = usedDiskSpaceAfterIndexing + MERGE_DISK_HIGH_WATERMARK_BYTES - randomLongBetween(1L, 10L); | 
|  | 294 | +        setTotalSpace(node1, insufficientTotalDiskSpace); | 
|  | 295 | +        // node stats' FS stats should report that there is insufficient disk space available | 
|  | 296 | +        assertBusy(() -> { | 
|  | 297 | +            NodesStatsResponse nodesStatsResponse = client().admin().cluster().prepareNodesStats().setFs(true).get(); | 
|  | 298 | +            assertThat(nodesStatsResponse.getNodes().size(), equalTo(1)); | 
|  | 299 | +            NodeStats nodeStats = nodesStatsResponse.getNodes().get(0); | 
|  | 300 | +            assertThat(nodeStats.getFs().getTotal().getTotal().getBytes(), equalTo(insufficientTotalDiskSpace)); | 
|  | 301 | +            assertThat(nodeStats.getFs().getTotal().getAvailable().getBytes(), lessThan(MERGE_DISK_HIGH_WATERMARK_BYTES)); | 
|  | 302 | +        }); | 
|  | 303 | +        int indexingRounds = randomIntBetween(5, 10); | 
|  | 304 | +        while (indexingRounds-- > 0) { | 
|  | 305 | +            indexRandom( | 
|  | 306 | +                true, | 
|  | 307 | +                true, | 
|  | 308 | +                true, | 
|  | 309 | +                false, | 
|  | 310 | +                IntStream.range(1, randomIntBetween(5, 10)) | 
|  | 311 | +                    .mapToObj(i -> prepareIndex(indexName).setSource("field", randomAlphaOfLength(50))) | 
|  | 312 | +                    .toList() | 
|  | 313 | +            ); | 
|  | 314 | +        } | 
|  | 315 | +        // the max segments argument makes it a blocking call | 
|  | 316 | +        ActionFuture<BroadcastResponse> forceMergeBeforeRelocationFuture = indicesAdmin().prepareForceMerge(indexName) | 
|  | 317 | +            .setMaxNumSegments(1) | 
|  | 318 | +            .execute(); | 
|  | 319 | +        ThreadPoolMergeExecutorService threadPoolMergeExecutorService = internalCluster().getInstance(IndicesService.class, node1) | 
|  | 320 | +            .getThreadPoolMergeExecutorService(); | 
|  | 321 | +        TestTelemetryPlugin testTelemetryPlugin = getTelemetryPlugin(node1); | 
|  | 322 | +        assertBusy(() -> { | 
|  | 323 | +            // merge executor says merging is blocked due to insufficient disk space while there is a single merge task enqueued | 
|  | 324 | +            assertThat(threadPoolMergeExecutorService.getMergeTasksQueueLength(), equalTo(1)); | 
|  | 325 | +            assertTrue(threadPoolMergeExecutorService.isMergingBlockedDueToInsufficientDiskSpace()); | 
|  | 326 | +            // telemetry says that there are indeed some segments enqueued to be merged | 
|  | 327 | +            testTelemetryPlugin.collect(); | 
|  | 328 | +            assertThat( | 
|  | 329 | +                testTelemetryPlugin.getLongGaugeMeasurement(MergeMetrics.MERGE_SEGMENTS_QUEUED_USAGE).getLast().getLong(), | 
|  | 330 | +                greaterThan(0L) | 
|  | 331 | +            ); | 
|  | 332 | +            // but still no merges are currently running | 
|  | 333 | +            assertThat( | 
|  | 334 | +                testTelemetryPlugin.getLongGaugeMeasurement(MergeMetrics.MERGE_SEGMENTS_RUNNING_USAGE).getLast().getLong(), | 
|  | 335 | +                equalTo(0L) | 
|  | 336 | +            ); | 
|  | 337 | +            // indices stats also says that no merge is currently running (blocked merges are NOT considered as "running") | 
|  | 338 | +            IndicesStatsResponse indicesStatsResponse = client().admin().indices().prepareStats(indexName).setMerge(true).get(); | 
|  | 339 | +            long currentMergeCount = indicesStatsResponse.getIndices().get(indexName).getPrimaries().merge.getCurrent(); | 
|  | 340 | +            assertThat(currentMergeCount, equalTo(0L)); | 
|  | 341 | +        }); | 
|  | 342 | +        // the force merge call is still blocked | 
|  | 343 | +        assertFalse(forceMergeBeforeRelocationFuture.isCancelled()); | 
|  | 344 | +        assertFalse(forceMergeBeforeRelocationFuture.isDone()); | 
|  | 345 | +        // merge executor still confirms merging is blocked due to insufficient disk space | 
|  | 346 | +        assertTrue(threadPoolMergeExecutorService.isMergingBlockedDueToInsufficientDiskSpace()); | 
|  | 347 | +        IndicesSegmentResponse indicesSegmentResponseBeforeRelocation = indicesAdmin().prepareSegments(indexName).get(); | 
|  | 348 | +        // the index should have more than 1 segments at this stage | 
|  | 349 | +        assertThat( | 
|  | 350 | +            indicesSegmentResponseBeforeRelocation.getIndices().get(indexName).iterator().next().shards()[0].getSegments(), | 
|  | 351 | +            iterableWithSize(greaterThan(1)) | 
|  | 352 | +        ); | 
|  | 353 | +        // start another node | 
|  | 354 | +        final String node2 = internalCluster().startNode(); | 
|  | 355 | +        ensureStableCluster(2); | 
|  | 356 | +        setTotalSpace(node2, Long.MAX_VALUE); | 
|  | 357 | +        // relocate the shard from node1 to node2 | 
|  | 358 | +        ClusterRerouteUtils.reroute(client(), new MoveAllocationCommand(indexName, 0, node1, node2, Metadata.DEFAULT_PROJECT_ID)); | 
|  | 359 | +        ClusterHealthResponse clusterHealthResponse = clusterAdmin().prepareHealth(TEST_REQUEST_TIMEOUT) | 
|  | 360 | +            .setWaitForEvents(Priority.LANGUID) | 
|  | 361 | +            .setWaitForNoRelocatingShards(true) | 
|  | 362 | +            .setTimeout(ACCEPTABLE_RELOCATION_TIME) | 
|  | 363 | +            .get(); | 
|  | 364 | +        assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); | 
|  | 365 | +        // the force merge call is now unblocked | 
|  | 366 | +        assertBusy(() -> { | 
|  | 367 | +            assertTrue(forceMergeBeforeRelocationFuture.isDone()); | 
|  | 368 | +            assertFalse(forceMergeBeforeRelocationFuture.isCancelled()); | 
|  | 369 | +        }); | 
|  | 370 | +        // there is some merging going on in the {@code PostRecoveryMerger} after recovery, but that's not guaranteeing us a single segment, | 
|  | 371 | +        // so let's trigger a force merge to 1 segment again (this one should succeed promptly) | 
|  | 372 | +        indicesAdmin().prepareForceMerge(indexName).setMaxNumSegments(1).get(); | 
|  | 373 | +        IndicesSegmentResponse indicesSegmentResponseAfterRelocation = indicesAdmin().prepareSegments(indexName).get(); | 
|  | 374 | +        // assert there's only one segment now | 
|  | 375 | +        assertThat( | 
|  | 376 | +            indicesSegmentResponseAfterRelocation.getIndices().get(indexName).iterator().next().shards()[0].getSegments(), | 
|  | 377 | +            iterableWithSize(1) | 
|  | 378 | +        ); | 
|  | 379 | +        // also assert that the shard was indeed moved to a different node | 
|  | 380 | +        assertThat( | 
|  | 381 | +            indicesSegmentResponseAfterRelocation.getIndices().get(indexName).iterator().next().shards()[0].getShardRouting() | 
|  | 382 | +                .currentNodeId(), | 
|  | 383 | +            not( | 
|  | 384 | +                equalTo( | 
|  | 385 | +                    indicesSegmentResponseBeforeRelocation.getIndices().get(indexName).iterator().next().shards()[0].getShardRouting() | 
|  | 386 | +                        .currentNodeId() | 
|  | 387 | +                ) | 
|  | 388 | +            ) | 
|  | 389 | +        ); | 
|  | 390 | +    } | 
|  | 391 | + | 
| 269 | 392 |     public void setTotalSpace(String dataNodeName, long totalSpace) { | 
| 270 | 393 |         getTestFileStore(dataNodeName).setTotalSpace(totalSpace); | 
| 271 | 394 |         refreshClusterInfo(); | 
|  | 
0 commit comments