Skip to content
Closed
Show file tree
Hide file tree
Changes from 60 commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
447de70
Add assigned queued and limit INIT per node
ywangd Jul 19, 2025
a74f94b
renames and tweak
ywangd Jul 20, 2025
f76b13a
Fix deletions
ywangd Jul 20, 2025
55a6907
Extract common method
ywangd Jul 20, 2025
8eb71ef
Fix allQueued snapshot test for deletion
ywangd Jul 20, 2025
82765b2
Change back to assigned queued
ywangd Jul 21, 2025
4292017
improve node counting a bit
ywangd Jul 21, 2025
12dfdc3
Allow relocation and add an IT
ywangd Jul 21, 2025
cd574b8
Update docs/changelog/131592.yaml
ywangd Jul 21, 2025
3a76132
rename
ywangd Jul 21, 2025
49e1b42
Fix a bug where AssignedQueued shard fails with MISSING
ywangd Jul 22, 2025
6c9fdaa
Merge remote-tracking branch 'origin/main' into shard-snapshot-limit-…
ywangd Jul 22, 2025
5ecc2cf
Fix bug where assigned-queued can be left forever
ywangd Jul 23, 2025
ae42b23
comments
ywangd Jul 23, 2025
fcfbc63
actually use updated states
ywangd Jul 23, 2025
329a2c5
TODO about removing the kickoff logic after deletion removal
ywangd Jul 24, 2025
f60009a
Merge remote-tracking branch 'origin/main' into shard-snapshot-limit-…
ywangd Jul 24, 2025
8396c67
Merge remote-tracking branch 'origin/main' into shard-snapshot-limit-…
ywangd Jul 25, 2025
c6d86c9
Revert "TODO about removing the kickoff logic after deletion removal"
ywangd Jul 27, 2025
726da41
remove kickoff process after deletion removal
ywangd Jul 25, 2025
0fde650
Fix a bug where assigned-queued shards are not kicked off
ywangd Jul 27, 2025
c76f244
One more TODO
ywangd Jul 27, 2025
d6d0ac4
comments
ywangd Jul 27, 2025
d20856e
Simply how we kick off assigned queued shards for entries not seeing …
ywangd Jul 27, 2025
b1de4c6
Merge remote-tracking branch 'origin/main' into shard-snapshot-limit-…
ywangd Jul 27, 2025
57ac6aa
half the per-node-limit in stress IT
ywangd Jul 27, 2025
9ec50c1
Honor running shard-snapshot counting in processWaitingShardsAndRemov…
ywangd Jul 29, 2025
5bd6ca2
enable rebalance
ywangd Jul 31, 2025
b5c4907
Add random allocation filtering
ywangd Jul 31, 2025
713d86a
rename
ywangd Jul 31, 2025
8a95b99
Randomize and update per-node limit in stress IT
ywangd Aug 2, 2025
292649d
Actually randomize initial shard snapshot limit
ywangd Aug 2, 2025
4109412
Pre-compute hasAssignedQueuedShards
ywangd Aug 7, 2025
f5d3b64
No need for static method
ywangd Aug 11, 2025
4768d27
Tweak and add tests for PerNodeShardSnapshotCounter
ywangd Aug 11, 2025
c3c56d2
more test assertions
ywangd Aug 11, 2025
6745235
Merge remote-tracking branch 'origin/main' into shard-snapshot-limit-…
ywangd Aug 12, 2025
287aca2
update changelog
ywangd Aug 12, 2025
55d4502
bwc default
ywangd Aug 12, 2025
6277c7c
Merge remote-tracking branch 'origin/main' into shard-snapshot-limit-…
ywangd Aug 14, 2025
dfb035f
augument tests for PerNodeShardSnapshotCounter
ywangd Aug 14, 2025
e2ef7e4
Add test for completion order
ywangd Aug 14, 2025
095010f
Merge remote-tracking branch 'origin/main' into shard-snapshot-limit-…
ywangd Aug 19, 2025
1337704
sometimes neither shutdown nor allocation filter
ywangd Aug 19, 2025
b9e4925
reinstate class qualification
ywangd Aug 19, 2025
fe7be7a
Use null nodeId to different aborted assigned-queued
ywangd Aug 19, 2025
99c81e7
fix test
ywangd Aug 19, 2025
1b90ef9
fix test
ywangd Aug 19, 2025
dab105d
Merge remote-tracking branch 'origin/main' into shard-snapshot-limit-…
ywangd Aug 19, 2025
6996653
Merge remote-tracking branch 'origin/main' into shard-snapshot-limit-…
ywangd Aug 21, 2025
0b64258
Complete assigned-queued shard snapshots inline without one more upda…
ywangd Aug 21, 2025
8bef3f0
Merge branch 'main' into shard-snapshot-limit-init-poc
elasticmachine Aug 21, 2025
a29ba9b
Fix propagation for clones
ywangd Aug 21, 2025
1d7f484
Revert "Fix propagation for clones"
ywangd Aug 21, 2025
5d8c51e
Revert "Complete assigned-queued shard snapshots inline without one m…
ywangd Aug 21, 2025
cf96d35
Refactor and reuse EntryContext for propagation
ywangd Aug 21, 2025
0198d7b
fail assigned-queued shards directly in processExternalChanges
ywangd Aug 21, 2025
0b9c6f2
Comments, renames, assertions and deleted status update
ywangd Aug 22, 2025
68f8663
Fix deletion where it may need to start a previous deletion
ywangd Aug 22, 2025
65888c5
Merge branch 'main' into shard-snapshot-limit-init-poc
DaveCTurner Sep 15, 2025
096a9df
[CI] Update transport version definitions
Oct 2, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/131592.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 131592
summary: "Limited number of shard snapshot in INIT state per node"
area: Snapshot/Restore
type: enhancement
issues: []
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package org.elasticsearch.snapshots;

