Skip to content

Commit 45c6bbd

Browse files
authored
Allow balancing weights to be set per tier (#126091)
Instead of assuming the BalancedShardsAllocator applies to the entire cluster, I've added the concept of "partitions" into the balancing. The partitions must be mutually disjoint subsets of the shards and nodes - i.e. the set of shards in a partition are only ever allocated to the set of nodes in the same partition, as is the case in serverless. WeightFunctions and NodeSorters are scoped to partitions. a BalancingWeights implementation knows how to partition a cluster. The status quo behaviour is defined by the GlobalBalancingWeightsFactory, it produces a single global partition.
1 parent 9772b5e commit 45c6bbd

File tree

12 files changed

+481
-65
lines changed

12 files changed

+481
-65
lines changed

docs/changelog/126091.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 126091
2+
summary: Allow balancing weights to be set per tier
3+
area: Allocation
4+
type: enhancement
5+
issues: []

server/src/main/java/org/elasticsearch/cluster/ClusterModule.java

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,10 @@
3838
import org.elasticsearch.cluster.routing.allocation.WriteLoadForecaster;
3939
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
4040
import org.elasticsearch.cluster.routing.allocation.allocator.BalancerSettings;
41+
import org.elasticsearch.cluster.routing.allocation.allocator.BalancingWeightsFactory;
4142
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator;
4243
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator.DesiredBalanceReconcilerAction;
44+
import org.elasticsearch.cluster.routing.allocation.allocator.GlobalBalancingWeightsFactory;
4345
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator;
4446
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
4547
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
@@ -146,11 +148,20 @@ public ClusterModule(
146148
this.deciderList = createAllocationDeciders(settings, clusterService.getClusterSettings(), clusterPlugins);
147149
this.allocationDeciders = new AllocationDeciders(deciderList);
148150
final BalancerSettings balancerSettings = new BalancerSettings(clusterService.getClusterSettings());
149-
var nodeAllocationStatsAndWeightsCalculator = new NodeAllocationStatsAndWeightsCalculator(writeLoadForecaster, balancerSettings);
151+
final BalancingWeightsFactory balancingWeightsFactory = getBalancingWeightsFactory(
152+
clusterPlugins,
153+
balancerSettings,
154+
clusterService.getClusterSettings()
155+
);
156+
var nodeAllocationStatsAndWeightsCalculator = new NodeAllocationStatsAndWeightsCalculator(
157+
writeLoadForecaster,
158+
balancingWeightsFactory
159+
);
150160
this.shardsAllocator = createShardsAllocator(
151161
settings,
152162
clusterService.getClusterSettings(),
153163
balancerSettings,
164+
balancingWeightsFactory,
154165
threadPool,
155166
clusterPlugins,
156167
clusterService,
@@ -203,6 +214,22 @@ public ShardRouting.Role newEmptyRole(int copyIndex) {
203214
};
204215
}
205216

217+
static BalancingWeightsFactory getBalancingWeightsFactory(
218+
List<ClusterPlugin> clusterPlugins,
219+
BalancerSettings balancerSettings,
220+
ClusterSettings clusterSettings
221+
) {
222+
final var strategies = clusterPlugins.stream()
223+
.map(pl -> pl.getBalancingWeightsFactory(balancerSettings, clusterSettings))
224+
.filter(Objects::nonNull)
225+
.toList();
226+
return switch (strategies.size()) {
227+
case 0 -> new GlobalBalancingWeightsFactory(balancerSettings);
228+
case 1 -> strategies.getFirst();
229+
default -> throw new IllegalArgumentException("multiple plugins define balancing weights factories, which is not permitted");
230+
};
231+
}
232+
206233
private ClusterState reconcile(ClusterState clusterState, RerouteStrategy rerouteStrategy) {
207234
return allocationService.executeWithRoutingAllocation(clusterState, "reconcile-desired-balance", rerouteStrategy);
208235
}
@@ -439,6 +466,7 @@ private static ShardsAllocator createShardsAllocator(
439466
Settings settings,
440467
ClusterSettings clusterSettings,
441468
BalancerSettings balancerSettings,
469+
BalancingWeightsFactory balancingWeightsFactory,
442470
ThreadPool threadPool,
443471
List<ClusterPlugin> clusterPlugins,
444472
ClusterService clusterService,
@@ -448,12 +476,15 @@ private static ShardsAllocator createShardsAllocator(
448476
NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator
449477
) {
450478
Map<String, Supplier<ShardsAllocator>> allocators = new HashMap<>();
451-
allocators.put(BALANCED_ALLOCATOR, () -> new BalancedShardsAllocator(balancerSettings, writeLoadForecaster));
479+
allocators.put(
480+
BALANCED_ALLOCATOR,
481+
() -> new BalancedShardsAllocator(balancerSettings, writeLoadForecaster, balancingWeightsFactory)
482+
);
452483
allocators.put(
453484
DESIRED_BALANCE_ALLOCATOR,
454485
() -> new DesiredBalanceShardsAllocator(
455486
clusterSettings,
456-
new BalancedShardsAllocator(balancerSettings, writeLoadForecaster),
487+
new BalancedShardsAllocator(balancerSettings, writeLoadForecaster, balancingWeightsFactory),
457488
threadPool,
458489
clusterService,
459490
reconciler,

server/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocationStatsAndWeightsCalculator.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@
1515
import org.elasticsearch.cluster.routing.RoutingNode;
1616
import org.elasticsearch.cluster.routing.RoutingNodes;
1717
import org.elasticsearch.cluster.routing.ShardRouting;
18-
import org.elasticsearch.cluster.routing.allocation.allocator.BalancerSettings;
18+
import org.elasticsearch.cluster.routing.allocation.allocator.BalancingWeights;
19+
import org.elasticsearch.cluster.routing.allocation.allocator.BalancingWeightsFactory;
1920
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalance;
2021
import org.elasticsearch.cluster.routing.allocation.allocator.WeightFunction;
2122
import org.elasticsearch.common.util.Maps;
@@ -28,7 +29,7 @@
2829
*/
2930
public class NodeAllocationStatsAndWeightsCalculator {
3031
private final WriteLoadForecaster writeLoadForecaster;
31-
private final BalancerSettings balancerSettings;
32+
private final BalancingWeightsFactory balancingWeightsFactory;
3233

3334
/**
3435
* Node shard allocation stats and the total node weight.
@@ -42,9 +43,12 @@ public record NodeAllocationStatsAndWeight(
4243
float currentNodeWeight
4344
) {}
4445

45-
public NodeAllocationStatsAndWeightsCalculator(WriteLoadForecaster writeLoadForecaster, BalancerSettings balancerSettings) {
46+
public NodeAllocationStatsAndWeightsCalculator(
47+
WriteLoadForecaster writeLoadForecaster,
48+
BalancingWeightsFactory balancingWeightsFactory
49+
) {
4650
this.writeLoadForecaster = writeLoadForecaster;
47-
this.balancerSettings = balancerSettings;
51+
this.balancingWeightsFactory = balancingWeightsFactory;
4852
}
4953

5054
/**
@@ -60,18 +64,14 @@ public Map<String, NodeAllocationStatsAndWeight> nodesAllocationStatsAndWeights(
6064
// must not use licensed features when just starting up
6165
writeLoadForecaster.refreshLicense();
6266
}
63-
var weightFunction = new WeightFunction(
64-
balancerSettings.getShardBalanceFactor(),
65-
balancerSettings.getIndexBalanceFactor(),
66-
balancerSettings.getWriteLoadBalanceFactor(),
67-
balancerSettings.getDiskUsageBalanceFactor()
68-
);
67+
final BalancingWeights balancingWeights = balancingWeightsFactory.create();
6968
var avgShardsPerNode = WeightFunction.avgShardPerNode(metadata, routingNodes);
7069
var avgWriteLoadPerNode = WeightFunction.avgWriteLoadPerNode(writeLoadForecaster, metadata, routingNodes);
7170
var avgDiskUsageInBytesPerNode = WeightFunction.avgDiskUsageInBytesPerNode(clusterInfo, metadata, routingNodes);
7271

7372
var nodeAllocationStatsAndWeights = Maps.<String, NodeAllocationStatsAndWeight>newMapWithExpectedSize(routingNodes.size());
7473
for (RoutingNode node : routingNodes) {
74+
WeightFunction weightFunction = balancingWeights.weightFunctionForNode(node);
7575
int shards = 0;
7676
int undesiredShards = 0;
7777
double forecastedWriteLoad = 0.0;

0 commit comments

Comments
 (0)