Skip to content

Commit 9a81043

Browse files
authored
Only simulate legal desired moves (#93654)
Today when setting up for the desired balance computation we move all shards to their desired locations without checking any allocation rules. However, certain allocation rules (e.g. those related to node versions and shutdowns) may prevent these movements in reality, resulting in a shard which cannot move to its desired location but which may not remain on its current node either. This commit adds some checks to verify that these preliminary moves are still legal when setting up the computation. Backport of #93635 Closes #93271
1 parent 8aed262 commit 9a81043

File tree

6 files changed

+154
-22
lines changed

6 files changed

+154
-22
lines changed

docs/changelog/93635.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 93635
2+
summary: Only simulate legal desired moves
3+
area: Allocation
4+
type: bug
5+
issues:
6+
- 93271

qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/SnapshotBasedRecoveryIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
import static org.hamcrest.Matchers.notNullValue;
4141

4242
public class SnapshotBasedRecoveryIT extends AbstractRollingTestCase {
43-
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/91383")
43+
4444
public void testSnapshotBasedRecovery() throws Exception {
4545
final String indexName = "snapshot_based_recovery";
4646
final String repositoryName = "snapshot_based_recovery_repo";

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputer.java

Lines changed: 34 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.elasticsearch.cluster.routing.ShardRouting;
1818
import org.elasticsearch.cluster.routing.UnassignedInfo;
1919
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
20+
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
2021
import org.elasticsearch.common.metrics.MeanMetric;
2122
import org.elasticsearch.core.Strings;
2223
import org.elasticsearch.index.shard.ShardId;
@@ -143,14 +144,21 @@ public DesiredBalance compute(
143144
// Here existing shards are moved to desired locations before initializing unassigned shards because we prefer not to leave
144145
// immovable shards allocated to undesirable locations (e.g. a node that is shutting down or an allocation filter which was
145146
// only recently applied). In contrast, reconciliation prefers to initialize the unassigned shards first.
146-
for (final var shardRouting : shardsToRelocate.values()) {
147+
relocateToDesiredLocation: for (final var shardRouting : shardsToRelocate.values()) {
147148
assert shardRouting.started();
148-
if (targetNodesIterator.hasNext()) {
149-
ShardRouting shardToRelocate = routingNodes.relocateShard(shardRouting, targetNodesIterator.next(), 0L, changes).v2();
150-
clusterInfoSimulator.simulateShardStarted(shardToRelocate);
151-
routingNodes.startShard(logger, shardToRelocate, changes, 0L);
152-
} else {
153-
break;
149+
150+
while (targetNodesIterator.hasNext()) {
151+
final var targetNodeId = targetNodesIterator.next();
152+
final var targetNode = routingNodes.node(targetNodeId);
153+
if (targetNode != null
154+
&& routingAllocation.deciders()
155+
.canAllocate(shardRouting, targetNode, routingAllocation)
156+
.type() != Decision.Type.NO) {
157+
final var shardToRelocate = routingNodes.relocateShard(shardRouting, targetNodeId, 0L, changes).v2();
158+
clusterInfoSimulator.simulateShardStarted(shardToRelocate);
159+
routingNodes.startShard(logger, shardToRelocate, changes, 0L);
160+
continue relocateToDesiredLocation;
161+
}
154162
}
155163
}
156164

@@ -172,9 +180,15 @@ public DesiredBalance compute(
172180
final var nodeIds = unassignedShardsToInitialize.get(shardRouting);
173181
if (nodeIds != null && nodeIds.isEmpty() == false) {
174182
final var nodeId = nodeIds.removeFirst();
175-
final var shardToInitialize = unassignedPrimaryIterator.initialize(nodeId, null, 0L, changes);
176-
clusterInfoSimulator.simulateShardStarted(shardToInitialize);
177-
routingNodes.startShard(logger, shardToInitialize, changes, 0L);
183+
final var routingNode = routingNodes.node(nodeId);
184+
if (routingNode != null
185+
&& routingAllocation.deciders()
186+
.canAllocate(shardRouting, routingNode, routingAllocation)
187+
.type() != Decision.Type.NO) {
188+
final var shardToInitialize = unassignedPrimaryIterator.initialize(nodeId, null, 0L, changes);
189+
clusterInfoSimulator.simulateShardStarted(shardToInitialize);
190+
routingNodes.startShard(logger, shardToInitialize, changes, 0L);
191+
}
178192
}
179193
}
180194
}
@@ -185,10 +199,16 @@ public DesiredBalance compute(
185199
if (unassignedPrimaries.contains(shardRouting.shardId()) == false) {
186200
final var nodeIds = unassignedShardsToInitialize.get(shardRouting);
187201
if (nodeIds != null && nodeIds.isEmpty() == false) {
188-
final String nodeId = nodeIds.removeFirst();
189-
ShardRouting shardToInitialize = unassignedReplicaIterator.initialize(nodeId, null, 0L, changes);
190-
clusterInfoSimulator.simulateShardStarted(shardToInitialize);
191-
routingNodes.startShard(logger, shardToInitialize, changes, 0L);
202+
final var nodeId = nodeIds.removeFirst();
203+
final var routingNode = routingNodes.node(nodeId);
204+
if (routingNode != null
205+
&& routingAllocation.deciders()
206+
.canAllocate(shardRouting, routingNode, routingAllocation)
207+
.type() != Decision.Type.NO) {
208+
final var shardToInitialize = unassignedReplicaIterator.initialize(nodeId, null, 0L, changes);
209+
clusterInfoSimulator.simulateShardStarted(shardToInitialize);
210+
routingNodes.startShard(logger, shardToInitialize, changes, 0L);
211+
}
192212
}
193213
}
194214
}

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -318,13 +318,13 @@ private static String diff(DesiredBalance old, DesiredBalance updated) {
318318
var oldAssignment = old.getAssignment(shardId);
319319
var updatedAssignment = updated.getAssignment(shardId);
320320
if (Objects.equals(oldAssignment, updatedAssignment) == false) {
321-
builder.append(newLine).append(shardId).append(": ").append(oldAssignment).append(" --> ").append(updatedAssignment);
321+
builder.append(newLine).append(shardId).append(": ").append(oldAssignment).append(" -> ").append(updatedAssignment);
322322
}
323323
}
324324
for (ShardId shardId : diff) {
325325
var oldAssignment = old.getAssignment(shardId);
326326
var updatedAssignment = updated.getAssignment(shardId);
327-
builder.append(newLine).append(shardId).append(": ").append(oldAssignment).append(" --> ").append(updatedAssignment);
327+
builder.append(newLine).append(shardId).append(": ").append(oldAssignment).append(" -> ").append(updatedAssignment);
328328
}
329329
return builder.append(newLine).toString();
330330
}

server/src/test/java/org/elasticsearch/cluster/routing/allocation/AwarenessAllocationTests.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.elasticsearch.cluster.ESAllocationTestCase;
1818
import org.elasticsearch.cluster.metadata.IndexMetadata;
1919
import org.elasticsearch.cluster.metadata.Metadata;
20+
import org.elasticsearch.cluster.node.DiscoveryNode;
2021
import org.elasticsearch.cluster.node.DiscoveryNodes;
2122
import org.elasticsearch.cluster.routing.RoutingNode;
2223
import org.elasticsearch.cluster.routing.RoutingTable;
@@ -35,6 +36,7 @@
3536
import java.util.HashMap;
3637
import java.util.Map;
3738
import java.util.function.UnaryOperator;
39+
import java.util.stream.Collectors;
3840

3941
import static java.util.Collections.emptyMap;
4042
import static java.util.Collections.singletonList;
@@ -932,15 +934,24 @@ public void testUnassignedShardsWithUnbalancedZones() {
932934

933935
// Cancel all initializing shards and move started primary to another node.
934936
AllocationCommands commands = new AllocationCommands();
935-
String primaryNode = null;
937+
final var unusedNodes = clusterState.nodes().stream().map(DiscoveryNode::getId).collect(Collectors.toSet());
938+
// Cancel all initializing shards
936939
for (ShardRouting routing : clusterState.routingTable().allShards()) {
937-
if (routing.primary()) {
938-
primaryNode = routing.currentNodeId();
939-
} else if (routing.initializing()) {
940+
unusedNodes.remove(routing.currentNodeId());
941+
if (routing.initializing()) {
940942
commands.add(new CancelAllocationCommand(routing.shardId().getIndexName(), routing.id(), routing.currentNodeId(), false));
941943
}
942944
}
943-
commands.add(new MoveAllocationCommand("test", 0, primaryNode, "A-4"));
945+
// Move started primary to another node.
946+
for (ShardRouting routing : clusterState.routingTable().allShards()) {
947+
if (routing.primary()) {
948+
var currentNodeId = routing.currentNodeId();
949+
unusedNodes.remove(currentNodeId);
950+
var otherNodeId = randomFrom(unusedNodes);
951+
commands.add(new MoveAllocationCommand("test", 0, currentNodeId, otherNodeId));
952+
break;
953+
}
954+
}
944955

945956
clusterState = strategy.reroute(clusterState, commands, false, false, false, ActionListener.noop()).clusterState();
946957

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.shutdown;
9+
10+
import org.elasticsearch.action.support.PlainActionFuture;
11+
import org.elasticsearch.action.support.master.AcknowledgedResponse;
12+
import org.elasticsearch.cluster.metadata.IndexMetadata;
13+
import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata;
14+
import org.elasticsearch.cluster.service.ClusterService;
15+
import org.elasticsearch.common.Strings;
16+
import org.elasticsearch.common.settings.Settings;
17+
import org.elasticsearch.plugins.Plugin;
18+
import org.elasticsearch.test.ESIntegTestCase;
19+
import org.elasticsearch.test.InternalTestCluster;
20+
21+
import java.util.Collection;
22+
import java.util.List;
23+
import java.util.concurrent.TimeUnit;
24+
25+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
26+
27+
public class DesiredBalanceShutdownIT extends ESIntegTestCase {
28+
29+
private static final String INDEX = "test-index";
30+
31+
@Override
32+
protected Collection<Class<? extends Plugin>> nodePlugins() {
33+
return List.of(ShutdownPlugin.class);
34+
}
35+
36+
public void testDesiredBalanceWithShutdown() throws Exception {
37+
38+
final var oldNodeName = internalCluster().startNode();
39+
final var oldNodeId = internalCluster().getInstance(ClusterService.class, oldNodeName).localNode().getId();
40+
41+
createIndex(
42+
INDEX,
43+
Settings.builder()
44+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
45+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
46+
.put(IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_PREFIX + "._name", oldNodeName)
47+
.build()
48+
);
49+
ensureGreen(INDEX);
50+
51+
internalCluster().restartNode(internalCluster().startNode(), new InternalTestCluster.RestartCallback() {
52+
@Override
53+
public Settings onNodeStopped(String newNodeName) {
54+
55+
logger.info("--> excluding index from [{}] and concurrently starting replacement with [{}]", oldNodeName, newNodeName);
56+
57+
final PlainActionFuture<AcknowledgedResponse> excludeFuture = new PlainActionFuture<>();
58+
client().admin()
59+
.indices()
60+
.prepareUpdateSettings(INDEX)
61+
.setSettings(
62+
Settings.builder()
63+
.put(IndexMetadata.INDEX_ROUTING_EXCLUDE_GROUP_PREFIX + "._name", oldNodeName)
64+
.putNull(IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_PREFIX + "._name")
65+
)
66+
.execute(excludeFuture);
67+
68+
assertAcked(
69+
client().execute(
70+
PutShutdownNodeAction.INSTANCE,
71+
new PutShutdownNodeAction.Request(oldNodeId, SingleNodeShutdownMetadata.Type.REPLACE, "test", null, newNodeName)
72+
).actionGet(10, TimeUnit.SECONDS)
73+
);
74+
75+
excludeFuture.actionGet(10, TimeUnit.SECONDS);
76+
77+
return Settings.EMPTY;
78+
}
79+
});
80+
81+
logger.info("--> waiting for replacement to complete");
82+
83+
assertBusy(() -> {
84+
final var getShutdownResponse = client().execute(GetShutdownStatusAction.INSTANCE, new GetShutdownStatusAction.Request())
85+
.actionGet(10, TimeUnit.SECONDS);
86+
assertTrue(
87+
Strings.toString(getShutdownResponse, true, true),
88+
getShutdownResponse.getShutdownStatuses()
89+
.stream()
90+
.allMatch(s -> s.overallStatus() == SingleNodeShutdownMetadata.Status.COMPLETE)
91+
);
92+
});
93+
}
94+
95+
}

0 commit comments

Comments
 (0)