Skip to content

Commit ee72be0

Browse files
Add unthrottled path for replicas in ThrottlingAllocationDecider (#138545)
Adds a new cluster setting to allow the ThrottlingAllocationDecider to bypass replica shard throttling during balancer simulation. Primary shards already always bypass throttling during simulation so that new index shards are assigned (and made available) as quickly as possible. Replicas need the same quick availability in some environments. Relates ES-12942 ------------------ I'm splitting the work for ES-12942 into pieces. Next I'll need to make sure the BalancedShardsAllocator#allocate() can assign all the replicas in one call, and then change [the DesiredBalanceComputer's early return logic](https://github.com/elastic/elasticsearch/blob/e33f903e40ff88d1ef146f8f40e3400e2a9a6c67/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputer.java#L434-L447) to pick up new assignment of unassigned replicas.
1 parent aed9d84 commit ee72be0

File tree

7 files changed

+340
-3
lines changed

7 files changed

+340
-3
lines changed

server/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ public final class ShardRouting implements Writeable, ToXContentObject {
7474
*/
7575
ShardRouting(
7676
ShardId shardId,
77-
String currentNodeId,
77+
@Nullable String currentNodeId,
7878
String relocatingNodeId,
7979
boolean primary,
8080
ShardRoutingState state,

server/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ public RoutingAllocation(
123123
* @param currentNanoTime the nano time to use for all delay allocation calculation (typically {@link System#nanoTime()})
124124
* @param isSimulating {@code true} if "transient" deciders should be ignored because we are simulating the final allocation
125125
*/
126-
private RoutingAllocation(
126+
public RoutingAllocation(
127127
AllocationDeciders deciders,
128128
@Nullable RoutingNodes routingNodes,
129129
ClusterState clusterState,

server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,9 +77,17 @@ public class ThrottlingAllocationDecider extends AllocationDecider {
7777
Property.NodeScope
7878
);
7979

80+
public static final Setting<Boolean> CLUSTER_ROUTING_ALLOCATION_UNTHROTTLE_REPLICA_ASSIGNMENT_IN_SIMULATION = Setting.boolSetting(
81+
"cluster.routing.allocation.unthrottle_replica_assignment_in_simulation",
82+
false,
83+
Property.Dynamic,
84+
Property.NodeScope
85+
);
86+
8087
private volatile int primariesInitialRecoveries;
8188
private volatile int concurrentIncomingRecoveries;
8289
private volatile int concurrentOutgoingRecoveries;
90+
private volatile boolean unthrottleReplicaAssignmentInSimulation = false;
8391

8492
public ThrottlingAllocationDecider(ClusterSettings clusterSettings) {
8593
clusterSettings.initializeAndWatch(
@@ -94,6 +102,9 @@ public ThrottlingAllocationDecider(ClusterSettings clusterSettings) {
94102
CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING,
95103
this::setConcurrentOutgoingRecoverries
96104
);
105+
clusterSettings.initializeAndWatch(CLUSTER_ROUTING_ALLOCATION_UNTHROTTLE_REPLICA_ASSIGNMENT_IN_SIMULATION, (settingVal) -> {
106+
unthrottleReplicaAssignmentInSimulation = settingVal;
107+
});
97108
logger.debug(
98109
"using node_concurrent_outgoing_recoveries [{}], node_concurrent_incoming_recoveries [{}], "
99110
+ "node_initial_primaries_recoveries [{}]",
@@ -150,6 +161,13 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
150161
);
151162
}
152163
return allocation.decision(YES, NAME, "below primary recovery limit of [%d]", primariesInitialRecoveries);
164+
} else if (allocation.isSimulating() && unthrottleReplicaAssignmentInSimulation && shardRouting.unassigned()) {
165+
// CLUSTER_ROUTING_ALLOCATION_UNTHROTTLE_REPLICA_ASSIGNMENT_IN_SIMULATION permits us to treat unassigned replicas as equally
166+
// urgent as unassigned primaries to allocate, for availability reasons.
167+
// During simulation, this supports early publishing DesiredBalance, with all unassigned shards assigned.
168+
// Notably, this bypass is only in simulation decisions. Reconciliation will continue to obey throttling, in particular the
169+
// requirement to assign a primary before allowing its replicas to begin initializing.
170+
return allocation.decision(Decision.YES, NAME, "replica allocation is not throttled when simulating");
153171
} else {
154172
// Peer recovery
155173
assert initializingShard(shardRouting, node.nodeId()).recoverySource().getType() == RecoverySource.Type.PEER;

server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,7 @@ public void apply(Settings value, Settings current, Settings previous) {
291291
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING,
292292
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING,
293293
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING,
294+
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_UNTHROTTLE_REPLICA_ASSIGNMENT_IN_SIMULATION,
294295
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING,
295296
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_MAX_HEADROOM_SETTING,
296297
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING,
Lines changed: 234 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,234 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.cluster.routing.allocation;
11+
12+
import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
13+
import org.elasticsearch.cluster.ClusterInfo;
14+
import org.elasticsearch.cluster.ClusterState;
15+
import org.elasticsearch.cluster.ESAllocationTestCase;
16+
import org.elasticsearch.cluster.metadata.IndexMetadata;
17+
import org.elasticsearch.cluster.metadata.ProjectId;
18+
import org.elasticsearch.cluster.routing.RoutingChangesObserver;
19+
import org.elasticsearch.cluster.routing.RoutingNode;
20+
import org.elasticsearch.cluster.routing.RoutingNodes;
21+
import org.elasticsearch.cluster.routing.RoutingTable;
22+
import org.elasticsearch.cluster.routing.ShardRouting;
23+
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
24+
import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
25+
import org.elasticsearch.common.settings.ClusterSettings;
26+
import org.elasticsearch.common.settings.Settings;
27+
import org.elasticsearch.index.Index;
28+
import org.elasticsearch.index.shard.ShardId;
29+
30+
import static org.hamcrest.Matchers.equalTo;
31+
import static org.junit.Assert.assertFalse;
32+
import static org.junit.Assert.assertTrue;
33+
34+
public class ThrottlingAllocationDeciderTests extends ESAllocationTestCase {
35+
36+
private record TestHarness(
37+
ClusterState clusterState,
38+
RoutingNodes mutableRoutingNodes,
39+
RoutingNode mutableRoutingNode1,
40+
RoutingNode mutableRoutingNode2,
41+
ShardRouting unassignedShardRouting1Primary,
42+
ShardRouting unassignedShardRouting1Replica,
43+
ShardRouting unassignedShardRouting2Primary,
44+
ShardRouting unassignedShardRouting2Replica
45+
) {}
46+
47+
private TestHarness setUpTwoNodesAndIndexWithTwoUnassignedPrimariesAndReplicas() {
48+
int numberOfShards = 2;
49+
ClusterState clusterState = ClusterStateCreationUtils.stateWithUnassignedPrimariesAndReplicas(
50+
new String[] { "test-index" },
51+
numberOfShards,
52+
1
53+
);
54+
// The number of data nodes the util method above creates is numberOfReplicas+1.
55+
assertEquals(2, clusterState.nodes().size());
56+
assertEquals(1, clusterState.metadata().getTotalNumberOfIndices());
57+
58+
var indexIterator = clusterState.metadata().indicesAllProjects().iterator();
59+
assertTrue(indexIterator.hasNext());
60+
IndexMetadata testIndexMetadata = indexIterator.next();
61+
assertFalse(indexIterator.hasNext());
62+
Index testIndex = testIndexMetadata.getIndex();
63+
assertEquals(numberOfShards, testIndexMetadata.getNumberOfShards());
64+
ShardId testShardId1 = new ShardId(testIndex, 0);
65+
ShardId testShardId2 = new ShardId(testIndex, 1);
66+
67+
var mutableRoutingNodes = clusterState.mutableRoutingNodes();
68+
69+
// The RoutingNode references must be to the RoutingAllocation's RoutingNodes, so that changes to one is reflected in the other.
70+
var routingNodesIterator = mutableRoutingNodes.iterator();
71+
assertTrue(routingNodesIterator.hasNext());
72+
var mutableRoutingNode1 = routingNodesIterator.next();
73+
assertTrue(routingNodesIterator.hasNext());
74+
var mutableRoutingNode2 = routingNodesIterator.next();
75+
assertFalse(routingNodesIterator.hasNext());
76+
77+
RoutingTable routingTable = clusterState.routingTable(ProjectId.DEFAULT);
78+
79+
assertThat(routingTable.shardRoutingTable(testShardId1).replicaShards().size(), equalTo(1));
80+
assertThat(routingTable.shardRoutingTable(testShardId2).replicaShards().size(), equalTo(1));
81+
82+
ShardRouting unassignedShardRouting1Primary = routingTable.shardRoutingTable(testShardId1).primaryShard();
83+
ShardRouting unassignedShardRouting1Replica = routingTable.shardRoutingTable(testShardId1).replicaShards().get(0);
84+
ShardRouting unassignedShardRouting2Primary = routingTable.shardRoutingTable(testShardId2).primaryShard();
85+
ShardRouting unassignedShardRouting2Replica = routingTable.shardRoutingTable(testShardId2).replicaShards().get(0);
86+
87+
assertFalse(unassignedShardRouting1Primary.assignedToNode());
88+
assertFalse(unassignedShardRouting1Replica.assignedToNode());
89+
assertFalse(unassignedShardRouting2Primary.assignedToNode());
90+
assertFalse(unassignedShardRouting2Replica.assignedToNode());
91+
92+
return new TestHarness(
93+
clusterState,
94+
mutableRoutingNodes,
95+
mutableRoutingNode1,
96+
mutableRoutingNode2,
97+
unassignedShardRouting1Primary,
98+
unassignedShardRouting1Replica,
99+
unassignedShardRouting2Primary,
100+
unassignedShardRouting2Replica
101+
);
102+
}
103+
104+
public void testPrimaryAndReplicaThrottlingNotSimulation() {
105+
/* Create cluster state for multiple nodes and an index with _unassigned_ shards. */
106+
TestHarness harness = setUpTwoNodesAndIndexWithTwoUnassignedPrimariesAndReplicas();
107+
108+
/* Decider Testing */
109+
110+
// Set up RoutingAllocation in non-simulation mode.
111+
var routingAllocation = new RoutingAllocation(
112+
null,
113+
harness.mutableRoutingNodes,
114+
harness.clusterState,
115+
ClusterInfo.builder().build(),
116+
null,
117+
System.nanoTime(),
118+
false // Turn off isSimulating
119+
);
120+
121+
final RoutingChangesObserver NOOP = new RoutingChangesObserver() {
122+
};
123+
Settings settings = Settings.builder()
124+
.put("cluster.routing.allocation.unthrottle_replica_assignment_in_simulation", randomBoolean() ? true : false)
125+
.put("cluster.routing.allocation.node_concurrent_recoveries", 1)
126+
.put("cluster.routing.allocation.node_initial_primaries_recoveries", 1)
127+
.build();
128+
assertFalse(routingAllocation.isSimulating());
129+
ThrottlingAllocationDecider decider = new ThrottlingAllocationDecider(ClusterSettings.createBuiltInClusterSettings(settings));
130+
131+
// A single primary can be allocated.
132+
assertThat(
133+
decider.canAllocate(harness.unassignedShardRouting1Primary, harness.mutableRoutingNode1, routingAllocation),
134+
equalTo(Decision.YES)
135+
);
136+
var shardRouting1PrimaryInitializing = harness.mutableRoutingNodes.initializeShard(
137+
harness.unassignedShardRouting1Primary,
138+
harness.mutableRoutingNode1.nodeId(),
139+
null,
140+
0,
141+
NOOP
142+
);
143+
144+
// Leaving the first shard's primary in an INITIALIZING state should THROTTLE further allocation.
145+
// Only 1 concurrent allocation is allowed.
146+
assertThat(
147+
decider.canAllocate(harness.unassignedShardRouting2Primary, harness.mutableRoutingNode1, routingAllocation),
148+
equalTo(Decision.THROTTLE)
149+
);
150+
151+
// The first shard's replica should receive a simple NO because the corresponding primary is not active yet.
152+
assertThat(
153+
decider.canAllocate(harness.unassignedShardRouting1Replica, harness.mutableRoutingNode2, routingAllocation),
154+
equalTo(Decision.NO)
155+
);
156+
157+
// Start the first shard's primary, and initialize the second shard's primary to again reach the 1 concurrency limit.
158+
harness.mutableRoutingNodes.startShard(shardRouting1PrimaryInitializing, NOOP, 0);
159+
assertThat(
160+
decider.canAllocate(harness.unassignedShardRouting2Primary, harness.mutableRoutingNode2, routingAllocation),
161+
equalTo(Decision.YES)
162+
);
163+
harness.mutableRoutingNodes.initializeShard(
164+
harness.unassignedShardRouting2Primary,
165+
harness.mutableRoutingNode2.nodeId(),
166+
null,
167+
0,
168+
NOOP
169+
);
170+
171+
// The first shard's replica should receive THROTTLE now, since the primary is active.
172+
// There is still already 1 allocation in progress, which is the limit.
173+
assertThat(
174+
decider.canAllocate(harness.unassignedShardRouting1Replica, harness.mutableRoutingNode2, routingAllocation),
175+
equalTo(Decision.THROTTLE)
176+
);
177+
}
178+
179+
public void testPrimaryAndReplicaThrottlingInSimulation() {
180+
/* Create cluster state for multiple nodes and an index with _unassigned_ shards. */
181+
TestHarness harness = setUpTwoNodesAndIndexWithTwoUnassignedPrimariesAndReplicas();
182+
var mutableRoutingNodes = harness.clusterState.mutableRoutingNodes();
183+
184+
/* Decider Testing */
185+
186+
// Set up RoutingAllocation in simulation mode.
187+
var routingAllocation = new RoutingAllocation(
188+
null,
189+
mutableRoutingNodes,
190+
harness.clusterState,
191+
ClusterInfo.builder().build(),
192+
null,
193+
System.nanoTime(),
194+
true // Turn on isSimulating
195+
);
196+
197+
final RoutingChangesObserver NOOP = new RoutingChangesObserver() {
198+
};
199+
Settings settings = Settings.builder()
200+
.put("cluster.routing.allocation.unthrottle_replica_assignment_in_simulation", true)
201+
.put("cluster.routing.allocation.node_concurrent_recoveries", 1)
202+
.put("cluster.routing.allocation.node_initial_primaries_recoveries", 1)
203+
.build();
204+
assertTrue(routingAllocation.isSimulating());
205+
ThrottlingAllocationDecider decider = new ThrottlingAllocationDecider(ClusterSettings.createBuiltInClusterSettings(settings));
206+
207+
// Primary path is unthrottled during simulation, regardless of the `node_initial_primaries_recoveries` setting
208+
assertThat(
209+
decider.canAllocate(harness.unassignedShardRouting1Primary, harness.mutableRoutingNode1, routingAllocation),
210+
equalTo(Decision.YES)
211+
);
212+
mutableRoutingNodes.initializeShard(harness.unassignedShardRouting1Primary, harness.mutableRoutingNode1.nodeId(), null, 0, NOOP);
213+
assertThat(
214+
decider.canAllocate(harness.unassignedShardRouting2Primary, harness.mutableRoutingNode1, routingAllocation),
215+
equalTo(Decision.YES)
216+
);
217+
mutableRoutingNodes.initializeShard(harness.unassignedShardRouting2Primary, harness.mutableRoutingNode1.nodeId(), null, 0, NOOP);
218+
219+
// Replica path is unthrottled during simulation AND `unthrottle_replica_assignment_in_simulation` is set to true.
220+
assertThat(
221+
decider.canAllocate(harness.unassignedShardRouting1Replica, harness.mutableRoutingNode2, routingAllocation),
222+
equalTo(Decision.YES)
223+
);
224+
mutableRoutingNodes.initializeShard(harness.unassignedShardRouting1Replica, harness.mutableRoutingNode2.nodeId(), null, 0, NOOP);
225+
assertThat(
226+
decider.canAllocate(harness.unassignedShardRouting2Replica, harness.mutableRoutingNode2, routingAllocation),
227+
equalTo(Decision.YES)
228+
);
229+
mutableRoutingNodes.initializeShard(harness.unassignedShardRouting2Replica, harness.mutableRoutingNode2.nodeId(), null, 0, NOOP);
230+
231+
// Note: INITIALIZING was chosen above, not STARTED, because the BalancedShardsAllocator only initializes. We want that path to be
232+
// unthrottled in simulation.
233+
}
234+
}

0 commit comments

Comments
 (0)