-
Notifications
You must be signed in to change notification settings - Fork 25.7k
Limit number of shard snapshot in INIT state per node #131592
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Changes from 9 commits
Commits
Show all changes
61 commits
Select commit
Hold shift + click to select a range
447de70
Add assigned queued and limit INIT per node
ywangd a74f94b
renames and tweak
ywangd f76b13a
Fix deletions
ywangd 55a6907
Extract common method
ywangd 8eb71ef
Fix allQueued snapshot test for deletion
ywangd 82765b2
Change back to assigned queued
ywangd 4292017
improve node counting a bit
ywangd 12dfdc3
Allow relocation and add an IT
ywangd cd574b8
Update docs/changelog/131592.yaml
ywangd 3a76132
rename
ywangd 49e1b42
Fix a bug where AssignedQueued shard fails with MISSING
ywangd 6c9fdaa
Merge remote-tracking branch 'origin/main' into shard-snapshot-limit-…
ywangd 5ecc2cf
Fix bug where assigned-queued can be left forever
ywangd ae42b23
comments
ywangd fcfbc63
actually use updated states
ywangd 329a2c5
TODO about removing the kickoff logic after deletion removal
ywangd f60009a
Merge remote-tracking branch 'origin/main' into shard-snapshot-limit-…
ywangd 8396c67
Merge remote-tracking branch 'origin/main' into shard-snapshot-limit-…
ywangd c6d86c9
Revert "TODO about removing the kickoff logic after deletion removal"
ywangd 726da41
remove kickoff process after deletion removal
ywangd 0fde650
Fix a bug where assigned-queued shards are not kicked off
ywangd c76f244
One more TODO
ywangd d6d0ac4
comments
ywangd d20856e
Simply how we kick off assigned queued shards for entries not seeing …
ywangd b1de4c6
Merge remote-tracking branch 'origin/main' into shard-snapshot-limit-…
ywangd 57ac6aa
half the per-node-limit in stress IT
ywangd 9ec50c1
Honor running shard-snapshot counting in processWaitingShardsAndRemov…
ywangd 5bd6ca2
enable rebalance
ywangd b5c4907
Add random allocation filtering
ywangd 713d86a
rename
ywangd 8a95b99
Randomize and update per-node limit in stress IT
ywangd 292649d
Actually randomize initial shard snapshot limit
ywangd 4109412
Pre-compute hasAssignedQueuedShards
ywangd f5d3b64
No need for static method
ywangd 4768d27
Tweak and add tests for PerNodeShardSnapshotCounter
ywangd c3c56d2
more test assertions
ywangd 6745235
Merge remote-tracking branch 'origin/main' into shard-snapshot-limit-…
ywangd 287aca2
update changelog
ywangd 55d4502
bwc default
ywangd 6277c7c
Merge remote-tracking branch 'origin/main' into shard-snapshot-limit-…
ywangd dfb035f
augument tests for PerNodeShardSnapshotCounter
ywangd e2ef7e4
Add test for completion order
ywangd 095010f
Merge remote-tracking branch 'origin/main' into shard-snapshot-limit-…
ywangd 1337704
sometimes neither shutdown nor allocation filter
ywangd b9e4925
reinstate class qualification
ywangd fe7be7a
Use null nodeId to different aborted assigned-queued
ywangd 99c81e7
fix test
ywangd 1b90ef9
fix test
ywangd dab105d
Merge remote-tracking branch 'origin/main' into shard-snapshot-limit-…
ywangd 6996653
Merge remote-tracking branch 'origin/main' into shard-snapshot-limit-…
ywangd 0b64258
Complete assigned-queued shard snapshots inline without one more upda…
ywangd 8bef3f0
Merge branch 'main' into shard-snapshot-limit-init-poc
elasticmachine a29ba9b
Fix propagation for clones
ywangd 1d7f484
Revert "Fix propagation for clones"
ywangd 5d8c51e
Revert "Complete assigned-queued shard snapshots inline without one m…
ywangd cf96d35
Refactor and reuse EntryContext for propagation
ywangd 0198d7b
fail assigned-queued shards directly in processExternalChanges
ywangd 0b9c6f2
Comments, renames, assertions and deleted status update
ywangd 68f8663
Fix deletion where it may need to start a previous deletion
ywangd 65888c5
Merge branch 'main' into shard-snapshot-limit-init-poc
DaveCTurner 096a9df
[CI] Update transport version definitions
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| pr: 131592 | ||
| summary: "[PoC] Limited number of shard snapshot in INIT state per node" | ||
| area: Snapshot/Restore | ||
| type: enhancement | ||
| issues: [] |
129 changes: 129 additions & 0 deletions
129
server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotAndRelocationIT.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,129 @@ | ||
| /* | ||
| * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
| * or more contributor license agreements. Licensed under the "Elastic License | ||
| * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side | ||
| * Public License v 1"; you may not use this file except in compliance with, at | ||
| * your election, the "Elastic License 2.0", the "GNU Affero General Public | ||
| * License v3.0 only", or the "Server Side Public License, v 1". | ||
| */ | ||
| package org.elasticsearch.snapshots; | ||
|
|
||
| import org.elasticsearch.cluster.SnapshotsInProgress; | ||
| import org.elasticsearch.cluster.service.ClusterService; | ||
| import org.elasticsearch.common.settings.Settings; | ||
| import org.elasticsearch.core.CheckedRunnable; | ||
| import org.elasticsearch.index.shard.ShardId; | ||
| import org.elasticsearch.indices.IndicesService; | ||
| import org.elasticsearch.plugins.Plugin; | ||
| import org.elasticsearch.snapshots.mockstore.MockRepository; | ||
| import org.elasticsearch.test.ESIntegTestCase; | ||
| import org.elasticsearch.test.transport.MockTransportService; | ||
|
|
||
| import java.util.Arrays; | ||
| import java.util.Collection; | ||
| import java.util.Map; | ||
| import java.util.concurrent.CountDownLatch; | ||
| import java.util.concurrent.atomic.AtomicBoolean; | ||
| import java.util.concurrent.atomic.AtomicReference; | ||
| import java.util.stream.IntStream; | ||
| import java.util.stream.StreamSupport; | ||
|
|
||
| import static org.hamcrest.Matchers.equalTo; | ||
| import static org.hamcrest.Matchers.is; | ||
|
|
||
| @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) | ||
| public class SnapshotAndRelocationIT extends AbstractSnapshotIntegTestCase { | ||
|
|
||
| @Override | ||
| protected Collection<Class<? extends Plugin>> nodePlugins() { | ||
| return Arrays.asList(MockTransportService.TestPlugin.class, MockRepository.Plugin.class); | ||
| } | ||
|
|
||
| @Override | ||
| protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { | ||
| return Settings.builder() | ||
| .put(super.nodeSettings(nodeOrdinal, otherSettings)) | ||
| .put(SnapshotsService.SHARD_SNAPSHOT_PER_NODE_LIMIT_SETTING.getKey(), 1) | ||
| .build(); | ||
| } | ||
|
|
||
| public void testLimitingInitAndRelocationForAssignedQueueShards() throws Exception { | ||
| final String masterNode = internalCluster().startMasterOnlyNode(); | ||
| final String dataNodeA = internalCluster().startDataOnlyNode(); | ||
| final String dataNodeB = internalCluster().startDataOnlyNode(); | ||
| final String repoName = "test-repo"; | ||
| createRepository(repoName, "mock"); | ||
|
|
||
| final AtomicBoolean delayOnce = new AtomicBoolean(false); | ||
| final AtomicReference<CheckedRunnable<Exception>> delayedAction = new AtomicReference<>(); | ||
| final var delayedActionSetLatch = new CountDownLatch(1); | ||
| MockTransportService.getInstance(masterNode) | ||
| .addRequestHandlingBehavior(SnapshotsService.UPDATE_SNAPSHOT_STATUS_ACTION_NAME, (handler, request, channel, task) -> { | ||
| if (delayOnce.compareAndSet(false, true)) { | ||
| delayedAction.set(() -> handler.messageReceived(request, channel, task)); | ||
| delayedActionSetLatch.countDown(); | ||
| } else { | ||
| handler.messageReceived(request, channel, task); | ||
| } | ||
| }); | ||
|
|
||
| final var numIndices = between(2, 4); | ||
| final var indexNames = IntStream.range(0, numIndices).mapToObj(i -> "index-" + i).toList(); | ||
|
|
||
| for (var indexName : indexNames) { | ||
| createIndex(indexName, indexSettings(1, 0).put("index.routing.allocation.include._name", dataNodeA).build()); | ||
| indexRandomDocs(indexName, between(10, 42)); | ||
| } | ||
| ensureGreen(); | ||
|
|
||
| final var future = startFullSnapshot(repoName, "snapshot"); | ||
| safeAwait(delayedActionSetLatch); | ||
|
|
||
| final var clusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class); | ||
| final SnapshotsInProgress.Entry snapshot = SnapshotsInProgress.get(clusterService.state()).asStream().iterator().next(); | ||
| logger.info("--> snapshot=[{}]", snapshot); | ||
| final var shards = snapshot.shards(); | ||
| assertThat(shards.size(), equalTo(numIndices)); | ||
|
|
||
| final var dataNodeAId = getNodeId(dataNodeA); | ||
| final var initShards = shards.entrySet() | ||
| .stream() | ||
| .filter(entry -> entry.getValue().state() == SnapshotsInProgress.ShardState.INIT) | ||
| .peek(entry -> assertThat(entry.getValue().nodeId(), equalTo(dataNodeAId))) | ||
| .map(Map.Entry::getKey) | ||
| .toList(); | ||
| logger.info("--> init shards [{}]", initShards); | ||
| assertThat(initShards.size(), equalTo(1)); | ||
|
|
||
| final var assignedQueuedShards = shards.entrySet() | ||
| .stream() | ||
| .filter(entry -> entry.getValue().isAssignedQueued()) | ||
| .peek(entry -> assertThat(entry.getValue().nodeId(), equalTo(dataNodeAId))) | ||
| .map(Map.Entry::getKey) | ||
| .toList(); | ||
| logger.info("--> assigned queued shards [{}]", assignedQueuedShards); | ||
| assertThat(assignedQueuedShards.size(), equalTo(numIndices - 1)); | ||
|
|
||
| // Relocate indices that are assigned queued | ||
| final String[] indices = assignedQueuedShards.stream().map(ShardId::getIndexName).toArray(String[]::new); | ||
| logger.info("--> relocate indices [{}]", Arrays.toString(indices)); | ||
| updateIndexSettings(Settings.builder().put("index.routing.allocation.include._name", dataNodeB), indices); | ||
| ensureGreen(indices); | ||
|
|
||
| final var dataNodeBIndicesService = internalCluster().getInstance(IndicesService.class, dataNodeB); | ||
| for (var shardId : assignedQueuedShards) { | ||
| assertTrue( | ||
| "indices: " | ||
| + StreamSupport.stream(dataNodeBIndicesService.spliterator(), false) | ||
| .map(indexService -> indexService.index().getName()) | ||
| .toList(), | ||
| dataNodeBIndicesService.hasIndex(shardId.getIndex()) | ||
| ); | ||
| } | ||
|
|
||
| assertThat(future.isDone(), is(false)); | ||
| logger.info("--> run delayed action"); | ||
| delayedAction.get().run(); | ||
| assertSuccessful(future); | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the distinction between assigned-queued and unassigned-queued important? My thinking is that the point of leaving the shards in state
QUEUEDwhile a limit is in play is that we haven't committed to running them on a particular node yet: we only need to make that decision when we move them toINIT.Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes the distinction is important. My general principle is to have assigned-queued work similarly to the
INITstate as much as possible. I think this helps leverage existing code logic the most.INITso that the new shard snapshot will be inQUEUEDin this state.QUEUEDstate. This is done by sending the update to SnapshotTaskExecutor which performs the state propogation. Again, a similar approach for updating theINITstate.INITones.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm I really don't like moving from
QUEUEDtoABORTED-ABORTEDreally means "aborting" - it indicates there's work in progress on the data node and we must wait for the data node to complete it and update the master state. But if the shard wasQUEUEDthen we aren't waiting for the data node any more.I see two alternatives:
Extract out the common code that moves future snapshots of a shard to a ready (or at least assigned-queued) so we can re-use it on both paths.
Drop the assigned-queued state and instead move shards from unassigned-queued to
INITin theSnapshotTaskExecutoras other shard snapshots complete.What do you think?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With this PR, we are giving new meanings to existing states.
QUEUEDcan now be assigned and similarlyABORTEDcan now be unassigned, i.e.nullnodeId indicates no work left on data node and master will complete it with another update cycle. These new meanings seem fine to me. We are adding more state transitions after all.I think the logic gets more complicated with this approach. We are basically duplicating part of
SnapshotShardsUpdateContext#EntryContext#computeUpdatedSnapshotEntryFromShardUpdatesand its downstream methods. It's very hard to extract a resuable code from it. We will have to duplicate. It's possible but I'd prefer we keep the states update in one place as suggested by this existing comment.Nof sure if I follow this suggestion. Do you mean completely remove the new assigned-queued state? I suspect the change would be a lot more complicated without it. We still need to differentiate one
QUEUEDshard from others. If we don't have the new state, we will have to rely on checking (1) whether the shard is the first queued and (2) whether there is aINITshard before it. This is rather cubersome especially when we update snapshots with shard updates in a loop. Deletion also needs to tell the difference. Currently,QUEUEDdoes not stop deletion from starting because there must be anINIT(or some other active state) shard prevents deletion from starting. This PR prevents deletion with assigned-queue state. Without it, deletion also needs to check whether one of theQUEUEDshards is special. Overall, I think the new state makes the new logic to be more explicit. It is also likely more performant since we can quickly tell whether a snapshot has any shards queued due to the capacity limit. So I think I prefer it.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be clear, if we must choose one alternative, I prefer the first one. It adds some complexity but with potential performance benefit, ie no need for another shard update cycle.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have now pushed 0b64258 to implement the first alternative. Please let me know whether it looks better. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new change does not work because it does not handle clones. It's been a while and I forgot that was part of the complexities led me taking the original approach. I am trying to extend the change to cover clone. But it's getting quite ugly and I am not sure whether it is worthwhile.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I updated for a better implementation of the first alternative (cf96d35). It refactors and reuses
EntryContextfor propagating state changes of assigned-queued shards to later snapshots, i.e. it uses the same logic asShardSnapshotUpdatetasks while avoiding extra cluster state update cycles. This is now ready for another look. Thanks!