Skip to content

Commit a02ffaf

Browse files
authored
Fix RebalanceOnlyWhenActiveAllocationDecider (#96025)
Currently above decider has 2 issues: * it does not allow rebalance is relocation target is initializing * it does allow rebalancing when one of the shards is relocating and another is unassigned This commit fixes above 2 issues
1 parent e0a4edc commit a02ffaf

File tree

6 files changed

+159
-27
lines changed

6 files changed

+159
-27
lines changed

docs/changelog/96025.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 96025
2+
summary: Fix `RebalanceOnlyWhenActiveAllocationDecider`
3+
area: Allocation
4+
type: bug
5+
issues: []

server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -375,17 +375,20 @@ public ShardRouting activePromotableReplicaWithHighestVersion(ShardId shardId) {
375375
/**
376376
* Returns <code>true</code> iff all replicas are active for the given shard routing. Otherwise <code>false</code>
377377
*/
378-
public boolean allReplicasActive(ShardId shardId, Metadata metadata) {
378+
public boolean allShardsActive(ShardId shardId, Metadata metadata) {
379379
final List<ShardRouting> shards = assignedShards(shardId);
380-
if (shards.isEmpty() || shards.size() < metadata.getIndexSafe(shardId.getIndex()).getNumberOfReplicas() + 1) {
380+
final int shardCopies = metadata.getIndexSafe(shardId.getIndex()).getNumberOfReplicas() + 1;
381+
if (shards.size() < shardCopies) {
381382
return false; // if we are empty nothing is active if we have less than total at least one is unassigned
382383
}
384+
int active = 0;
383385
for (ShardRouting shard : shards) {
384-
if (shard.active() == false) {
385-
return false;
386+
if (shard.active()) {
387+
active++;
386388
}
387389
}
388-
return true;
390+
assert active <= shardCopies;
391+
return active == shardCopies;
389392
}
390393

391394
@Override

server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/RebalanceOnlyWhenActiveAllocationDecider.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,21 @@ public class RebalanceOnlyWhenActiveAllocationDecider extends AllocationDecider
1818

1919
public static final String NAME = "rebalance_only_when_active";
2020

21+
static final Decision YES_ALL_REPLICAS_ACTIVE = Decision.single(
22+
Decision.Type.YES,
23+
NAME,
24+
"rebalancing is allowed as all copies of this shard are active"
25+
);
26+
static final Decision NO_SOME_REPLICAS_INACTIVE = Decision.single(
27+
Decision.Type.NO,
28+
NAME,
29+
"rebalancing is not allowed until all copies of this shard are active"
30+
);
31+
2132
@Override
2233
public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation allocation) {
23-
if (allocation.routingNodes().allReplicasActive(shardRouting.shardId(), allocation.metadata()) == false) {
24-
return allocation.decision(Decision.NO, NAME, "rebalancing is not allowed until all replicas in the cluster are active");
25-
}
26-
return allocation.decision(Decision.YES, NAME, "rebalancing is allowed as all replicas are active in the cluster");
34+
return allocation.routingNodes().allShardsActive(shardRouting.shardId(), allocation.metadata())
35+
? YES_ALL_REPLICAS_ACTIVE
36+
: NO_SOME_REPLICAS_INACTIVE;
2737
}
2838
}

server/src/test/java/org/elasticsearch/cluster/routing/allocation/ClusterRebalanceRoutingTests.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import org.elasticsearch.cluster.routing.ShardRouting;
2525
import org.elasticsearch.cluster.routing.UnassignedInfo;
2626
import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
27-
import org.elasticsearch.common.collect.Iterators;
2827
import org.elasticsearch.common.settings.Settings;
2928
import org.elasticsearch.test.gateway.TestGatewayAllocator;
3029

@@ -52,8 +51,11 @@ public void testAlways() {
5251
);
5352

5453
Metadata metadata = Metadata.builder()
55-
.put(IndexMetadata.builder("test1").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))
56-
.put(IndexMetadata.builder("test2").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))
54+
.put(IndexMetadata.builder("test1").settings(indexSettings(Version.CURRENT, 1, 1)))
55+
.put(
56+
IndexMetadata.builder("test2")
57+
.settings(indexSettings(Version.CURRENT, 1, 1).put("index.routing.allocation.include._id", "node1,node2"))
58+
)
5759
.build();
5860

5961
RoutingTable initialRoutingTable = RoutingTable.builder(TestShardRoutingRoleStrategies.DEFAULT_ROLE_ONLY)
@@ -86,7 +88,7 @@ public void testAlways() {
8688

8789
for (int i = 0; i < clusterState.routingTable().index("test1").size(); i++) {
8890
assertThat(clusterState.routingTable().index("test1").shard(i).size(), equalTo(2));
89-
// assertThat(clusterState.routingTable().index("test1").shard(i).primaryShard().state(), equalTo(STARTED));
91+
assertThat(clusterState.routingTable().index("test1").shard(i).primaryShard().state(), equalTo(STARTED));
9092
assertThat(clusterState.routingTable().index("test1").shard(i).replicaShards().get(0).state(), equalTo(INITIALIZING));
9193
}
9294

@@ -111,14 +113,11 @@ public void testAlways() {
111113
assertThat(clusterState.routingTable().index("test2").shard(i).replicaShards().get(0).state(), equalTo(UNASSIGNED));
112114
}
113115

114-
logger.info("now, start 2 more nodes, check that rebalancing will happen (for test1) because we set it to always");
115-
clusterState = ClusterState.builder(clusterState)
116-
.nodes(DiscoveryNodes.builder(clusterState.nodes()).add(newNode("node3")).add(newNode("node4")))
117-
.build();
116+
logger.info("now, start 1 more nodes, check that rebalancing will happen (for test1) because we set it to always");
117+
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).add(newNode("node3"))).build();
118118
clusterState = strategy.reroute(clusterState, "reroute", ActionListener.noop());
119-
RoutingNodes routingNodes = clusterState.getRoutingNodes();
120119

