Skip to content

Commit 2806d30

Browse files
committed
Breakdown undesired allocations by tier
1 parent c0facac commit 2806d30

File tree

13 files changed

+155
-37
lines changed

13 files changed

+155
-37
lines changed

server/src/main/java/org/elasticsearch/cluster/ClusterModule.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -496,7 +496,8 @@ private static ShardsAllocator createShardsAllocator(
496496
clusterService,
497497
reconciler,
498498
telemetryProvider,
499-
nodeAllocationStatsAndWeightsCalculator
499+
nodeAllocationStatsAndWeightsCalculator,
500+
balancingWeightsFactory
500501
)
501502
);
502503

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancingWeightsFactory.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99

1010
package org.elasticsearch.cluster.routing.allocation.allocator;
1111

12+
import org.elasticsearch.cluster.routing.ShardRouting;
13+
1214
/**
1315
* A balancing weights factory must be able to divide all shards and nodes into mutually
1416
* disjoint partitions. Allocation balancing will then be conducted sequentially for each partition.
@@ -18,4 +20,11 @@
1820
public interface BalancingWeightsFactory {
1921

2022
BalancingWeights create();
23+
24+
/**
25+
* Get partition for shard
26+
*
27+
* @return A string identifying which partition the shard belongs to
28+
*/
29+
String partitionForShard(ShardRouting shardRouting);
2130
}

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetrics.java

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,30 @@ public class DesiredBalanceMetrics {
3131

3232
/**
3333
* @param unassignedShards Shards that are not assigned to any node.
34+
* @param partitionStats Allocation stats broken down by balancer partition
35+
*/
36+
public record AllocationStats(long unassignedShards, Map<String, PartitionStats> partitionStats) {
37+
38+
public AllocationStats(long unassignedShards, long totalAllocations, long undesiredAllocationsExcludingShuttingDownNodes) {
39+
this(unassignedShards, Map.of("global", new PartitionStats(totalAllocations, undesiredAllocationsExcludingShuttingDownNodes)));
40+
}
41+
42+
public long totalAllocations() {
43+
return partitionStats.values().stream().mapToLong(PartitionStats::totalAllocations).sum();
44+
}
45+
46+
public long undesiredAllocationsExcludingShuttingDownNodes() {
47+
return partitionStats.values().stream().mapToLong(PartitionStats::undesiredAllocationsExcludingShuttingDownNodes).sum();
48+
}
49+
}
50+
51+
/**
3452
* @param totalAllocations Shards that are assigned to a node.
3553
* @param undesiredAllocationsExcludingShuttingDownNodes Shards that are assigned to a node but must move to alleviate a resource
3654
* constraint per the {@link AllocationDeciders}. Excludes shards that must move
3755
* because of a node shutting down.
3856
*/
39-
public record AllocationStats(long unassignedShards, long totalAllocations, long undesiredAllocationsExcludingShuttingDownNodes) {}
57+
public record PartitionStats(long totalAllocations, long undesiredAllocationsExcludingShuttingDownNodes) {}
4058

4159
public record NodeWeightStats(long shardCount, double diskUsageInBytes, double writeLoad, double nodeWeight) {
4260
public static final NodeWeightStats ZERO = new NodeWeightStats(0, 0, 0, 0);
@@ -71,7 +89,7 @@ public record NodeWeightStats(long shardCount, double diskUsageInBytes, double w
7189
public static final String CURRENT_NODE_FORECASTED_DISK_USAGE_METRIC_NAME =
7290
"es.allocator.allocations.node.forecasted_disk_usage_bytes.current";
7391

74-
public static final AllocationStats EMPTY_ALLOCATION_STATS = new AllocationStats(-1, -1, -1);
92+
public static final AllocationStats EMPTY_ALLOCATION_STATS = new AllocationStats(-1, Map.of());
7593

7694
private volatile boolean nodeIsMaster = false;
7795

@@ -105,8 +123,8 @@ public void updateMetrics(
105123
assert weightStatsPerNode != null : "node balance weight stats cannot be null";
106124
if (allocationStats != EMPTY_ALLOCATION_STATS) {
107125
this.unassignedShards = allocationStats.unassignedShards;
108-
this.totalAllocations = allocationStats.totalAllocations;
109-
this.undesiredAllocationsExcludingShuttingDownNodes = allocationStats.undesiredAllocationsExcludingShuttingDownNodes;
126+
this.totalAllocations = allocationStats.totalAllocations();
127+
this.undesiredAllocationsExcludingShuttingDownNodes = allocationStats.undesiredAllocationsExcludingShuttingDownNodes();
110128
}
111129
weightStatsPerNodeRef.set(weightStatsPerNode);
112130
allocationStatsPerNodeRef.set(nodeAllocationStats);

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java

Lines changed: 55 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,19 @@
2727
import org.elasticsearch.common.Strings;
2828
import org.elasticsearch.common.settings.ClusterSettings;
2929
import org.elasticsearch.common.settings.Setting;
30+
import org.elasticsearch.common.util.Maps;
3031
import org.elasticsearch.core.TimeValue;
3132
import org.elasticsearch.core.Tuple;
3233
import org.elasticsearch.gateway.PriorityComparator;
3334
import org.elasticsearch.index.IndexVersions;
3435
import org.elasticsearch.index.shard.ShardId;
3536
import org.elasticsearch.threadpool.ThreadPool;
3637

38+
import java.util.Collections;
3739
import java.util.Comparator;
40+
import java.util.HashMap;
3841
import java.util.Iterator;
42+
import java.util.Map;
3943
import java.util.Set;
4044
import java.util.function.BiFunction;
4145
import java.util.stream.Collectors;
@@ -79,12 +83,18 @@ public class DesiredBalanceReconciler {
7983
private double undesiredAllocationsLogThreshold;
8084
private final NodeAllocationOrdering allocationOrdering = new NodeAllocationOrdering();
8185
private final NodeAllocationOrdering moveOrdering = new NodeAllocationOrdering();
86+
private final BalancingWeightsFactory balancingWeightsFactory;
8287

83-
public DesiredBalanceReconciler(ClusterSettings clusterSettings, ThreadPool threadPool) {
88+
public DesiredBalanceReconciler(
89+
ClusterSettings clusterSettings,
90+
ThreadPool threadPool,
91+
BalancingWeightsFactory balancingWeightsFactory
92+
) {
8493
this.undesiredAllocationLogInterval = new FrequencyCappedAction(
8594
threadPool.relativeTimeInMillisSupplier(),
8695
TimeValue.timeValueMinutes(5)
8796
);
97+
this.balancingWeightsFactory = balancingWeightsFactory;
8898
clusterSettings.initializeAndWatch(UNDESIRED_ALLOCATIONS_LOG_INTERVAL_SETTING, this.undesiredAllocationLogInterval::setMinInterval);
8999
clusterSettings.initializeAndWatch(
90100
UNDESIRED_ALLOCATIONS_LOG_THRESHOLD_SETTING,
@@ -523,16 +533,16 @@ private void moveShards() {
523533

524534
private DesiredBalanceMetrics.AllocationStats balance() {
525535
int unassignedShards = routingNodes.unassigned().size() + routingNodes.unassigned().ignored().size();
526-
int totalAllocations = 0;
527-
int undesiredAllocationsExcludingShuttingDownNodes = 0;
536+
final AllocationStatsBuilder allocationStatsBuilder = new AllocationStatsBuilder(unassignedShards);
528537

529538
// Iterate over all started shards and try to move any which are on undesired nodes. In the presence of throttling shard
530539
// movements, the goal of this iteration order is to achieve a fairer movement of shards from the nodes that are offloading the
531540
// shards.
532541
for (final var iterator = OrderedShardsIterator.createForBalancing(allocation, moveOrdering); iterator.hasNext();) {
533542
final var shardRouting = iterator.next();
543+
final String shardPartition = balancingWeightsFactory.partitionForShard(shardRouting);
534544

535-
totalAllocations++;
545+
allocationStatsBuilder.incrementTotalAllocations(shardPartition);
536546

537547
if (shardRouting.started() == false) {
538548
// can only rebalance started shards
@@ -552,7 +562,7 @@ private DesiredBalanceMetrics.AllocationStats balance() {
552562

553563
if (allocation.metadata().nodeShutdowns().contains(shardRouting.currentNodeId()) == false) {
554564
// shard is not on a shutting down node, nor is it on a desired node per the previous check.
555-
undesiredAllocationsExcludingShuttingDownNodes++;
565+
allocationStatsBuilder.incrementUndesiredAllocationsExcludingShuttingDownNodes(shardPartition);
556566
}
557567

558568
if (allocation.deciders().canRebalance(allocation).type() != Decision.Type.YES) {
@@ -591,15 +601,16 @@ private DesiredBalanceMetrics.AllocationStats balance() {
591601
}
592602
}
593603

594-
maybeLogUndesiredAllocationsWarning(totalAllocations, undesiredAllocationsExcludingShuttingDownNodes, routingNodes.size());
595-
return new DesiredBalanceMetrics.AllocationStats(
596-
unassignedShards,
597-
totalAllocations,
598-
undesiredAllocationsExcludingShuttingDownNodes
604+
final DesiredBalanceMetrics.AllocationStats allocationStats = allocationStatsBuilder.create();
605+
maybeLogUndesiredAllocationsWarning(
606+
allocationStats.totalAllocations(),
607+
allocationStats.undesiredAllocationsExcludingShuttingDownNodes(),
608+
routingNodes.size()
599609
);
610+
return allocationStats;
600611
}
601612

602-
private void maybeLogUndesiredAllocationsWarning(int totalAllocations, int undesiredAllocations, int nodeCount) {
613+
private void maybeLogUndesiredAllocationsWarning(long totalAllocations, long undesiredAllocations, int nodeCount) {
603614
// more shards than cluster can relocate with one reroute
604615
final boolean nonEmptyRelocationBacklog = undesiredAllocations > 2L * nodeCount;
605616
final boolean warningThresholdReached = undesiredAllocations > undesiredAllocationsLogThreshold * totalAllocations;
@@ -662,5 +673,38 @@ private Decision decideCanForceAllocateForVacate(ShardRouting shardRouting, Rout
662673
assert target != null : "Target node is not found";
663674
return allocation.deciders().canForceAllocateDuringReplace(shardRouting, target, allocation);
664675
}
676+
677+
private static class AllocationStatsBuilder {
678+
private final int unassignedShards;
679+
private final Map<String, PartitionStats> partitionStats = new HashMap<>();
680+
681+
private AllocationStatsBuilder(int unassignedShards) {
682+
this.unassignedShards = unassignedShards;
683+
}
684+
685+
public DesiredBalanceMetrics.AllocationStats create() {
686+
return new DesiredBalanceMetrics.AllocationStats(
687+
unassignedShards,
688+
Collections.unmodifiableMap(Maps.transformValues(partitionStats, PartitionStats::create))
689+
);
690+
}
691+
692+
public void incrementTotalAllocations(String partition) {
693+
partitionStats.computeIfAbsent(partition, p -> new PartitionStats()).totalAllocations++;
694+
}
695+
696+
public void incrementUndesiredAllocationsExcludingShuttingDownNodes(String partition) {
697+
partitionStats.computeIfAbsent(partition, p -> new PartitionStats()).undesiredAllocationsExcludingShuttingDownNodes++;
698+
}
699+
700+
private static class PartitionStats {
701+
long totalAllocations;
702+
long undesiredAllocationsExcludingShuttingDownNodes;
703+
704+
public DesiredBalanceMetrics.PartitionStats create() {
705+
return new DesiredBalanceMetrics.PartitionStats(totalAllocations, undesiredAllocationsExcludingShuttingDownNodes);
706+
}
707+
}
708+
}
665709
}
666710
}

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,8 @@ public DesiredBalanceShardsAllocator(
116116
ClusterService clusterService,
117117
DesiredBalanceReconcilerAction reconciler,
118118
TelemetryProvider telemetryProvider,
119-
NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator
119+
NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator,
120+
BalancingWeightsFactory balancingWeightsFactory
120121
) {
121122
this(
122123
delegateAllocator,
@@ -125,7 +126,8 @@ public DesiredBalanceShardsAllocator(
125126
new DesiredBalanceComputer(clusterSettings, threadPool, delegateAllocator),
126127
reconciler,
127128
telemetryProvider,
128-
nodeAllocationStatsAndWeightsCalculator
129+
nodeAllocationStatsAndWeightsCalculator,
130+
balancingWeightsFactory
129131
);
130132
}
131133

@@ -136,7 +138,8 @@ public DesiredBalanceShardsAllocator(
136138
DesiredBalanceComputer desiredBalanceComputer,
137139
DesiredBalanceReconcilerAction reconciler,
138140
TelemetryProvider telemetryProvider,
139-
NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator
141+
NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator,
142+
BalancingWeightsFactory balancingWeightsFactory
140143
) {
141144
this.desiredBalanceMetrics = new DesiredBalanceMetrics(telemetryProvider.getMeterRegistry());
142145
this.nodeAllocationStatsAndWeightsCalculator = nodeAllocationStatsAndWeightsCalculator;
@@ -145,7 +148,11 @@ public DesiredBalanceShardsAllocator(
145148
this.threadPool = threadPool;
146149
this.reconciler = reconciler;
147150
this.desiredBalanceComputer = desiredBalanceComputer;
148-
this.desiredBalanceReconciler = new DesiredBalanceReconciler(clusterService.getClusterSettings(), threadPool);
151+
this.desiredBalanceReconciler = new DesiredBalanceReconciler(
152+
clusterService.getClusterSettings(),
153+
threadPool,
154+
balancingWeightsFactory
155+
);
149156
this.desiredBalanceComputation = new ContinuousComputation<>(threadPool.generic()) {
150157

151158
@Override

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/GlobalBalancingWeightsFactory.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
public class GlobalBalancingWeightsFactory implements BalancingWeightsFactory {
1919

20+
public static final String GLOBAL_PARTITION_NAME = "global";
2021
private final BalancerSettings balancerSettings;
2122

2223
public GlobalBalancingWeightsFactory(BalancerSettings balancerSettings) {
@@ -28,6 +29,11 @@ public BalancingWeights create() {
2829
return new GlobalBalancingWeights();
2930
}
3031

32+
@Override
33+
public String partitionForShard(ShardRouting shardRouting) {
34+
return GLOBAL_PARTITION_NAME;
35+
}
36+
3137
private class GlobalBalancingWeights implements BalancingWeights {
3238

3339
private final WeightFunction weightFunction;

server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/TransportDeleteDesiredBalanceActionTests.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,12 @@
2525
import org.elasticsearch.cluster.routing.RoutingTable;
2626
import org.elasticsearch.cluster.routing.allocation.AllocationService;
2727
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
28+
import org.elasticsearch.cluster.routing.allocation.allocator.BalancerSettings;
2829
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalance;
2930
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceComputer;
3031
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceInput;
3132
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator;
33+
import org.elasticsearch.cluster.routing.allocation.allocator.GlobalBalancingWeightsFactory;
3234
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator;
3335
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
3436
import org.elasticsearch.cluster.service.ClusterService;
@@ -121,7 +123,8 @@ public DesiredBalance compute(
121123
computer,
122124
(state, action) -> state,
123125
TelemetryProvider.NOOP,
124-
EMPTY_NODE_ALLOCATION_STATS
126+
EMPTY_NODE_ALLOCATION_STATS,
127+
new GlobalBalancingWeightsFactory(BalancerSettings.DEFAULT)
125128
);
126129
var allocationService = new MockAllocationService(
127130
randomAllocationDeciders(settings, clusterSettings),

server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsServiceTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,8 @@ public void testUndesiredShardCount() {
176176
clusterService,
177177
(innerState, strategy) -> innerState,
178178
TelemetryProvider.NOOP,
179-
EMPTY_NODE_ALLOCATION_STATS
179+
EMPTY_NODE_ALLOCATION_STATS,
180+
new GlobalBalancingWeightsFactory(BalancerSettings.DEFAULT)
180181
) {
181182
@Override
182183
public DesiredBalance getDesiredBalance() {

server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -797,7 +797,7 @@ private void addIndex(
797797
* A {@link BalancingWeightsFactory} that assumes the cluster is partitioned by the prefix
798798
* of the node and shard names before the `-`.
799799
*/
800-
class PrefixBalancingWeightsFactory implements BalancingWeightsFactory {
800+
static class PrefixBalancingWeightsFactory implements BalancingWeightsFactory {
801801

802802
private final Map<String, WeightFunction> prefixWeights;
803803

@@ -810,6 +810,11 @@ public BalancingWeights create() {
810810
return new PrefixBalancingWeights();
811811
}
812812

813+
@Override
814+
public String partitionForShard(ShardRouting shardRouting) {
815+
return prefix(shardRouting.getIndexName());
816+
}
817+
813818
class PrefixBalancingWeights implements BalancingWeights {
814819

815820
@Override

server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterAllocationSimulationTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -490,7 +490,8 @@ private Map.Entry<MockAllocationService, ShardsAllocator> createNewAllocationSer
490490
(clusterState, routingAllocationAction) -> strategyRef.get()
491491
.executeWithRoutingAllocation(clusterState, "reconcile-desired-balance", routingAllocationAction),
492492
TelemetryProvider.NOOP,
493-
EMPTY_NODE_ALLOCATION_STATS
493+
EMPTY_NODE_ALLOCATION_STATS,
494+
new GlobalBalancingWeightsFactory(BalancerSettings.DEFAULT)
494495
) {
495496
@Override
496497
public void allocate(RoutingAllocation allocation, ActionListener<Void> listener) {

0 commit comments

Comments
 (0)