Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -324,8 +324,10 @@ private Balancer(
this.threshold = threshold;
avgShardsPerNode = WeightFunction.avgShardPerNode(metadata, routingNodes);
avgWriteLoadPerNode = WeightFunction.avgWriteLoadPerNode(writeLoadForecaster, metadata, routingNodes);
avgDiskUsageInBytesPerNode = WeightFunction.avgDiskUsageInBytesPerNode(allocation.clusterInfo(), metadata, routingNodes);
nodes = Collections.unmodifiableMap(buildModelFromAssigned());
avgDiskUsageInBytesPerNode = balancingWeights.diskUsageIgnored()
? 0
: WeightFunction.avgDiskUsageInBytesPerNode(allocation.clusterInfo(), metadata, routingNodes);
nodes = Collections.unmodifiableMap(buildModelFromAssigned(balancingWeights.diskUsageIgnored()));
this.nodeSorters = balancingWeights.createNodeSorters(nodesArray(), this);
this.balancingWeights = balancingWeights;
this.completeEarlyOnShardAssignmentChange = completeEarlyOnShardAssignmentChange;
Expand Down Expand Up @@ -1156,10 +1158,10 @@ private Decision decideCanForceAllocateForVacate(ShardRouting shardRouting, Rout
* on the target node which we respect during the allocation / balancing
* process. In short, this method recreates the status-quo in the cluster.
*/
private Map<String, ModelNode> buildModelFromAssigned() {
private Map<String, ModelNode> buildModelFromAssigned(boolean diskUsageIgnored) {
Map<String, ModelNode> nodes = Maps.newMapWithExpectedSize(routingNodes.size());
for (RoutingNode rn : routingNodes) {
ModelNode node = new ModelNode(writeLoadForecaster, metadata, allocation.clusterInfo(), rn);
ModelNode node = new ModelNode(writeLoadForecaster, metadata, allocation.clusterInfo(), rn, diskUsageIgnored);
nodes.put(rn.nodeId(), node);
for (ShardRouting shard : rn) {
assert rn.nodeId().equals(shard.currentNodeId());
Expand Down Expand Up @@ -1476,13 +1478,21 @@ public static class ModelNode implements Iterable<ModelIndex> {
private final ClusterInfo clusterInfo;
private final RoutingNode routingNode;
private final Map<ProjectIndex, ModelIndex> indices;
private final boolean diskUsageIgnored;

public ModelNode(WriteLoadForecaster writeLoadForecaster, Metadata metadata, ClusterInfo clusterInfo, RoutingNode routingNode) {
public ModelNode(
WriteLoadForecaster writeLoadForecaster,
Metadata metadata,
ClusterInfo clusterInfo,
RoutingNode routingNode,
boolean diskUsageIgnored
) {
this.writeLoadForecaster = writeLoadForecaster;
this.metadata = metadata;
this.clusterInfo = clusterInfo;
this.routingNode = routingNode;
this.indices = Maps.newMapWithExpectedSize(routingNode.size() + 10);// some extra to account for shard movements
this.diskUsageIgnored = diskUsageIgnored;
}

public ModelIndex getIndex(ProjectIndex index) {
Expand Down Expand Up @@ -1527,7 +1537,9 @@ public void addShard(ProjectIndex index, ShardRouting shard) {
indices.computeIfAbsent(index, t -> new ModelIndex()).addShard(shard);
IndexMetadata indexMetadata = metadata.getProject(index.project).index(shard.index());
writeLoad += writeLoadForecaster.getForecastedWriteLoad(indexMetadata).orElse(0.0);
diskUsageInBytes += Balancer.getShardDiskUsageInBytes(shard, indexMetadata, clusterInfo);
if (diskUsageIgnored == false) {
diskUsageInBytes += Balancer.getShardDiskUsageInBytes(shard, indexMetadata, clusterInfo);
}
numShards++;
}

Expand All @@ -1541,7 +1553,9 @@ public void removeShard(ProjectIndex projectIndex, ShardRouting shard) {
}
IndexMetadata indexMetadata = metadata.getProject(projectIndex.project).index(shard.index());
writeLoad -= writeLoadForecaster.getForecastedWriteLoad(indexMetadata).orElse(0.0);
diskUsageInBytes -= Balancer.getShardDiskUsageInBytes(shard, indexMetadata, clusterInfo);
if (diskUsageIgnored == false) {
diskUsageInBytes -= Balancer.getShardDiskUsageInBytes(shard, indexMetadata, clusterInfo);
}
numShards--;
}

Expand Down Expand Up @@ -1656,7 +1670,10 @@ public float weight(ModelNode node) {
}

public float minWeightDelta() {
return function.minWeightDelta(balancer.getShardWriteLoad(index), balancer.maxShardSizeBytes(index));
return function.minWeightDelta(
balancer.getShardWriteLoad(index),
balancer.balancingWeights.diskUsageIgnored() ? 0 : balancer.maxShardSizeBytes(index)
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,9 @@ public interface BalancingWeights {
* @return a {@link NodeSorters} instance
*/
NodeSorters createNodeSorters(BalancedShardsAllocator.ModelNode[] modelNodes, BalancedShardsAllocator.Balancer balancer);

/**
* Returns true if disk usage is ignored for the purposes of weight calculations
*/
boolean diskUsageIgnored();
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,17 @@ public BalancingWeights create() {
private class GlobalBalancingWeights implements BalancingWeights {

private final WeightFunction weightFunction;
private final boolean diskUsageIgnored;

GlobalBalancingWeights() {
final float diskUsageBalanceFactor = balancerSettings.getDiskUsageBalanceFactor();
this.weightFunction = new WeightFunction(
balancerSettings.getShardBalanceFactor(),
balancerSettings.getIndexBalanceFactor(),
balancerSettings.getWriteLoadBalanceFactor(),
balancerSettings.getDiskUsageBalanceFactor()
diskUsageBalanceFactor
);
this.diskUsageIgnored = diskUsageBalanceFactor == 0;
}

@Override
Expand All @@ -56,6 +59,11 @@ public NodeSorters createNodeSorters(BalancedShardsAllocator.ModelNode[] modelNo
return new GlobalNodeSorters(new BalancedShardsAllocator.NodeSorter(modelNodes, weightFunction, balancer));
}

@Override
public boolean diskUsageIgnored() {
return diskUsageIgnored;
}

private record GlobalNodeSorters(BalancedShardsAllocator.NodeSorter nodeSorter) implements NodeSorters {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.elasticsearch.cluster.routing.allocation.decider.SameShardAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
Expand All @@ -63,6 +64,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.DoubleSupplier;
import java.util.function.Function;
import java.util.stream.Collector;
Expand All @@ -74,6 +76,7 @@
import static java.util.stream.Collectors.summingDouble;
import static java.util.stream.Collectors.summingLong;
import static java.util.stream.Collectors.toSet;
import static org.elasticsearch.cluster.ClusterInfo.shardIdentifierFromRouting;
import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
import static org.elasticsearch.cluster.routing.TestShardRouting.shardRoutingBuilder;
import static org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.Balancer.PrioritiseByShardWriteLoadComparator.THRESHOLD_RATIO;
Expand All @@ -90,8 +93,6 @@
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.sameInstance;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

public class BalancedShardsAllocatorTests extends ESAllocationTestCase {

Expand Down Expand Up @@ -614,9 +615,9 @@ public void testShardSizeDiscrepancyWithinIndex() {
() -> ClusterInfo.builder()
.shardSizes(
Map.of(
ClusterInfo.shardIdentifierFromRouting(new ShardId(index, 0), true),
shardIdentifierFromRouting(new ShardId(index, 0), true),
0L,
ClusterInfo.shardIdentifierFromRouting(new ShardId(index, 1), true),
shardIdentifierFromRouting(new ShardId(index, 1), true),
ByteSizeUnit.GB.toBytes(500)
)
)
Expand Down Expand Up @@ -691,6 +692,78 @@ public void testPartitionedClusterWithSeparateWeights() {
assertThat(shardBalancedPartition.get("shardsOnly-2"), hasSize(3));
}

public void testSkipDiskUsageComputation() {
final var modelNodesRef = new AtomicReference<BalancedShardsAllocator.ModelNode[]>();
final var balancerRef = new AtomicReference<BalancedShardsAllocator.Balancer>();
// Intentionally configure disk weight factor to be non-zero so that the test would fail if disk usage is not ignored
final var weightFunction = new WeightFunction(1, 1, 1, 1);
final var allocator = new BalancedShardsAllocator(
BalancerSettings.DEFAULT,
TEST_WRITE_LOAD_FORECASTER,
() -> new BalancingWeights() {
@Override
public WeightFunction weightFunctionForShard(ShardRouting shard) {
return weightFunction;
}

@Override
public WeightFunction weightFunctionForNode(RoutingNode node) {
return weightFunction;
}

@Override
public NodeSorters createNodeSorters(
BalancedShardsAllocator.ModelNode[] modelNodes,
BalancedShardsAllocator.Balancer balancer
) {
modelNodesRef.set(modelNodes);
balancerRef.set(balancer);
final BalancedShardsAllocator.NodeSorter nodeSorter = new BalancedShardsAllocator.NodeSorter(
modelNodes,
weightFunction,
balancer
);
return new NodeSorters() {
@Override
public BalancedShardsAllocator.NodeSorter sorterForShard(ShardRouting shard) {
return nodeSorter;
}

@Override
public Iterator<BalancedShardsAllocator.NodeSorter> iterator() {
return Iterators.single(nodeSorter);
}
};
}

@Override
public boolean diskUsageIgnored() {
return true; // This makes the computation ignore disk usage
}
}
);

final String indexName = randomIdentifier();
final var clusterState = createStateWithIndices(anIndex(indexName).shardSizeInBytesForecast(ByteSizeValue.ofGb(8).getBytes()));
final var index = clusterState.metadata().getProject(ProjectId.DEFAULT).index(indexName);

// A cluster info with shard sizes
final var clusterInfo = ClusterInfo.builder()
.shardSizes(Map.of(shardIdentifierFromRouting(new ShardId(index.getIndex(), 0), true), ByteSizeValue.ofGb(8).getBytes()))
.build();
allocator.allocate(
new RoutingAllocation(new AllocationDeciders(List.of()), clusterState, clusterInfo, null, 0L).mutableCloneForSimulation()
);

final var modelNodes = modelNodesRef.get();
assertNotNull(modelNodes);
final var balancer = balancerRef.get();
assertNotNull(balancer);

assertThat(balancer.avgDiskUsageInBytesPerNode(), equalTo(0.0));
Arrays.stream(modelNodes).forEach(modelNode -> assertThat(modelNode.diskUsageInBytes(), equalTo(0.0)));
}

public void testReturnEarlyOnShardAssignmentChanges() {
var shardWriteLoads = new HashMap<ShardId, Double>();
var allocationService = new MockAllocationService(
Expand Down Expand Up @@ -1228,6 +1301,11 @@ public BalancedShardsAllocator.NodeSorter sorterForShard(ShardRouting shard) {
}
};
}

@Override
public boolean diskUsageIgnored() {
return true;
}
}
}

Expand Down