121-
final var newNodesIterator = Iterators.concat(routingNodes.node("node3").iterator(), routingNodes.node("node4").iterator());
120+
final var newNodesIterator = clusterState.getRoutingNodes().node("node3").iterator();
122121
assertThat(newNodesIterator.next().shardId().getIndex().getName(), equalTo("test1"));
123122
assertFalse(newNodesIterator.hasNext());
124123
}

server/src/test/java/org/elasticsearch/cluster/routing/allocation/RebalanceAfterActiveTests.java

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@
3333
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
3434
import static org.elasticsearch.cluster.routing.ShardRoutingState.UNASSIGNED;
3535
import static org.hamcrest.Matchers.equalTo;
36+
import static org.hamcrest.Matchers.hasSize;
3637
import static org.hamcrest.Matchers.nullValue;
37-
import static org.hamcrest.Matchers.oneOf;
3838

3939
public class RebalanceAfterActiveTests extends ESAllocationTestCase {
4040
private final Logger logger = LogManager.getLogger(RebalanceAfterActiveTests.class);
@@ -63,7 +63,7 @@ public Long getShardSize(ShardRouting shardRouting) {
6363
);
6464
logger.info("Building initial routing table");
6565

66-
var indexMetadata = IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(5).numberOfReplicas(1).build();
66+
var indexMetadata = IndexMetadata.builder("test").settings(indexSettings(Version.CURRENT, 5, 1)).build();
6767

6868
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
6969
.metadata(Metadata.builder().put(indexMetadata, false))
@@ -128,11 +128,8 @@ public Long getShardSize(ShardRouting shardRouting) {
128128
logger.info("start the replica shards, rebalancing should start");
129129
clusterState = startInitializingShardsAndReroute(strategy, clusterState);
130130

131-
// both primary and replica should not be rebalanced at once so 5 replicas should start moving
132-
// unless we computed the balance where one of the indices already have both primary and replica on desired nodes
133-
// in such case only 4 shards are immediately relocating
134-
assertThat(shardsWithState(clusterState.getRoutingNodes(), STARTED).size(), oneOf(5, 6));
135-
assertThat(shardsWithState(clusterState.getRoutingNodes(), RELOCATING).size(), oneOf(4, 5));
131+
assertThat(shardsWithState(clusterState.getRoutingNodes(), STARTED), hasSize(2));
132+
assertThat(shardsWithState(clusterState.getRoutingNodes(), RELOCATING), hasSize(8));
136133

137134
logger.info("complete all relocations");
138135
clusterState = applyStartedShardsUntilNoChange(clusterState, strategy);
@@ -141,7 +138,7 @@ public Long getShardSize(ShardRouting shardRouting) {
141138
clusterState = startInitializingShardsAndReroute(strategy, clusterState);
142139
RoutingNodes routingNodes = clusterState.getRoutingNodes();
143140

144-
assertThat(shardsWithState(clusterState.getRoutingNodes(), STARTED).size(), equalTo(10));
141+
assertThat(shardsWithState(clusterState.getRoutingNodes(), STARTED), hasSize(10));
145142
// make sure we have an even relocation
146143
for (RoutingNode routingNode : routingNodes) {
147144
assertThat(routingNode.size(), equalTo(1));
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.cluster.routing.allocation.decider;
10+
11+
import org.elasticsearch.Version;
12+
import org.elasticsearch.cluster.ClusterName;
13+
import org.elasticsearch.cluster.ClusterState;
14+
import org.elasticsearch.cluster.ESAllocationTestCase;
15+
import org.elasticsearch.cluster.metadata.IndexMetadata;
16+
import org.elasticsearch.cluster.metadata.Metadata;
17+
import org.elasticsearch.cluster.node.DiscoveryNodes;
18+
import org.elasticsearch.cluster.routing.IndexRoutingTable;
19+
import org.elasticsearch.cluster.routing.RoutingTable;
20+
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
21+
import org.elasticsearch.index.Index;
22+
import org.elasticsearch.index.shard.ShardId;
23+
24+
import java.util.List;
25+
26+
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
27+
import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
28+
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
29+
import static org.elasticsearch.cluster.routing.ShardRoutingState.UNASSIGNED;
30+
import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting;
31+
import static org.elasticsearch.cluster.routing.allocation.decider.RebalanceOnlyWhenActiveAllocationDecider.NO_SOME_REPLICAS_INACTIVE;
32+
import static org.elasticsearch.cluster.routing.allocation.decider.RebalanceOnlyWhenActiveAllocationDecider.YES_ALL_REPLICAS_ACTIVE;
33+
import static org.hamcrest.Matchers.equalTo;
34+
35+
public class RebalanceOnlyWhenActiveAllocationDeciderTests extends ESAllocationTestCase {
36+
37+
public void testAllowRebalanceWhenAllShardsActive() {
38+
39+
var index = new Index("test", "_na_");
40+
var primary = newShardRouting(new ShardId(index, 0), "node-1", true, STARTED);
41+
var replica = newShardRouting(new ShardId(index, 0), "node-2", false, STARTED);
42+
43+
var state = ClusterState.builder(ClusterName.DEFAULT)
44+
.metadata(Metadata.builder().put(IndexMetadata.builder(index.getName()).settings(indexSettings(Version.CURRENT, 1, 1))))
45+
.nodes(DiscoveryNodes.builder().add(newNode("node-1")).add(newNode("node-2")).add(newNode("node-3")))
46+
.routingTable(RoutingTable.builder().add(IndexRoutingTable.builder(index).addShard(primary).addShard(replica)))
47+
.build();
48+
49+
var allocation = createRoutingAllocation(state);
50+
var decider = new RebalanceOnlyWhenActiveAllocationDecider();
51+
52+
assertThat(decider.canRebalance(primary, allocation), equalTo(YES_ALL_REPLICAS_ACTIVE));
53+
assertThat(decider.canRebalance(replica, allocation), equalTo(YES_ALL_REPLICAS_ACTIVE));
54+
}
55+
56+
public void testDoNotAllowRebalanceWhenSomeShardsAreNotActive() {
57+
58+
var index = new Index("test", "_na_");
59+
var primary = newShardRouting(new ShardId(index, 0), "node-1", true, STARTED);
60+
var replica = randomBoolean()
61+
? newShardRouting(new ShardId(index, 0), null, false, UNASSIGNED)
62+
: newShardRouting(new ShardId(index, 0), "node-2", false, INITIALIZING);
63+
64+
var state = ClusterState.builder(ClusterName.DEFAULT)
65+
.metadata(Metadata.builder().put(IndexMetadata.builder(index.getName()).settings(indexSettings(Version.CURRENT, 1, 1))))
66+
.nodes(DiscoveryNodes.builder().add(newNode("node-1")).add(newNode("node-2")).add(newNode("node-3")))
67+
.routingTable(RoutingTable.builder().add(IndexRoutingTable.builder(index).addShard(primary).addShard(replica)))
68+
.build();
69+
70+
var allocation = createRoutingAllocation(state);
71+
var decider = new RebalanceOnlyWhenActiveAllocationDecider();
72+
73+
assertThat(decider.canRebalance(primary, allocation), equalTo(NO_SOME_REPLICAS_INACTIVE));
74+
}
75+
76+
public void testDoNotAllowRebalanceWhenSomeShardsAreNotActiveAndRebalancing() {
77+
78+
var index = new Index("test", "_na_");
79+
var primary = newShardRouting(new ShardId(index, 0), "node-1", true, STARTED);
80+
var replica1 = newShardRouting(new ShardId(index, 0), "node-2", "node-3", false, RELOCATING);
81+
var replica2 = newShardRouting(new ShardId(index, 0), null, false, UNASSIGNED);
82+
83+
var state = ClusterState.builder(ClusterName.DEFAULT)
84+
.metadata(Metadata.builder().put(IndexMetadata.builder(index.getName()).settings(indexSettings(Version.CURRENT, 1, 2))))
85+
.nodes(DiscoveryNodes.builder().add(newNode("node-1")).add(newNode("node-2")).add(newNode("node-3")))
86+
.routingTable(
87+
RoutingTable.builder().add(IndexRoutingTable.builder(index).addShard(primary).addShard(replica1).addShard(replica2))
88+
)
89+
.build();
90+
91+
var allocation = createRoutingAllocation(state);
92+
var decider = new RebalanceOnlyWhenActiveAllocationDecider();
93+
94+
assertThat(decider.canRebalance(primary, allocation), equalTo(NO_SOME_REPLICAS_INACTIVE));
95+
}
96+
97+
public void testAllowConcurrentRebalance() {
98+
99+
var index = new Index("test", "_na_");
100+
var primary = newShardRouting(new ShardId(index, 0), "node-1", true, STARTED);
101+
var replica = newShardRouting(new ShardId(index, 0), "node-2", "node-3", false, RELOCATING);
102+
103+
var state = ClusterState.builder(ClusterName.DEFAULT)
104+
.metadata(Metadata.builder().put(IndexMetadata.builder(index.getName()).settings(indexSettings(Version.CURRENT, 1, 1))))
105+
.nodes(DiscoveryNodes.builder().add(newNode("node-1")).add(newNode("node-2")).add(newNode("node-3")))
106+
.routingTable(RoutingTable.builder().add(IndexRoutingTable.builder(index).addShard(primary).addShard(replica)))
107+
.build();
108+
109+
var allocation = createRoutingAllocation(state);
110+
var decider = new RebalanceOnlyWhenActiveAllocationDecider();
111+
112+
assertThat(decider.canRebalance(primary, allocation), equalTo(YES_ALL_REPLICAS_ACTIVE));
113+
}
114+
115+
private static RoutingAllocation createRoutingAllocation(ClusterState state) {
116+
return new RoutingAllocation(new AllocationDeciders(List.of()), state, null, null, 0L);
117+
}
118+
}

0 commit comments

Comments
 (0)