diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 5e8297fd393c5..5bc40f53cab30 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -324,8 +324,10 @@ private Balancer( this.threshold = threshold; avgShardsPerNode = WeightFunction.avgShardPerNode(metadata, routingNodes); avgWriteLoadPerNode = WeightFunction.avgWriteLoadPerNode(writeLoadForecaster, metadata, routingNodes); - avgDiskUsageInBytesPerNode = WeightFunction.avgDiskUsageInBytesPerNode(allocation.clusterInfo(), metadata, routingNodes); - nodes = Collections.unmodifiableMap(buildModelFromAssigned()); + avgDiskUsageInBytesPerNode = balancingWeights.diskUsageIgnored() + ? 0 + : WeightFunction.avgDiskUsageInBytesPerNode(allocation.clusterInfo(), metadata, routingNodes); + nodes = Collections.unmodifiableMap(buildModelFromAssigned(balancingWeights.diskUsageIgnored())); this.nodeSorters = balancingWeights.createNodeSorters(nodesArray(), this); this.balancingWeights = balancingWeights; this.completeEarlyOnShardAssignmentChange = completeEarlyOnShardAssignmentChange; @@ -1156,10 +1158,10 @@ private Decision decideCanForceAllocateForVacate(ShardRouting shardRouting, Rout * on the target node which we respect during the allocation / balancing * process. In short, this method recreates the status-quo in the cluster. */ - private Map buildModelFromAssigned() { + private Map buildModelFromAssigned(boolean diskUsageIgnored) { Map nodes = Maps.newMapWithExpectedSize(routingNodes.size()); for (RoutingNode rn : routingNodes) { - ModelNode node = new ModelNode(writeLoadForecaster, metadata, allocation.clusterInfo(), rn); + ModelNode node = new ModelNode(writeLoadForecaster, metadata, allocation.clusterInfo(), rn, diskUsageIgnored); nodes.put(rn.nodeId(), node); for (ShardRouting shard : rn) { assert rn.nodeId().equals(shard.currentNodeId()); @@ -1476,13 +1478,21 @@ public static class ModelNode implements Iterable { private final ClusterInfo clusterInfo; private final RoutingNode routingNode; private final Map indices; + private final boolean diskUsageIgnored; - public ModelNode(WriteLoadForecaster writeLoadForecaster, Metadata metadata, ClusterInfo clusterInfo, RoutingNode routingNode) { + public ModelNode( + WriteLoadForecaster writeLoadForecaster, + Metadata metadata, + ClusterInfo clusterInfo, + RoutingNode routingNode, + boolean diskUsageIgnored + ) { this.writeLoadForecaster = writeLoadForecaster; this.metadata = metadata; this.clusterInfo = clusterInfo; this.routingNode = routingNode; this.indices = Maps.newMapWithExpectedSize(routingNode.size() + 10);// some extra to account for shard movements + this.diskUsageIgnored = diskUsageIgnored; } public ModelIndex getIndex(ProjectIndex index) { @@ -1527,7 +1537,9 @@ public void addShard(ProjectIndex index, ShardRouting shard) { indices.computeIfAbsent(index, t -> new ModelIndex()).addShard(shard); IndexMetadata indexMetadata = metadata.getProject(index.project).index(shard.index()); writeLoad += writeLoadForecaster.getForecastedWriteLoad(indexMetadata).orElse(0.0); - diskUsageInBytes += Balancer.getShardDiskUsageInBytes(shard, indexMetadata, clusterInfo); + if (diskUsageIgnored == false) { + diskUsageInBytes += Balancer.getShardDiskUsageInBytes(shard, indexMetadata, clusterInfo); + } numShards++; } @@ -1541,7 +1553,9 @@ public void removeShard(ProjectIndex projectIndex, ShardRouting shard) { } IndexMetadata indexMetadata = metadata.getProject(projectIndex.project).index(shard.index()); writeLoad -= writeLoadForecaster.getForecastedWriteLoad(indexMetadata).orElse(0.0); - diskUsageInBytes -= Balancer.getShardDiskUsageInBytes(shard, indexMetadata, clusterInfo); + if (diskUsageIgnored == false) { + diskUsageInBytes -= Balancer.getShardDiskUsageInBytes(shard, indexMetadata, clusterInfo); + } numShards--; } @@ -1656,7 +1670,10 @@ public float weight(ModelNode node) { } public float minWeightDelta() { - return function.minWeightDelta(balancer.getShardWriteLoad(index), balancer.maxShardSizeBytes(index)); + return function.minWeightDelta( + balancer.getShardWriteLoad(index), + balancer.balancingWeights.diskUsageIgnored() ? 0 : balancer.maxShardSizeBytes(index) + ); } @Override diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancingWeights.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancingWeights.java index b4ee53b996526..be3b2351c4a83 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancingWeights.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancingWeights.java @@ -42,4 +42,9 @@ public interface BalancingWeights { * @return a {@link NodeSorters} instance */ NodeSorters createNodeSorters(BalancedShardsAllocator.ModelNode[] modelNodes, BalancedShardsAllocator.Balancer balancer); + + /** + * Returns true if disk usage is ignored for the purposes of weight calculations + */ + boolean diskUsageIgnored(); } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/GlobalBalancingWeightsFactory.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/GlobalBalancingWeightsFactory.java index eecc549c1ece4..c408e7a39d981 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/GlobalBalancingWeightsFactory.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/GlobalBalancingWeightsFactory.java @@ -31,14 +31,17 @@ public BalancingWeights create() { private class GlobalBalancingWeights implements BalancingWeights { private final WeightFunction weightFunction; + private final boolean diskUsageIgnored; GlobalBalancingWeights() { + final float diskUsageBalanceFactor = balancerSettings.getDiskUsageBalanceFactor(); this.weightFunction = new WeightFunction( balancerSettings.getShardBalanceFactor(), balancerSettings.getIndexBalanceFactor(), balancerSettings.getWriteLoadBalanceFactor(), - balancerSettings.getDiskUsageBalanceFactor() + diskUsageBalanceFactor ); + this.diskUsageIgnored = diskUsageBalanceFactor == 0; } @Override @@ -56,6 +59,11 @@ public NodeSorters createNodeSorters(BalancedShardsAllocator.ModelNode[] modelNo return new GlobalNodeSorters(new BalancedShardsAllocator.NodeSorter(modelNodes, weightFunction, balancer)); } + @Override + public boolean diskUsageIgnored() { + return diskUsageIgnored; + } + private record GlobalNodeSorters(BalancedShardsAllocator.NodeSorter nodeSorter) implements NodeSorters { @Override diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java index 38dba2acf3250..cdbf1a81b24df 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java @@ -43,6 +43,7 @@ import org.elasticsearch.cluster.routing.allocation.decider.SameShardAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; @@ -63,6 +64,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.DoubleSupplier; import java.util.function.Function; import java.util.stream.Collector; @@ -74,6 +76,7 @@ import static java.util.stream.Collectors.summingDouble; import static java.util.stream.Collectors.summingLong; import static java.util.stream.Collectors.toSet; +import static org.elasticsearch.cluster.ClusterInfo.shardIdentifierFromRouting; import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING; import static org.elasticsearch.cluster.routing.TestShardRouting.shardRoutingBuilder; import static org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.Balancer.PrioritiseByShardWriteLoadComparator.THRESHOLD_RATIO; @@ -90,8 +93,6 @@ import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.sameInstance; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; public class BalancedShardsAllocatorTests extends ESAllocationTestCase { @@ -614,9 +615,9 @@ public void testShardSizeDiscrepancyWithinIndex() { () -> ClusterInfo.builder() .shardSizes( Map.of( - ClusterInfo.shardIdentifierFromRouting(new ShardId(index, 0), true), + shardIdentifierFromRouting(new ShardId(index, 0), true), 0L, - ClusterInfo.shardIdentifierFromRouting(new ShardId(index, 1), true), + shardIdentifierFromRouting(new ShardId(index, 1), true), ByteSizeUnit.GB.toBytes(500) ) ) @@ -691,6 +692,78 @@ public void testPartitionedClusterWithSeparateWeights() { assertThat(shardBalancedPartition.get("shardsOnly-2"), hasSize(3)); } + public void testSkipDiskUsageComputation() { + final var modelNodesRef = new AtomicReference(); + final var balancerRef = new AtomicReference(); + // Intentionally configure disk weight factor to be non-zero so that the test would fail if disk usage is not ignored + final var weightFunction = new WeightFunction(1, 1, 1, 1); + final var allocator = new BalancedShardsAllocator( + BalancerSettings.DEFAULT, + TEST_WRITE_LOAD_FORECASTER, + () -> new BalancingWeights() { + @Override + public WeightFunction weightFunctionForShard(ShardRouting shard) { + return weightFunction; + } + + @Override + public WeightFunction weightFunctionForNode(RoutingNode node) { + return weightFunction; + } + + @Override + public NodeSorters createNodeSorters( + BalancedShardsAllocator.ModelNode[] modelNodes, + BalancedShardsAllocator.Balancer balancer + ) { + modelNodesRef.set(modelNodes); + balancerRef.set(balancer); + final BalancedShardsAllocator.NodeSorter nodeSorter = new BalancedShardsAllocator.NodeSorter( + modelNodes, + weightFunction, + balancer + ); + return new NodeSorters() { + @Override + public BalancedShardsAllocator.NodeSorter sorterForShard(ShardRouting shard) { + return nodeSorter; + } + + @Override + public Iterator iterator() { + return Iterators.single(nodeSorter); + } + }; + } + + @Override + public boolean diskUsageIgnored() { + return true; // This makes the computation ignore disk usage + } + } + ); + + final String indexName = randomIdentifier(); + final var clusterState = createStateWithIndices(anIndex(indexName).shardSizeInBytesForecast(ByteSizeValue.ofGb(8).getBytes())); + final var index = clusterState.metadata().getProject(ProjectId.DEFAULT).index(indexName); + + // A cluster info with shard sizes + final var clusterInfo = ClusterInfo.builder() + .shardSizes(Map.of(shardIdentifierFromRouting(new ShardId(index.getIndex(), 0), true), ByteSizeValue.ofGb(8).getBytes())) + .build(); + allocator.allocate( + new RoutingAllocation(new AllocationDeciders(List.of()), clusterState, clusterInfo, null, 0L).mutableCloneForSimulation() + ); + + final var modelNodes = modelNodesRef.get(); + assertNotNull(modelNodes); + final var balancer = balancerRef.get(); + assertNotNull(balancer); + + assertThat(balancer.avgDiskUsageInBytesPerNode(), equalTo(0.0)); + Arrays.stream(modelNodes).forEach(modelNode -> assertThat(modelNode.diskUsageInBytes(), equalTo(0.0))); + } + public void testReturnEarlyOnShardAssignmentChanges() { var shardWriteLoads = new HashMap(); var allocationService = new MockAllocationService( @@ -1228,6 +1301,11 @@ public BalancedShardsAllocator.NodeSorter sorterForShard(ShardRouting shard) { } }; } + + @Override + public boolean diskUsageIgnored() { + return true; + } } }