@@ -100,13 +100,6 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt
100
100
private volatile TimeValue updateFrequency ;
101
101
private volatile TimeValue fetchTimeout ;
102
102
103
- private volatile Map <String , DiskUsage > leastAvailableSpaceUsages ;
104
- private volatile Map <String , DiskUsage > mostAvailableSpaceUsages ;
105
- private volatile Map <String , ByteSizeValue > maxHeapPerNode ;
106
- private volatile Map <String , Long > estimatedHeapUsagePerNode ;
107
- private volatile Map <String , NodeUsageStatsForThreadPools > nodeThreadPoolUsageStatsPerNode ;
108
- private volatile IndicesStatsSummary indicesStatsSummary ;
109
-
110
103
private final ThreadPool threadPool ;
111
104
private final Client client ;
112
105
private final Supplier <ClusterState > clusterStateSupplier ;
@@ -131,12 +124,6 @@ public InternalClusterInfoService(
131
124
EstimatedHeapUsageCollector estimatedHeapUsageCollector ,
132
125
NodeUsageStatsForThreadPoolsCollector nodeUsageStatsForThreadPoolsCollector
133
126
) {
134
- this .leastAvailableSpaceUsages = Map .of ();
135
- this .mostAvailableSpaceUsages = Map .of ();
136
- this .maxHeapPerNode = Map .of ();
137
- this .estimatedHeapUsagePerNode = Map .of ();
138
- this .nodeThreadPoolUsageStatsPerNode = Map .of ();
139
- this .indicesStatsSummary = IndicesStatsSummary .EMPTY ;
140
127
this .threadPool = threadPool ;
141
128
this .client = client ;
142
129
this .estimatedHeapUsageCollector = estimatedHeapUsageCollector ;
@@ -210,6 +197,13 @@ public void clusterChanged(ClusterChangedEvent event) {
210
197
211
198
private class AsyncRefresh {
212
199
200
+ private volatile Map <String , DiskUsage > leastAvailableSpaceUsages ;
201
+ private volatile Map <String , DiskUsage > mostAvailableSpaceUsages ;
202
+ private volatile Map <String , ByteSizeValue > maxHeapPerNode ;
203
+ private volatile Map <String , Long > estimatedHeapUsagePerNode ;
204
+ private volatile Map <String , NodeUsageStatsForThreadPools > nodeThreadPoolUsageStatsPerNode ;
205
+ private volatile IndicesStatsSummary indicesStatsSummary ;
206
+
213
207
private final List <ActionListener <ClusterInfo >> thisRefreshListeners ;
214
208
private final RefCountingRunnable fetchRefs = new RefCountingRunnable (this ::callListeners );
215
209
@@ -475,6 +469,32 @@ private void callListeners() {
475
469
onRefreshComplete (this );
476
470
}
477
471
}
472
+
473
+ private ClusterInfo updateAndGetCurrentClusterInfo () {
474
+ final IndicesStatsSummary indicesStatsSummary = this .indicesStatsSummary ; // single volatile read
475
+ final Map <String , EstimatedHeapUsage > estimatedHeapUsages = new HashMap <>();
476
+ final var currentMaxHeapPerNode = this .maxHeapPerNode ; // Make sure we use a consistent view
477
+ currentMaxHeapPerNode .forEach ((nodeId , maxHeapSize ) -> {
478
+ final Long estimatedHeapUsage = estimatedHeapUsagePerNode .get (nodeId );
479
+ if (estimatedHeapUsage != null ) {
480
+ estimatedHeapUsages .put (nodeId , new EstimatedHeapUsage (nodeId , maxHeapSize .getBytes (), estimatedHeapUsage ));
481
+ }
482
+ });
483
+ final var newClusterInfo = new ClusterInfo (
484
+ leastAvailableSpaceUsages ,
485
+ mostAvailableSpaceUsages ,
486
+ indicesStatsSummary .shardSizes ,
487
+ indicesStatsSummary .shardDataSetSizes ,
488
+ indicesStatsSummary .dataPath ,
489
+ indicesStatsSummary .reservedSpace ,
490
+ estimatedHeapUsages ,
491
+ nodeThreadPoolUsageStatsPerNode ,
492
+ indicesStatsSummary .shardWriteLoads (),
493
+ currentMaxHeapPerNode
494
+ );
495
+ currentClusterInfo = newClusterInfo ;
496
+ return newClusterInfo ;
497
+ }
478
498
}
479
499
480
500
private void onRefreshComplete (AsyncRefresh completedRefresh ) {
@@ -542,39 +562,6 @@ public ClusterInfo getClusterInfo() {
542
562
return currentClusterInfo ;
543
563
}
544
564
545
- /**
546
- * Compute and return a new ClusterInfo from the most recently fetched stats and update {@link #currentClusterInfo} to it.
547
- * Note the method is called when a {@link AsyncRefresh} has received all the stats it requested. Since there can only be
548
- * a single AsyncRefresh at a time, the various stats used to compose the final results are guaranteed to be from a single
549
- * refresh cycle for consistency. Note that users of this class must call {@link #getClusterInfo()} to get the latest
550
- * computed and cached ClusterInfo and avoid accessing individual stats directly.
551
- */
552
- private ClusterInfo updateAndGetCurrentClusterInfo () {
553
- final IndicesStatsSummary indicesStatsSummary = this .indicesStatsSummary ; // single volatile read
554
- final Map <String , EstimatedHeapUsage > estimatedHeapUsages = new HashMap <>();
555
- final var currentMaxHeapPerNode = this .maxHeapPerNode ; // Make sure we use a consistent view
556
- currentMaxHeapPerNode .forEach ((nodeId , maxHeapSize ) -> {
557
- final Long estimatedHeapUsage = estimatedHeapUsagePerNode .get (nodeId );
558
- if (estimatedHeapUsage != null ) {
559
- estimatedHeapUsages .put (nodeId , new EstimatedHeapUsage (nodeId , maxHeapSize .getBytes (), estimatedHeapUsage ));
560
- }
561
- });
562
- final var newClusterInfo = new ClusterInfo (
563
- leastAvailableSpaceUsages ,
564
- mostAvailableSpaceUsages ,
565
- indicesStatsSummary .shardSizes ,
566
- indicesStatsSummary .shardDataSetSizes ,
567
- indicesStatsSummary .dataPath ,
568
- indicesStatsSummary .reservedSpace ,
569
- estimatedHeapUsages ,
570
- nodeThreadPoolUsageStatsPerNode ,
571
- indicesStatsSummary .shardWriteLoads (),
572
- currentMaxHeapPerNode
573
- );
574
- currentClusterInfo = newClusterInfo ;
575
- return newClusterInfo ;
576
- }
577
-
578
565
// allow tests to adjust the node stats on receipt
579
566
List <NodeStats > adjustNodesStats (List <NodeStats > nodeStats ) {
580
567
return nodeStats ;
0 commit comments