3838import org .elasticsearch .cluster .routing .allocation .WriteLoadForecaster ;
3939import org .elasticsearch .cluster .routing .allocation .allocator .BalancedShardsAllocator ;
4040import org .elasticsearch .cluster .routing .allocation .allocator .BalancerSettings ;
41+ import org .elasticsearch .cluster .routing .allocation .allocator .BalancingWeightsFactory ;
4142import org .elasticsearch .cluster .routing .allocation .allocator .DesiredBalanceShardsAllocator ;
4243import org .elasticsearch .cluster .routing .allocation .allocator .DesiredBalanceShardsAllocator .DesiredBalanceReconcilerAction ;
44+ import org .elasticsearch .cluster .routing .allocation .allocator .GlobalBalancingWeightsFactory ;
4345import org .elasticsearch .cluster .routing .allocation .allocator .ShardsAllocator ;
4446import org .elasticsearch .cluster .routing .allocation .decider .AllocationDecider ;
4547import org .elasticsearch .cluster .routing .allocation .decider .AllocationDeciders ;
@@ -146,11 +148,20 @@ public ClusterModule(
146148 this .deciderList = createAllocationDeciders (settings , clusterService .getClusterSettings (), clusterPlugins );
147149 this .allocationDeciders = new AllocationDeciders (deciderList );
148150 final BalancerSettings balancerSettings = new BalancerSettings (clusterService .getClusterSettings ());
149- var nodeAllocationStatsAndWeightsCalculator = new NodeAllocationStatsAndWeightsCalculator (writeLoadForecaster , balancerSettings );
151+ final BalancingWeightsFactory balancingWeightsFactory = getBalancingWeightsFactory (
152+ clusterPlugins ,
153+ balancerSettings ,
154+ clusterService .getClusterSettings ()
155+ );
156+ var nodeAllocationStatsAndWeightsCalculator = new NodeAllocationStatsAndWeightsCalculator (
157+ writeLoadForecaster ,
158+ balancingWeightsFactory
159+ );
150160 this .shardsAllocator = createShardsAllocator (
151161 settings ,
152162 clusterService .getClusterSettings (),
153163 balancerSettings ,
164+ balancingWeightsFactory ,
154165 threadPool ,
155166 clusterPlugins ,
156167 clusterService ,
@@ -203,6 +214,22 @@ public ShardRouting.Role newEmptyRole(int copyIndex) {
203214 };
204215 }
205216
217+ static BalancingWeightsFactory getBalancingWeightsFactory (
218+ List <ClusterPlugin > clusterPlugins ,
219+ BalancerSettings balancerSettings ,
220+ ClusterSettings clusterSettings
221+ ) {
222+ final var strategies = clusterPlugins .stream ()
223+ .map (pl -> pl .getBalancingWeightsFactory (balancerSettings , clusterSettings ))
224+ .filter (Objects ::nonNull )
225+ .toList ();
226+ return switch (strategies .size ()) {
227+ case 0 -> new GlobalBalancingWeightsFactory (balancerSettings );
228+ case 1 -> strategies .getFirst ();
229+ default -> throw new IllegalArgumentException ("multiple plugins define balancing weights factories, which is not permitted" );
230+ };
231+ }
232+
206233 private ClusterState reconcile (ClusterState clusterState , RerouteStrategy rerouteStrategy ) {
207234 return allocationService .executeWithRoutingAllocation (clusterState , "reconcile-desired-balance" , rerouteStrategy );
208235 }
@@ -439,6 +466,7 @@ private static ShardsAllocator createShardsAllocator(
439466 Settings settings ,
440467 ClusterSettings clusterSettings ,
441468 BalancerSettings balancerSettings ,
469+ BalancingWeightsFactory balancingWeightsFactory ,
442470 ThreadPool threadPool ,
443471 List <ClusterPlugin > clusterPlugins ,
444472 ClusterService clusterService ,
@@ -448,12 +476,15 @@ private static ShardsAllocator createShardsAllocator(
448476 NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator
449477 ) {
450478 Map <String , Supplier <ShardsAllocator >> allocators = new HashMap <>();
451- allocators .put (BALANCED_ALLOCATOR , () -> new BalancedShardsAllocator (balancerSettings , writeLoadForecaster ));
479+ allocators .put (
480+ BALANCED_ALLOCATOR ,
481+ () -> new BalancedShardsAllocator (balancerSettings , writeLoadForecaster , balancingWeightsFactory )
482+ );
452483 allocators .put (
453484 DESIRED_BALANCE_ALLOCATOR ,
454485 () -> new DesiredBalanceShardsAllocator (
455486 clusterSettings ,
456- new BalancedShardsAllocator (balancerSettings , writeLoadForecaster ),
487+ new BalancedShardsAllocator (balancerSettings , writeLoadForecaster , balancingWeightsFactory ),
457488 threadPool ,
458489 clusterService ,
459490 reconciler ,
0 commit comments