Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
f36bfd5
Different approach at allowing separate weights per tier
nicktindall Apr 2, 2025
45f8c44
Merge remote-tracking branch 'origin/main' into separate_weights_per_…
nicktindall Apr 2, 2025
b40f3a4
Compiling without errors
nicktindall Apr 3, 2025
91b2072
Minimised
nicktindall Apr 3, 2025
4807773
Hack in tiered partitions for serverless
nicktindall Apr 3, 2025
05a27b6
Merge branch 'main' into separate_weights_per_tier_v2
nicktindall Apr 4, 2025
539499f
Register settings
nicktindall Apr 4, 2025
2db4d2e
Use the same PartitionedClusterFactory for NodeAllocationStatsAndWeig…
nicktindall Apr 4, 2025
8bc0aba
Use the same PartitionedClusterFactory for NodeAllocationStatsAndWeig…
nicktindall Apr 4, 2025
6e7d2e0
Move balancer settings testing to use BalancerSettings
nicktindall Apr 4, 2025
453e819
Tidy
nicktindall Apr 4, 2025
daff958
Add assertion around tiering
nicktindall Apr 4, 2025
5fbf544
Handle case when there are no search nodes
nicktindall Apr 4, 2025
f8f579c
Handle zero nodes in NodeSorter
nicktindall Apr 8, 2025
afea2ef
Merge remote-tracking branch 'origin/main' into separate_weights_per_…
nicktindall Apr 8, 2025
2f87308
Update docs/changelog/126091.yaml
nicktindall Apr 8, 2025
321ce06
Rename PartitionedCluster(Factory), add javadoc
nicktindall Apr 14, 2025
3557b54
[CI] Auto commit changes from spotless
Apr 14, 2025
f186c56
TieredBalancingWeightsFactory -> StatelessBalancingWeightsFactory
nicktindall Apr 14, 2025
0679990
Allow balancer weights factory to be provided by ClusterPlugin
nicktindall Apr 14, 2025
0dbd4ca
Merge remote-tracking branch 'origin/main' into separate_weights_per_…
nicktindall Apr 14, 2025
49429f1
Move stateless parts to a separate PR
nicktindall Apr 14, 2025
6952169
Fix changelog message
nicktindall Apr 14, 2025
2d1ec90
Make ModelNode and NodeSorter public
nicktindall Apr 14, 2025
9e7f13d
Merge branch 'main' into separate_weights_per_tier_v2
nicktindall Apr 15, 2025
99c9144
Merge branch 'main' into separate_weights_per_tier_v2
nicktindall Apr 15, 2025
bfdc214
Merge remote-tracking branch 'origin/main' into separate_weights_per_…
nicktindall Apr 17, 2025
b9d0241
PartitionedNodeSorter -> NodeSorters
nicktindall Apr 17, 2025
218901f
Add tests for weights-by-partition
nicktindall Apr 17, 2025
da8a69e
Merge remote-tracking branch 'origin/main' into separate_weights_per_…
nicktindall Apr 17, 2025
e50bd1b
Merge remote-tracking branch 'origin/main' into separate_weights_per_…
nicktindall Apr 22, 2025
3361bbe
Make NodeSorters iterable
nicktindall Apr 22, 2025
b9403bb
Javadoc on NodeSorter
nicktindall Apr 22, 2025
da14071
Merge remote-tracking branch 'origin/main' into separate_weights_per_…
nicktindall Apr 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions server/src/main/java/org/elasticsearch/cluster/ClusterModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,11 @@
import org.elasticsearch.cluster.routing.allocation.NodeAllocationStatsAndWeightsCalculator;
import org.elasticsearch.cluster.routing.allocation.WriteLoadForecaster;
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.allocator.BalancerSettings;
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator.DesiredBalanceReconcilerAction;
import org.elasticsearch.cluster.routing.allocation.allocator.GlobalPartitionedClusterFactory;
import org.elasticsearch.cluster.routing.allocation.allocator.PartitionedClusterFactory;
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
Expand Down Expand Up @@ -145,13 +148,15 @@ public ClusterModule(
this.clusterPlugins = clusterPlugins;
this.deciderList = createAllocationDeciders(settings, clusterService.getClusterSettings(), clusterPlugins);
this.allocationDeciders = new AllocationDeciders(deciderList);
final BalancerSettings balancerSettings = new BalancerSettings(clusterService.getClusterSettings());
var nodeAllocationStatsAndWeightsCalculator = new NodeAllocationStatsAndWeightsCalculator(
writeLoadForecaster,
clusterService.getClusterSettings()
new GlobalPartitionedClusterFactory(balancerSettings)
);
this.shardsAllocator = createShardsAllocator(
settings,
clusterService.getClusterSettings(),
balancerSettings,
threadPool,
clusterPlugins,
clusterService,
Expand Down Expand Up @@ -440,6 +445,7 @@ private static void addAllocationDecider(Map<Class<?>, AllocationDecider> decide
private static ShardsAllocator createShardsAllocator(
Settings settings,
ClusterSettings clusterSettings,
BalancerSettings balancerSettings,
ThreadPool threadPool,
List<ClusterPlugin> clusterPlugins,
ClusterService clusterService,
Expand All @@ -449,12 +455,16 @@ private static ShardsAllocator createShardsAllocator(
NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator
) {
Map<String, Supplier<ShardsAllocator>> allocators = new HashMap<>();
allocators.put(BALANCED_ALLOCATOR, () -> new BalancedShardsAllocator(clusterSettings, writeLoadForecaster));
final PartitionedClusterFactory partitionedClusterFactory = new GlobalPartitionedClusterFactory(balancerSettings);
allocators.put(
BALANCED_ALLOCATOR,
() -> new BalancedShardsAllocator(balancerSettings, writeLoadForecaster, partitionedClusterFactory)
);
allocators.put(
DESIRED_BALANCE_ALLOCATOR,
() -> new DesiredBalanceShardsAllocator(
clusterSettings,
new BalancedShardsAllocator(clusterSettings, writeLoadForecaster),
new BalancedShardsAllocator(balancerSettings, writeLoadForecaster, partitionedClusterFactory),
threadPool,
clusterService,
reconciler,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalance;
import org.elasticsearch.cluster.routing.allocation.allocator.PartitionedCluster;
import org.elasticsearch.cluster.routing.allocation.allocator.PartitionedClusterFactory;
import org.elasticsearch.cluster.routing.allocation.allocator.WeightFunction;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.core.Nullable;

Expand All @@ -29,11 +29,7 @@
*/
public class NodeAllocationStatsAndWeightsCalculator {
private final WriteLoadForecaster writeLoadForecaster;

private volatile float indexBalanceFactor;
private volatile float shardBalanceFactor;
private volatile float writeLoadBalanceFactor;
private volatile float diskUsageBalanceFactor;
private final PartitionedClusterFactory partitionedClusterFactory;

/**
* Node shard allocation stats and the total node weight.
Expand All @@ -47,18 +43,12 @@ public record NodeAllocationStatsAndWeight(
float currentNodeWeight
) {}

public NodeAllocationStatsAndWeightsCalculator(WriteLoadForecaster writeLoadForecaster, ClusterSettings clusterSettings) {
public NodeAllocationStatsAndWeightsCalculator(
WriteLoadForecaster writeLoadForecaster,
PartitionedClusterFactory partitionedClusterFactory
) {
this.writeLoadForecaster = writeLoadForecaster;
clusterSettings.initializeAndWatch(BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING, value -> this.shardBalanceFactor = value);
clusterSettings.initializeAndWatch(BalancedShardsAllocator.INDEX_BALANCE_FACTOR_SETTING, value -> this.indexBalanceFactor = value);
clusterSettings.initializeAndWatch(
BalancedShardsAllocator.WRITE_LOAD_BALANCE_FACTOR_SETTING,
value -> this.writeLoadBalanceFactor = value
);
clusterSettings.initializeAndWatch(
BalancedShardsAllocator.DISK_USAGE_BALANCE_FACTOR_SETTING,
value -> this.diskUsageBalanceFactor = value
);
this.partitionedClusterFactory = partitionedClusterFactory;
}

/**
Expand All @@ -74,13 +64,14 @@ public Map<String, NodeAllocationStatsAndWeight> nodesAllocationStatsAndWeights(
// must not use licensed features when just starting up
writeLoadForecaster.refreshLicense();
}
var weightFunction = new WeightFunction(shardBalanceFactor, indexBalanceFactor, writeLoadBalanceFactor, diskUsageBalanceFactor);
final PartitionedCluster partitionedCluster = partitionedClusterFactory.create();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't change from call to call -- maybe it did once upon a time? Or does it somehow?

Shouldn't the PartitionedCluster (a global or tiered instance) be created once and locally save in the constructor, and then repeatedly use the saved PartitionedCluster instance?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I mentioned above it's just about capturing the weights at this point so they remain static throughout the balancing. If we kept a single long-lived instance it would have to keep a reference to the dynamic weights and they may be updated mid-balance. It's probably not the end of the world but I was trying to keep the behaviour consistent with the existing implementation.

var avgShardsPerNode = WeightFunction.avgShardPerNode(metadata, routingNodes);
var avgWriteLoadPerNode = WeightFunction.avgWriteLoadPerNode(writeLoadForecaster, metadata, routingNodes);
var avgDiskUsageInBytesPerNode = WeightFunction.avgDiskUsageInBytesPerNode(clusterInfo, metadata, routingNodes);

var nodeAllocationStatsAndWeights = Maps.<String, NodeAllocationStatsAndWeight>newMapWithExpectedSize(routingNodes.size());
for (RoutingNode node : routingNodes) {
WeightFunction weightFunction = partitionedCluster.weightFunctionForNode(node);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
WeightFunction weightFunction = partitionedCluster.weightFunctionForNode(node);
WeightFunction nodeWeightFunction = partitionedCluster.weightFunctionForNode(node);

Nit. Since it's per node now, might be nice to reflect that.

int shards = 0;
int undesiredShards = 0;
double forecastedWriteLoad = 0.0;
Expand Down
Loading