Skip to content

Conversation

@ywangd
Copy link
Member

@ywangd ywangd commented Jul 21, 2025

This PR adds a new feature to allow configuring the max number of shard snapshot in INIT state per node. Since shard snapshots in INIT state prevent relocation, limiting the number allows relocation for more shards. Some important implementation details are:

  1. The QUEUED state is reused to for the new state where a shard should have been in INIT state but limited due to the new configuration. To differentiate from the existing unassigned-queued state, the new state has a non-null nodeId similar to INIT. This new state is called assigned-queued.
  2. When processing a shard snapshot update, in addition to process it vertically through different snapshots for the same shard, it now also process horizontally within the snapshot to kick off any assigned-queued shard snapshot based on the node capacity. This is to ensure a snapshot started earlier has a better chance to complete before snapshots started later.
  3. We also process through all snapshots across all repos attempting to start any assigned-queued shard snapshots, regardless whether the snapshot has received an update. This is because the limit is node wide and across all repos/snapshots. Therefore it is possible that a shard snapshot gets limited by a different snapshot in a different repo.

The new setting is dynamically configurable with a default of 0 meaning no limit, i.e. no behaviour change by default. The stress test has been updated to randomly (1) configure and update the limit and (2) force relocation.

Resolves: ES-12377

ywangd added 8 commits July 20, 2025 17:21
With the limit enabled, shard snapshots that would have been in the INIT
state are now put into assigned QUEUED state when the limit is reached
on the node. This new state is transitioned into INIT when a shard
snapshot completes and gives back capacity to the node. The shard
snapshot that completes can be either from the same snapshot or a
different snapshot in either the same or different repo.
* Treat queued with gen as started that blocks deletion to run
* Aborted queued with gen is failed with a separate cluster state update
  to simulate how abort for init works.
Generation can still be null
Some other tweaks. Still WIP
@ywangd ywangd requested a review from DaveCTurner July 21, 2025 06:13
@ywangd ywangd added >enhancement :Distributed Coordination/Snapshot/Restore Anything directly related to the `_snapshot/*` APIs v9.2.0 labels Jul 21, 2025
@elasticsearchmachine
Copy link
Collaborator

Hi @ywangd, I've created a changelog YAML for you.

Copy link
Contributor

@DaveCTurner DaveCTurner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great stuff, I much prefer this approach.

