Skip to content

Commit 12dfdc3

Browse files
committed
Allow relocation and add an IT
Some other tweaks. Still WIP
1 parent 4292017 commit 12dfdc3

File tree

3 files changed

+144
-6
lines changed

3 files changed

+144
-6
lines changed
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
package org.elasticsearch.snapshots;
10+
11+
import org.elasticsearch.cluster.SnapshotsInProgress;
12+
import org.elasticsearch.cluster.service.ClusterService;
13+
import org.elasticsearch.common.settings.Settings;
14+
import org.elasticsearch.core.CheckedRunnable;
15+
import org.elasticsearch.index.shard.ShardId;
16+
import org.elasticsearch.indices.IndicesService;
17+
import org.elasticsearch.plugins.Plugin;
18+
import org.elasticsearch.snapshots.mockstore.MockRepository;
19+
import org.elasticsearch.test.ESIntegTestCase;
20+
import org.elasticsearch.test.transport.MockTransportService;
21+
22+
import java.util.Arrays;
23+
import java.util.Collection;
24+
import java.util.Map;
25+
import java.util.concurrent.CountDownLatch;
26+
import java.util.concurrent.atomic.AtomicBoolean;
27+
import java.util.concurrent.atomic.AtomicReference;
28+
import java.util.stream.IntStream;
29+
import java.util.stream.StreamSupport;
30+
31+
import static org.hamcrest.Matchers.equalTo;
32+
import static org.hamcrest.Matchers.is;
33+
34+
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
35+
public class SnapshotAndRelocationIT extends AbstractSnapshotIntegTestCase {
36+
37+
@Override
38+
protected Collection<Class<? extends Plugin>> nodePlugins() {
39+
return Arrays.asList(MockTransportService.TestPlugin.class, MockRepository.Plugin.class);
40+
}
41+
42+
@Override
43+
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
44+
return Settings.builder()
45+
.put(super.nodeSettings(nodeOrdinal, otherSettings))
46+
.put(SnapshotsService.SHARD_SNAPSHOT_PER_NODE_LIMIT_SETTING.getKey(), 1)
47+
.build();
48+
}
49+
50+
public void testLimitingInitAndRelocationForAssignedQueueShards() throws Exception {
51+
final String masterNode = internalCluster().startMasterOnlyNode();
52+
final String dataNodeA = internalCluster().startDataOnlyNode();
53+
final String dataNodeB = internalCluster().startDataOnlyNode();
54+
final String repoName = "test-repo";
55+
createRepository(repoName, "mock");
56+
57+
final AtomicBoolean delayOnce = new AtomicBoolean(false);
58+
final AtomicReference<CheckedRunnable<Exception>> delayedAction = new AtomicReference<>();
59+
final var delayedActionSetLatch = new CountDownLatch(1);
60+
MockTransportService.getInstance(masterNode)
61+
.addRequestHandlingBehavior(SnapshotsService.UPDATE_SNAPSHOT_STATUS_ACTION_NAME, (handler, request, channel, task) -> {
62+
if (delayOnce.compareAndSet(false, true)) {
63+
delayedAction.set(() -> handler.messageReceived(request, channel, task));
64+
delayedActionSetLatch.countDown();
65+
} else {
66+
handler.messageReceived(request, channel, task);
67+
}
68+
});
69+
70+
final var numIndices = between(2, 4);
71+
final var indexNames = IntStream.range(0, numIndices).mapToObj(i -> "index-" + i).toList();
72+
73+
for (var indexName : indexNames) {
74+
createIndex(indexName, indexSettings(1, 0).put("index.routing.allocation.include._name", dataNodeA).build());
75+
indexRandomDocs(indexName, between(10, 42));
76+
}
77+
ensureGreen();
78+
79+
final var future = startFullSnapshot(repoName, "snapshot");
80+
safeAwait(delayedActionSetLatch);
81+
82+
final var clusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class);
83+
final SnapshotsInProgress.Entry snapshot = SnapshotsInProgress.get(clusterService.state()).asStream().iterator().next();
84+
logger.info("--> snapshot=[{}]", snapshot);
85+
final var shards = snapshot.shards();
86+
assertThat(shards.size(), equalTo(numIndices));
87+
88+
final var dataNodeAId = getNodeId(dataNodeA);
89+
final var initShards = shards.entrySet()
90+
.stream()
91+
.filter(entry -> entry.getValue().state() == SnapshotsInProgress.ShardState.INIT)
92+
.peek(entry -> assertThat(entry.getValue().nodeId(), equalTo(dataNodeAId)))
93+
.map(Map.Entry::getKey)
94+
.toList();
95+
logger.info("--> init shards [{}]", initShards);
96+
assertThat(initShards.size(), equalTo(1));
97+
98+
final var assignedQueuedShards = shards.entrySet()
99+
.stream()
100+
.filter(entry -> entry.getValue().isAssignedQueued())
101+
.peek(entry -> assertThat(entry.getValue().nodeId(), equalTo(dataNodeAId)))
102+
.map(Map.Entry::getKey)
103+
.toList();
104+
logger.info("--> assigned queued shards [{}]", assignedQueuedShards);
105+
assertThat(assignedQueuedShards.size(), equalTo(numIndices - 1));
106+
107+
// Relocate indices that are assigned queued
108+
final String[] indices = assignedQueuedShards.stream().map(ShardId::getIndexName).toArray(String[]::new);
109+
logger.info("--> relocate indices [{}]", Arrays.toString(indices));
110+
updateIndexSettings(Settings.builder().put("index.routing.allocation.include._name", dataNodeB), indices);
111+
ensureGreen(indices);
112+
113+
final var dataNodeBIndicesService = internalCluster().getInstance(IndicesService.class, dataNodeB);
114+
for (var shardId : assignedQueuedShards) {
115+
assertTrue(
116+
"indices: "
117+
+ StreamSupport.stream(dataNodeBIndicesService.spliterator(), false)
118+
.map(indexService -> indexService.index().getName())
119+
.toList(),
120+
dataNodeBIndicesService.hasIndex(shardId.getIndex())
121+
);
122+
}
123+
124+
assertThat(future.isDone(), is(false));
125+
logger.info("--> run delayed action");
126+
delayedAction.get().run();
127+
assertSuccessful(future);
128+
}
129+
}

