Skip to content

Commit b698d38

Browse files
authored
Allocation: introduce a new decider that balances the index shard count among nodes (#135875)
* Allocation: Include index shard counts as a criteria In a balanced allocation, for an index with n shards on a cluster of m nodes, each node should host not significantly more than n / m shards. This decider enforces this principle.
1 parent 6b2d5c6 commit b698d38

File tree

13 files changed

+634
-21
lines changed

13 files changed

+634
-21
lines changed

server/src/main/java/org/elasticsearch/cluster/ClusterModule.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider;
5959
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
6060
import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider;
61+
import org.elasticsearch.cluster.routing.allocation.decider.IndexBalanceAllocationDecider;
6162
import org.elasticsearch.cluster.routing.allocation.decider.IndexVersionAllocationDecider;
6263
import org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider;
6364
import org.elasticsearch.cluster.routing.allocation.decider.NodeReplacementAllocationDecider;
@@ -497,6 +498,7 @@ public static Collection<AllocationDecider> createAllocationDeciders(
497498
addAllocationDecider(deciders, new ThrottlingAllocationDecider(clusterSettings));
498499
addAllocationDecider(deciders, new ShardsLimitAllocationDecider(clusterSettings));
499500
addAllocationDecider(deciders, new AwarenessAllocationDecider(settings, clusterSettings));
501+
addAllocationDecider(deciders, new IndexBalanceAllocationDecider(settings, clusterSettings));
500502

501503
clusterPlugins.stream()
502504
.flatMap(p -> p.createAllocationDeciders(settings, clusterSettings).stream())

server/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodeFilters.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,10 @@ private boolean isSingleNodeFilterInternal() {
250250
|| (filters.size() > 1 && opType == OpType.AND && NON_ATTRIBUTE_NAMES.containsAll(filters.keySet()));
251251
}
252252

253+
public boolean hasFilters() {
254+
return filters.isEmpty() == false;
255+
}
256+
253257
/**
254258
* Generates a human-readable string for the DiscoverNodeFilters.
255259
* Example: {@code _id:"id1 OR blah",name:"blah OR name2"}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.cluster.routing.allocation;
11+
12+
import org.elasticsearch.common.settings.ClusterSettings;
13+
import org.elasticsearch.common.settings.Setting;
14+
15+
/**
16+
* Settings definitions for the index shard count allocation decider and associated infrastructure
17+
*/
18+
public class IndexBalanceConstraintSettings {
19+
20+
private static final String SETTING_PREFIX = "cluster.routing.allocation.index_balance_decider.";
21+
22+
public static final Setting<Boolean> INDEX_BALANCE_DECIDER_ENABLED_SETTING = Setting.boolSetting(
23+
SETTING_PREFIX + "enabled",
24+
false,
25+
Setting.Property.Dynamic,
26+
Setting.Property.NodeScope
27+
);
28+
29+
/**
30+
* This setting permits nodes to host more than ideally balanced number of index shards.
31+
* Maximum tolerated index shard count = ideal + skew_tolerance
32+
* i.e. ideal = 4 shards, skew_tolerance = 1
33+
* maximum tolerated index shards = 4 + 1 = 5.
34+
*/
35+
public static final Setting<Integer> INDEX_BALANCE_DECIDER_EXCESS_SHARDS = Setting.intSetting(
36+
SETTING_PREFIX + "excess_shards",
37+
0,
38+
0,
39+
Setting.Property.Dynamic,
40+
Setting.Property.NodeScope
41+
);
42+
43+
private volatile boolean deciderEnabled;
44+
private volatile int excessShards;
45+
46+
public IndexBalanceConstraintSettings(ClusterSettings clusterSettings) {
47+
clusterSettings.initializeAndWatch(INDEX_BALANCE_DECIDER_ENABLED_SETTING, enabled -> this.deciderEnabled = enabled);
48+
clusterSettings.initializeAndWatch(INDEX_BALANCE_DECIDER_EXCESS_SHARDS, value -> this.excessShards = value);
49+
}
50+
51+
public boolean isDeciderEnabled() {
52+
return this.deciderEnabled;
53+
}
54+
55+
public int getExcessShards() {
56+
return this.excessShards;
57+
}
58+
59+
}

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1581,6 +1581,11 @@ private boolean tryRelocateShard(ModelNode minNode, ModelNode maxNode, ProjectIn
15811581
logger.trace("No shards of [{}] can relocate from [{}] to [{}]", idx, maxNode.getNodeId(), minNode.getNodeId());
15821582
return false;
15831583
}
1584+
1585+
// Visible for testing.
1586+
public RoutingAllocation getAllocation() {
1587+
return this.allocation;
1588+
}
15841589
}
15851590

15861591
public static class ModelNode implements Iterable<ModelIndex> {
@@ -1824,7 +1829,8 @@ public WeightFunction getWeightFunction() {
18241829
}
18251830
}
18261831

1827-
record ProjectIndex(ProjectId project, String indexName) {
1832+
// Visible for testing.
1833+
public record ProjectIndex(ProjectId project, String indexName) {
18281834
ProjectIndex(RoutingAllocation allocation, ShardRouting shard) {
18291835
this(allocation.metadata().projectFor(shard.index()).id(), shard.getIndexName());
18301836
}

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/WeightFunction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,8 @@ public WeightFunction(float shardBalance, float indexBalance, float writeLoadBal
6060
theta3 = diskUsageBalance / sum;
6161
}
6262

63-
float calculateNodeWeightWithIndex(
63+
// Visible for testing
64+
public float calculateNodeWeightWithIndex(
6465
BalancedShardsAllocator.Balancer balancer,
6566
BalancedShardsAllocator.ModelNode node,
6667
ProjectIndex index

server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/FilterAllocationDecider.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,9 @@ public class FilterAllocationDecider extends AllocationDecider {
6262

6363
public static final String NAME = "filter";
6464

65-
private static final String CLUSTER_ROUTING_REQUIRE_GROUP_PREFIX = "cluster.routing.allocation.require";
66-
private static final String CLUSTER_ROUTING_INCLUDE_GROUP_PREFIX = "cluster.routing.allocation.include";
67-
private static final String CLUSTER_ROUTING_EXCLUDE_GROUP_PREFIX = "cluster.routing.allocation.exclude";
65+
public static final String CLUSTER_ROUTING_REQUIRE_GROUP_PREFIX = "cluster.routing.allocation.require";
66+
public static final String CLUSTER_ROUTING_INCLUDE_GROUP_PREFIX = "cluster.routing.allocation.include";
67+
public static final String CLUSTER_ROUTING_EXCLUDE_GROUP_PREFIX = "cluster.routing.allocation.exclude";
6868

6969
public static final Setting.AffixSetting<List<String>> CLUSTER_ROUTING_REQUIRE_GROUP_SETTING = Setting.prefixKeySetting(
7070
CLUSTER_ROUTING_REQUIRE_GROUP_PREFIX + ".",
Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.cluster.routing.allocation.decider;
11+
12+
import org.apache.logging.log4j.LogManager;
13+
import org.apache.logging.log4j.Logger;
14+
import org.elasticsearch.cluster.metadata.IndexMetadata;
15+
import org.elasticsearch.cluster.metadata.ProjectId;
16+
import org.elasticsearch.cluster.node.DiscoveryNode;
17+
import org.elasticsearch.cluster.node.DiscoveryNodeFilters;
18+
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
19+
import org.elasticsearch.cluster.routing.RoutingNode;
20+
import org.elasticsearch.cluster.routing.ShardRouting;
21+
import org.elasticsearch.cluster.routing.allocation.IndexBalanceConstraintSettings;
22+
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
23+
import org.elasticsearch.common.settings.ClusterSettings;
24+
import org.elasticsearch.common.settings.Settings;
25+
import org.elasticsearch.core.Strings;
26+
import org.elasticsearch.index.Index;
27+
28+
import java.util.HashSet;
29+
import java.util.List;
30+
import java.util.Map;
31+
import java.util.Set;
32+
33+
import static org.elasticsearch.cluster.node.DiscoveryNodeFilters.OpType.AND;
34+
import static org.elasticsearch.cluster.node.DiscoveryNodeFilters.OpType.OR;
35+
import static org.elasticsearch.cluster.node.DiscoveryNodeRole.INDEX_ROLE;
36+
import static org.elasticsearch.cluster.node.DiscoveryNodeRole.SEARCH_ROLE;
37+
import static org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider.CLUSTER_ROUTING_EXCLUDE_GROUP_SETTING;
38+
import static org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider.CLUSTER_ROUTING_INCLUDE_GROUP_SETTING;
39+
import static org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider.CLUSTER_ROUTING_REQUIRE_GROUP_SETTING;
40+
41+
/**
42+
* For an index of n shards hosted by a cluster of m nodes, a node should not host
43+
* significantly more than n / m shards. This allocation decider enforces this principle.
44+
* This allocation decider excludes any nodes flagged for shutdown from consideration
45+
* when computing optimal shard distributions.
46+
*/
47+
public class IndexBalanceAllocationDecider extends AllocationDecider {
48+
49+
private static final Logger logger = LogManager.getLogger(IndexBalanceAllocationDecider.class);
50+
private static final String EMPTY = "";
51+
52+
public static final String NAME = "index_balance";
53+
54+
private final IndexBalanceConstraintSettings indexBalanceConstraintSettings;
55+
private final boolean isStateless;
56+
57+
private volatile DiscoveryNodeFilters clusterRequireFilters;
58+
private volatile DiscoveryNodeFilters clusterIncludeFilters;
59+
private volatile DiscoveryNodeFilters clusterExcludeFilters;
60+
61+
public IndexBalanceAllocationDecider(Settings settings, ClusterSettings clusterSettings) {
62+
this.indexBalanceConstraintSettings = new IndexBalanceConstraintSettings(clusterSettings);
63+
setClusterRequireFilters(CLUSTER_ROUTING_REQUIRE_GROUP_SETTING.getAsMap(settings));
64+
setClusterExcludeFilters(CLUSTER_ROUTING_EXCLUDE_GROUP_SETTING.getAsMap(settings));
65+
setClusterIncludeFilters(CLUSTER_ROUTING_INCLUDE_GROUP_SETTING.getAsMap(settings));
66+
clusterSettings.addAffixMapUpdateConsumer(CLUSTER_ROUTING_REQUIRE_GROUP_SETTING, this::setClusterRequireFilters, (a, b) -> {});
67+
clusterSettings.addAffixMapUpdateConsumer(CLUSTER_ROUTING_EXCLUDE_GROUP_SETTING, this::setClusterExcludeFilters, (a, b) -> {});
68+
clusterSettings.addAffixMapUpdateConsumer(CLUSTER_ROUTING_INCLUDE_GROUP_SETTING, this::setClusterIncludeFilters, (a, b) -> {});
69+
isStateless = DiscoveryNode.isStateless(settings);
70+
}
71+
72+
@Override
73+
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
74+
if (indexBalanceConstraintSettings.isDeciderEnabled() == false || isStateless == false || hasFilters()) {
75+
return allocation.decision(Decision.YES, NAME, "Decider is disabled.");
76+
}
77+
78+
Index index = shardRouting.index();
79+
if (node.hasIndex(index) == false) {
80+
return allocation.decision(Decision.YES, NAME, "Node does not currently host this index.");
81+
}
82+
83+
assert node.node() != null;
84+
assert node.node().getRoles().contains(INDEX_ROLE) || node.node().getRoles().contains(SEARCH_ROLE);
85+
86+
if (node.node().getRoles().contains(INDEX_ROLE) && shardRouting.primary() == false) {
87+
return allocation.decision(Decision.YES, NAME, "An index node cannot own search shards. Decider inactive.");
88+
}
89+
90+
if (node.node().getRoles().contains(SEARCH_ROLE) && shardRouting.primary()) {
91+
return allocation.decision(Decision.YES, NAME, "A search node cannot own primary shards. Decider inactive.");
92+
}
93+
94+
final ProjectId projectId = allocation.getClusterState().metadata().projectFor(index).id();
95+
final Set<DiscoveryNode> eligibleNodes = new HashSet<>();
96+
int totalShards = 0;
97+
String nomenclature = EMPTY;
98+
99+
if (node.node().getRoles().contains(INDEX_ROLE)) {
100+
collectEligibleNodes(allocation, eligibleNodes, INDEX_ROLE);
101+
// Primary shards only.
102+
totalShards = allocation.getClusterState().routingTable(projectId).index(index).size();
103+
nomenclature = "index";
104+
} else if (node.node().getRoles().contains(SEARCH_ROLE)) {
105+
collectEligibleNodes(allocation, eligibleNodes, SEARCH_ROLE);
106+
// Replicas only.
107+
final IndexMetadata indexMetadata = allocation.getClusterState().metadata().getProject(projectId).index(index);
108+
totalShards = indexMetadata.getNumberOfShards() * indexMetadata.getNumberOfReplicas();
109+
nomenclature = "search";
110+
}
111+
112+
assert eligibleNodes.isEmpty() == false;
113+
if (eligibleNodes.isEmpty()) {
114+
return allocation.decision(Decision.YES, NAME, "There are no eligible nodes available.");
115+
}
116+
assert totalShards > 0;
117+
final double idealAllocation = Math.ceil((double) totalShards / eligibleNodes.size());
118+
119+
// Adding the excess shards before division ensures that with tolerance 1 we get:
120+
// 2 shards, 2 nodes, allow 2 on each
121+
// 3 shards, 2 nodes, allow 2 on each etc.
122+
final int threshold = Math.ceilDiv(totalShards + indexBalanceConstraintSettings.getExcessShards(), eligibleNodes.size());
123+
final int currentAllocation = node.numberOfOwningShardsForIndex(index);
124+
125+
if (currentAllocation >= threshold) {
126+
String explanation = Strings.format(
127+
"There are [%d] eligible nodes in the [%s] tier for assignment of [%d] shards in index [%s]. Ideally no more than [%.0f] "
128+
+ "shard would be assigned per node (the index balance excess shards setting is [%d]). This node is already assigned"
129+
+ " [%d] shards of the index.",
130+
eligibleNodes.size(),
131+
nomenclature,
132+
totalShards,
133+
index,
134+
idealAllocation,
135+
indexBalanceConstraintSettings.getExcessShards(),
136+
currentAllocation
137+
);
138+
139+
logger.trace(explanation);
140+
141+
return allocation.decision(Decision.NOT_PREFERRED, NAME, explanation);
142+
}
143+
144+
return allocation.decision(Decision.YES, NAME, "Node index shard allocation is under the threshold.");
145+
}
146+
147+
private void collectEligibleNodes(RoutingAllocation allocation, Set<DiscoveryNode> eligibleNodes, DiscoveryNodeRole role) {
148+
for (DiscoveryNode discoveryNode : allocation.nodes()) {
149+
if (discoveryNode.getRoles().contains(role) && allocation.metadata().nodeShutdowns().contains(discoveryNode.getId()) == false) {
150+
eligibleNodes.add(discoveryNode);
151+
}
152+
}
153+
}
154+
155+
private void setClusterRequireFilters(Map<String, List<String>> filters) {
156+
clusterRequireFilters = DiscoveryNodeFilters.trimTier(DiscoveryNodeFilters.buildFromKeyValues(AND, filters));
157+
}
158+
159+
private void setClusterIncludeFilters(Map<String, List<String>> filters) {
160+
clusterIncludeFilters = DiscoveryNodeFilters.trimTier(DiscoveryNodeFilters.buildFromKeyValues(OR, filters));
161+
}
162+
163+
private void setClusterExcludeFilters(Map<String, List<String>> filters) {
164+
clusterExcludeFilters = DiscoveryNodeFilters.trimTier(DiscoveryNodeFilters.buildFromKeyValues(OR, filters));
165+
}
166+
167+
private boolean hasFilters() {
168+
return (clusterExcludeFilters != null && clusterExcludeFilters.hasFilters())
169+
|| (clusterIncludeFilters != null && clusterIncludeFilters.hasFilters())
170+
|| (clusterRequireFilters != null && clusterRequireFilters.hasFilters());
171+
}
172+
}

server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.elasticsearch.cluster.routing.OperationRouting;
4747
import org.elasticsearch.cluster.routing.allocation.DataTier;
4848
import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings;
49+
import org.elasticsearch.cluster.routing.allocation.IndexBalanceConstraintSettings;
4950
import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings;
5051
import org.elasticsearch.cluster.routing.allocation.allocator.AllocationBalancingRoundSummaryService;
5152
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
@@ -659,6 +660,8 @@ public void apply(Settings value, Settings current, Settings previous) {
659660
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_HIGH_UTILIZATION_DURATION_SETTING,
660661
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_QUEUE_LATENCY_THRESHOLD_SETTING,
661662
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_REROUTE_INTERVAL_SETTING,
663+
IndexBalanceConstraintSettings.INDEX_BALANCE_DECIDER_ENABLED_SETTING,
664+
IndexBalanceConstraintSettings.INDEX_BALANCE_DECIDER_EXCESS_SHARDS,
662665
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_MINIMUM_LOGGING_INTERVAL,
663666
SamplingService.TTL_POLL_INTERVAL_SETTING,
664667
BlobStoreRepository.MAX_HEAP_SIZE_FOR_SNAPSHOT_DELETION_SETTING,

server/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider;
2424
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
2525
import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider;
26+
import org.elasticsearch.cluster.routing.allocation.decider.IndexBalanceAllocationDecider;
2627
import org.elasticsearch.cluster.routing.allocation.decider.IndexVersionAllocationDecider;
2728
import org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider;
2829
import org.elasticsearch.cluster.routing.allocation.decider.NodeReplacementAllocationDecider;
@@ -286,7 +287,8 @@ public void testAllocationDeciderOrder() {
286287
DiskThresholdDecider.class,
287288
ThrottlingAllocationDecider.class,
288289
ShardsLimitAllocationDecider.class,
289-
AwarenessAllocationDecider.class
290+
AwarenessAllocationDecider.class,
291+
IndexBalanceAllocationDecider.class
290292
);
291293
Collection<AllocationDecider> deciders = ClusterModule.createAllocationDeciders(
292294
Settings.EMPTY,

server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1277,7 +1277,7 @@ private static class NodeNameDrivenWeightFunction extends WeightFunction {
12771277
}
12781278

12791279
@Override
1280-
float calculateNodeWeightWithIndex(
1280+
public float calculateNodeWeightWithIndex(
12811281
BalancedShardsAllocator.Balancer balancer,
12821282
BalancedShardsAllocator.ModelNode node,
12831283
BalancedShardsAllocator.ProjectIndex index

0 commit comments

Comments
 (0)