Skip to content

Commit 455fde2

Browse files
Add documentation around desired balance (#119902)
Relates ES-10341
1 parent 5167497 commit 455fde2

File tree

8 files changed

+94
-24
lines changed

8 files changed

+94
-24
lines changed

server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -158,9 +158,7 @@ public ClusterState applyStartedShards(ClusterState clusterState, List<ShardRout
158158
}
159159

160160
private static ClusterState buildResultAndLogHealthChange(ClusterState oldState, RoutingAllocation allocation, String reason) {
161-
final RoutingTable oldRoutingTable = oldState.routingTable();
162-
final RoutingNodes newRoutingNodes = allocation.routingNodes();
163-
final RoutingTable newRoutingTable = RoutingTable.of(newRoutingNodes);
161+
final RoutingTable newRoutingTable = RoutingTable.of(allocation.routingNodes());
164162
final Metadata newMetadata = allocation.updateMetadataWithRoutingChanges(newRoutingTable);
165163
assert newRoutingTable.validate(newMetadata); // validates the routing table is coherent with the cluster state metadata
166164

@@ -271,8 +269,7 @@ public ClusterState applyFailedShards(
271269
}
272270

273271
/**
274-
* unassigned an shards that are associated with nodes that are no longer part of the cluster, potentially promoting replicas
275-
* if needed.
272+
* Unassign any shards that are associated with nodes that are no longer part of the cluster, potentially promoting replicas if needed.
276273
*/
277274
public ClusterState disassociateDeadNodes(ClusterState clusterState, boolean reroute, String reason) {
278275
RoutingAllocation allocation = createRoutingAllocation(clusterState, currentNanoTime());
@@ -284,7 +281,7 @@ public ClusterState disassociateDeadNodes(ClusterState clusterState, boolean rer
284281
clusterState = buildResultAndLogHealthChange(clusterState, allocation, reason);
285282
}
286283
if (reroute) {
287-
return reroute(clusterState, reason, rerouteCompletionIsNotRequired());// this is not triggered by a user request
284+
return reroute(clusterState, reason, rerouteCompletionIsNotRequired() /* this is not triggered by a user request */);
288285
} else {
289286
return clusterState;
290287
}

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalance.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@
1919
/**
2020
* The desired balance of the cluster, indicating which nodes should hold a copy of each shard.
2121
*
22+
* @param lastConvergedIndex Identifies what input data the balancer computation round used to produce this {@link DesiredBalance}. See
23+
* {@link DesiredBalanceInput#index()} for details. Each reroute request in the same master term is assigned a
24+
* strictly increasing sequence number. A new master term restarts the index values from zero. The balancer,
25+
* which runs async to reroute, uses the latest request's data to compute the desired balance.
2226
* @param assignments a set of the (persistent) node IDs to which each {@link ShardId} should be allocated
2327
* @param weightsPerNode The node weights calculated based on
2428
* {@link org.elasticsearch.cluster.routing.allocation.allocator.WeightFunction#calculateNodeWeight}
@@ -31,8 +35,11 @@ public record DesiredBalance(
3135
) {
3236

3337
enum ComputationFinishReason {
38+
/** Computation ran to completion */
3439
CONVERGED,
40+
/** Computation exited and published early because a new cluster event occurred that affects computation */
3541
YIELD_TO_NEW_INPUT,
42+
/** Computation stopped and published early to avoid delaying new shard assignment */
3643
STOP_EARLY
3744
}
3845

@@ -44,6 +51,7 @@ public DesiredBalance(long lastConvergedIndex, Map<ShardId, ShardAssignment> ass
4451
* The placeholder value for {@link DesiredBalance} when the node stands down as master.
4552
*/
4653
public static final DesiredBalance NOT_MASTER = new DesiredBalance(-2, Map.of());
54+
4755
/**
4856
* The starting value for {@link DesiredBalance} when the node becomes the master.
4957
*/
@@ -57,6 +65,10 @@ public static boolean hasChanges(DesiredBalance a, DesiredBalance b) {
5765
return Objects.equals(a.assignments, b.assignments) == false;
5866
}
5967

68+
/**
69+
* Returns the sum of shard movements needed to reach the new desired balance. Doesn't count new shard copies as a move, nor removal or
70+
* unassignment of a shard copy.
71+
*/
6072
public static int shardMovements(DesiredBalance old, DesiredBalance updated) {
6173
var intersection = Sets.intersection(old.assignments().keySet(), updated.assignments().keySet());
6274
int movements = 0;
@@ -70,8 +82,15 @@ public static int shardMovements(DesiredBalance old, DesiredBalance updated) {
7082
return movements;
7183
}
7284

85+
/**
86+
* Returns the number of shard movements needed to reach the new shard assignment. Doesn't count new shard copies as a move, nor removal
87+
* or unassignment of a shard copy.
88+
*/
7389
private static int shardMovements(ShardAssignment old, ShardAssignment updated) {
74-
var movements = Math.min(0, old.assigned() - updated.assigned());// compensate newly started shards
90+
// A shard move should retain the same number of assigned nodes, just swap out one node for another. We will compensate for newly
91+
// started shards -- adding a shard copy is not a move -- by initializing the count with a negative value so that incrementing later
92+
// for a new node zeros out.
93+
var movements = Math.min(0, old.assigned() - updated.assigned());
7594
for (String nodeId : updated.nodeIds()) {
7695
if (old.nodeIds().contains(nodeId) == false) {
7796
movements++;

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputer.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -415,11 +415,14 @@ boolean hasEnoughIterations(int currentIteration) {
415415
}
416416

417417
private static Map<ShardId, ShardAssignment> collectShardAssignments(RoutingNodes routingNodes) {
418-
final var entries = routingNodes.getAssignedShards().entrySet();
419-
assert entries.stream().flatMap(t -> t.getValue().stream()).allMatch(ShardRouting::started) : routingNodes;
420-
final Map<ShardId, ShardAssignment> res = Maps.newHashMapWithExpectedSize(entries.size());
421-
for (var shardAndAssignments : entries) {
422-
res.put(shardAndAssignments.getKey(), ShardAssignment.ofAssignedShards(shardAndAssignments.getValue()));
418+
final var allAssignedShards = routingNodes.getAssignedShards().entrySet();
419+
assert allAssignedShards.stream().flatMap(t -> t.getValue().stream()).allMatch(ShardRouting::started) : routingNodes;
420+
final Map<ShardId, ShardAssignment> res = Maps.newHashMapWithExpectedSize(allAssignedShards.size());
421+
for (var shardIdAndShardRoutings : allAssignedShards) {
422+
res.put(
423+
shardIdAndShardRoutings.getKey(),
424+
ShardAssignment.createFromAssignedShardRoutingsList(shardIdAndShardRoutings.getValue())
425+
);
423426
}
424427
return res;
425428
}

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,10 @@ public class DesiredBalanceReconciler {
5454

5555
private static final Logger logger = LogManager.getLogger(DesiredBalanceReconciler.class);
5656

57+
/**
58+
* The minimum interval that log messages will be written if the number of undesired shard allocations reaches the percentage of total
59+
* shards set by {@link #UNDESIRED_ALLOCATIONS_LOG_THRESHOLD_SETTING}.
60+
*/
5761
public static final Setting<TimeValue> UNDESIRED_ALLOCATIONS_LOG_INTERVAL_SETTING = Setting.timeSetting(
5862
"cluster.routing.allocation.desired_balance.undesired_allocations.log_interval",
5963
TimeValue.timeValueHours(1),
@@ -62,6 +66,10 @@ public class DesiredBalanceReconciler {
6266
Setting.Property.NodeScope
6367
);
6468

69+
/**
70+
* Warning log messages may be periodically written if the number of shards that are on undesired nodes reaches this percentage setting.
71+
* Works together with {@link #UNDESIRED_ALLOCATIONS_LOG_INTERVAL_SETTING} to log on a periodic basis.
72+
*/
6573
public static final Setting<Double> UNDESIRED_ALLOCATIONS_LOG_THRESHOLD_SETTING = Setting.doubleSetting(
6674
"cluster.routing.allocation.desired_balance.undesired_allocations.threshold",
6775
0.1,
@@ -96,6 +104,13 @@ public DesiredBalanceReconciler(
96104
this.nodeAllocationStatsAndWeightsCalculator = nodeAllocationStatsAndWeightsCalculator;
97105
}
98106

107+
/**
108+
* Applies a desired shard allocation to the routing table by initializing and relocating shards in the cluster state.
109+
*
110+
* @param desiredBalance The new desired cluster shard allocation
111+
* @param allocation Cluster state information with which to make decisions, contains routing table metadata that will be modified to
112+
* reach the given desired balance.
113+
*/
99114
public void reconcile(DesiredBalance desiredBalance, RoutingAllocation allocation) {
100115
var nodeIds = allocation.routingNodes().getAllNodeIds();
101116
allocationOrdering.retainNodes(nodeIds);

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.elasticsearch.cluster.ClusterStateTaskListener;
1818
import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata;
1919
import org.elasticsearch.cluster.routing.ShardRouting;
20+
import org.elasticsearch.cluster.routing.allocation.AllocationService;
2021
import org.elasticsearch.cluster.routing.allocation.AllocationService.RerouteStrategy;
2122
import org.elasticsearch.cluster.routing.allocation.NodeAllocationStatsAndWeightsCalculator;
2223
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
@@ -56,11 +57,30 @@ public class DesiredBalanceShardsAllocator implements ShardsAllocator {
5657

5758
private final ShardsAllocator delegateAllocator;
5859
private final ThreadPool threadPool;
60+
/**
61+
* This is a callback to run {@link AllocationService#executeWithRoutingAllocation(ClusterState, String, RerouteStrategy)}, which
62+
* produces a new ClusterState with the changes made by {@link DesiredBalanceReconciler#reconcile}. The {@link RerouteStrategy} provided
63+
* to the callback calls into {@link #desiredBalanceReconciler} for the changes. The {@link #masterServiceTaskQueue} will publish the
64+
* new cluster state after the cluster state is constructed by the {@link ReconcileDesiredBalanceExecutor}.
65+
*/
5966
private final DesiredBalanceReconcilerAction reconciler;
6067
private final DesiredBalanceComputer desiredBalanceComputer;
68+
/**
69+
* Reconciliation ({@link DesiredBalanceReconciler#reconcile(DesiredBalance, RoutingAllocation)}) takes the {@link DesiredBalance}
70+
* output of {@link DesiredBalanceComputer#compute} and identifies how shards need to be added, moved or removed to go from the current
71+
* cluster shard allocation to the new desired allocation.
72+
*/
6173
private final DesiredBalanceReconciler desiredBalanceReconciler;
6274
private final ContinuousComputation<DesiredBalanceInput> desiredBalanceComputation;
63-
private final PendingListenersQueue queue;
75+
/**
76+
* Saves and runs listeners after DesiredBalance computations complete.
77+
*/
78+
private final PendingListenersQueue pendingListenersQueue;
79+
/**
80+
* Each reroute request gets assigned a monotonically increasing sequence number. Many reroute requests may arrive before the balancer
81+
* asynchronously runs a computation. The balancer will use the latest request and save this sequence number to track back to the
82+
* request.
83+
*/
6484
private final AtomicLong indexGenerator = new AtomicLong(-1);
6585
private final ConcurrentLinkedQueue<List<MoveAllocationCommand>> pendingDesiredBalanceMoves = new ConcurrentLinkedQueue<>();
6686
private final MasterServiceTaskQueue<ReconcileDesiredBalanceTask> masterServiceTaskQueue;
@@ -199,7 +219,7 @@ public String toString() {
199219
return "DesiredBalanceShardsAllocator#allocate";
200220
}
201221
};
202-
this.queue = new PendingListenersQueue();
222+
this.pendingListenersQueue = new PendingListenersQueue();
203223
this.masterServiceTaskQueue = clusterService.createTaskQueue(
204224
"reconcile-desired-balance",
205225
Priority.URGENT,
@@ -235,7 +255,7 @@ public void allocate(RoutingAllocation allocation, ActionListener<Void> listener
235255

236256
var index = indexGenerator.incrementAndGet();
237257
logger.debug("Executing allocate for [{}]", index);
238-
queue.add(index, listener);
258+
pendingListenersQueue.add(index, listener);
239259
// This can only run on master, so unset not-master if exists
240260
if (currentDesiredBalanceRef.compareAndSet(DesiredBalance.NOT_MASTER, DesiredBalance.BECOME_MASTER_INITIAL)) {
241261
logger.debug("initialized desired balance for becoming master");
@@ -378,7 +398,7 @@ public DesiredBalanceStats getStats() {
378398
private void onNoLongerMaster() {
379399
if (indexGenerator.getAndSet(-1) != -1) {
380400
currentDesiredBalanceRef.set(DesiredBalance.NOT_MASTER);
381-
queue.completeAllAsNotMaster();
401+
pendingListenersQueue.completeAllAsNotMaster();
382402
pendingDesiredBalanceMoves.clear();
383403
desiredBalanceReconciler.clear();
384404
desiredBalanceMetrics.zeroAllMetrics();
@@ -428,7 +448,7 @@ private ClusterState applyBalance(
428448
batchExecutionContext.initialState(),
429449
createReconcileAllocationAction(latest.getTask().desiredBalance)
430450
);
431-
latest.success(() -> queue.complete(latest.getTask().desiredBalance.lastConvergedIndex()));
451+
latest.success(() -> pendingListenersQueue.complete(latest.getTask().desiredBalance.lastConvergedIndex()));
432452
return newState;
433453
}
434454
}
@@ -447,7 +467,7 @@ private static void discardSupersededTasks(
447467

448468
// only for tests - in production, this happens after reconciliation
449469
protected final void completeToLastConvergedIndex() {
450-
queue.complete(currentDesiredBalanceRef.get().lastConvergedIndex());
470+
pendingListenersQueue.complete(currentDesiredBalanceRef.get().lastConvergedIndex());
451471
}
452472

453473
private void recordTime(CounterMetric metric, Runnable action) {

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/PendingListenersQueue.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@
2424
import static org.elasticsearch.cluster.service.ClusterApplierService.CLUSTER_UPDATE_THREAD_NAME;
2525
import static org.elasticsearch.cluster.service.MasterService.MASTER_UPDATE_THREAD_NAME;
2626

27+
/**
28+
* Registers listeners with an `index` number ({@link #add(long, ActionListener)}) and then completes them whenever the latest index number
29+
* is greater or equal to a listener's index value ({@link #complete(long)}).
30+
*/
2731
public class PendingListenersQueue {
2832

2933
private static final Logger logger = LogManager.getLogger(PendingListenersQueue.class);

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardAssignment.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,14 @@
1717

1818
import static java.util.Collections.unmodifiableSet;
1919

20+
/**
21+
* Simple shard assignment summary of shard copies for a particular index shard.
22+
*
23+
* @param nodeIds The node IDs of nodes holding a shard copy.
24+
* @param total The total number of shard copies.
25+
* @param unassigned The number of unassigned shard copies.
26+
* @param ignored The number of ignored shard copies.
27+
*/
2028
public record ShardAssignment(Set<String> nodeIds, int total, int unassigned, int ignored) {
2129

2230
public ShardAssignment {
@@ -28,9 +36,13 @@ public int assigned() {
2836
return nodeIds.size();
2937
}
3038

31-
public static ShardAssignment ofAssignedShards(List<ShardRouting> routings) {
39+
/**
40+
* Helper method to instantiate a new ShardAssignment from a given list of ShardRouting instances. Assumes all shards are assigned.
41+
*/
42+
public static ShardAssignment createFromAssignedShardRoutingsList(List<ShardRouting> routings) {
3243
var nodeIds = new LinkedHashSet<String>();
3344
for (ShardRouting routing : routings) {
45+
assert routing.unassignedInfo() == null : "Expected assigned shard copies only, unassigned info: " + routing.unassignedInfo();
3446
nodeIds.add(routing.currentNodeId());
3547
}
3648
return new ShardAssignment(unmodifiableSet(nodeIds), routings.size(), 0, 0);

server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceTests.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,19 +46,19 @@ public void testShardMovements() {
4646
);
4747

4848
assertThat(
49-
"1 shard movements when existing shard is moved and new shard copy is unassigned",
49+
"1 shard movements when an existing shard copy is moved and new shard copy is unassigned",
5050
shardMovements(new ShardAssignment(Set.of("a", "b"), 2, 0, 0), new ShardAssignment(Set.of("a", "c"), 3, 1, 0)),
5151
equalTo(1)
5252
);
5353

5454
assertThat(
55-
"1 shard movement",
55+
"1 shard movement when an existing shard copy is moved",
5656
shardMovements(new ShardAssignment(Set.of("a", "b"), 2, 0, 0), new ShardAssignment(Set.of("a", "c"), 2, 0, 0)),
5757
equalTo(1)
5858
);
5959

6060
assertThat(
61-
"2 shard movement",
61+
"2 shard movements when both shard copies are move to new nodes",
6262
shardMovements(new ShardAssignment(Set.of("a", "b"), 2, 0, 0), new ShardAssignment(Set.of("c", "d"), 2, 0, 0)),
6363
equalTo(2)
6464
);
@@ -77,10 +77,10 @@ public void testShardMovements() {
7777
}
7878

7979
private static int shardMovements(ShardAssignment old, ShardAssignment updated) {
80-
return DesiredBalance.shardMovements(of(old), of(updated));
80+
return DesiredBalance.shardMovements(createDesiredBalanceWith(old), createDesiredBalanceWith(updated));
8181
}
8282

83-
private static DesiredBalance of(ShardAssignment assignment) {
83+
private static DesiredBalance createDesiredBalanceWith(ShardAssignment assignment) {
8484
return new DesiredBalance(1, Map.of(new ShardId("index", "_na_", 0), assignment));
8585
}
8686
}

0 commit comments

Comments
 (0)