-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Fix shard size of initializing restored shard #126783
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
0eeb2d0
dac2417
8811ec5
4e73411
f36ed4b
444b929
a5fd67a
85b4267
d47a190
014fac3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| pr: 126783 | ||
| summary: Fix shard size of initializing restored shard | ||
| area: Allocation | ||
| type: bug | ||
| issues: | ||
| - 105331 |
| Original file line number | Diff line number | Diff line change | ||
|---|---|---|---|---|
|
|
@@ -13,6 +13,7 @@ | |||
| import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; | ||||
| import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; | ||||
| import org.elasticsearch.action.admin.indices.stats.ShardStats; | ||||
| import org.elasticsearch.action.support.ActionTestUtils; | ||||
| import org.elasticsearch.cluster.ClusterInfoService; | ||||
| import org.elasticsearch.cluster.ClusterInfoServiceUtils; | ||||
| import org.elasticsearch.cluster.DiskUsageIntegTestCase; | ||||
|
|
@@ -34,6 +35,7 @@ | |||
| import org.elasticsearch.snapshots.RestoreInfo; | ||||
| import org.elasticsearch.snapshots.SnapshotInfo; | ||||
| import org.elasticsearch.snapshots.SnapshotState; | ||||
| import org.elasticsearch.test.ClusterServiceUtils; | ||||
| import org.elasticsearch.test.ESIntegTestCase; | ||||
| import org.elasticsearch.test.junit.annotations.TestIssueLogging; | ||||
| import org.hamcrest.Matcher; | ||||
|
|
@@ -43,6 +45,7 @@ | |||
| import java.util.HashSet; | ||||
| import java.util.List; | ||||
| import java.util.Set; | ||||
| import java.util.concurrent.CountDownLatch; | ||||
| import java.util.concurrent.TimeUnit; | ||||
| import java.util.concurrent.atomic.AtomicBoolean; | ||||
|
|
||||
|
|
@@ -56,6 +59,7 @@ | |||
| import static org.hamcrest.Matchers.equalTo; | ||||
| import static org.hamcrest.Matchers.in; | ||||
| import static org.hamcrest.Matchers.is; | ||||
| import static org.hamcrest.Matchers.lessThanOrEqualTo; | ||||
|
|
||||
| @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) | ||||
| public class DiskThresholdDeciderIT extends DiskUsageIntegTestCase { | ||||
|
|
@@ -163,16 +167,6 @@ public void testRestoreSnapshotAllocationDoesNotExceedWatermark() throws Excepti | |||
| assertBusyWithDiskUsageRefresh(dataNode0Id, indexName, contains(in(shardSizes.getSmallestShardIds()))); | ||||
| } | ||||
|
|
||||
| @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/105331") | ||||
| @TestIssueLogging( | ||||
| value = "org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceComputer:TRACE," | ||||
| + "org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceReconciler:DEBUG," | ||||
| + "org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator:TRACE," | ||||
| + "org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator:TRACE," | ||||
| + "org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders:TRACE," | ||||
| + "org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider:TRACE", | ||||
| issueUrl = "https://github.com/elastic/elasticsearch/issues/105331" | ||||
| ) | ||||
| public void testRestoreSnapshotAllocationDoesNotExceedWatermarkWithMultipleShards() throws Exception { | ||||
| internalCluster().startMasterOnlyNode(); | ||||
| internalCluster().startDataOnlyNode(); | ||||
|
|
@@ -230,6 +224,110 @@ public void testRestoreSnapshotAllocationDoesNotExceedWatermarkWithMultipleShard | |||
| assertBusyWithDiskUsageRefresh(dataNode0Id, indexName, contains(in(shardSizes.getShardIdsWithSizeSmallerOrEqual(usableSpace)))); | ||||
| } | ||||
|
|
||||
| public void testRestoreSnapshotAllocationDoesNotExceedWatermarkWithMultipleRestores() { | ||||
| internalCluster().startMasterOnlyNode(); | ||||
| final String dataNodeName = internalCluster().startDataOnlyNode(); | ||||
| internalCluster().startDataOnlyNode(); | ||||
| ensureStableCluster(3); | ||||
|
|
||||
| assertAcked( | ||||
| clusterAdmin().preparePutRepository(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, "repo") | ||||
| .setType(FsRepository.TYPE) | ||||
| .setSettings(Settings.builder().put("location", randomRepoPath()).put("compress", randomBoolean())) | ||||
| ); | ||||
|
|
||||
| final AtomicBoolean allowRelocations = new AtomicBoolean(true); | ||||
| final InternalClusterInfoService clusterInfoService = getInternalClusterInfoService(); | ||||
| internalCluster().getCurrentMasterNodeInstance(ClusterService.class).addListener(event -> { | ||||
| ClusterInfoServiceUtils.refresh(clusterInfoService); | ||||
| if (allowRelocations.get() == false) { | ||||
| assertThat( | ||||
| "Expects no relocating shards but got: " + event.state().getRoutingNodes(), | ||||
| numberOfShardsWithState(event.state().getRoutingNodes(), ShardRoutingState.RELOCATING), | ||||
| equalTo(0) | ||||
| ); | ||||
| } | ||||
| }); | ||||
|
|
||||
| final String indexName = randomIdentifier(); | ||||
| createIndex(indexName, indexSettings(6, 0).put(INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING.getKey(), "0ms").build()); | ||||
| var shardSizes = createReasonableSizedShards(indexName); | ||||
DaveCTurner marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||
|
|
||||
| final CreateSnapshotResponse createSnapshotResponse = clusterAdmin().prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, "repo", "snap") | ||||
| .setIndices(indexName) | ||||
| .setWaitForCompletion(true) | ||||
| .get(); | ||||
| final SnapshotInfo snapshotInfo = createSnapshotResponse.getSnapshotInfo(); | ||||
| assertThat(snapshotInfo.successfulShards(), is(snapshotInfo.totalShards())); | ||||
| assertThat(snapshotInfo.state(), is(SnapshotState.SUCCESS)); | ||||
JeremyDahlgren marked this conversation as resolved.
Show resolved
Hide resolved
|
||||
|
|
||||
| assertAcked(indicesAdmin().prepareDelete(indexName).get()); | ||||
| updateClusterSettings(Settings.builder().put(CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), Rebalance.NONE.toString())); | ||||
| allowRelocations.set(false); | ||||
|
|
||||
| // reduce disk size of one data node so that only one shard copy fits there, forcing all the other shards to be assigned to the | ||||
| // other data node | ||||
| getTestFileStore(dataNodeName).setTotalSpace(shardSizes.getSmallestShardSize() * 2 + WATERMARK_BYTES - 1L); | ||||
| refreshDiskUsage(); | ||||
|
|
||||
| // We're going to restore the index twice in quick succession and verify that we don't assign more than one shard in total to the | ||||
| // chosen node, but to do this we have to work backwards: first we have to set up listeners to react to events and then finally we | ||||
| // trigger the whole chain by starting the first restore. | ||||
| final var copyIndexName = indexName + "-copy"; | ||||
|
|
||||
| // set up a listener that explicitly forbids more than one shard to be assigned to the tiny node | ||||
| final String dataNodeId = internalCluster().getInstance(NodeEnvironment.class, dataNodeName).nodeId(); | ||||
| final var allShardsActiveListener = ClusterServiceUtils.addTemporaryStateListener(cs -> { | ||||
| assertThat(cs.getRoutingNodes().toString(), cs.getRoutingNodes().node(dataNodeId).size(), lessThanOrEqualTo(1)); | ||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oooph, RoutingNode#size() is an unhelpful method name 😓 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Naming things is hard, but yeah this is not good. At least it has Javadocs :) |
||||
| var seenCopy = false; | ||||
| for (final IndexRoutingTable indexRoutingTable : cs.routingTable()) { | ||||
| if (indexRoutingTable.getIndex().getName().equals(copyIndexName)) { | ||||
| seenCopy = true; | ||||
| } | ||||
| if (indexRoutingTable.allShardsActive() == false) { | ||||
| return false; | ||||
| } | ||||
| } | ||||
| return seenCopy; // only remove this listener when we've started both restores and all the resulting shards are complete | ||||
| }); | ||||
|
|
||||
| // set up a listener which waits for the shards from the first restore to start initializing and then kick off another restore | ||||
| final var secondRestoreCompleteLatch = new CountDownLatch(1); | ||||
| final var secondRestoreStartedListener = ClusterServiceUtils.addTemporaryStateListener(cs -> { | ||||
| final var indexRoutingTable = cs.routingTable().index(indexName); | ||||
| if (indexRoutingTable != null && indexRoutingTable.shardsWithState(ShardRoutingState.INITIALIZING).isEmpty() == false) { | ||||
| clusterAdmin().prepareRestoreSnapshot(TEST_REQUEST_TIMEOUT, "repo", "snap") | ||||
| .setWaitForCompletion(true) | ||||
| .setRenamePattern(indexName) | ||||
| .setRenameReplacement(indexName + "-copy") | ||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Huh, is this purely a test-only feature? Doesn't look like it's used anyplace else. (not actionable, I'm just surprised) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We use the feature in production, see here: Line 564 in 77ef1d4
We just don't use the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ohh 🤔 Got it 👍 |
||||
| .execute(ActionTestUtils.assertNoFailureListener(restoreSnapshotResponse -> { | ||||
| final RestoreInfo restoreInfo = restoreSnapshotResponse.getRestoreInfo(); | ||||
| assertThat(restoreInfo.successfulShards(), is(snapshotInfo.totalShards())); | ||||
| assertThat(restoreInfo.failedShards(), is(0)); | ||||
| secondRestoreCompleteLatch.countDown(); | ||||
| })); | ||||
| return true; | ||||
| } | ||||
| return false; | ||||
| }); | ||||
|
|
||||
| // now set the ball rolling by doing the first restore, waiting for it to complete | ||||
| final RestoreSnapshotResponse restoreSnapshotResponse = clusterAdmin().prepareRestoreSnapshot(TEST_REQUEST_TIMEOUT, "repo", "snap") | ||||
| .setWaitForCompletion(true) | ||||
| .get(); | ||||
| final RestoreInfo restoreInfo = restoreSnapshotResponse.getRestoreInfo(); | ||||
| assertThat(restoreInfo.successfulShards(), is(snapshotInfo.totalShards())); | ||||
| assertThat(restoreInfo.failedShards(), is(0)); | ||||
|
|
||||
| // wait for the second restore to complete too | ||||
| safeAwait(secondRestoreStartedListener); | ||||
| safeAwait(secondRestoreCompleteLatch); | ||||
|
|
||||
| // wait for all the shards to finish moving | ||||
| safeAwait(allShardsActiveListener); | ||||
| ensureGreen(indexName, indexName + "-copy"); | ||||
| } | ||||
|
|
||||
| private Set<ShardId> getShardIds(final String nodeId, final String indexName) { | ||||
| final Set<ShardId> shardIds = new HashSet<>(); | ||||
| final IndexRoutingTable indexRoutingTable = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT) | ||||
|
|
||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -92,7 +92,7 @@ public void simulateShardStarted(ShardRouting shard) { | |
| var project = allocation.metadata().projectFor(shard.index()); | ||
| var size = getExpectedShardSize( | ||
| shard, | ||
| UNAVAILABLE_EXPECTED_SHARD_SIZE, | ||
| shard.getExpectedShardSize(), | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. One-line fix \o/ |
||
| getClusterInfo(), | ||
| allocation.snapshotShardSizeInfo(), | ||
| project, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
testRestoreSnapshotAllocationDoesNotExceedWatermarkWithMultipleShardswas flaky because of this bug, but only failing once every few hundred iterations. This test fails more reliably for the same reason, although still not all that reliably (after around 20-30 iterations on my laptop). I could make it exercise the exact path that hits the bug every time, but it'd be very specific to this one bug and I'd rather have something a little more general to look out for related bugs too.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So testRestoreSnapshotAllocationDoesNotExceedWatermarkWithMultipleShards does a little more disk usage checking as well as the shard assignment. This new test checks that shards are assigned correctly, not so much disk usage.
They feel a little redundant: might we add a couple more touches to the new one and delete the old?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I was sort of inclined to keep them both but you're right, we're not really testing anything different in the old test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great, thanks for updating