diff --git a/docs/changelog/126091.yaml b/docs/changelog/126091.yaml new file mode 100644 index 0000000000000..beba808c96fc6 --- /dev/null +++ b/docs/changelog/126091.yaml @@ -0,0 +1,5 @@ +pr: 126091 +summary: Allow balancing weights to be set per tier +area: Allocation +type: enhancement +issues: [] diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java index e0c78780c3b3f..33a7139213f93 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java @@ -38,8 +38,10 @@ import org.elasticsearch.cluster.routing.allocation.WriteLoadForecaster; import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; import org.elasticsearch.cluster.routing.allocation.allocator.BalancerSettings; +import org.elasticsearch.cluster.routing.allocation.allocator.BalancingWeightsFactory; import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator; import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator.DesiredBalanceReconcilerAction; +import org.elasticsearch.cluster.routing.allocation.allocator.GlobalBalancingWeightsFactory; import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; @@ -146,11 +148,20 @@ public ClusterModule( this.deciderList = createAllocationDeciders(settings, clusterService.getClusterSettings(), clusterPlugins); this.allocationDeciders = new AllocationDeciders(deciderList); final BalancerSettings balancerSettings = new BalancerSettings(clusterService.getClusterSettings()); - var nodeAllocationStatsAndWeightsCalculator = new NodeAllocationStatsAndWeightsCalculator(writeLoadForecaster, balancerSettings); + final BalancingWeightsFactory balancingWeightsFactory = getBalancingWeightsFactory( + clusterPlugins, + balancerSettings, + clusterService.getClusterSettings() + ); + var nodeAllocationStatsAndWeightsCalculator = new NodeAllocationStatsAndWeightsCalculator( + writeLoadForecaster, + balancingWeightsFactory + ); this.shardsAllocator = createShardsAllocator( settings, clusterService.getClusterSettings(), balancerSettings, + balancingWeightsFactory, threadPool, clusterPlugins, clusterService, @@ -203,6 +214,22 @@ public ShardRouting.Role newEmptyRole(int copyIndex) { }; } + static BalancingWeightsFactory getBalancingWeightsFactory( + List clusterPlugins, + BalancerSettings balancerSettings, + ClusterSettings clusterSettings + ) { + final var strategies = clusterPlugins.stream() + .map(pl -> pl.getBalancingWeightsFactory(balancerSettings, clusterSettings)) + .filter(Objects::nonNull) + .toList(); + return switch (strategies.size()) { + case 0 -> new GlobalBalancingWeightsFactory(balancerSettings); + case 1 -> strategies.getFirst(); + default -> throw new IllegalArgumentException("multiple plugins define balancing weights factories, which is not permitted"); + }; + } + private ClusterState reconcile(ClusterState clusterState, RerouteStrategy rerouteStrategy) { return allocationService.executeWithRoutingAllocation(clusterState, "reconcile-desired-balance", rerouteStrategy); } @@ -439,6 +466,7 @@ private static ShardsAllocator createShardsAllocator( Settings settings, ClusterSettings clusterSettings, BalancerSettings balancerSettings, + BalancingWeightsFactory balancingWeightsFactory, ThreadPool threadPool, List clusterPlugins, ClusterService clusterService, @@ -448,12 +476,15 @@ private static ShardsAllocator createShardsAllocator( NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator ) { Map> allocators = new HashMap<>(); - allocators.put(BALANCED_ALLOCATOR, () -> new BalancedShardsAllocator(balancerSettings, writeLoadForecaster)); + allocators.put( + BALANCED_ALLOCATOR, + () -> new BalancedShardsAllocator(balancerSettings, writeLoadForecaster, balancingWeightsFactory) + ); allocators.put( DESIRED_BALANCE_ALLOCATOR, () -> new DesiredBalanceShardsAllocator( clusterSettings, - new BalancedShardsAllocator(balancerSettings, writeLoadForecaster), + new BalancedShardsAllocator(balancerSettings, writeLoadForecaster, balancingWeightsFactory), threadPool, clusterService, reconciler, diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocationStatsAndWeightsCalculator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocationStatsAndWeightsCalculator.java index f85b97125ceba..21e006f76b1d1 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocationStatsAndWeightsCalculator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocationStatsAndWeightsCalculator.java @@ -15,7 +15,8 @@ import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.cluster.routing.allocation.allocator.BalancerSettings; +import org.elasticsearch.cluster.routing.allocation.allocator.BalancingWeights; +import org.elasticsearch.cluster.routing.allocation.allocator.BalancingWeightsFactory; import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalance; import org.elasticsearch.cluster.routing.allocation.allocator.WeightFunction; import org.elasticsearch.common.util.Maps; @@ -28,7 +29,7 @@ */ public class NodeAllocationStatsAndWeightsCalculator { private final WriteLoadForecaster writeLoadForecaster; - private final BalancerSettings balancerSettings; + private final BalancingWeightsFactory balancingWeightsFactory; /** * Node shard allocation stats and the total node weight. @@ -42,9 +43,12 @@ public record NodeAllocationStatsAndWeight( float currentNodeWeight ) {} - public NodeAllocationStatsAndWeightsCalculator(WriteLoadForecaster writeLoadForecaster, BalancerSettings balancerSettings) { + public NodeAllocationStatsAndWeightsCalculator( + WriteLoadForecaster writeLoadForecaster, + BalancingWeightsFactory balancingWeightsFactory + ) { this.writeLoadForecaster = writeLoadForecaster; - this.balancerSettings = balancerSettings; + this.balancingWeightsFactory = balancingWeightsFactory; } /** @@ -60,18 +64,14 @@ public Map nodesAllocationStatsAndWeights( // must not use licensed features when just starting up writeLoadForecaster.refreshLicense(); } - var weightFunction = new WeightFunction( - balancerSettings.getShardBalanceFactor(), - balancerSettings.getIndexBalanceFactor(), - balancerSettings.getWriteLoadBalanceFactor(), - balancerSettings.getDiskUsageBalanceFactor() - ); + final BalancingWeights balancingWeights = balancingWeightsFactory.create(); var avgShardsPerNode = WeightFunction.avgShardPerNode(metadata, routingNodes); var avgWriteLoadPerNode = WeightFunction.avgWriteLoadPerNode(writeLoadForecaster, metadata, routingNodes); var avgDiskUsageInBytesPerNode = WeightFunction.avgDiskUsageInBytesPerNode(clusterInfo, metadata, routingNodes); var nodeAllocationStatsAndWeights = Maps.newMapWithExpectedSize(routingNodes.size()); for (RoutingNode node : routingNodes) { + WeightFunction weightFunction = balancingWeights.weightFunctionForNode(node); int shards = 0; int undesiredShards = 0; double forecastedWriteLoad = 0.0; diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 4173ff930cfef..2681c588dcf3f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -114,6 +114,7 @@ public class BalancedShardsAllocator implements ShardsAllocator { private final BalancerSettings balancerSettings; private final WriteLoadForecaster writeLoadForecaster; + private final BalancingWeightsFactory balancingWeightsFactory; public BalancedShardsAllocator() { this(Settings.EMPTY); @@ -123,10 +124,19 @@ public BalancedShardsAllocator(Settings settings) { this(new BalancerSettings(settings), WriteLoadForecaster.DEFAULT); } - @Inject public BalancedShardsAllocator(BalancerSettings balancerSettings, WriteLoadForecaster writeLoadForecaster) { + this(balancerSettings, writeLoadForecaster, new GlobalBalancingWeightsFactory(balancerSettings)); + } + + @Inject + public BalancedShardsAllocator( + BalancerSettings balancerSettings, + WriteLoadForecaster writeLoadForecaster, + BalancingWeightsFactory balancingWeightsFactory + ) { this.balancerSettings = balancerSettings; this.writeLoadForecaster = writeLoadForecaster; + this.balancingWeightsFactory = balancingWeightsFactory; } @Override @@ -142,25 +152,21 @@ public void allocate(RoutingAllocation allocation) { failAllocationOfNewPrimaries(allocation); return; } - final WeightFunction weightFunction = new WeightFunction( - balancerSettings.getShardBalanceFactor(), - balancerSettings.getIndexBalanceFactor(), - balancerSettings.getWriteLoadBalanceFactor(), - balancerSettings.getDiskUsageBalanceFactor() - ); - final Balancer balancer = new Balancer(writeLoadForecaster, allocation, weightFunction, balancerSettings.getThreshold()); + final BalancingWeights balancingWeights = balancingWeightsFactory.create(); + final Balancer balancer = new Balancer(writeLoadForecaster, allocation, balancerSettings.getThreshold(), balancingWeights); balancer.allocateUnassigned(); balancer.moveShards(); balancer.balance(); // Node weights are calculated after each internal balancing round and saved to the RoutingNodes copy. - collectAndRecordNodeWeightStats(balancer, weightFunction, allocation); + collectAndRecordNodeWeightStats(balancer, balancingWeights, allocation); } - private void collectAndRecordNodeWeightStats(Balancer balancer, WeightFunction weightFunction, RoutingAllocation allocation) { + private void collectAndRecordNodeWeightStats(Balancer balancer, BalancingWeights balancingWeights, RoutingAllocation allocation) { Map nodeLevelWeights = new HashMap<>(); for (var entry : balancer.nodes.entrySet()) { var node = entry.getValue(); + var weightFunction = balancingWeights.weightFunctionForNode(node.routingNode); var nodeWeight = weightFunction.calculateNodeWeight( node.numShards(), balancer.avgShardsPerNode(), @@ -179,13 +185,12 @@ private void collectAndRecordNodeWeightStats(Balancer balancer, WeightFunction w @Override public ShardAllocationDecision decideShardAllocation(final ShardRouting shard, final RoutingAllocation allocation) { - WeightFunction weightFunction = new WeightFunction( - balancerSettings.getShardBalanceFactor(), - balancerSettings.getIndexBalanceFactor(), - balancerSettings.getWriteLoadBalanceFactor(), - balancerSettings.getDiskUsageBalanceFactor() + Balancer balancer = new Balancer( + writeLoadForecaster, + allocation, + balancerSettings.getThreshold(), + balancingWeightsFactory.create() ); - Balancer balancer = new Balancer(writeLoadForecaster, allocation, weightFunction, balancerSettings.getThreshold()); AllocateUnassignedDecision allocateUnassignedDecision = AllocateUnassignedDecision.NOT_TAKEN; MoveDecision moveDecision = MoveDecision.NOT_TAKEN; final ProjectIndex index = new ProjectIndex(allocation, shard); @@ -236,32 +241,32 @@ public static class Balancer { private final RoutingAllocation allocation; private final RoutingNodes routingNodes; private final Metadata metadata; - private final WeightFunction weightFunction; private final float threshold; private final float avgShardsPerNode; private final double avgWriteLoadPerNode; private final double avgDiskUsageInBytesPerNode; private final Map nodes; - private final NodeSorter sorter; + private final BalancingWeights balancingWeights; + private final NodeSorters nodeSorters; private Balancer( WriteLoadForecaster writeLoadForecaster, RoutingAllocation allocation, - WeightFunction weightFunction, - float threshold + float threshold, + BalancingWeights balancingWeights ) { this.writeLoadForecaster = writeLoadForecaster; this.allocation = allocation; this.routingNodes = allocation.routingNodes(); this.metadata = allocation.metadata(); - this.weightFunction = weightFunction; this.threshold = threshold; avgShardsPerNode = WeightFunction.avgShardPerNode(metadata, routingNodes); avgWriteLoadPerNode = WeightFunction.avgWriteLoadPerNode(writeLoadForecaster, metadata, routingNodes); avgDiskUsageInBytesPerNode = WeightFunction.avgDiskUsageInBytesPerNode(allocation.clusterInfo(), metadata, routingNodes); nodes = Collections.unmodifiableMap(buildModelFromAssigned()); - sorter = newNodeSorter(); + this.nodeSorters = balancingWeights.createNodeSorters(nodesArray(), this); + this.balancingWeights = balancingWeights; } private static long getShardDiskUsageInBytes(ShardRouting shardRouting, IndexMetadata indexMetadata, ClusterInfo clusterInfo) { @@ -327,16 +332,6 @@ public double avgDiskUsageInBytesPerNode() { return avgDiskUsageInBytesPerNode; } - /** - * Returns a new {@link NodeSorter} that sorts the nodes based on their - * current weight with respect to the index passed to the sorter. The - * returned sorter is not sorted. Use {@link NodeSorter#reset(ProjectIndex)} )} - * to sort based on an index. - */ - private NodeSorter newNodeSorter() { - return new NodeSorter(nodesArray(), weightFunction, this); - } - /** * The absolute value difference between two weights. */ @@ -362,7 +357,7 @@ private IndexMetadata indexMetadata(ProjectIndex index) { /** * Balances the nodes on the cluster model according to the weight function. - * The actual balancing is delegated to {@link #balanceByWeights()} + * The actual balancing is delegated to {@link #balanceByWeights(NodeSorter)} */ private void balance() { if (logger.isTraceEnabled()) { @@ -387,7 +382,11 @@ private void balance() { logger.trace("skipping rebalance as single node only"); return; } - balanceByWeights(); + + // Balance each partition + for (NodeSorter nodeSorter : nodeSorters) { + balanceByWeights(nodeSorter); + } } /** @@ -396,6 +395,7 @@ private void balance() { * explain API only. */ private MoveDecision decideRebalance(final ProjectIndex index, final ShardRouting shard, Decision canRemain) { + final NodeSorter sorter = nodeSorters.sorterForShard(shard); index.assertMatch(shard); if (shard.started() == false) { // we can only rebalance started shards @@ -418,7 +418,7 @@ private MoveDecision decideRebalance(final ProjectIndex index, final ShardRoutin assert currentNode != null : "currently assigned node could not be found"; // balance the shard, if a better node can be found - final float currentWeight = weightFunction.calculateNodeWeightWithIndex(this, currentNode, index); + final float currentWeight = sorter.getWeightFunction().calculateNodeWeightWithIndex(this, currentNode, index); final AllocationDeciders deciders = allocation.deciders(); Type rebalanceDecisionType = Type.NO; ModelNode targetNode = null; @@ -434,7 +434,7 @@ private MoveDecision decideRebalance(final ProjectIndex index, final ShardRoutin // this is a comparison of the number of shards on this node to the number of shards // that should be on each node on average (both taking the cluster as a whole into account // as well as shards per index) - final float nodeWeight = weightFunction.calculateNodeWeightWithIndex(this, node, index); + final float nodeWeight = sorter.getWeightFunction().calculateNodeWeightWithIndex(this, node, index); // if the node we are examining has a worse (higher) weight than the node the shard is // assigned to, then there is no way moving the shard to the node with the worse weight // can make the balance of the cluster better, so we check for that here @@ -532,11 +532,11 @@ private MoveDecision decideRebalance(final ProjectIndex index, final ShardRoutin * only, or in other words relocations that move the weight delta closer * to {@code 0.0} */ - private void balanceByWeights() { + private void balanceByWeights(NodeSorter sorter) { final AllocationDeciders deciders = allocation.deciders(); final ModelNode[] modelNodes = sorter.modelNodes; final float[] weights = sorter.weights; - for (var index : buildWeightOrderedIndices()) { + for (var index : buildWeightOrderedIndices(sorter)) { IndexMetadata indexMetadata = indexMetadata(index); // find nodes that have a shard of this index or where shards of this index are allowed to be allocated to, @@ -667,7 +667,7 @@ private void balanceByWeights() { * average. To re-balance we need to move shards back eventually likely * to the nodes we relocated them from. */ - private ProjectIndex[] buildWeightOrderedIndices() { + private ProjectIndex[] buildWeightOrderedIndices(NodeSorter sorter) { final ProjectIndex[] indices = allocation.globalRoutingTable() .routingTables() .entrySet() @@ -765,6 +765,7 @@ public void moveShards() { * {@link MoveDecision#getNodeDecisions} will have a non-null value. */ public MoveDecision decideMove(final ProjectIndex index, final ShardRouting shardRouting) { + NodeSorter sorter = nodeSorters.sorterForShard(shardRouting); index.assertMatch(shardRouting); if (shardRouting.started() == false) { @@ -787,17 +788,18 @@ public MoveDecision decideMove(final ProjectIndex index, final ShardRouting shar * This is not guaranteed to be balanced after this operation we still try best effort to * allocate on the minimal eligible node. */ - MoveDecision moveDecision = decideMove(shardRouting, sourceNode, canRemain, this::decideCanAllocate); + MoveDecision moveDecision = decideMove(sorter, shardRouting, sourceNode, canRemain, this::decideCanAllocate); if (moveDecision.canRemain() == false && moveDecision.forceMove() == false) { final boolean shardsOnReplacedNode = allocation.metadata().nodeShutdowns().contains(shardRouting.currentNodeId(), REPLACE); if (shardsOnReplacedNode) { - return decideMove(shardRouting, sourceNode, canRemain, this::decideCanForceAllocateForVacate); + return decideMove(sorter, shardRouting, sourceNode, canRemain, this::decideCanForceAllocateForVacate); } } return moveDecision; } private MoveDecision decideMove( + NodeSorter sorter, ShardRouting shardRouting, ModelNode sourceNode, Decision remainDecision, @@ -998,6 +1000,7 @@ private ProjectIndex projectIndex(ShardRouting shardRouting) { * is of type {@link Type#NO}, then the assigned node will be null. */ private AllocateUnassignedDecision decideAllocateUnassigned(final ProjectIndex index, final ShardRouting shard) { + WeightFunction weightFunction = balancingWeights.weightFunctionForShard(shard); index.assertMatch(shard); if (shard.assignedToNode()) { // we only make decisions for unassigned shards here @@ -1143,7 +1146,7 @@ private boolean tryRelocateShard(ModelNode minNode, ModelNode maxNode, ProjectIn } } - static class ModelNode implements Iterable { + public static class ModelNode implements Iterable { private int numShards = 0; private double writeLoad = 0.0; private double diskUsageInBytes = 0.0; @@ -1153,7 +1156,7 @@ static class ModelNode implements Iterable { private final RoutingNode routingNode; private final Map indices; - ModelNode(WriteLoadForecaster writeLoadForecaster, Metadata metadata, ClusterInfo clusterInfo, RoutingNode routingNode) { + public ModelNode(WriteLoadForecaster writeLoadForecaster, Metadata metadata, ClusterInfo clusterInfo, RoutingNode routingNode) { this.writeLoadForecaster = writeLoadForecaster; this.metadata = metadata; this.clusterInfo = clusterInfo; @@ -1286,7 +1289,15 @@ public boolean containsShard(ShardRouting shard) { } } - static final class NodeSorter extends IntroSorter { + /** + * A NodeSorter sorts the set of nodes for a single partition using the {@link WeightFunction} + * for that partition. In partitioned cluster topologies there will be one for each partition + * (e.g. search/indexing in stateless). By default, there is a single partition containing + * a single weight function that applies to all nodes and shards. + * + * @see BalancingWeightsFactory + */ + public static final class NodeSorter extends IntroSorter { final ModelNode[] modelNodes; /* the nodes weights with respect to the current weight function / index */ @@ -1296,7 +1307,7 @@ static final class NodeSorter extends IntroSorter { private final Balancer balancer; private float pivotWeight; - NodeSorter(ModelNode[] modelNodes, WeightFunction function, Balancer balancer) { + public NodeSorter(ModelNode[] modelNodes, WeightFunction function, Balancer balancer) { this.function = function; this.balancer = balancer; this.modelNodes = modelNodes; @@ -1353,7 +1364,11 @@ protected int comparePivot(int j) { } public float delta() { - return weights[weights.length - 1] - weights[0]; + return weights.length == 0 ? 0.0f : weights[weights.length - 1] - weights[0]; + } + + public WeightFunction getWeightFunction() { + return function; } } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancingWeights.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancingWeights.java new file mode 100644 index 0000000000000..b4ee53b996526 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancingWeights.java @@ -0,0 +1,45 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster.routing.allocation.allocator; + +import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.ShardRouting; + +/** + * The cluster nodes and shards are partitioned into mutually disjoint partitions. Each partition + * has its own {@link WeightFunction}. + */ +public interface BalancingWeights { + + /** + * Get the weight function for the partition to which this shard belongs + * + * @param shard The shard + * @return The weight function that applies to the partition that shard belongs to + */ + WeightFunction weightFunctionForShard(ShardRouting shard); + + /** + * Get the weight function for the partition to which this node belongs + * + * @param node The node + * @return The weight function that applies to the partition that node belongs to + */ + WeightFunction weightFunctionForNode(RoutingNode node); + + /** + * Create the node sorters for the cluster + * + * @param modelNodes The full set of cluster nodes + * @param balancer The balancer + * @return a {@link NodeSorters} instance + */ + NodeSorters createNodeSorters(BalancedShardsAllocator.ModelNode[] modelNodes, BalancedShardsAllocator.Balancer balancer); +} diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancingWeightsFactory.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancingWeightsFactory.java new file mode 100644 index 0000000000000..23c556fe9d9d1 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancingWeightsFactory.java @@ -0,0 +1,21 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster.routing.allocation.allocator; + +/** + * A balancing weights factory must be able to divide all shards and nodes into mutually + * disjoint partitions. Allocation balancing will then be conducted sequentially for each partition. + *

+ * If you can't partition your shards and nodes in this way, use {@link GlobalBalancingWeightsFactory} + */ +public interface BalancingWeightsFactory { + + BalancingWeights create(); +} diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/GlobalBalancingWeightsFactory.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/GlobalBalancingWeightsFactory.java new file mode 100644 index 0000000000000..eecc549c1ece4 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/GlobalBalancingWeightsFactory.java @@ -0,0 +1,72 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster.routing.allocation.allocator; + +import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.common.collect.Iterators; + +import java.util.Iterator; + +public class GlobalBalancingWeightsFactory implements BalancingWeightsFactory { + + private final BalancerSettings balancerSettings; + + public GlobalBalancingWeightsFactory(BalancerSettings balancerSettings) { + this.balancerSettings = balancerSettings; + } + + @Override + public BalancingWeights create() { + return new GlobalBalancingWeights(); + } + + private class GlobalBalancingWeights implements BalancingWeights { + + private final WeightFunction weightFunction; + + GlobalBalancingWeights() { + this.weightFunction = new WeightFunction( + balancerSettings.getShardBalanceFactor(), + balancerSettings.getIndexBalanceFactor(), + balancerSettings.getWriteLoadBalanceFactor(), + balancerSettings.getDiskUsageBalanceFactor() + ); + } + + @Override + public WeightFunction weightFunctionForShard(ShardRouting shard) { + return weightFunction; + } + + @Override + public WeightFunction weightFunctionForNode(RoutingNode node) { + return weightFunction; + } + + @Override + public NodeSorters createNodeSorters(BalancedShardsAllocator.ModelNode[] modelNodes, BalancedShardsAllocator.Balancer balancer) { + return new GlobalNodeSorters(new BalancedShardsAllocator.NodeSorter(modelNodes, weightFunction, balancer)); + } + + private record GlobalNodeSorters(BalancedShardsAllocator.NodeSorter nodeSorter) implements NodeSorters { + + @Override + public BalancedShardsAllocator.NodeSorter sorterForShard(ShardRouting shard) { + return nodeSorter; + } + + @Override + public Iterator iterator() { + return Iterators.single(nodeSorter); + } + } + } +} diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/NodeSorters.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/NodeSorters.java new file mode 100644 index 0000000000000..041dcba3e047e --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/NodeSorters.java @@ -0,0 +1,27 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster.routing.allocation.allocator; + +import org.elasticsearch.cluster.routing.ShardRouting; + +/** + * NodeSorters is just a cache of + * {@link org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.NodeSorter} + * instances for each cluster partition + *

+ * The returned iterator will return a node sorter for each partition in the cluster. + */ +public interface NodeSorters extends Iterable { + + /** + * Get the {@link BalancedShardsAllocator.NodeSorter} for the specified shard + */ + BalancedShardsAllocator.NodeSorter sorterForShard(ShardRouting shard); +} diff --git a/server/src/main/java/org/elasticsearch/plugins/ClusterPlugin.java b/server/src/main/java/org/elasticsearch/plugins/ClusterPlugin.java index f8f508fddb25b..3f71b901ee32f 100644 --- a/server/src/main/java/org/elasticsearch/plugins/ClusterPlugin.java +++ b/server/src/main/java/org/elasticsearch/plugins/ClusterPlugin.java @@ -12,6 +12,8 @@ import org.elasticsearch.cluster.routing.ShardRoutingRoleStrategy; import org.elasticsearch.cluster.routing.allocation.ExistingShardsAllocator; import org.elasticsearch.cluster.routing.allocation.WriteLoadForecaster; +import org.elasticsearch.cluster.routing.allocation.allocator.BalancerSettings; +import org.elasticsearch.cluster.routing.allocation.allocator.BalancingWeightsFactory; import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider; import org.elasticsearch.common.settings.ClusterSettings; @@ -75,6 +77,10 @@ default ShardRoutingRoleStrategy getShardRoutingRoleStrategy() { return null; } + default BalancingWeightsFactory getBalancingWeightsFactory(BalancerSettings balancerSettings, ClusterSettings clusterSettings) { + return null; + } + /** * Called when the node is started */ diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsServiceTests.java index 756f29384ec5c..cf2653bc6c559 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsServiceTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.cluster.routing.allocation.allocator.BalancerSettings; import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalance; import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator; +import org.elasticsearch.cluster.routing.allocation.allocator.GlobalBalancingWeightsFactory; import org.elasticsearch.cluster.routing.allocation.allocator.ShardAssignment; import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator; import org.elasticsearch.common.settings.ClusterSettings; @@ -85,7 +86,10 @@ public void testShardStats() { clusterService, () -> clusterInfo, createShardAllocator(), - new NodeAllocationStatsAndWeightsCalculator(TEST_WRITE_LOAD_FORECASTER, BalancerSettings.DEFAULT) + new NodeAllocationStatsAndWeightsCalculator( + TEST_WRITE_LOAD_FORECASTER, + new GlobalBalancingWeightsFactory(BalancerSettings.DEFAULT) + ) ); assertThat( service.stats(), @@ -126,7 +130,10 @@ public void testRelocatingShardIsOnlyCountedOnceOnTargetNode() { clusterService, EmptyClusterInfoService.INSTANCE, createShardAllocator(), - new NodeAllocationStatsAndWeightsCalculator(TEST_WRITE_LOAD_FORECASTER, BalancerSettings.DEFAULT) + new NodeAllocationStatsAndWeightsCalculator( + TEST_WRITE_LOAD_FORECASTER, + new GlobalBalancingWeightsFactory(BalancerSettings.DEFAULT) + ) ); assertThat( service.stats(), @@ -183,7 +190,10 @@ public DesiredBalance getDesiredBalance() { ); } }, - new NodeAllocationStatsAndWeightsCalculator(TEST_WRITE_LOAD_FORECASTER, BalancerSettings.DEFAULT) + new NodeAllocationStatsAndWeightsCalculator( + TEST_WRITE_LOAD_FORECASTER, + new GlobalBalancingWeightsFactory(BalancerSettings.DEFAULT) + ) ); assertThat( service.stats(), diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java index 92171b75b5181..7e05ae7c57f79 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java @@ -35,7 +35,9 @@ import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.allocation.AllocateUnassignedDecision; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; +import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; +import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.settings.Settings; @@ -51,9 +53,11 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Function; import java.util.stream.Collector; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -611,6 +615,71 @@ public void testShardSizeDiscrepancyWithinIndex() { assertSame(clusterState, reroute(allocationService, clusterState)); } + public void testPartitionedClusterWithSeparateWeights() { + var allocationService = new MockAllocationService( + prefixAllocationDeciders(), + new TestGatewayAllocator(), + new BalancedShardsAllocator( + BalancerSettings.DEFAULT, + TEST_WRITE_LOAD_FORECASTER, + new PrefixBalancingWeightsFactory( + Map.of("shardsOnly", new WeightFunction(1, 0, 0, 0), "weightsOnly", new WeightFunction(0, 0, 1, 0)) + ) + ), + EmptyClusterInfoService.INSTANCE, + SNAPSHOT_INFO_SERVICE_WITH_NO_SHARD_SIZES + ); + + var clusterState = applyStartedShardsUntilNoChange( + createStateWithIndices( + List.of("shardsOnly-1", "shardsOnly-2", "weightsOnly-1", "weightsOnly-2"), + shardId -> prefix(shardId.getIndexName()) + "-1", + anIndex("weightsOnly-heavy-index").indexWriteLoadForecast(8.0), + anIndex("weightsOnly-light-index-1").indexWriteLoadForecast(1.0), + anIndex("weightsOnly-light-index-2").indexWriteLoadForecast(2.0), + anIndex("weightsOnly-light-index-3").indexWriteLoadForecast(3.0), + anIndex("weightsOnly-zero-write-load-index").indexWriteLoadForecast(0.0), + anIndex("weightsOnly-no-write-load-index"), + anIndex("shardsOnly-heavy-index").indexWriteLoadForecast(8.0), + anIndex("shardsOnly-light-index-1").indexWriteLoadForecast(1.0), + anIndex("shardsOnly-light-index-2").indexWriteLoadForecast(2.0), + anIndex("shardsOnly-light-index-3").indexWriteLoadForecast(3.0), + anIndex("shardsOnly-zero-write-load-index").indexWriteLoadForecast(0.0), + anIndex("shardsOnly-no-write-load-index") + ), + allocationService + ); + + Map> shardsPerNode = getShardsPerNode(clusterState); + Map> shardBalancedPartition = shardsPerNode.entrySet() + .stream() + .filter(e -> e.getKey().startsWith("shardsOnly")) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + Map> weightBalancedPartition = shardsPerNode.entrySet() + .stream() + .filter(e -> e.getKey().startsWith("weightsOnly")) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + // The partition that balances on weights only is skewed + assertThat( + weightBalancedPartition.values(), + containsInAnyOrder( + Set.of("weightsOnly-heavy-index"), + Set.of( + "weightsOnly-light-index-1", + "weightsOnly-light-index-2", + "weightsOnly-light-index-3", + "weightsOnly-zero-write-load-index", + "weightsOnly-no-write-load-index" + ) + ) + ); + + // The partition that balances on shard count only has an even distribution of shards + assertThat(shardBalancedPartition.get("shardsOnly-1"), hasSize(3)); + assertThat(shardBalancedPartition.get("shardsOnly-2"), hasSize(3)); + } + private Map getTargetShardPerNodeCount(IndexRoutingTable indexRoutingTable) { var counts = new HashMap(); for (int shardId = 0; shardId < indexRoutingTable.size(); shardId++) { @@ -647,6 +716,14 @@ private static IndexMetadata.Builder anIndex(String name, Settings.Builder setti } private static ClusterState createStateWithIndices(IndexMetadata.Builder... indexMetadataBuilders) { + return createStateWithIndices(List.of("node-1", "node-2"), shardId -> "node-1", indexMetadataBuilders); + } + + private static ClusterState createStateWithIndices( + List nodeNames, + Function unbalancedAllocator, + IndexMetadata.Builder... indexMetadataBuilders + ) { var metadataBuilder = Metadata.builder(); var routingTableBuilder = RoutingTable.builder(TestShardRoutingRoleStrategies.DEFAULT_ROLE_ONLY); if (randomBoolean()) { @@ -663,10 +740,11 @@ private static ClusterState createStateWithIndices(IndexMetadata.Builder... inde var inSyncId = UUIDs.randomBase64UUID(); var indexMetadata = index.putInSyncAllocationIds(0, Set.of(inSyncId)).build(); metadataBuilder.put(indexMetadata, false); + ShardId shardId = new ShardId(indexMetadata.getIndex(), 0); routingTableBuilder.add( IndexRoutingTable.builder(indexMetadata.getIndex()) .addShard( - shardRoutingBuilder(new ShardId(indexMetadata.getIndex(), 0), "node-1", true, ShardRoutingState.STARTED) + shardRoutingBuilder(shardId, unbalancedAllocator.apply(shardId), true, ShardRoutingState.STARTED) .withAllocationId(AllocationId.newInitializing(inSyncId)) .build() ) @@ -674,8 +752,13 @@ private static ClusterState createStateWithIndices(IndexMetadata.Builder... inde } } + DiscoveryNodes.Builder discoveryNodesBuilder = DiscoveryNodes.builder(); + for (String nodeName : nodeNames) { + discoveryNodesBuilder.add(newNode(nodeName)); + } + return ClusterState.builder(ClusterName.DEFAULT) - .nodes(DiscoveryNodes.builder().add(newNode("node-1")).add(newNode("node-2"))) + .nodes(discoveryNodesBuilder) .metadata(metadataBuilder) .routingTable(routingTableBuilder) .build(); @@ -712,4 +795,101 @@ private void addIndex( } routingTableBuilder.add(indexRoutingTableBuilder); } + + /** + * A {@link BalancingWeightsFactory} that assumes the cluster is partitioned by the prefix + * of the node and shard names before the `-`. + */ + class PrefixBalancingWeightsFactory implements BalancingWeightsFactory { + + private final Map prefixWeights; + + PrefixBalancingWeightsFactory(Map prefixWeights) { + this.prefixWeights = prefixWeights; + } + + @Override + public BalancingWeights create() { + return new PrefixBalancingWeights(); + } + + class PrefixBalancingWeights implements BalancingWeights { + + @Override + public WeightFunction weightFunctionForShard(ShardRouting shard) { + return prefixWeights.get(prefix(shard.getIndexName())); + } + + @Override + public WeightFunction weightFunctionForNode(RoutingNode node) { + return prefixWeights.get(prefix(node.node().getId())); + } + + @Override + public NodeSorters createNodeSorters( + BalancedShardsAllocator.ModelNode[] modelNodes, + BalancedShardsAllocator.Balancer balancer + ) { + final HashMap prefixNodeSorters = new HashMap<>(); + for (var entry : prefixWeights.entrySet()) { + prefixNodeSorters.put( + entry.getKey(), + new BalancedShardsAllocator.NodeSorter( + Arrays.stream(modelNodes) + .filter(node -> prefix(node.getRoutingNode().node().getId()).equals(entry.getKey())) + .toArray(BalancedShardsAllocator.ModelNode[]::new), + entry.getValue(), + balancer + ) + ); + } + return new NodeSorters() { + + @Override + public Iterator iterator() { + return prefixNodeSorters.values().iterator(); + } + + @Override + public BalancedShardsAllocator.NodeSorter sorterForShard(ShardRouting shard) { + return prefixNodeSorters.get(prefix(shard.getIndexName())); + } + }; + } + } + } + + /** + * Allocation deciders that only allow shards to be allocated to nodes whose names share the same prefix + * as the index they're from + */ + private AllocationDeciders prefixAllocationDeciders() { + return new AllocationDeciders(List.of(new AllocationDecider() { + @Override + public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + return nodePrefixMatchesIndexPrefix(shardRouting, node); + } + + @Override + public Decision canRemain( + IndexMetadata indexMetadata, + ShardRouting shardRouting, + RoutingNode node, + RoutingAllocation allocation + ) { + return nodePrefixMatchesIndexPrefix(shardRouting, node); + } + + private Decision nodePrefixMatchesIndexPrefix(ShardRouting shardRouting, RoutingNode node) { + var indexPrefix = prefix(shardRouting.index().getName()); + var nodePrefix = prefix(node.node().getId()); + return nodePrefix.equals(indexPrefix) ? Decision.YES : Decision.NO; + } + })); + } + + private static String prefix(String value) { + assert value != null && value.contains("-") : "Invalid name passed: " + value; + return value.substring(0, value.indexOf("-")); + } } diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java b/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java index d2f6c05a5d3f6..e6a3f7664bd28 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java @@ -33,6 +33,7 @@ import org.elasticsearch.cluster.routing.allocation.allocator.BalancerSettings; import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalance; import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator; +import org.elasticsearch.cluster.routing.allocation.allocator.GlobalBalancingWeightsFactory; import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; @@ -443,7 +444,10 @@ public void allocateUnassigned( } protected static final NodeAllocationStatsAndWeightsCalculator EMPTY_NODE_ALLOCATION_STATS = - new NodeAllocationStatsAndWeightsCalculator(WriteLoadForecaster.DEFAULT, BalancerSettings.DEFAULT) { + new NodeAllocationStatsAndWeightsCalculator( + WriteLoadForecaster.DEFAULT, + new GlobalBalancingWeightsFactory(BalancerSettings.DEFAULT) + ) { @Override public Map nodesAllocationStatsAndWeights( Metadata metadata,