53
53
import java .util .Iterator ;
54
54
import java .util .List ;
55
55
import java .util .Map ;
56
- import java .util .OptionalLong ;
57
56
import java .util .Set ;
58
57
import java .util .function .BiFunction ;
59
58
import java .util .stream .StreamSupport ;
@@ -321,26 +320,26 @@ float minWeightDelta(Balancer balancer, String index) {
321
320
* A {@link Balancer}
322
321
*/
323
322
public static class Balancer {
324
- private final Map <String , ModelNode > nodes ;
325
323
private final WriteLoadForecaster writeLoadForecaster ;
326
324
private final RoutingAllocation allocation ;
327
325
private final RoutingNodes routingNodes ;
326
+ private final Metadata metadata ;
328
327
private final WeightFunction weight ;
329
328
330
329
private final float threshold ;
331
- private final Metadata metadata ;
332
330
private final float avgShardsPerNode ;
333
331
private final double avgWriteLoadPerNode ;
334
332
private final double avgDiskUsageInBytesPerNode ;
333
+ private final Map <String , ModelNode > nodes ;
335
334
private final NodeSorter sorter ;
336
335
337
336
public Balancer (WriteLoadForecaster writeLoadForecaster , RoutingAllocation allocation , WeightFunction weight , float threshold ) {
338
337
this .writeLoadForecaster = writeLoadForecaster ;
339
338
this .allocation = allocation ;
340
- this .weight = weight ;
341
- this .threshold = threshold ;
342
339
this .routingNodes = allocation .routingNodes ();
343
340
this .metadata = allocation .metadata ();
341
+ this .weight = weight ;
342
+ this .threshold = threshold ;
344
343
avgShardsPerNode = ((float ) metadata .getTotalNumberOfShards ()) / routingNodes .size ();
345
344
avgWriteLoadPerNode = getTotalWriteLoad (writeLoadForecaster , metadata ) / routingNodes .size ();
346
345
avgDiskUsageInBytesPerNode = ((double ) getTotalDiskUsageInBytes (allocation .clusterInfo (), metadata ) / routingNodes .size ());
@@ -371,15 +370,10 @@ private static long getTotalDiskUsageInBytes(ClusterInfo clusterInfo, Metadata m
371
370
372
371
// Visible for testing
373
372
static long getIndexDiskUsageInBytes (ClusterInfo clusterInfo , IndexMetadata indexMetadata ) {
374
- OptionalLong forecastedShardSizeInBytes = indexMetadata .getForecastedShardSizeInBytes ();
375
- final long indexDiskUsageInBytes ;
376
- if (forecastedShardSizeInBytes .isPresent ()) {
377
- int i = numberOfCopies (indexMetadata );
378
- indexDiskUsageInBytes = forecastedShardSizeInBytes .getAsLong () * i ;
379
- } else {
380
- indexDiskUsageInBytes = getIndexDiskUsageInBytesFromClusterInfo (clusterInfo , indexMetadata );
381
- }
382
- return indexDiskUsageInBytes ;
373
+ var forecastedShardSizeInBytes = indexMetadata .getForecastedShardSizeInBytes ();
374
+ return forecastedShardSizeInBytes .isPresent ()
375
+ ? forecastedShardSizeInBytes .getAsLong () * numberOfCopies (indexMetadata )
376
+ : getIndexDiskUsageInBytesFromClusterInfo (clusterInfo , indexMetadata );
383
377
}
384
378
385
379
private static long getIndexDiskUsageInBytesFromClusterInfo (ClusterInfo clusterInfo , IndexMetadata indexMetadata ) {
@@ -408,6 +402,10 @@ private static long getIndexDiskUsageInBytesFromClusterInfo(ClusterInfo clusterI
408
402
return shardCount == 0 ? 0 : (totalSizeInBytes / shardCount ) * numberOfCopies (indexMetadata );
409
403
}
410
404
405
+ private static long getShardDiskUsageInBytes (ShardRouting shardRouting , IndexMetadata indexMetadata , ClusterInfo clusterInfo ) {
406
+ return indexMetadata .getForecastedShardSizeInBytes ().orElseGet (() -> clusterInfo .getShardSize (shardRouting , 0L ));
407
+ }
408
+
411
409
private static int numberOfCopies (IndexMetadata indexMetadata ) {
412
410
return indexMetadata .getNumberOfShards () * (1 + indexMetadata .getNumberOfReplicas ());
413
411
}
@@ -416,6 +414,14 @@ private double getShardWriteLoad(String index) {
416
414
return writeLoadForecaster .getForecastedWriteLoad (metadata .index (index )).orElse (0.0 );
417
415
}
418
416
417
+ private double diskUsageInBytesPerShard (String index ) {
418
+ var indexMetadata = metadata .index (index );
419
+ var forecastedShardSizeInBytes = indexMetadata .getForecastedShardSizeInBytes ();
420
+ return forecastedShardSizeInBytes .isPresent ()
421
+ ? forecastedShardSizeInBytes .getAsLong ()
422
+ : (double ) getIndexDiskUsageInBytesFromClusterInfo (allocation .clusterInfo (), indexMetadata ) / numberOfCopies (indexMetadata );
423
+ }
424
+
419
425
/**
420
426
* Returns an array view on the nodes in the balancer. Nodes should not be removed from this list.
421
427
*/
@@ -445,10 +451,6 @@ public double avgDiskUsageInBytesPerNode() {
445
451
return avgDiskUsageInBytesPerNode ;
446
452
}
447
453
448
- public double diskUsageInBytesPerShard (String index ) {
449
- return metadata .index (index ).getForecastedShardSizeInBytes ().orElse (0 );
450
- }
451
-
452
454
/**
453
455
* Returns a new {@link NodeSorter} that sorts the nodes based on their
454
456
* current weight with respect to the index passed to the sorter. The
@@ -962,7 +964,7 @@ private Decision decideCanForceAllocateForVacate(ShardRouting shardRouting, Rout
962
964
private Map <String , ModelNode > buildModelFromAssigned () {
963
965
Map <String , ModelNode > nodes = Maps .newMapWithExpectedSize (routingNodes .size ());
964
966
for (RoutingNode rn : routingNodes ) {
965
- ModelNode node = new ModelNode (writeLoadForecaster , metadata , rn );
967
+ ModelNode node = new ModelNode (writeLoadForecaster , metadata , allocation . clusterInfo (), rn );
966
968
nodes .put (rn .nodeId (), node );
967
969
for (ShardRouting shard : rn ) {
968
970
assert rn .nodeId ().equals (shard .currentNodeId ());
@@ -1254,12 +1256,14 @@ static class ModelNode implements Iterable<ModelIndex> {
1254
1256
private double diskUsageInBytes = 0.0 ;
1255
1257
private final WriteLoadForecaster writeLoadForecaster ;
1256
1258
private final Metadata metadata ;
1259
+ private final ClusterInfo clusterInfo ;
1257
1260
private final RoutingNode routingNode ;
1258
1261
private final Map <String , ModelIndex > indices ;
1259
1262
1260
- ModelNode (WriteLoadForecaster writeLoadForecaster , Metadata metadata , RoutingNode routingNode ) {
1263
+ ModelNode (WriteLoadForecaster writeLoadForecaster , Metadata metadata , ClusterInfo clusterInfo , RoutingNode routingNode ) {
1261
1264
this .writeLoadForecaster = writeLoadForecaster ;
1262
1265
this .metadata = metadata ;
1266
+ this .clusterInfo = clusterInfo ;
1263
1267
this .routingNode = routingNode ;
1264
1268
this .indices = Maps .newMapWithExpectedSize (routingNode .size () + 10 );// some extra to account for shard movements
1265
1269
}
@@ -1305,7 +1309,7 @@ public void addShard(ShardRouting shard) {
1305
1309
indices .computeIfAbsent (shard .getIndexName (), t -> new ModelIndex ()).addShard (shard );
1306
1310
IndexMetadata indexMetadata = metadata .index (shard .index ());
1307
1311
writeLoad += writeLoadForecaster .getForecastedWriteLoad (indexMetadata ).orElse (0.0 );
1308
- diskUsageInBytes += indexMetadata . getForecastedShardSizeInBytes (). orElse ( 0 );
1312
+ diskUsageInBytes += Balancer . getShardDiskUsageInBytes ( shard , indexMetadata , clusterInfo );
1309
1313
numShards ++;
1310
1314
}
1311
1315
@@ -1319,7 +1323,7 @@ public void removeShard(ShardRouting shard) {
1319
1323
}
1320
1324
IndexMetadata indexMetadata = metadata .index (shard .index ());
1321
1325
writeLoad -= writeLoadForecaster .getForecastedWriteLoad (indexMetadata ).orElse (0.0 );
1322
- diskUsageInBytes -= indexMetadata . getForecastedShardSizeInBytes (). orElse ( 0 );
1326
+ diskUsageInBytes -= Balancer . getShardDiskUsageInBytes ( shard , indexMetadata , clusterInfo );
1323
1327
numShards --;
1324
1328
}
1325
1329
0 commit comments