diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java index c5fd400070936..10d7ebc92f798 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java @@ -33,6 +33,7 @@ import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.AllocationService.RerouteStrategy; import org.elasticsearch.cluster.routing.allocation.AllocationStatsService; +import org.elasticsearch.cluster.routing.allocation.BalancedAllocatorSettings; import org.elasticsearch.cluster.routing.allocation.ExistingShardsAllocator; import org.elasticsearch.cluster.routing.allocation.NodeAllocationStatsAndWeightsCalculator; import org.elasticsearch.cluster.routing.allocation.WriteLoadForecaster; @@ -145,13 +146,15 @@ public ClusterModule( this.clusterPlugins = clusterPlugins; this.deciderList = createAllocationDeciders(settings, clusterService.getClusterSettings(), clusterPlugins); this.allocationDeciders = new AllocationDeciders(deciderList); + BalancedAllocatorSettings balancedAllocatorSettings = new BalancedAllocatorSettings(clusterService.getClusterSettings()); var nodeAllocationStatsAndWeightsCalculator = new NodeAllocationStatsAndWeightsCalculator( writeLoadForecaster, - clusterService.getClusterSettings() + balancedAllocatorSettings ); this.shardsAllocator = createShardsAllocator( settings, clusterService.getClusterSettings(), + balancedAllocatorSettings, threadPool, clusterPlugins, clusterService, @@ -440,6 +443,7 @@ private static void addAllocationDecider(Map, AllocationDecider> decide private static ShardsAllocator createShardsAllocator( Settings settings, ClusterSettings clusterSettings, + BalancedAllocatorSettings balancedAllocatorSettings, ThreadPool threadPool, List clusterPlugins, ClusterService clusterService, @@ -449,12 +453,12 @@ private static ShardsAllocator createShardsAllocator( NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator ) { Map> allocators = new HashMap<>(); - allocators.put(BALANCED_ALLOCATOR, () -> new BalancedShardsAllocator(clusterSettings, writeLoadForecaster)); + allocators.put(BALANCED_ALLOCATOR, () -> new BalancedShardsAllocator(balancedAllocatorSettings, writeLoadForecaster)); allocators.put( DESIRED_BALANCE_ALLOCATOR, () -> new DesiredBalanceShardsAllocator( clusterSettings, - new BalancedShardsAllocator(clusterSettings, writeLoadForecaster), + new BalancedShardsAllocator(balancedAllocatorSettings, writeLoadForecaster), threadPool, clusterService, reconciler, diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/BalancedAllocatorSettings.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/BalancedAllocatorSettings.java new file mode 100644 index 0000000000000..a988a915191e9 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/BalancedAllocatorSettings.java @@ -0,0 +1,113 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster.routing.allocation; + +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; + +import static org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.DISK_USAGE_BALANCE_FACTOR_SETTING; +import static org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.INDEXING_TIER_SHARD_BALANCE_FACTOR_SETTING; +import static org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.INDEXING_TIER_WRITE_LOAD_BALANCE_FACTOR_SETTING; +import static org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.INDEX_BALANCE_FACTOR_SETTING; +import static org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.SEARCH_TIER_SHARD_BALANCE_FACTOR_SETTING; +import static org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.SEARCH_TIER_WRITE_LOAD_BALANCE_FACTOR_SETTING; +import static org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING; +import static org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.THRESHOLD_SETTING; +import static org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.WRITE_LOAD_BALANCE_FACTOR_SETTING; + +public class BalancedAllocatorSettings { + + public static BalancedAllocatorSettings DEFAULT = new BalancedAllocatorSettings(ClusterSettings.createBuiltInClusterSettings()); + private volatile float indexBalanceFactor; + private volatile float shardBalanceFactor; + private volatile float indexingTierShardBalanceFactor; + private volatile float searchTierShardBalanceFactor; + private volatile float writeLoadBalanceFactor; + private volatile float indexingTierWriteLoadBalanceFactor; + private volatile float searchTierWriteLoadBalanceFactor; + private volatile float diskUsageBalanceFactor; + private volatile float threshold; + + public BalancedAllocatorSettings(Settings settings) { + this(ClusterSettings.createBuiltInClusterSettings(settings)); + } + + public BalancedAllocatorSettings(ClusterSettings clusterSettings) { + clusterSettings.initializeAndWatch(SHARD_BALANCE_FACTOR_SETTING, value -> this.shardBalanceFactor = value); + clusterSettings.initializeAndWatch( + INDEXING_TIER_SHARD_BALANCE_FACTOR_SETTING, + value -> this.indexingTierShardBalanceFactor = value + ); + clusterSettings.initializeAndWatch(SEARCH_TIER_SHARD_BALANCE_FACTOR_SETTING, value -> this.searchTierShardBalanceFactor = value); + clusterSettings.initializeAndWatch(INDEX_BALANCE_FACTOR_SETTING, value -> this.indexBalanceFactor = value); + clusterSettings.initializeAndWatch(WRITE_LOAD_BALANCE_FACTOR_SETTING, value -> this.writeLoadBalanceFactor = value); + clusterSettings.initializeAndWatch( + INDEXING_TIER_WRITE_LOAD_BALANCE_FACTOR_SETTING, + value -> this.indexingTierWriteLoadBalanceFactor = value + ); + clusterSettings.initializeAndWatch( + SEARCH_TIER_WRITE_LOAD_BALANCE_FACTOR_SETTING, + value -> this.searchTierWriteLoadBalanceFactor = value + ); + clusterSettings.initializeAndWatch(DISK_USAGE_BALANCE_FACTOR_SETTING, value -> this.diskUsageBalanceFactor = value); + clusterSettings.initializeAndWatch(THRESHOLD_SETTING, value -> this.threshold = value); + } + + /** + * Returns the index related weight factor. + */ + public float getIndexBalanceFactor() { + return indexBalanceFactor; + } + + /** + * Returns the shard related weight factor. + */ + public float getShardBalanceFactor() { + return shardBalanceFactor; + } + + /** + * Returns the shard related weight factor. + */ + public float getIndexingTierShardBalanceFactor() { + return indexingTierShardBalanceFactor; + } + + /** + * Returns the shard related weight factor. + */ + public float getSearchTierShardBalanceFactor() { + return searchTierShardBalanceFactor; + } + + public float getWriteLoadBalanceFactor() { + return writeLoadBalanceFactor; + } + + public float getIndexingTierWriteLoadBalanceFactor() { + return indexingTierWriteLoadBalanceFactor; + } + + public float getSearchTierWriteLoadBalanceFactor() { + return searchTierWriteLoadBalanceFactor; + } + + public float getDiskUsageBalanceFactor() { + return diskUsageBalanceFactor; + } + + /** + * Returns the currently configured delta threshold + */ + public float getThreshold() { + return threshold; + } +} diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocationStatsAndWeightsCalculator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocationStatsAndWeightsCalculator.java index 6466736e1e8ab..eb426b8b18b23 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocationStatsAndWeightsCalculator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocationStatsAndWeightsCalculator.java @@ -15,10 +15,9 @@ import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalance; +import org.elasticsearch.cluster.routing.allocation.allocator.SpecialisedWeightFunction; import org.elasticsearch.cluster.routing.allocation.allocator.WeightFunction; -import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.util.Maps; import org.elasticsearch.core.Nullable; @@ -29,11 +28,7 @@ */ public class NodeAllocationStatsAndWeightsCalculator { private final WriteLoadForecaster writeLoadForecaster; - - private volatile float indexBalanceFactor; - private volatile float shardBalanceFactor; - private volatile float writeLoadBalanceFactor; - private volatile float diskUsageBalanceFactor; + private final BalancedAllocatorSettings settings; /** * Node shard allocation stats and the total node weight. @@ -47,18 +42,9 @@ public record NodeAllocationStatsAndWeight( float currentNodeWeight ) {} - public NodeAllocationStatsAndWeightsCalculator(WriteLoadForecaster writeLoadForecaster, ClusterSettings clusterSettings) { + public NodeAllocationStatsAndWeightsCalculator(WriteLoadForecaster writeLoadForecaster, BalancedAllocatorSettings settings) { this.writeLoadForecaster = writeLoadForecaster; - clusterSettings.initializeAndWatch(BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING, value -> this.shardBalanceFactor = value); - clusterSettings.initializeAndWatch(BalancedShardsAllocator.INDEX_BALANCE_FACTOR_SETTING, value -> this.indexBalanceFactor = value); - clusterSettings.initializeAndWatch( - BalancedShardsAllocator.WRITE_LOAD_BALANCE_FACTOR_SETTING, - value -> this.writeLoadBalanceFactor = value - ); - clusterSettings.initializeAndWatch( - BalancedShardsAllocator.DISK_USAGE_BALANCE_FACTOR_SETTING, - value -> this.diskUsageBalanceFactor = value - ); + this.settings = settings; } /** @@ -74,7 +60,7 @@ public Map nodesAllocationStatsAndWeights( // must not use licensed features when just starting up writeLoadForecaster.refreshLicense(); } - var weightFunction = new WeightFunction(shardBalanceFactor, indexBalanceFactor, writeLoadBalanceFactor, diskUsageBalanceFactor); + var weightFunction = new SpecialisedWeightFunction(settings); var avgShardsPerNode = WeightFunction.avgShardPerNode(metadata, routingNodes); var avgWriteLoadPerNode = WeightFunction.avgWriteLoadPerNode(writeLoadForecaster, metadata, routingNodes); var avgDiskUsageInBytesPerNode = WeightFunction.avgDiskUsageInBytesPerNode(clusterInfo, metadata, routingNodes); @@ -102,6 +88,7 @@ public Map nodesAllocationStatsAndWeights( currentDiskUsage += shardSize; } float currentNodeWeight = weightFunction.calculateNodeWeight( + node, shards, avgShardsPerNode, forecastedWriteLoad, diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index cc6f1bcbf3477..7714aef584178 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -27,6 +27,7 @@ import org.elasticsearch.cluster.routing.UnassignedInfo.AllocationStatus; import org.elasticsearch.cluster.routing.allocation.AllocateUnassignedDecision; import org.elasticsearch.cluster.routing.allocation.AllocationDecision; +import org.elasticsearch.cluster.routing.allocation.BalancedAllocatorSettings; import org.elasticsearch.cluster.routing.allocation.MoveDecision; import org.elasticsearch.cluster.routing.allocation.NodeAllocationResult; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; @@ -85,6 +86,20 @@ public class BalancedShardsAllocator implements ShardsAllocator { Property.Dynamic, Property.NodeScope ); + public static final Setting INDEXING_TIER_SHARD_BALANCE_FACTOR_SETTING = Setting.floatSetting( + "cluster.routing.allocation.balance.shard.indexing", + SHARD_BALANCE_FACTOR_SETTING, + 0.0f, + Property.Dynamic, + Property.NodeScope + ); + public static final Setting SEARCH_TIER_SHARD_BALANCE_FACTOR_SETTING = Setting.floatSetting( + "cluster.routing.allocation.balance.shard.search", + SHARD_BALANCE_FACTOR_SETTING, + 0.0f, + Property.Dynamic, + Property.NodeScope + ); public static final Setting INDEX_BALANCE_FACTOR_SETTING = Setting.floatSetting( "cluster.routing.allocation.balance.index", 0.55f, @@ -99,6 +114,20 @@ public class BalancedShardsAllocator implements ShardsAllocator { Property.Dynamic, Property.NodeScope ); + public static final Setting INDEXING_TIER_WRITE_LOAD_BALANCE_FACTOR_SETTING = Setting.floatSetting( + "cluster.routing.allocation.balance.write_load.indexing", + WRITE_LOAD_BALANCE_FACTOR_SETTING, + 0.0f, + Property.Dynamic, + Property.NodeScope + ); + public static final Setting SEARCH_TIER_WRITE_LOAD_BALANCE_FACTOR_SETTING = Setting.floatSetting( + "cluster.routing.allocation.balance.write_load.search", + WRITE_LOAD_BALANCE_FACTOR_SETTING, + 0.0f, + Property.Dynamic, + Property.NodeScope + ); public static final Setting DISK_USAGE_BALANCE_FACTOR_SETTING = Setting.floatSetting( "cluster.routing.allocation.balance.disk_usage", 2e-11f, @@ -114,13 +143,7 @@ public class BalancedShardsAllocator implements ShardsAllocator { Property.NodeScope ); - // TODO: deduplicate these fields, use the fields in NodeAllocationStatsAndWeightsCalculator instead. - private volatile float indexBalanceFactor; - private volatile float shardBalanceFactor; - private volatile float writeLoadBalanceFactor; - private volatile float diskUsageBalanceFactor; - private volatile float threshold; - + private final BalancedAllocatorSettings settings; private final WriteLoadForecaster writeLoadForecaster; public BalancedShardsAllocator() { @@ -135,13 +158,13 @@ public BalancedShardsAllocator(ClusterSettings clusterSettings) { this(clusterSettings, WriteLoadForecaster.DEFAULT); } - @Inject public BalancedShardsAllocator(ClusterSettings clusterSettings, WriteLoadForecaster writeLoadForecaster) { - clusterSettings.initializeAndWatch(SHARD_BALANCE_FACTOR_SETTING, value -> this.shardBalanceFactor = value); - clusterSettings.initializeAndWatch(INDEX_BALANCE_FACTOR_SETTING, value -> this.indexBalanceFactor = value); - clusterSettings.initializeAndWatch(WRITE_LOAD_BALANCE_FACTOR_SETTING, value -> this.writeLoadBalanceFactor = value); - clusterSettings.initializeAndWatch(DISK_USAGE_BALANCE_FACTOR_SETTING, value -> this.diskUsageBalanceFactor = value); - clusterSettings.initializeAndWatch(THRESHOLD_SETTING, value -> this.threshold = value); + this(new BalancedAllocatorSettings(clusterSettings), writeLoadForecaster); + } + + @Inject + public BalancedShardsAllocator(BalancedAllocatorSettings settings, WriteLoadForecaster writeLoadForecaster) { + this.settings = settings; this.writeLoadForecaster = writeLoadForecaster; } @@ -158,13 +181,8 @@ public void allocate(RoutingAllocation allocation) { failAllocationOfNewPrimaries(allocation); return; } - final WeightFunction weightFunction = new WeightFunction( - shardBalanceFactor, - indexBalanceFactor, - writeLoadBalanceFactor, - diskUsageBalanceFactor - ); - final Balancer balancer = new Balancer(writeLoadForecaster, allocation, weightFunction, threshold); + final WeightFunction weightFunction = new SpecialisedWeightFunction(settings); + final Balancer balancer = new Balancer(writeLoadForecaster, allocation, weightFunction, settings.getThreshold()); balancer.allocateUnassigned(); balancer.moveShards(); balancer.balance(); @@ -178,6 +196,7 @@ private void collectAndRecordNodeWeightStats(Balancer balancer, WeightFunction w for (var entry : balancer.nodes.entrySet()) { var node = entry.getValue(); var nodeWeight = weightFunction.calculateNodeWeight( + node.routingNode, node.numShards(), balancer.avgShardsPerNode(), node.writeLoad(), @@ -195,13 +214,8 @@ private void collectAndRecordNodeWeightStats(Balancer balancer, WeightFunction w @Override public ShardAllocationDecision decideShardAllocation(final ShardRouting shard, final RoutingAllocation allocation) { - WeightFunction weightFunction = new WeightFunction( - shardBalanceFactor, - indexBalanceFactor, - writeLoadBalanceFactor, - diskUsageBalanceFactor - ); - Balancer balancer = new Balancer(writeLoadForecaster, allocation, weightFunction, threshold); + WeightFunction weightFunction = new SpecialisedWeightFunction(settings); + Balancer balancer = new Balancer(writeLoadForecaster, allocation, weightFunction, settings.getThreshold()); AllocateUnassignedDecision allocateUnassignedDecision = AllocateUnassignedDecision.NOT_TAKEN; MoveDecision moveDecision = MoveDecision.NOT_TAKEN; final ProjectIndex index = new ProjectIndex(allocation, shard); @@ -248,21 +262,21 @@ private void failAllocationOfNewPrimaries(RoutingAllocation allocation) { * Returns the currently configured delta threshold */ public float getThreshold() { - return threshold; + return settings.getThreshold(); } /** * Returns the index related weight factor. */ public float getIndexBalance() { - return indexBalanceFactor; + return settings.getIndexBalanceFactor(); } /** * Returns the shard related weight factor. */ public float getShardBalance() { - return shardBalanceFactor; + return settings.getShardBalanceFactor(); } /** @@ -484,7 +498,7 @@ private MoveDecision decideRebalance(final ProjectIndex index, final ShardRoutin // then even though the node we are examining has a better weight and may make the cluster balance // more even, it doesn't make sense to execute the heavyweight operation of relocating a shard unless // the gains make it worth it, as defined by the threshold - final float localThreshold = sorter.minWeightDelta() * threshold; + final float localThreshold = sorter.minWeightDelta(node) * threshold; boolean deltaAboveThreshold = lessThan(currentDelta, localThreshold) == false; // calculate the delta of the weights of the two nodes if we were to add the shard to the // node in question and move it away from the node that currently holds it. @@ -597,10 +611,10 @@ private void balanceByWeights() { sorter.reset(index, 0, relevantNodes); int lowIdx = 0; int highIdx = relevantNodes - 1; - final float localThreshold = sorter.minWeightDelta() * threshold; while (true) { final ModelNode minNode = modelNodes[lowIdx]; final ModelNode maxNode = modelNodes[highIdx]; + final float localThreshold = sorter.minWeightDelta(minNode) * threshold; advance_range: if (maxNode.numShards(index) > 0) { final float delta = absDelta(weights[lowIdx], weights[highIdx]); if (lessThan(delta, localThreshold)) { @@ -1189,6 +1203,7 @@ static class ModelNode implements Iterable { private final ClusterInfo clusterInfo; private final RoutingNode routingNode; private final Map indices; + private final NodeType nodeType; ModelNode(WriteLoadForecaster writeLoadForecaster, Metadata metadata, ClusterInfo clusterInfo, RoutingNode routingNode) { this.writeLoadForecaster = writeLoadForecaster; @@ -1196,6 +1211,7 @@ static class ModelNode implements Iterable { this.clusterInfo = clusterInfo; this.routingNode = routingNode; this.indices = Maps.newMapWithExpectedSize(routingNode.size() + 10);// some extra to account for shard movements + this.nodeType = NodeType.forNode(routingNode); } public ModelIndex getIndex(ProjectIndex index) { @@ -1275,6 +1291,10 @@ public boolean containsShard(ProjectIndex projIndex, ShardRouting shard) { ModelIndex index = getIndex(projIndex); return index != null && index.containsShard(shard); } + + public NodeType nodeType() { + return nodeType; + } } static final class ModelIndex implements Iterable { @@ -1360,8 +1380,8 @@ public float weight(ModelNode node) { return function.calculateNodeWeightWithIndex(balancer, node, index); } - public float minWeightDelta() { - return function.minWeightDelta(balancer.getShardWriteLoad(index), balancer.maxShardSizeBytes(index)); + public float minWeightDelta(ModelNode targetNode) { + return function.minWeightDelta(targetNode, balancer.getShardWriteLoad(index), balancer.maxShardSizeBytes(index)); } @Override diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/NodeType.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/NodeType.java new file mode 100644 index 0000000000000..f8c4e23eb4b11 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/NodeType.java @@ -0,0 +1,36 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster.routing.allocation.allocator; + +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodeRole; +import org.elasticsearch.cluster.routing.RoutingNode; + +/** + * The set of "node types" that are significant for balancer weight calculations. + */ +enum NodeType { + INDEXING, + SEARCH, + OTHER; + + static NodeType forNode(RoutingNode node) { + final DiscoveryNode discoveryNode = node.node(); + if (discoveryNode == null) { + return NodeType.OTHER; + } else if (discoveryNode.getRoles().contains(DiscoveryNodeRole.INDEX_ROLE)) { + return INDEXING; + } else if (discoveryNode.getRoles().contains(DiscoveryNodeRole.SEARCH_ROLE)) { + return SEARCH; + } else { + return NodeType.OTHER; + } + } +} diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/SingleWeightFunction.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/SingleWeightFunction.java new file mode 100644 index 0000000000000..53e62d14f98d1 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/SingleWeightFunction.java @@ -0,0 +1,96 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster.routing.allocation.allocator; + +import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.ProjectIndex; + +/** + * This class is the primary weight function used to create balanced over nodes and shards in the cluster. + * Currently this function has 3 properties: + *
    + *
  • index balance - balance property over shards per index
  • + *
  • shard balance - balance property over shards per cluster
  • + *
+ *

+ * Each of these properties are expressed as factor such that the properties factor defines the relative + * importance of the property for the weight function. For example if the weight function should calculate + * the weights only based on a global (shard) balance the index balance can be set to {@code 0.0} and will + * in turn have no effect on the distribution. + *

+ * The weight per index is calculated based on the following formula: + *
    + *
  • + * weightindex(node, index) = indexBalance * (node.numShards(index) - avgShardsPerNode(index)) + *
  • + *
  • + * weightnode(node, index) = shardBalance * (node.numShards() - avgShardsPerNode) + *
  • + *
+ * weight(node, index) = weightindex(node, index) + weightnode(node, index) + */ +public class SingleWeightFunction implements WeightFunction { + + private final float theta0; + private final float theta1; + private final float theta2; + private final float theta3; + + public SingleWeightFunction(float shardBalance, float indexBalance, float writeLoadBalance, float diskUsageBalance) { + float sum = shardBalance + indexBalance + writeLoadBalance + diskUsageBalance; + if (sum <= 0.0f) { + throw new IllegalArgumentException("Balance factors must sum to a value > 0 but was: " + sum); + } + theta0 = shardBalance / sum; + theta1 = indexBalance / sum; + theta2 = writeLoadBalance / sum; + theta3 = diskUsageBalance / sum; + } + + @Override + public float calculateNodeWeightWithIndex( + BalancedShardsAllocator.Balancer balancer, + BalancedShardsAllocator.ModelNode node, + ProjectIndex index + ) { + final float weightIndex = node.numShards(index) - balancer.avgShardsPerNode(index); + final float nodeWeight = calculateNodeWeight( + node.getRoutingNode(), + node.numShards(), + balancer.avgShardsPerNode(), + node.writeLoad(), + balancer.avgWriteLoadPerNode(), + node.diskUsageInBytes(), + balancer.avgDiskUsageInBytesPerNode() + ); + return nodeWeight + theta1 * weightIndex; + } + + @Override + public float calculateNodeWeight( + RoutingNode routingNode, + int nodeNumShards, + float avgShardsPerNode, + double nodeWriteLoad, + double avgWriteLoadPerNode, + double diskUsageInBytes, + double avgDiskUsageInBytesPerNode + ) { + final float weightShard = nodeNumShards - avgShardsPerNode; + final float ingestLoad = (float) (nodeWriteLoad - avgWriteLoadPerNode); + final float diskUsage = (float) (diskUsageInBytes - avgDiskUsageInBytesPerNode); + return theta0 * weightShard + theta2 * ingestLoad + theta3 * diskUsage; + } + + @Override + public float minWeightDelta(BalancedShardsAllocator.ModelNode modelNode, float shardWriteLoad, float shardSizeBytes) { + return theta0 * 1 + theta1 * 1 + theta2 * shardWriteLoad + theta3 * shardSizeBytes; + } +} diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/SpecialisedWeightFunction.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/SpecialisedWeightFunction.java new file mode 100644 index 0000000000000..6fffb17336713 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/SpecialisedWeightFunction.java @@ -0,0 +1,84 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster.routing.allocation.allocator; + +import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.allocation.BalancedAllocatorSettings; + +public class SpecialisedWeightFunction implements WeightFunction { + + private final SingleWeightFunction defaultWeightFunction; + private final SingleWeightFunction indexingWeightFunction; + private final SingleWeightFunction searchWeightFunction; + + public SpecialisedWeightFunction(BalancedAllocatorSettings balancedAllocatorSettings) { + this.defaultWeightFunction = new SingleWeightFunction( + balancedAllocatorSettings.getShardBalanceFactor(), + balancedAllocatorSettings.getIndexBalanceFactor(), + balancedAllocatorSettings.getWriteLoadBalanceFactor(), + balancedAllocatorSettings.getDiskUsageBalanceFactor() + ); + this.indexingWeightFunction = new SingleWeightFunction( + balancedAllocatorSettings.getIndexingTierShardBalanceFactor(), + balancedAllocatorSettings.getIndexBalanceFactor(), + balancedAllocatorSettings.getIndexingTierWriteLoadBalanceFactor(), + balancedAllocatorSettings.getDiskUsageBalanceFactor() + ); + this.searchWeightFunction = new SingleWeightFunction( + balancedAllocatorSettings.getSearchTierShardBalanceFactor(), + balancedAllocatorSettings.getIndexBalanceFactor(), + balancedAllocatorSettings.getSearchTierWriteLoadBalanceFactor(), + balancedAllocatorSettings.getDiskUsageBalanceFactor() + ); + } + + @Override + public float calculateNodeWeightWithIndex( + BalancedShardsAllocator.Balancer balancer, + BalancedShardsAllocator.ModelNode node, + BalancedShardsAllocator.ProjectIndex index + ) { + return weightFunctionForType(node.nodeType()).calculateNodeWeightWithIndex(balancer, node, index); + } + + @Override + public float calculateNodeWeight( + RoutingNode node, + int nodeNumShards, + float avgShardsPerNode, + double nodeWriteLoad, + double avgWriteLoadPerNode, + double diskUsageInBytes, + double avgDiskUsageInBytesPerNode + ) { + return weightFunctionForType(NodeType.forNode(node)).calculateNodeWeight( + node, + nodeNumShards, + avgShardsPerNode, + nodeWriteLoad, + avgWriteLoadPerNode, + diskUsageInBytes, + avgDiskUsageInBytesPerNode + ); + } + + @Override + public float minWeightDelta(BalancedShardsAllocator.ModelNode modelNode, float shardWriteLoad, float shardSizeBytes) { + return weightFunctionForType(modelNode.nodeType()).minWeightDelta(modelNode, shardWriteLoad, shardSizeBytes); + } + + private WeightFunction weightFunctionForType(NodeType nodeType) { + return switch (nodeType) { + case SEARCH -> searchWeightFunction; + case INDEXING -> indexingWeightFunction; + default -> defaultWeightFunction; + }; + } +} diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/WeightFunction.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/WeightFunction.java index 5b58b1d022590..4cc17e251fb80 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/WeightFunction.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/WeightFunction.java @@ -13,97 +13,40 @@ import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.ProjectMetadata; +import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.allocation.WriteLoadForecaster; -import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.ProjectIndex; import org.elasticsearch.index.shard.ShardId; -/** - * This class is the primary weight function used to create balanced over nodes and shards in the cluster. - * Currently this function has 3 properties: - *
    - *
  • index balance - balance property over shards per index
  • - *
  • shard balance - balance property over shards per cluster
  • - *
- *

- * Each of these properties are expressed as factor such that the properties factor defines the relative - * importance of the property for the weight function. For example if the weight function should calculate - * the weights only based on a global (shard) balance the index balance can be set to {@code 0.0} and will - * in turn have no effect on the distribution. - *

- * The weight per index is calculated based on the following formula: - *
    - *
  • - * weightindex(node, index) = indexBalance * (node.numShards(index) - avgShardsPerNode(index)) - *
  • - *
  • - * weightnode(node, index) = shardBalance * (node.numShards() - avgShardsPerNode) - *
  • - *
- * weight(node, index) = weightindex(node, index) + weightnode(node, index) - */ -public class WeightFunction { - - private final float theta0; - private final float theta1; - private final float theta2; - private final float theta3; - - public WeightFunction(float shardBalance, float indexBalance, float writeLoadBalance, float diskUsageBalance) { - float sum = shardBalance + indexBalance + writeLoadBalance + diskUsageBalance; - if (sum <= 0.0f) { - throw new IllegalArgumentException("Balance factors must sum to a value > 0 but was: " + sum); - } - theta0 = shardBalance / sum; - theta1 = indexBalance / sum; - theta2 = writeLoadBalance / sum; - theta3 = diskUsageBalance / sum; - } +public interface WeightFunction { float calculateNodeWeightWithIndex( BalancedShardsAllocator.Balancer balancer, BalancedShardsAllocator.ModelNode node, - ProjectIndex index - ) { - final float weightIndex = node.numShards(index) - balancer.avgShardsPerNode(index); - final float nodeWeight = calculateNodeWeight( - node.numShards(), - balancer.avgShardsPerNode(), - node.writeLoad(), - balancer.avgWriteLoadPerNode(), - node.diskUsageInBytes(), - balancer.avgDiskUsageInBytesPerNode() - ); - return nodeWeight + theta1 * weightIndex; - } + BalancedShardsAllocator.ProjectIndex index + ); - public float calculateNodeWeight( + float calculateNodeWeight( + RoutingNode node, int nodeNumShards, float avgShardsPerNode, double nodeWriteLoad, double avgWriteLoadPerNode, double diskUsageInBytes, double avgDiskUsageInBytesPerNode - ) { - final float weightShard = nodeNumShards - avgShardsPerNode; - final float ingestLoad = (float) (nodeWriteLoad - avgWriteLoadPerNode); - final float diskUsage = (float) (diskUsageInBytes - avgDiskUsageInBytesPerNode); - return theta0 * weightShard + theta2 * ingestLoad + theta3 * diskUsage; - } + ); - float minWeightDelta(float shardWriteLoad, float shardSizeBytes) { - return theta0 * 1 + theta1 * 1 + theta2 * shardWriteLoad + theta3 * shardSizeBytes; - } + float minWeightDelta(BalancedShardsAllocator.ModelNode targetNode, float shardWriteLoad, float shardSizeBytes); - public static float avgShardPerNode(Metadata metadata, RoutingNodes routingNodes) { + static float avgShardPerNode(Metadata metadata, RoutingNodes routingNodes) { return ((float) metadata.getTotalNumberOfShards()) / routingNodes.size(); } - public static double avgWriteLoadPerNode(WriteLoadForecaster writeLoadForecaster, Metadata metadata, RoutingNodes routingNodes) { + static double avgWriteLoadPerNode(WriteLoadForecaster writeLoadForecaster, Metadata metadata, RoutingNodes routingNodes) { return getTotalWriteLoad(writeLoadForecaster, metadata) / routingNodes.size(); } - public static double avgDiskUsageInBytesPerNode(ClusterInfo clusterInfo, Metadata metadata, RoutingNodes routingNodes) { + static double avgDiskUsageInBytesPerNode(ClusterInfo clusterInfo, Metadata metadata, RoutingNodes routingNodes) { return ((double) getTotalDiskUsageInBytes(clusterInfo, metadata) / routingNodes.size()); } diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 9cee0872f3f7d..4b44f0c4ad3ac 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -222,7 +222,11 @@ public void apply(Settings value, Settings current, Settings previous) { AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING, BalancedShardsAllocator.INDEX_BALANCE_FACTOR_SETTING, BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING, + BalancedShardsAllocator.INDEXING_TIER_SHARD_BALANCE_FACTOR_SETTING, + BalancedShardsAllocator.SEARCH_TIER_SHARD_BALANCE_FACTOR_SETTING, BalancedShardsAllocator.WRITE_LOAD_BALANCE_FACTOR_SETTING, + BalancedShardsAllocator.INDEXING_TIER_WRITE_LOAD_BALANCE_FACTOR_SETTING, + BalancedShardsAllocator.SEARCH_TIER_WRITE_LOAD_BALANCE_FACTOR_SETTING, BalancedShardsAllocator.DISK_USAGE_BALANCE_FACTOR_SETTING, BalancedShardsAllocator.THRESHOLD_SETTING, DataStreamAutoShardingService.DATA_STREAMS_AUTO_SHARDING_DECREASE_SHARDS_COOLDOWN, diff --git a/server/src/main/java/org/elasticsearch/common/settings/Setting.java b/server/src/main/java/org/elasticsearch/common/settings/Setting.java index 7ec4399b4ff94..b442f09d39d8c 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/Setting.java +++ b/server/src/main/java/org/elasticsearch/common/settings/Setting.java @@ -1368,7 +1368,16 @@ public static Setting floatSetting(String key, float defaultValue, Proper public static Setting floatSetting(String key, float defaultValue, float minValue, Property... properties) { final boolean isFiltered = isFiltered(properties); - return new Setting<>(key, Float.toString(defaultValue), (s) -> { + return new Setting<>(key, Float.toString(defaultValue), floatParser(key, minValue, properties), properties); + } + + public static Setting floatSetting(String key, Setting fallbackSetting, float minValue, Property... properties) { + return new Setting<>(key, fallbackSetting, floatParser(key, minValue, properties), properties); + } + + private static Function floatParser(String key, float minValue, Property... properties) { + final boolean isFiltered = isFiltered(properties); + return (s) -> { float value = Float.parseFloat(s); if (value < minValue) { String err = "Failed to parse value" @@ -1380,7 +1389,7 @@ public static Setting floatSetting(String key, float defaultValue, float throw new IllegalArgumentException(err); } return value; - }, properties); + }; } private static boolean isFiltered(Property[] properties) { diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsServiceTests.java index 2a4d91437df0b..889868b7cfb2e 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsServiceTests.java @@ -84,7 +84,7 @@ public void testShardStats() { clusterService, () -> clusterInfo, createShardAllocator(), - new NodeAllocationStatsAndWeightsCalculator(TEST_WRITE_LOAD_FORECASTER, ClusterSettings.createBuiltInClusterSettings()) + new NodeAllocationStatsAndWeightsCalculator(TEST_WRITE_LOAD_FORECASTER, BalancedAllocatorSettings.DEFAULT) ); assertThat( service.stats(), @@ -125,7 +125,7 @@ public void testRelocatingShardIsOnlyCountedOnceOnTargetNode() { clusterService, EmptyClusterInfoService.INSTANCE, createShardAllocator(), - new NodeAllocationStatsAndWeightsCalculator(TEST_WRITE_LOAD_FORECASTER, ClusterSettings.createBuiltInClusterSettings()) + new NodeAllocationStatsAndWeightsCalculator(TEST_WRITE_LOAD_FORECASTER, BalancedAllocatorSettings.DEFAULT) ); assertThat( service.stats(), @@ -182,7 +182,7 @@ public DesiredBalance getDesiredBalance() { ); } }, - new NodeAllocationStatsAndWeightsCalculator(TEST_WRITE_LOAD_FORECASTER, ClusterSettings.createBuiltInClusterSettings()) + new NodeAllocationStatsAndWeightsCalculator(TEST_WRITE_LOAD_FORECASTER, BalancedAllocatorSettings.DEFAULT) ); assertThat( service.stats(), diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java index b32621e62ab58..c25e4611ccc0f 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java @@ -34,11 +34,11 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.allocation.AllocateUnassignedDecision; +import org.elasticsearch.cluster.routing.allocation.BalancedAllocatorSettings; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; import org.elasticsearch.common.UUIDs; -import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; @@ -225,7 +225,7 @@ public void testBalanceByForecastWriteLoad() { var allocationService = new MockAllocationService( yesAllocationDeciders(), new TestGatewayAllocator(), - new BalancedShardsAllocator(ClusterSettings.createBuiltInClusterSettings(), TEST_WRITE_LOAD_FORECASTER), + new BalancedShardsAllocator(BalancedAllocatorSettings.DEFAULT, TEST_WRITE_LOAD_FORECASTER), EmptyClusterInfoService.INSTANCE, SNAPSHOT_INFO_SERVICE_WITH_NO_SHARD_SIZES ); diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java b/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java index a6d18a46016e9..ba5790f4d43cc 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java @@ -25,6 +25,7 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.routing.allocation.AllocationService; +import org.elasticsearch.cluster.routing.allocation.BalancedAllocatorSettings; import org.elasticsearch.cluster.routing.allocation.FailedShard; import org.elasticsearch.cluster.routing.allocation.NodeAllocationStatsAndWeightsCalculator; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; @@ -442,7 +443,7 @@ public void allocateUnassigned( } protected static final NodeAllocationStatsAndWeightsCalculator EMPTY_NODE_ALLOCATION_STATS = - new NodeAllocationStatsAndWeightsCalculator(WriteLoadForecaster.DEFAULT, createBuiltInClusterSettings()) { + new NodeAllocationStatsAndWeightsCalculator(WriteLoadForecaster.DEFAULT, BalancedAllocatorSettings.DEFAULT) { @Override public Map nodesAllocationStatsAndWeights( Metadata metadata,