server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/SnapshotInProgressAllocationDecider.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,10 @@ private static Decision canMove(ShardRouting shardRouting, RoutingAllocation all
118118
}
119119
}
120120

121+
if (shardSnapshotStatus.isAssignedQueued()) {
122+
continue;
123+
}
124+
121125
return allocation.decision(
122126
Decision.THROTTLE,
123127
NAME,

server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2852,6 +2852,8 @@ public ClusterState execute(ClusterState currentState) {
28522852

28532853
ClusterState updatedState = res.v1();
28542854

2855+
// Check whether we can start any assigned-queued shard snapshots in other repositories. They may have been
2856+
// limited because the deleted entry took all node capacity.
28552857
// TODO: deduplicate the code for starting queued-with-gen shards across repos
28562858
final var snapshotsInProgress = SnapshotsInProgress.get(updatedState);
28572859
final var perNodeShardSnapshotCounter = new PerNodeShardSnapshotCounter(
@@ -2867,7 +2869,7 @@ public ClusterState execute(ClusterState currentState) {
28672869
if (repo.equals(repoForDeletedEntry)) {
28682870
continue;
28692871
}
2870-
updatedSnapshotsInProgress = maybeStartQueuedWithGenerationShardSnapshotsForRepo(
2872+
updatedSnapshotsInProgress = maybeStartAssignedQueuedShardSnapshotsForRepo(
28712873
repo,
28722874
updatedState,
28732875
updatedSnapshotsInProgress,
@@ -3102,7 +3104,7 @@ public String toString() {
31023104
}
31033105
}
31043106

3105-
private static SnapshotsInProgress maybeStartQueuedWithGenerationShardSnapshotsForRepo(
3107+
private static SnapshotsInProgress maybeStartAssignedQueuedShardSnapshotsForRepo(
31063108
ProjectRepo projectRepo,
31073109
ClusterState clusterState,
31083110
SnapshotsInProgress snapshotsInProgress,
@@ -3119,7 +3121,7 @@ private static SnapshotsInProgress maybeStartQueuedWithGenerationShardSnapshotsF
31193121
for (SnapshotsInProgress.Entry entry : oldEntries) {
31203122
if (entry.isClone() == false && perNodeShardSnapshotCounter.hasCapacityOnAnyNode()) {
31213123
final var shardsBuilder = ImmutableOpenMap.builder(entry.shards());
3122-
maybeStartQueuedWithGenerationShardSnapshots(
3124+
maybeStartAssignedQueuedShardSnapshots(
31233125
clusterState,
31243126
entry,
31253127
snapshotsInProgress::isNodeIdForRemoval,
@@ -3140,7 +3142,7 @@ private static SnapshotsInProgress maybeStartQueuedWithGenerationShardSnapshotsF
31403142
return snapshotsInProgress.createCopyWithUpdatedEntriesForRepo(projectRepo.projectId(), projectRepo.name(), newEntries);
31413143
}
31423144

3143-
private static void maybeStartQueuedWithGenerationShardSnapshots(
3145+
private static void maybeStartAssignedQueuedShardSnapshots(
31443146
ClusterState clusterState,
31453147
SnapshotsInProgress.Entry entry,
31463148
Predicate<String> nodeIdRemovalPredicate,
@@ -3558,12 +3560,14 @@ SnapshotsInProgress computeUpdatedState() {
35583560
updated = updated.createCopyWithUpdatedEntriesForRepo(projectRepo.projectId(), projectRepo.name(), newEntries);
35593561
}
35603562

3563+
// Also check snapshots in other repositories since they might have been limited earlier due to shard snapshots running
3564+
// from repositories currently seeing updates
35613565
// TODO: deduplicate the code for starting queued-with-gen shards across repos
35623566
for (var notUpdatedRepo : Sets.difference(existing.repos(), updatesByRepo.keySet())) {
35633567
if (perNodeShardSnapshotCounter.hasCapacityOnAnyNode() == false) {
35643568
break;
35653569
}
3566-
updated = maybeStartQueuedWithGenerationShardSnapshotsForRepo(
3570+
updated = maybeStartAssignedQueuedShardSnapshotsForRepo(
35673571
notUpdatedRepo,
35683572
initialState,
35693573
updated,
@@ -3693,7 +3697,8 @@ SnapshotsInProgress.Entry computeUpdatedSnapshotEntryFromShardUpdates() {
36933697
+ " as well as "
36943698
+ shardsBuilder;
36953699

3696-
maybeStartQueuedWithGenerationShardSnapshots(
3700+
// Check horizontally within the snapshot to see whether any previously limited shard snapshots can now start
3701+
maybeStartAssignedQueuedShardSnapshots(
36973702
initialState,
36983703
entry,
36993704
nodeIdRemovalPredicate,

0 commit comments

Comments
 (0)