import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.CheckedRunnable;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.snapshots.mockstore.MockRepository;
import org.elasticsearch.test.ClusterServiceUtils;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.transport.MockTransportService;

import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SnapshotAndRelocationIT extends AbstractSnapshotIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockTransportService.TestPlugin.class, MockRepository.Plugin.class);
}

@Override
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal, otherSettings))
.put(SnapshotsService.SHARD_SNAPSHOT_PER_NODE_LIMIT_SETTING.getKey(), 1)
.build();
}

public void testLimitingInitAndRelocationForAssignedQueueShards() throws Exception {
final String masterNode = internalCluster().startMasterOnlyNode();
final String dataNodeA = internalCluster().startDataOnlyNode();
final String dataNodeB = internalCluster().startDataOnlyNode();
ensureStableCluster(3);
final String repoName = "test-repo";
createRepository(repoName, "mock");

final AtomicBoolean delayOnce = new AtomicBoolean(false);
final AtomicReference<CheckedRunnable<Exception>> delayedAction = new AtomicReference<>();
final var delayedActionSetLatch = new CountDownLatch(1);
MockTransportService.getInstance(masterNode)
.addRequestHandlingBehavior(TransportUpdateSnapshotStatusAction.NAME, (handler, request, channel, task) -> {
if (delayOnce.compareAndSet(false, true)) {
delayedAction.set(() -> handler.messageReceived(request, channel, task));
delayedActionSetLatch.countDown();
} else {
handler.messageReceived(request, channel, task);
}
});

final var numIndices = between(2, 4);
final var indexNames = IntStream.range(0, numIndices).mapToObj(i -> "index-" + i).toList();

for (var indexName : indexNames) {
createIndex(indexName, indexSettings(1, 0).put("index.routing.allocation.include._name", dataNodeA).build());
indexRandomDocs(indexName, between(10, 42));
}
ensureGreen();

final var future = startFullSnapshot(repoName, "snapshot");
safeAwait(delayedActionSetLatch);

final var clusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class);
final SnapshotsInProgress.Entry snapshot = SnapshotsInProgress.get(clusterService.state()).asStream().iterator().next();
logger.info("--> snapshot=[{}]", snapshot);
final var shards = snapshot.shards();
assertThat(shards.size(), equalTo(numIndices));

