1313import org .elasticsearch .action .ActionListener ;
1414import org .elasticsearch .action .ActionRequestValidationException ;
1515import org .elasticsearch .action .ActionResponse ;
16+ import org .elasticsearch .action .ActionRunnable ;
1617import org .elasticsearch .action .ActionType ;
18+ import org .elasticsearch .action .SingleResultDeduplicator ;
1719import org .elasticsearch .action .admin .cluster .node .stats .NodesStatsRequestParameters .Metric ;
1820import org .elasticsearch .action .support .ActionFilters ;
21+ import org .elasticsearch .action .support .SubscribableListener ;
1922import org .elasticsearch .action .support .master .MasterNodeReadRequest ;
2023import org .elasticsearch .action .support .master .TransportMasterNodeReadAction ;
2124import org .elasticsearch .cluster .ClusterState ;
2831import org .elasticsearch .cluster .service .ClusterService ;
2932import org .elasticsearch .common .io .stream .StreamInput ;
3033import org .elasticsearch .common .io .stream .StreamOutput ;
34+ import org .elasticsearch .common .util .concurrent .EsExecutors ;
3135import org .elasticsearch .core .Nullable ;
3236import org .elasticsearch .core .TimeValue ;
3337import org .elasticsearch .features .FeatureService ;
@@ -47,7 +51,7 @@ public class TransportGetAllocationStatsAction extends TransportMasterNodeReadAc
4751
4852 public static final ActionType <TransportGetAllocationStatsAction .Response > TYPE = new ActionType <>("cluster:monitor/allocation/stats" );
4953
50- private final AllocationStatsService allocationStatsService ;
54+ private final SingleResultDeduplicator < Map < String , NodeAllocationStats >> allocationStatsSupplier ;
5155 private final DiskThresholdSettings diskThresholdSettings ;
5256 private final FeatureService featureService ;
5357
@@ -70,9 +74,15 @@ public TransportGetAllocationStatsAction(
7074 TransportGetAllocationStatsAction .Request ::new ,
7175 indexNameExpressionResolver ,
7276 TransportGetAllocationStatsAction .Response ::new ,
73- threadPool .executor (ThreadPool .Names .MANAGEMENT )
77+ // DIRECT is ok here because we fork the allocation stats computation onto a MANAGEMENT thread if needed, or else we return
78+ // very cheaply.
79+ EsExecutors .DIRECT_EXECUTOR_SERVICE
80+ );
81+ final var managementExecutor = threadPool .executor (ThreadPool .Names .MANAGEMENT );
82+ this .allocationStatsSupplier = new SingleResultDeduplicator <>(
83+ threadPool .getThreadContext (),
84+ l -> managementExecutor .execute (ActionRunnable .supply (l , allocationStatsService ::stats ))
7485 );
75- this .allocationStatsService = allocationStatsService ;
7686 this .diskThresholdSettings = new DiskThresholdSettings (clusterService .getSettings (), clusterService .getClusterSettings ());
7787 this .featureService = featureService ;
7888 }
@@ -89,15 +99,21 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
8999
90100 @ Override
91101 protected void masterOperation (Task task , Request request , ClusterState state , ActionListener <Response > listener ) throws Exception {
92- listener .onResponse (
93- new Response (
94- request .metrics ().contains (Metric .ALLOCATIONS ) ? allocationStatsService .stats () : Map .of (),
102+ // NB we are still on a transport thread here - if adding more functionality here make sure to fork to a different pool
103+
104+ final SubscribableListener <Map <String , NodeAllocationStats >> allocationStatsStep = request .metrics ().contains (Metric .ALLOCATIONS )
105+ ? SubscribableListener .newForked (allocationStatsSupplier ::execute )
106+ : SubscribableListener .newSucceeded (Map .of ());
107+
108+ allocationStatsStep .andThenApply (
109+ allocationStats -> new Response (
110+ allocationStats ,
95111 request .metrics ().contains (Metric .FS )
96112 && featureService .clusterHasFeature (clusterService .state (), AllocationStatsFeatures .INCLUDE_DISK_THRESHOLD_SETTINGS )
97113 ? diskThresholdSettings
98114 : null
99115 )
100- );
116+ ). addListener ( listener ) ;
101117 }
102118
103119 @ Override
0 commit comments