Skip to content

Commit a4e59ae

Browse files
authored
Balance priorities during reconciliation (#95454) (#96148)
When reconciling a balance with a lot of shards on undesired nodes there is a possibility of causing node hot spots due to usage of nodeInterleavedShardIterator. This iterator orders shards based on nodes they are located and order nodes based hash map iteration. This means it tends to pick shards returned first by the iterator. This change uses OrderedShardsIterator that applies custom shards order based on allocation recency. (cherry picked from commit 6ecd74d)
1 parent a72e422 commit a4e59ae

File tree

7 files changed

+344
-58
lines changed

7 files changed

+344
-58
lines changed

docs/changelog/95454.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 95454
2+
summary: Balance priorities during reconciliation
3+
area: Allocation
4+
type: bug
5+
issues: []

server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,10 @@ private ShardRouting findAssignedPrimaryIfPeerRecovery(ShardRouting routing) {
249249
return primary;
250250
}
251251

252+
public Set<String> getAllNodeIds() {
253+
return Collections.unmodifiableSet(nodesToShards.keySet());
254+
}
255+
252256
@Override
253257
public Iterator<RoutingNode> iterator() {
254258
return Collections.unmodifiableCollection(nodesToShards.values()).iterator();

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java

Lines changed: 27 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -47,16 +47,19 @@ public class DesiredBalanceReconciler {
4747
private final RoutingAllocation allocation; // name chosen to align with code in BalancedShardsAllocator but TODO rename
4848
private final RoutingNodes routingNodes;
4949
private final NodeAllocationOrdering allocationOrdering;
50+
private final NodeAllocationOrdering moveOrdering;
5051

5152
DesiredBalanceReconciler(
5253
DesiredBalance desiredBalance,
5354
RoutingAllocation routingAllocation,
54-
NodeAllocationOrdering allocationOrdering
55+
NodeAllocationOrdering allocationOrdering,
56+
NodeAllocationOrdering moveOrdering
5557
) {
5658
this.desiredBalance = desiredBalance;
5759
this.allocation = routingAllocation;
5860
this.routingNodes = routingAllocation.routingNodes();
5961
this.allocationOrdering = allocationOrdering;
62+
this.moveOrdering = moveOrdering;
6063
}
6164

6265
void run() {
@@ -220,9 +223,7 @@ private void allocateUnassigned() {
220223
final var decision = allocation.deciders().canAllocate(shard, routingNode, allocation);
221224
switch (decision.type()) {
222225
case YES -> {
223-
if (logger.isTraceEnabled()) {
224-
logger.trace("Assigned shard [{}] to [{}]", shard, desiredNodeId);
225-
}
226+
logger.debug("Assigning shard [{}] to [{}]", shard, desiredNodeId);
226227
final long shardSize = DiskThresholdDecider.getExpectedShardSize(
227228
shard,
228229
ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE,
@@ -253,9 +254,7 @@ private void allocateUnassigned() {
253254
}
254255
}
255256

256-
if (logger.isTraceEnabled()) {
257-
logger.trace("No eligible node found to assign shard [{}] amongst [{}]", shard, assignment);
258-
}
257+
logger.debug("No eligible node found to assign shard [{}] amongst [{}]", shard, assignment);
259258

260259
final UnassignedInfo.AllocationStatus allocationStatus;
261260
if (assignment == null || assignment.isIgnored(shard.primary())) {
@@ -284,20 +283,16 @@ private void allocateUnassigned() {
284283

285284
private Iterable<String> getDesiredNodesIds(ShardRouting shard, ShardAssignment assignment) {
286285
return allocationOrdering.sort(allocation.deciders().getForcedInitialShardAllocationToNodes(shard, allocation).map(forced -> {
287-
if (logger.isDebugEnabled()) {
288-
logger.debug("Shard [{}] assignment is ignored. Initial allocation forced to {}", shard.shardId(), forced);
289-
}
286+
logger.debug("Shard [{}] assignment is ignored. Initial allocation forced to {}", shard.shardId(), forced);
290287
return forced;
291288
}).orElse(assignment.nodeIds()));
292289
}
293290

294291
private Iterable<String> getFallbackNodeIds(ShardRouting shard, AtomicBoolean isThrottled) {
295292
return () -> {
296293
if (shard.primary() && isThrottled.get() == false) {
297-
var fallbackNodeIds = allocation.routingNodes().stream().map(RoutingNode::nodeId).toList();
298-
if (logger.isDebugEnabled()) {
299-
logger.trace("Shard [{}] assignment is temporary not possible. Falling back to {}", shard.shardId(), fallbackNodeIds);
300-
}
294+
var fallbackNodeIds = allocation.routingNodes().getAllNodeIds();
295+
logger.debug("Shard [{}] assignment is temporary not possible. Falling back to {}", shard.shardId(), fallbackNodeIds);
301296
return allocationOrdering.sort(fallbackNodeIds).iterator();
302297
} else {
303298
return Collections.emptyIterator();
@@ -306,10 +301,9 @@ private Iterable<String> getFallbackNodeIds(ShardRouting shard, AtomicBoolean is
306301
}
307302

308303
private void moveShards() {
309-
// Iterate over the started shards interleaving between nodes, and check if they can remain. In the presence of throttling
310-
// shard movements, the goal of this iteration order is to achieve a fairer movement of shards from the nodes that are
311-
// offloading the shards.
312-
for (final var iterator = routingNodes.nodeInterleavedShardIterator(); iterator.hasNext();) {
304+
// Iterate over all started shards and check if they can remain. In the presence of throttling shard movements,
305+
// the goal of this iteration order is to achieve a fairer movement of shards from the nodes that are offloading the shards.
306+
for (final var iterator = OrderedShardsIterator.create(routingNodes, moveOrdering); iterator.hasNext();) {
313307
final var shardRouting = iterator.next();
314308

315309
if (shardRouting.started() == false) {
@@ -343,12 +337,15 @@ private void moveShards() {
343337

344338
final var moveTarget = findRelocationTarget(shardRouting, assignment.nodeIds());
345339
if (moveTarget != null) {
340+
logger.debug("Moving shard {} from {} to {}", shardRouting.shardId(), shardRouting.currentNodeId(), moveTarget.getId());
346341
routingNodes.relocateOrReinitializeShard(
347342
shardRouting,
348343
moveTarget.getId(),
349344
allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE),
350345
allocation.changes()
351346
);
347+
iterator.dePrioritizeNode(shardRouting.currentNodeId());
348+
moveOrdering.recordAllocation(shardRouting.currentNodeId());
352349
}
353350
}
354351
}
@@ -358,10 +355,9 @@ private void balance() {
358355
return;
359356
}
360357

361-
// Iterate over the started shards interleaving between nodes, and try to move any which are on undesired nodes. In the presence of
362-
// throttling shard movements, the goal of this iteration order is to achieve a fairer movement of shards from the nodes that are
363-
// offloading the shards.
364-
for (final var iterator = routingNodes.nodeInterleavedShardIterator(); iterator.hasNext();) {
358+
// Iterate over all started shards and try to move any which are on undesired nodes. In the presence of throttling shard movements,
359+
// the goal of this iteration order is to achieve a fairer movement of shards from the nodes that are offloading the shards.
360+
for (final var iterator = OrderedShardsIterator.create(routingNodes, moveOrdering); iterator.hasNext();) {
365361
final var shardRouting = iterator.next();
366362

367363
if (shardRouting.started() == false) {
@@ -392,12 +388,21 @@ private void balance() {
392388

393389
final var rebalanceTarget = findRelocationTarget(shardRouting, assignment.nodeIds(), this::decideCanAllocate);
394390
if (rebalanceTarget != null) {
391+
logger.debug(
392+
"Rebalancing shard {} from {} to {}",
393+
shardRouting.shardId(),
394+
shardRouting.currentNodeId(),
395+
rebalanceTarget.getId()
396+
);
397+
395398
routingNodes.relocateOrReinitializeShard(
396399
shardRouting,
397400
rebalanceTarget.getId(),
398401
allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE),
399402
allocation.changes()
400403
);
404+
iterator.dePrioritizeNode(shardRouting.currentNodeId());
405+
moveOrdering.recordAllocation(shardRouting.currentNodeId());
401406
}
402407
}
403408
}

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,6 @@
1414
import org.elasticsearch.cluster.ClusterState;
1515
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
1616
import org.elasticsearch.cluster.ClusterStateTaskListener;
17-
import org.elasticsearch.cluster.routing.RoutingNode;
18-
import org.elasticsearch.cluster.routing.RoutingNodes;
1917
import org.elasticsearch.cluster.routing.ShardRouting;
2018
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
2119
import org.elasticsearch.cluster.routing.allocation.RoutingExplanations;
@@ -37,13 +35,10 @@
3735
import java.util.Comparator;
3836
import java.util.List;
3937
import java.util.Map;
40-
import java.util.Set;
4138
import java.util.concurrent.ConcurrentLinkedQueue;
4239
import java.util.concurrent.atomic.AtomicLong;
4340
import java.util.function.Consumer;
4441

45-
import static java.util.stream.Collectors.toSet;
46-
4742
/**
4843
* A {@link ShardsAllocator} which asynchronously refreshes the desired balance held by the {@link DesiredBalanceComputer} and then takes
4944
* steps towards the desired balance using the {@link DesiredBalanceReconciler}.
@@ -62,6 +57,7 @@ public class DesiredBalanceShardsAllocator implements ShardsAllocator {
6257
private final ConcurrentLinkedQueue<List<MoveAllocationCommand>> pendingDesiredBalanceMoves = new ConcurrentLinkedQueue<>();
6358
private final MasterServiceTaskQueue<ReconcileDesiredBalanceTask> masterServiceTaskQueue;
6459
private final NodeAllocationOrdering allocationOrdering = new NodeAllocationOrdering();
60+
private final NodeAllocationOrdering moveOrdering = new NodeAllocationOrdering();
6561
private volatile DesiredBalance currentDesiredBalance = DesiredBalance.INITIAL;
6662
private volatile boolean resetCurrentDesiredBalance = false;
6763

@@ -228,8 +224,13 @@ protected void reconcile(DesiredBalance desiredBalance, RoutingAllocation alloca
228224
} else {
229225
logger.debug("Reconciling desired balance for [{}]", desiredBalance.lastConvergedIndex());
230226
}
231-
allocationOrdering.retainNodes(getNodeIds(allocation.routingNodes()));
232-
recordTime(cumulativeReconciliationTime, new DesiredBalanceReconciler(desiredBalance, allocation, allocationOrdering)::run);
227+
var allNodeIds = allocation.routingNodes().getAllNodeIds();
228+
allocationOrdering.retainNodes(allNodeIds);
229+
moveOrdering.retainNodes(allNodeIds);
230+
recordTime(
231+
cumulativeReconciliationTime,
232+
new DesiredBalanceReconciler(desiredBalance, allocation, allocationOrdering, moveOrdering)::run
233+
);
233234
if (logger.isTraceEnabled()) {
234235
logger.trace("Reconciled desired balance: {}", desiredBalance);
235236
} else {
@@ -346,8 +347,4 @@ private void recordTime(CounterMetric metric, Runnable action) {
346347
metric.inc(finished - started);
347348
}
348349
}
349-
350-
private static Set<String> getNodeIds(RoutingNodes nodes) {
351-
return nodes.stream().map(RoutingNode::nodeId).collect(toSet());
352-
}
353350
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.cluster.routing.allocation.allocator;
10+
11+
import org.elasticsearch.cluster.routing.RoutingNodes;
12+
import org.elasticsearch.cluster.routing.ShardRouting;
13+
import org.elasticsearch.common.collect.Iterators;
14+
15+
import java.util.ArrayDeque;
16+
import java.util.Iterator;
17+
import java.util.NoSuchElementException;
18+
import java.util.Objects;
19+
20+
/**
21+
* This class iterates all shards from all nodes in order of allocation recency.
22+
* Shards from the node that had a new shard allocation would appear in the end of iteration.
23+
*/
24+
public class OrderedShardsIterator implements Iterator<ShardRouting> {
25+
26+
private final ArrayDeque<NodeAndShardIterator> queue;
27+
28+
public static OrderedShardsIterator create(RoutingNodes routingNodes, NodeAllocationOrdering ordering) {
29+
var queue = new ArrayDeque<NodeAndShardIterator>(routingNodes.size());
30+
for (var nodeId : ordering.sort(routingNodes.getAllNodeIds())) {
31+
var node = routingNodes.node(nodeId);
32+
if (node.size() > 0) {
33+
queue.add(new NodeAndShardIterator(nodeId, Iterators.forArray(node.copyShards())));
34+
}
35+
}
36+
return new OrderedShardsIterator(queue);
37+
}
38+
39+
private OrderedShardsIterator(ArrayDeque<NodeAndShardIterator> queue) {
40+
this.queue = queue;
41+
}
42+
43+
@Override
44+
public boolean hasNext() {
45+
return queue.isEmpty() == false;
46+
}
47+
48+
@Override
49+
public ShardRouting next() {
50+
if (queue.isEmpty()) {
51+
throw new NoSuchElementException();
52+
}
53+
var entry = queue.peek();
54+
assert entry.iterator.hasNext();
55+
final var nextShard = entry.iterator.next();
56+
if (entry.iterator.hasNext() == false) {
57+
queue.poll();
58+
}
59+
return nextShard;
60+
}
61+
62+
public void dePrioritizeNode(String nodeId) {
63+
var entry = queue.peek();
64+
if (entry != null && Objects.equals(nodeId, entry.nodeId)) {
65+
queue.offer(queue.poll());
66+
}
67+
}
68+
69+
private record NodeAndShardIterator(String nodeId, Iterator<ShardRouting> iterator) {}
70+
}

0 commit comments

Comments
 (0)