final var dataNodeAId = getNodeId(dataNodeA);
final var initShards = shards.entrySet()
.stream()
.filter(entry -> entry.getValue().state() == SnapshotsInProgress.ShardState.INIT)
.peek(entry -> assertThat(entry.getValue().nodeId(), equalTo(dataNodeAId)))
.map(Map.Entry::getKey)
.toList();
logger.info("--> init shards [{}]", initShards);
assertThat(initShards.size(), equalTo(1));

final var assignedQueuedShards = shards.entrySet()
.stream()
.filter(entry -> entry.getValue().isAssignedQueued())
.peek(entry -> assertThat(entry.getValue().nodeId(), equalTo(dataNodeAId)))
.map(Map.Entry::getKey)
.toList();
logger.info("--> assigned queued shards [{}]", assignedQueuedShards);
assertThat(assignedQueuedShards.size(), equalTo(numIndices - 1));

// Relocate indices that are assigned queued
final String[] indices = assignedQueuedShards.stream().map(ShardId::getIndexName).toArray(String[]::new);
logger.info("--> relocate indices [{}]", Arrays.toString(indices));
updateIndexSettings(Settings.builder().put("index.routing.allocation.include._name", dataNodeB), indices);
ensureGreen(indices);

final var dataNodeBIndicesService = internalCluster().getInstance(IndicesService.class, dataNodeB);
for (var shardId : assignedQueuedShards) {
assertTrue(
"indices: "
+ StreamSupport.stream(dataNodeBIndicesService.spliterator(), false)
.map(indexService -> indexService.index().getName())
.toList(),
dataNodeBIndicesService.hasIndex(shardId.getIndex())
);
}

assertThat(future.isDone(), is(false));
logger.info("--> run delayed action");
delayedAction.get().run();
assertSuccessful(future);
}

public void testSnapshotStartedEarlierCompletesFirst() throws Exception {
final String masterNode = internalCluster().startMasterOnlyNode();
final String dataNode = internalCluster().startDataOnlyNode();
ensureStableCluster(2);
final String repoName = "test-repo";
createRepository(repoName, "mock");

final var numIndices = between(2, 4);
final var indexNames = IntStream.range(0, numIndices).mapToObj(i -> "index-" + i).toList();

for (var indexName : indexNames) {
createIndex(indexName, 1, 0);
indexRandomDocs(indexName, between(10, 42));
}
ensureGreen();

final String firstSnapshot = "snapshot-0";
final String secondSnapshot = "snapshot-1";

// Start two snapshots and wait for both of them to appear in the cluster state
blockDataNode(repoName, dataNode);
final var future0 = startFullSnapshot(repoName, firstSnapshot);
safeAwait(ClusterServiceUtils.addMasterTemporaryStateListener(state -> {
final var snapshotsInProgress = SnapshotsInProgress.get(state);
final List<String> snapshotNames = snapshotsInProgress.asStream()
.map(entry -> entry.snapshot().getSnapshotId().getName())
.toList();
return snapshotNames.equals(List.of(firstSnapshot));
}));
final var future1 = startFullSnapshot(repoName, secondSnapshot);
safeAwait(ClusterServiceUtils.addMasterTemporaryStateListener(state -> {
final var snapshotsInProgress = SnapshotsInProgress.get(state);
final List<String> snapshotNames = snapshotsInProgress.asStream()
.map(entry -> entry.snapshot().getSnapshotId().getName())
.toList();
return snapshotNames.equals(List.of(firstSnapshot, secondSnapshot));
}));

// Ensure the first snapshot is completed first before the second one by observing a cluster state containing only the 2nd one
final var listenerForFirstSnapshotCompletion = ClusterServiceUtils.addMasterTemporaryStateListener(state -> {
final var snapshotsInProgress = SnapshotsInProgress.get(state);
final Set<String> snapshotNames = snapshotsInProgress.asStream()
.map(entry -> entry.snapshot().getSnapshotId().getName())
.collect(Collectors.toSet());
return snapshotNames.equals(Set.of(secondSnapshot));
});
unblockNode(repoName, dataNode);
safeAwait(listenerForFirstSnapshotCompletion);

assertSuccessful(future0);
assertSuccessful(future1);
}
}
Loading