From 0eeb2d0e35e90490c02e33f0133efed4ba085b18 Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 11 Apr 2025 19:52:03 +0100 Subject: [PATCH 1/5] Fix shard size of initializing restored shard For shards being restored from a snapshot we use `SnapshotShardSizeInfo` to track their sizes while they're unassigned, and then use `ShardRouting#expectedShardSize` when they start to recover. However we were incorrectly ignoring the `ShardRouting#expectedShardSize` value when accounting for the movements of shards in the `ClusterInfoSimulator`, which would sometimes cause us to assign more shards to a node than its disk space should have allowed. Closes #105331 --- .../decider/DiskThresholdDeciderIT.java | 118 ++++++++++++++++-- .../cluster/ClusterInfoSimulator.java | 2 +- 2 files changed, 109 insertions(+), 11 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderIT.java index 8b8f6a358ad05..7549a7be67073 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderIT.java @@ -13,6 +13,7 @@ import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; import org.elasticsearch.action.admin.indices.stats.ShardStats; +import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.cluster.ClusterInfoService; import org.elasticsearch.cluster.ClusterInfoServiceUtils; import org.elasticsearch.cluster.DiskUsageIntegTestCase; @@ -34,6 +35,7 @@ import org.elasticsearch.snapshots.RestoreInfo; import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.snapshots.SnapshotState; +import org.elasticsearch.test.ClusterServiceUtils; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.junit.annotations.TestIssueLogging; import org.hamcrest.Matcher; @@ -43,6 +45,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -56,6 +59,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.in; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThanOrEqualTo; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) public class DiskThresholdDeciderIT extends DiskUsageIntegTestCase { @@ -163,16 +167,6 @@ public void testRestoreSnapshotAllocationDoesNotExceedWatermark() throws Excepti assertBusyWithDiskUsageRefresh(dataNode0Id, indexName, contains(in(shardSizes.getSmallestShardIds()))); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/105331") - @TestIssueLogging( - value = "org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceComputer:TRACE," - + "org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceReconciler:DEBUG," - + "org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator:TRACE," - + "org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator:TRACE," - + "org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders:TRACE," - + "org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider:TRACE", - issueUrl = "https://github.com/elastic/elasticsearch/issues/105331" - ) public void testRestoreSnapshotAllocationDoesNotExceedWatermarkWithMultipleShards() throws Exception { internalCluster().startMasterOnlyNode(); internalCluster().startDataOnlyNode(); @@ -230,6 +224,110 @@ public void testRestoreSnapshotAllocationDoesNotExceedWatermarkWithMultipleShard assertBusyWithDiskUsageRefresh(dataNode0Id, indexName, contains(in(shardSizes.getShardIdsWithSizeSmallerOrEqual(usableSpace)))); } + public void testRestoreSnapshotAllocationDoesNotExceedWatermarkWithMultipleRestores() { + internalCluster().startMasterOnlyNode(); + final String dataNodeName = internalCluster().startDataOnlyNode(); + internalCluster().startDataOnlyNode(); + ensureStableCluster(3); + + assertAcked( + clusterAdmin().preparePutRepository(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, "repo") + .setType(FsRepository.TYPE) + .setSettings(Settings.builder().put("location", randomRepoPath()).put("compress", randomBoolean())) + ); + + final AtomicBoolean allowRelocations = new AtomicBoolean(true); + final InternalClusterInfoService clusterInfoService = getInternalClusterInfoService(); + internalCluster().getCurrentMasterNodeInstance(ClusterService.class).addListener(event -> { + ClusterInfoServiceUtils.refresh(clusterInfoService); + if (allowRelocations.get() == false) { + assertThat( + "Expects no relocating shards but got: " + event.state().getRoutingNodes(), + numberOfShardsWithState(event.state().getRoutingNodes(), ShardRoutingState.RELOCATING), + equalTo(0) + ); + } + }); + + final String indexName = randomIdentifier(); + createIndex(indexName, indexSettings(6, 0).put(INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING.getKey(), "0ms").build()); + var shardSizes = createReasonableSizedShards(indexName); + + final CreateSnapshotResponse createSnapshotResponse = clusterAdmin().prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, "repo", "snap") + .setIndices(indexName) + .setWaitForCompletion(true) + .get(); + final SnapshotInfo snapshotInfo = createSnapshotResponse.getSnapshotInfo(); + assertThat(snapshotInfo.successfulShards(), is(snapshotInfo.totalShards())); + assertThat(snapshotInfo.state(), is(SnapshotState.SUCCESS)); + + assertAcked(indicesAdmin().prepareDelete(indexName).get()); + updateClusterSettings(Settings.builder().put(CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), Rebalance.NONE.toString())); + allowRelocations.set(false); + + // reduce disk size of one data node so that only one shard copy fits there, forcing all the other shards to be assigned to the + // other data node + getTestFileStore(dataNodeName).setTotalSpace(shardSizes.getSmallestShardSize() * 2 + WATERMARK_BYTES - 1L); + refreshDiskUsage(); + + // We're going to restore the index twice in quick succession and verify that we don't assign more than one shard in total to the + // chosen node, but to do this we have to work backwards: first we have to set up listeners to react to events and then finally we + // trigger the whole chain by starting the first restore. + final var copyIndexName = indexName + "-copy"; + + // set up a listener that explicitly forbids more than one shard to be assigned to the tiny node + final String dataNodeId = internalCluster().getInstance(NodeEnvironment.class, dataNodeName).nodeId(); + final var allShardsActiveListener = ClusterServiceUtils.addTemporaryStateListener(cs -> { + assertThat(cs.getRoutingNodes().toString(), cs.getRoutingNodes().node(dataNodeId).size(), lessThanOrEqualTo(1)); + var seenCopy = false; + for (final IndexRoutingTable indexRoutingTable : cs.routingTable()) { + if (indexRoutingTable.getIndex().getName().equals(copyIndexName)) { + seenCopy = true; + } + if (indexRoutingTable.allShardsActive() == false) { + return false; + } + } + return seenCopy; // only remove this listener when we've started both restores and all the resulting shards are complete + }); + + // set up a listener which waits for the shards from the first restore to start initializing and then kick off another restore + final var secondRestoreCompleteLatch = new CountDownLatch(1); + final var secondRestoreStartedListener = ClusterServiceUtils.addTemporaryStateListener(cs -> { + final var indexRoutingTable = cs.routingTable().index(indexName); + if (indexRoutingTable != null && indexRoutingTable.shardsWithState(ShardRoutingState.INITIALIZING).isEmpty() == false) { + clusterAdmin().prepareRestoreSnapshot(TEST_REQUEST_TIMEOUT, "repo", "snap") + .setWaitForCompletion(true) + .setRenamePattern(indexName) + .setRenameReplacement(indexName + "-copy") + .execute(ActionTestUtils.assertNoFailureListener(restoreSnapshotResponse -> { + final RestoreInfo restoreInfo = restoreSnapshotResponse.getRestoreInfo(); + assertThat(restoreInfo.successfulShards(), is(snapshotInfo.totalShards())); + assertThat(restoreInfo.failedShards(), is(0)); + secondRestoreCompleteLatch.countDown(); + })); + return true; + } + return false; + }); + + // now set the ball rolling by doing the first restore, waiting for it to complete + final RestoreSnapshotResponse restoreSnapshotResponse = clusterAdmin().prepareRestoreSnapshot(TEST_REQUEST_TIMEOUT, "repo", "snap") + .setWaitForCompletion(true) + .get(); + final RestoreInfo restoreInfo = restoreSnapshotResponse.getRestoreInfo(); + assertThat(restoreInfo.successfulShards(), is(snapshotInfo.totalShards())); + assertThat(restoreInfo.failedShards(), is(0)); + + // wait for the second restore to complete too + safeAwait(secondRestoreStartedListener); + safeAwait(secondRestoreCompleteLatch); + + // wait for all the shards to finish moving + safeAwait(allShardsActiveListener); + ensureGreen(indexName, indexName + "-copy"); + } + private Set getShardIds(final String nodeId, final String indexName) { final Set shardIds = new HashSet<>(); final IndexRoutingTable indexRoutingTable = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT) diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java b/server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java index a8feab6542da8..17cf5c7b8b7c7 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java @@ -92,7 +92,7 @@ public void simulateShardStarted(ShardRouting shard) { var project = allocation.metadata().projectFor(shard.index()); var size = getExpectedShardSize( shard, - UNAVAILABLE_EXPECTED_SHARD_SIZE, + shard.getExpectedShardSize(), getClusterInfo(), allocation.snapshotShardSizeInfo(), project, From dac24173334d6bdc214be81c14dc55ec1f5f4c11 Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 14 Apr 2025 16:00:32 +0100 Subject: [PATCH 2/5] Update docs/changelog/126783.yaml --- docs/changelog/126783.yaml | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 docs/changelog/126783.yaml diff --git a/docs/changelog/126783.yaml b/docs/changelog/126783.yaml new file mode 100644 index 0000000000000..ac91c7cfd412b --- /dev/null +++ b/docs/changelog/126783.yaml @@ -0,0 +1,6 @@ +pr: 126783 +summary: Fix shard size of initializing restored shard +area: Allocation +type: bug +issues: + - 105331 From 8811ec5a12cef5db6a44256b188cf2fd8d68aa86 Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Mon, 14 Apr 2025 15:20:58 +0000 Subject: [PATCH 3/5] [CI] Auto commit changes from spotless --- .../routing/allocation/decider/DiskThresholdDeciderIT.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderIT.java index 7549a7be67073..59b6dbfdf9d2f 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderIT.java @@ -37,7 +37,6 @@ import org.elasticsearch.snapshots.SnapshotState; import org.elasticsearch.test.ClusterServiceUtils; import org.elasticsearch.test.ESIntegTestCase; -import org.elasticsearch.test.junit.annotations.TestIssueLogging; import org.hamcrest.Matcher; import java.util.Arrays; From f36ed4bc2b145c6348608710b69eeaa1cc12d4a8 Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 14 Apr 2025 22:30:47 +0100 Subject: [PATCH 4/5] Update server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderIT.java Co-authored-by: Jeremy Dahlgren --- .../routing/allocation/decider/DiskThresholdDeciderIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderIT.java index 59b6dbfdf9d2f..38dc6f14641b8 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderIT.java @@ -250,7 +250,7 @@ public void testRestoreSnapshotAllocationDoesNotExceedWatermarkWithMultipleResto final String indexName = randomIdentifier(); createIndex(indexName, indexSettings(6, 0).put(INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING.getKey(), "0ms").build()); - var shardSizes = createReasonableSizedShards(indexName); + final var shardSizes = createReasonableSizedShards(indexName); final CreateSnapshotResponse createSnapshotResponse = clusterAdmin().prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, "repo", "snap") .setIndices(indexName) From 85b426752bcb1a46328a065536657a7ea5455574 Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 17 Apr 2025 09:38:40 +0100 Subject: [PATCH 5/5] Remove unnecessary test --- .../decider/DiskThresholdDeciderIT.java | 90 ++++--------------- 1 file changed, 19 insertions(+), 71 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderIT.java index 38dc6f14641b8..dc0281f40bb6c 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderIT.java @@ -56,6 +56,7 @@ import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.in; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThanOrEqualTo; @@ -166,63 +167,6 @@ public void testRestoreSnapshotAllocationDoesNotExceedWatermark() throws Excepti assertBusyWithDiskUsageRefresh(dataNode0Id, indexName, contains(in(shardSizes.getSmallestShardIds()))); } - public void testRestoreSnapshotAllocationDoesNotExceedWatermarkWithMultipleShards() throws Exception { - internalCluster().startMasterOnlyNode(); - internalCluster().startDataOnlyNode(); - final String dataNodeName = internalCluster().startDataOnlyNode(); - ensureStableCluster(3); - - assertAcked( - clusterAdmin().preparePutRepository(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, "repo") - .setType(FsRepository.TYPE) - .setSettings(Settings.builder().put("location", randomRepoPath()).put("compress", randomBoolean())) - ); - - final AtomicBoolean allowRelocations = new AtomicBoolean(true); - final InternalClusterInfoService clusterInfoService = getInternalClusterInfoService(); - internalCluster().getCurrentMasterNodeInstance(ClusterService.class).addListener(event -> { - ClusterInfoServiceUtils.refresh(clusterInfoService); - if (allowRelocations.get() == false) { - assertThat( - "Expects no relocating shards but got: " + event.state().getRoutingNodes(), - numberOfShardsWithState(event.state().getRoutingNodes(), ShardRoutingState.RELOCATING), - equalTo(0) - ); - } - }); - - final String dataNode0Id = internalCluster().getInstance(NodeEnvironment.class, dataNodeName).nodeId(); - - final String indexName = randomIdentifier(); - createIndex(indexName, indexSettings(6, 0).put(INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING.getKey(), "0ms").build()); - var shardSizes = createReasonableSizedShards(indexName); - - final CreateSnapshotResponse createSnapshotResponse = clusterAdmin().prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, "repo", "snap") - .setWaitForCompletion(true) - .get(); - final SnapshotInfo snapshotInfo = createSnapshotResponse.getSnapshotInfo(); - assertThat(snapshotInfo.successfulShards(), is(snapshotInfo.totalShards())); - assertThat(snapshotInfo.state(), is(SnapshotState.SUCCESS)); - - assertAcked(indicesAdmin().prepareDelete(indexName).get()); - updateClusterSettings(Settings.builder().put(CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), Rebalance.NONE.toString())); - allowRelocations.set(false); - - // reduce disk size of node 0 so that only 1 of 2 smallest shards can be allocated - var usableSpace = shardSizes.sizes().get(1).size(); - getTestFileStore(dataNodeName).setTotalSpace(usableSpace + WATERMARK_BYTES); - refreshDiskUsage(); - - final RestoreSnapshotResponse restoreSnapshotResponse = clusterAdmin().prepareRestoreSnapshot(TEST_REQUEST_TIMEOUT, "repo", "snap") - .setWaitForCompletion(true) - .get(); - final RestoreInfo restoreInfo = restoreSnapshotResponse.getRestoreInfo(); - assertThat(restoreInfo.successfulShards(), is(snapshotInfo.totalShards())); - assertThat(restoreInfo.failedShards(), is(0)); - - assertBusyWithDiskUsageRefresh(dataNode0Id, indexName, contains(in(shardSizes.getShardIdsWithSizeSmallerOrEqual(usableSpace)))); - } - public void testRestoreSnapshotAllocationDoesNotExceedWatermarkWithMultipleRestores() { internalCluster().startMasterOnlyNode(); final String dataNodeName = internalCluster().startDataOnlyNode(); @@ -235,18 +179,9 @@ public void testRestoreSnapshotAllocationDoesNotExceedWatermarkWithMultipleResto .setSettings(Settings.builder().put("location", randomRepoPath()).put("compress", randomBoolean())) ); - final AtomicBoolean allowRelocations = new AtomicBoolean(true); final InternalClusterInfoService clusterInfoService = getInternalClusterInfoService(); - internalCluster().getCurrentMasterNodeInstance(ClusterService.class).addListener(event -> { - ClusterInfoServiceUtils.refresh(clusterInfoService); - if (allowRelocations.get() == false) { - assertThat( - "Expects no relocating shards but got: " + event.state().getRoutingNodes(), - numberOfShardsWithState(event.state().getRoutingNodes(), ShardRoutingState.RELOCATING), - equalTo(0) - ); - } - }); + internalCluster().getCurrentMasterNodeInstance(ClusterService.class) + .addListener(event -> ClusterInfoServiceUtils.refresh(clusterInfoService)); final String indexName = randomIdentifier(); createIndex(indexName, indexSettings(6, 0).put(INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING.getKey(), "0ms").build()); @@ -262,11 +197,20 @@ public void testRestoreSnapshotAllocationDoesNotExceedWatermarkWithMultipleResto assertAcked(indicesAdmin().prepareDelete(indexName).get()); updateClusterSettings(Settings.builder().put(CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), Rebalance.NONE.toString())); - allowRelocations.set(false); + + // Verify that from this point on we do not do any rebalancing + internalCluster().getCurrentMasterNodeInstance(ClusterService.class).addListener(event -> { + assertThat( + "Expects no relocating shards but got: " + event.state().getRoutingNodes(), + numberOfShardsWithState(event.state().getRoutingNodes(), ShardRoutingState.RELOCATING), + equalTo(0) + ); + }); // reduce disk size of one data node so that only one shard copy fits there, forcing all the other shards to be assigned to the // other data node - getTestFileStore(dataNodeName).setTotalSpace(shardSizes.getSmallestShardSize() * 2 + WATERMARK_BYTES - 1L); + final var usableSpace = randomLongBetween(shardSizes.getSmallestShardSize(), shardSizes.getSmallestShardSize() * 2 - 1L); + getTestFileStore(dataNodeName).setTotalSpace(usableSpace + WATERMARK_BYTES); refreshDiskUsage(); // We're going to restore the index twice in quick succession and verify that we don't assign more than one shard in total to the @@ -275,7 +219,7 @@ public void testRestoreSnapshotAllocationDoesNotExceedWatermarkWithMultipleResto final var copyIndexName = indexName + "-copy"; // set up a listener that explicitly forbids more than one shard to be assigned to the tiny node - final String dataNodeId = internalCluster().getInstance(NodeEnvironment.class, dataNodeName).nodeId(); + final var dataNodeId = internalCluster().getInstance(NodeEnvironment.class, dataNodeName).nodeId(); final var allShardsActiveListener = ClusterServiceUtils.addTemporaryStateListener(cs -> { assertThat(cs.getRoutingNodes().toString(), cs.getRoutingNodes().node(dataNodeId).size(), lessThanOrEqualTo(1)); var seenCopy = false; @@ -325,6 +269,10 @@ public void testRestoreSnapshotAllocationDoesNotExceedWatermarkWithMultipleResto // wait for all the shards to finish moving safeAwait(allShardsActiveListener); ensureGreen(indexName, indexName + "-copy"); + + final var tinyNodeShardIds = getShardIds(dataNodeId, indexName); + assertThat(tinyNodeShardIds, hasSize(1)); + assertThat(tinyNodeShardIds.iterator().next(), in(shardSizes.getShardIdsWithSizeSmallerOrEqual(usableSpace))); } private Set getShardIds(final String nodeId, final String indexName) {