final SnapshotsInProgress.Entry abortedEntry = existing.abort(
currentState.nodes().getLocalNodeId(),
((shardId, shardSnapshotStatus) -> completeAbortedAssignedQueuedRunnables.add(
() -> innerUpdateSnapshotState(existing.snapshot(), shardId, shardSnapshotStatus)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apply updates as ShardSnapshotUpdate tasks. I think this is a better approach than trying to update them locally which would require duplicate logics from the update task executors. We already have some duplication in processExternalChanges and there is a TODO suggesting refactoring it away.

// We cannot directly update its status here because there maybe another snapshot for
// the same shard that is QUEUED which must be updated as well, i.e. vertical update.
// So we submit the status update to let it be processed in a future cluster state update.
shardStatusUpdateConsumer.apply(entry.snapshot(), shardId, newShardSnapshotStatus);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly we send out shard snapshot update so the state change is propagated correctly by the task executor.

Comment on lines 3713 to 3714
// Check horizontally within the snapshot to see whether any previously limited shard snapshots can now start
maybeStartAssignedQueuedShardSnapshots(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has a side effect that a snapshot will kickoff assigned-queued shards belong to the same snapshot before starting shard snapshot of the same shard but belongs to a later snapshot. I think this is desirable. It means once a shard finishes its snapshot, it does not immediately get another snapshot in INIT state, i.e. something we previously wanted to prevent with limiting the number of concurrent snapshots. With limiting number of INIT shards, it is less important now but still nice to have.

Comment on lines 3578 to 3582
for (var notUpdatedRepo : Sets.difference(existing.repos(), updatesByRepo.keySet())) {
if (perNodeShardSnapshotCounter.hasCapacityOnAnyNode() == false) {
break;
}
updated = maybeStartAssignedQueuedShardSnapshotsForRepo(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An optimization is to order the snapshots by start time and start assigned-queued shards for the earlier entries.

@ywangd
Copy link
Member Author

ywangd commented Jul 25, 2025

@DaveCTurner The stress IT has been running for more than 24 hours successfully after fixing the last bug. I also pondered on the new logic for quite sometime. I think it is sufficiently correct and we can start polishing for a production version. I think one important thing is to enhance the stress IT to enable relocation and maybe dynamically changing the limits. But before I proceed further, what will your suggestion for this PR? Should we treat it as a PoC so that we close it (and associated ticket) and have a new task for the production PR? Or should we keep working on it towards the production version?

@ywangd ywangd requested a review from DaveCTurner August 19, 2025 01:49
@ywangd
Copy link
Member Author

ywangd commented Aug 21, 2025

@elasticmachine update branch

Copy link
Contributor

@DaveCTurner DaveCTurner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still too uncomfortable with the assigned-queued state, but it's taken me a while to really get to the heart of that discomfort. Fundamentally I think it's that there should be no need to keep track of these assignments in the cluster state itself, and that using the cluster state for this is going to be something we have to maintain forevermore.

I recognise the value of tracking the assignments somewhere: if we don't, we have to do a rather naive search for shards to activate on each cluster state update. Could we explore tracking these things using an in-memory data structure on the elected master node instead? On a master failover, the new master can populate its own local tracking from the routing table, but from then on it should be able to maintain it alongside the cluster state.

More generally, thats the direction I think we should take this area of the system: we're already doing rather a lot of work in the cluster state updates that relate to snapshots because of the tension between wanting a normalized data structure for efficient replication vs wanting a denormalized data structure for efficient update computations. This seems like a good opportunity to resolve that tension by separating those two concerns. In the short term I think we can continue to compute updates to both data structures on the master thread, but in future we may want to do more background work instead.

I opened #134795 to describe my thinking about the purpose of SnapshotsInProgress in its Javadocs.

@ywangd
Copy link
Member Author

ywangd commented Sep 17, 2025

Thanks for the feedback, David. I really appreciate it. Your suggestion makes sense to me. In practice, it feels like more or less a middle groud between the current "assigned-queued" approach and treating them entirely as "queued". Therefore potentially have benefits from both sides. Your articulation for mixing data structure of two separate concerns is a great one. I hadn't thought of it that way. Thanks for sharing your insights.

I'll go ahead and experiment with your suggestion. I may raise it as a new PR since it's rather difficult to shift direction on this one and it might still be useful as a comparison once the other one is up.

@ywangd
Copy link
Member Author

ywangd commented Sep 18, 2025

Could we explore tracking these things using an in-memory data structure on the elected master node instead?

I have experimented this idea in this draft PR (#134977). It does not work yet. But I am afraid it might not be the direction that we want to take. Though it does feel theoretically great to keep the tracking outside, it has quite a few downsides:

  1. The code is inherently more complex with states in two places. The separate tracking also needs to be passed around in every place that needs it and makes the code more tangled. I think these issues are already evident with existing changes in the draft PR.
  2. It makes debugging and investigation harder because we can no longer check cluster states and be sure about the behaviour. When I was working on the initial version of this PR, I wrote some scripts to analyze cluster state dumps and visualize in an output format similar to what you had in this comment. These analysis were immensely helpful in identfying all sorts of issues and I was able to further customize it based on different exceptions. This is no longer possible with separate tracking without also dumping the tracking state. We can certainly dump it in tests as well. But overall it feels rather awkward to have to worry about one more thing and overall likely to add to the difficulty. Along the same line, separate states also makes assertions harder.
  3. When re-computing the tracking on master failover, it feels like could mask subtle bugs, i.e. an incorrect tracking could be fixed by recomputing it from scratch (or vice-versa). It's yet another thing we need to ensure consistency.
  4. A separate tracking still needs to be propagated to the later snapshots when a node-capacity-limited shard fails due to index or snapshot deletion. So it does not help on this front which was something I (incorrectly) hoped for.

With this PR, I also started with trying to treat the node-capacity-limited shards entirely as QUEUED instead of QUEUED-but-still-active (i.e. assigned-queued). I found it not really possible to treat them entirely as QUEUED unless we don't care about snapshot operation orders at all. For example, do we want to allow a deletion to start when there are snapshots queued due to node-capacity-limit? Similar do we want to a clone to jump queue when there are earlier queued snapshots? Also, do we want to a later snapshot to jump queue? Jumping queue not only disturbs operation order, it also means we must check both earlier and later snapshots on deciding whether to start an operation. So I think the answers are No to at least some of these questions. In that case, we will have to treat these QUEUED shard snapshots specially, i.e. they should block later operations despite being queued. Hence they are really QUEUED-but-still-active in at least some cases. I find it really difficult to reason with them acting both QUEUED and QUEUED-but-still-active depending on the context. Therefore I think we should always treat them as QUEUED-but-still-active.

In summary, I think the initial version of the assigned-queued approach, though with downsides, has been overall the simplest solution, in terms of both amount of changes and mental model, so far. Therefore, I'd personally still suggest it unless we want to completely shift the direction, i.e. the other stateless PoC.

@ywangd
Copy link
Member Author

ywangd commented Oct 9, 2025

@DaveCTurner What is your take on my last comment? I wonder whether we should continue allocate sometime for this issue in the upcoming iteration or should we reconsider it? Thanks!

@DaveCTurner
Copy link
Contributor

I think these are valid points but none are strong enough to be a blocker IMO.

  1. I find the tangle of state we would have to store in ClusterState to be worse (e.g. it leaks across nodes and has BwC implications). There's already other master-local state held in SnapshotsService but it's all a bit ad-hoc at the moment and I feel we need to establish something more structured before we can do this, and other, improvements to snapshots scalability.

  2. Likewise, we already have this problem, we need to make the existing master-local state visible too rather than just ignoring it.

  3. We can check consistency with assertions, like we do elsewhere.

  4. This is something else that we can do better with in-memory master-local state than we could do within ClusterState itself.

I think it would be best to take the other approach and make stateless snapshots more stateless instead.

@ywangd
Copy link
Member Author

ywangd commented Oct 14, 2025

David and I synced and agreed to not move further with this PR and instead take the other approach for stateless snapshots. See also ES-12377 for details.

@ywangd ywangd closed this Oct 14, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

:Distributed Coordination/Snapshot/Restore Anything directly related to the `_snapshot/*` APIs >enhancement Team:Distributed Coordination Meta label for Distributed Coordination team v9.3.0

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants