|
26 | 26 | import org.elasticsearch.cluster.metadata.MetadataMappingService; |
27 | 27 | import org.elasticsearch.cluster.metadata.NodesShutdownMetadata; |
28 | 28 | import org.elasticsearch.cluster.metadata.RepositoriesMetadata; |
29 | | -import org.elasticsearch.cluster.node.DiscoveryNode; |
30 | 29 | import org.elasticsearch.cluster.project.ProjectResolver; |
31 | 30 | import org.elasticsearch.cluster.routing.DelayedAllocationService; |
32 | 31 | import org.elasticsearch.cluster.routing.ShardRouting; |
|
44 | 43 | import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator.DesiredBalanceReconcilerAction; |
45 | 44 | import org.elasticsearch.cluster.routing.allocation.allocator.GlobalBalancingWeightsFactory; |
46 | 45 | import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator; |
47 | | -import org.elasticsearch.cluster.routing.allocation.allocator.StatelessBalancingWeightsFactory; |
48 | 46 | import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider; |
49 | 47 | import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; |
50 | 48 | import org.elasticsearch.cluster.routing.allocation.decider.AwarenessAllocationDecider; |
@@ -150,11 +148,11 @@ public ClusterModule( |
150 | 148 | this.deciderList = createAllocationDeciders(settings, clusterService.getClusterSettings(), clusterPlugins); |
151 | 149 | this.allocationDeciders = new AllocationDeciders(deciderList); |
152 | 150 | final BalancerSettings balancerSettings = new BalancerSettings(clusterService.getClusterSettings()); |
153 | | - // I'm aware that the following is an anti-pattern and will implement as an SPI provider or plugin |
154 | | - // if we decide to go ahead with this. |
155 | | - final BalancingWeightsFactory balancingWeightsFactory = DiscoveryNode.isStateless(settings) |
156 | | - ? new StatelessBalancingWeightsFactory(balancerSettings, clusterService.getClusterSettings()) |
157 | | - : new GlobalBalancingWeightsFactory(balancerSettings); |
| 151 | + final BalancingWeightsFactory balancingWeightsFactory = getBalancingWeightsFactory( |
| 152 | + clusterPlugins, |
| 153 | + balancerSettings, |
| 154 | + clusterService.getClusterSettings() |
| 155 | + ); |
158 | 156 | var nodeAllocationStatsAndWeightsCalculator = new NodeAllocationStatsAndWeightsCalculator( |
159 | 157 | writeLoadForecaster, |
160 | 158 | balancingWeightsFactory |
@@ -216,6 +214,22 @@ public ShardRouting.Role newEmptyRole(int copyIndex) { |
216 | 214 | }; |
217 | 215 | } |
218 | 216 |
|
| 217 | + static BalancingWeightsFactory getBalancingWeightsFactory( |
| 218 | + List<ClusterPlugin> clusterPlugins, |
| 219 | + BalancerSettings balancerSettings, |
| 220 | + ClusterSettings clusterSettings |
| 221 | + ) { |
| 222 | + final var strategies = clusterPlugins.stream() |
| 223 | + .map(pl -> pl.getBalancingWeightsFactory(balancerSettings, clusterSettings)) |
| 224 | + .filter(Objects::nonNull) |
| 225 | + .toList(); |
| 226 | + return switch (strategies.size()) { |
| 227 | + case 0 -> new GlobalBalancingWeightsFactory(balancerSettings); |
| 228 | + case 1 -> strategies.getFirst(); |
| 229 | + default -> throw new IllegalArgumentException("multiple plugins define balancing weights factories, which is not permitted"); |
| 230 | + }; |
| 231 | + } |
| 232 | + |
219 | 233 | private ClusterState reconcile(ClusterState clusterState, RerouteStrategy rerouteStrategy) { |
220 | 234 | return allocationService.executeWithRoutingAllocation(clusterState, "reconcile-desired-balance", rerouteStrategy); |
221 | 235 | } |
|
0 commit comments