Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -324,8 +324,10 @@ private Balancer(
this.threshold = threshold;
avgShardsPerNode = WeightFunction.avgShardPerNode(metadata, routingNodes);
avgWriteLoadPerNode = WeightFunction.avgWriteLoadPerNode(writeLoadForecaster, metadata, routingNodes);
avgDiskUsageInBytesPerNode = WeightFunction.avgDiskUsageInBytesPerNode(allocation.clusterInfo(), metadata, routingNodes);
nodes = Collections.unmodifiableMap(buildModelFromAssigned());
avgDiskUsageInBytesPerNode = balancingWeights.diskUsageIgnored()
? 0
: WeightFunction.avgDiskUsageInBytesPerNode(allocation.clusterInfo(), metadata, routingNodes);
nodes = Collections.unmodifiableMap(buildModelFromAssigned(balancingWeights.diskUsageIgnored()));
this.nodeSorters = balancingWeights.createNodeSorters(nodesArray(), this);
this.balancingWeights = balancingWeights;
this.completeEarlyOnShardAssignmentChange = completeEarlyOnShardAssignmentChange;
Expand All @@ -345,7 +347,12 @@ private float getShardWriteLoad(ProjectIndex index) {
return (float) writeLoadForecaster.getForecastedWriteLoad(projectMetadata.index(index.indexName)).orElse(0.0);
}

// This method is used only by NodeSorter#minWeightDelta to compute the node weight delta.
// Hence, we can return 0 when disk usage is ignored. Any future usage of this method should review whether this still holds.
private float maxShardSizeBytes(ProjectIndex index) {
if (balancingWeights.diskUsageIgnored()) {
return 0;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This bit I have minor apprehensions about. But it seems like it's not an easy one to skip on the caller side. And we do have that information available to us here via the balancingWeights.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can remove this change if you prefer. It does not really show up in the flamegraph. So I could be over-zealous.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah maybe that would be nicer. The behaviour seems a little surprising potentially. It would seem safer if the caller was skipping the call rather than the callee just returning zero.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It turns out that we can check it at the call site. Not sure why I initially thought it was not feasible ... Pushed 0affb99
Let me know if this works for you. Thanks!

final var indexMetadata = indexMetadata(index);
if (indexMetadata.ignoreDiskWatermarks()) {
// disk watermarks are ignored for partial searchable snapshots
Expand Down Expand Up @@ -1156,10 +1163,10 @@ private Decision decideCanForceAllocateForVacate(ShardRouting shardRouting, Rout
* on the target node which we respect during the allocation / balancing
* process. In short, this method recreates the status-quo in the cluster.
*/
private Map<String, ModelNode> buildModelFromAssigned() {
private Map<String, ModelNode> buildModelFromAssigned(boolean diskUsageIgnored) {
Map<String, ModelNode> nodes = Maps.newMapWithExpectedSize(routingNodes.size());
for (RoutingNode rn : routingNodes) {
ModelNode node = new ModelNode(writeLoadForecaster, metadata, allocation.clusterInfo(), rn);
ModelNode node = new ModelNode(writeLoadForecaster, metadata, allocation.clusterInfo(), rn, diskUsageIgnored);
nodes.put(rn.nodeId(), node);
for (ShardRouting shard : rn) {
assert rn.nodeId().equals(shard.currentNodeId());
Expand Down Expand Up @@ -1476,13 +1483,21 @@ public static class ModelNode implements Iterable<ModelIndex> {
private final ClusterInfo clusterInfo;
private final RoutingNode routingNode;
private final Map<ProjectIndex, ModelIndex> indices;
private final boolean diskUsageIgnored;

public ModelNode(WriteLoadForecaster writeLoadForecaster, Metadata metadata, ClusterInfo clusterInfo, RoutingNode routingNode) {
public ModelNode(
WriteLoadForecaster writeLoadForecaster,
Metadata metadata,
ClusterInfo clusterInfo,
RoutingNode routingNode,
boolean diskUsageIgnored
) {
this.writeLoadForecaster = writeLoadForecaster;
this.metadata = metadata;
this.clusterInfo = clusterInfo;
this.routingNode = routingNode;
this.indices = Maps.newMapWithExpectedSize(routingNode.size() + 10);// some extra to account for shard movements
this.diskUsageIgnored = diskUsageIgnored;
}

public ModelIndex getIndex(ProjectIndex index) {
Expand Down Expand Up @@ -1527,7 +1542,9 @@ public void addShard(ProjectIndex index, ShardRouting shard) {
indices.computeIfAbsent(index, t -> new ModelIndex()).addShard(shard);
IndexMetadata indexMetadata = metadata.getProject(index.project).index(shard.index());
writeLoad += writeLoadForecaster.getForecastedWriteLoad(indexMetadata).orElse(0.0);
diskUsageInBytes += Balancer.getShardDiskUsageInBytes(shard, indexMetadata, clusterInfo);
if (diskUsageIgnored == false) {
diskUsageInBytes += Balancer.getShardDiskUsageInBytes(shard, indexMetadata, clusterInfo);
}
numShards++;
}

Expand All @@ -1541,7 +1558,9 @@ public void removeShard(ProjectIndex projectIndex, ShardRouting shard) {
}
IndexMetadata indexMetadata = metadata.getProject(projectIndex.project).index(shard.index());
writeLoad -= writeLoadForecaster.getForecastedWriteLoad(indexMetadata).orElse(0.0);
diskUsageInBytes -= Balancer.getShardDiskUsageInBytes(shard, indexMetadata, clusterInfo);
if (diskUsageIgnored == false) {
diskUsageInBytes -= Balancer.getShardDiskUsageInBytes(shard, indexMetadata, clusterInfo);
}
numShards--;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,9 @@ public interface BalancingWeights {
* @return a {@link NodeSorters} instance
*/
NodeSorters createNodeSorters(BalancedShardsAllocator.ModelNode[] modelNodes, BalancedShardsAllocator.Balancer balancer);

/**
* Returns true if disk usage is ignored for the purposes of weight calculations
*/
boolean diskUsageIgnored();
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,17 @@ public BalancingWeights create() {
private class GlobalBalancingWeights implements BalancingWeights {

private final WeightFunction weightFunction;
private final boolean diskUsageIgnored;

GlobalBalancingWeights() {
final float diskUsageBalanceFactor = balancerSettings.getDiskUsageBalanceFactor();
this.weightFunction = new WeightFunction(
balancerSettings.getShardBalanceFactor(),
balancerSettings.getIndexBalanceFactor(),
balancerSettings.getWriteLoadBalanceFactor(),
balancerSettings.getDiskUsageBalanceFactor()
diskUsageBalanceFactor
);
this.diskUsageIgnored = diskUsageBalanceFactor == 0;
}

@Override
Expand All @@ -56,6 +59,11 @@ public NodeSorters createNodeSorters(BalancedShardsAllocator.ModelNode[] modelNo
return new GlobalNodeSorters(new BalancedShardsAllocator.NodeSorter(modelNodes, weightFunction, balancer));
}

@Override
public boolean diskUsageIgnored() {
return diskUsageIgnored;
}

private record GlobalNodeSorters(BalancedShardsAllocator.NodeSorter nodeSorter) implements NodeSorters {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.elasticsearch.cluster.routing.allocation.decider.SameShardAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
Expand All @@ -63,6 +64,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.DoubleSupplier;
import java.util.function.Function;
import java.util.stream.Collector;
Expand All @@ -74,6 +76,7 @@
import static java.util.stream.Collectors.summingDouble;
import static java.util.stream.Collectors.summingLong;
import static java.util.stream.Collectors.toSet;
import static org.elasticsearch.cluster.ClusterInfo.shardIdentifierFromRouting;
import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
import static org.elasticsearch.cluster.routing.TestShardRouting.shardRoutingBuilder;
import static org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.Balancer.PrioritiseByShardWriteLoadComparator.THRESHOLD_RATIO;
Expand Down Expand Up @@ -614,9 +617,9 @@ public void testShardSizeDiscrepancyWithinIndex() {
() -> ClusterInfo.builder()
.shardSizes(
Map.of(
ClusterInfo.shardIdentifierFromRouting(new ShardId(index, 0), true),
shardIdentifierFromRouting(new ShardId(index, 0), true),
0L,
ClusterInfo.shardIdentifierFromRouting(new ShardId(index, 1), true),
shardIdentifierFromRouting(new ShardId(index, 1), true),
ByteSizeUnit.GB.toBytes(500)
)
)
Expand Down Expand Up @@ -691,6 +694,87 @@ public void testPartitionedClusterWithSeparateWeights() {
assertThat(shardBalancedPartition.get("shardsOnly-2"), hasSize(3));
}

public void testSkipDiskUsageComputation() {
final var modelNodesRef = new AtomicReference<BalancedShardsAllocator.ModelNode[]>();
final var balancerRef = new AtomicReference<BalancedShardsAllocator.Balancer>();
// Intentionally configure disk weight factor to be non-zero so that the test would fail if disk usage is not ignored
final var weightFunction = new WeightFunction(1, 1, 1, 1);
final var allocator = new BalancedShardsAllocator(
BalancerSettings.DEFAULT,
TEST_WRITE_LOAD_FORECASTER,
() -> new BalancingWeights() {
@Override
public WeightFunction weightFunctionForShard(ShardRouting shard) {
return weightFunction;
}

@Override
public WeightFunction weightFunctionForNode(RoutingNode node) {
return weightFunction;
}

@Override
public NodeSorters createNodeSorters(
BalancedShardsAllocator.ModelNode[] modelNodes,
BalancedShardsAllocator.Balancer balancer
) {
modelNodesRef.set(modelNodes);
balancerRef.set(balancer);
final BalancedShardsAllocator.NodeSorter nodeSorter = new BalancedShardsAllocator.NodeSorter(
modelNodes,
weightFunction,
balancer
);
return new NodeSorters() {
@Override
public BalancedShardsAllocator.NodeSorter sorterForShard(ShardRouting shard) {
return nodeSorter;
}

@Override
public Iterator<BalancedShardsAllocator.NodeSorter> iterator() {
return Iterators.single(nodeSorter);
}
};
}

@Override
public boolean diskUsageIgnored() {
return true; // This makes the computation ignore disk usage
}
}
);

final String indexName = randomIdentifier();
final var clusterState = createStateWithIndices(anIndex(indexName).shardSizeInBytesForecast(ByteSizeValue.ofGb(8).getBytes()));
final var index = clusterState.metadata().getProject(ProjectId.DEFAULT).index(indexName);

// A cluster info with shard sizes
final var clusterInfo = new ClusterInfo(
Map.of(),
Map.of(),
Map.of(shardIdentifierFromRouting(new ShardId(index.getIndex(), 0), true), ByteSizeValue.ofGb(8).getBytes()),
Map.of(),
Map.of(),
Map.of(),
Map.of(),
Map.of(),
Map.of(),
Map.of()
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: could use ClusterInfo.builder().shardSizes(...).build()?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forgot there is builder. Pushed b7e22fc looks much nicer! Thanks!

allocator.allocate(
new RoutingAllocation(new AllocationDeciders(List.of()), clusterState, clusterInfo, null, 0L).mutableCloneForSimulation()
);

final var modelNodes = modelNodesRef.get();
assertNotNull(modelNodes);
final var balancer = balancerRef.get();
assertNotNull(balancer);

assertThat(balancer.avgDiskUsageInBytesPerNode(), equalTo(0.0));
Arrays.stream(modelNodes).forEach(modelNode -> assertThat(modelNode.diskUsageInBytes(), equalTo(0.0)));
}

public void testReturnEarlyOnShardAssignmentChanges() {
var shardWriteLoads = new HashMap<ShardId, Double>();
var allocationService = new MockAllocationService(
Expand Down Expand Up @@ -1228,6 +1312,11 @@ public BalancedShardsAllocator.NodeSorter sorterForShard(ShardRouting shard) {
}
};
}

@Override
public boolean diskUsageIgnored() {
return true;
}
}
}

Expand Down