1313import org .elasticsearch .action .ActionListener ;
1414import org .elasticsearch .action .ActionRequestValidationException ;
1515import org .elasticsearch .action .ActionResponse ;
16+ import org .elasticsearch .action .ActionRunnable ;
1617import org .elasticsearch .action .ActionType ;
18+ import org .elasticsearch .action .SingleResultDeduplicator ;
1719import org .elasticsearch .action .admin .cluster .node .stats .NodesStatsRequestParameters .Metric ;
1820import org .elasticsearch .action .support .ActionFilters ;
21+ import org .elasticsearch .action .support .SubscribableListener ;
1922import org .elasticsearch .action .support .master .MasterNodeReadRequest ;
2023import org .elasticsearch .action .support .master .TransportMasterNodeReadAction ;
2124import org .elasticsearch .cluster .ClusterState ;
2831import org .elasticsearch .cluster .service .ClusterService ;
2932import org .elasticsearch .common .io .stream .StreamInput ;
3033import org .elasticsearch .common .io .stream .StreamOutput ;
34+ import org .elasticsearch .common .util .concurrent .EsExecutors ;
3135import org .elasticsearch .core .Nullable ;
3236import org .elasticsearch .core .TimeValue ;
3337import org .elasticsearch .features .FeatureService ;
@@ -47,7 +51,7 @@ public class TransportGetAllocationStatsAction extends TransportMasterNodeReadAc
4751
4852 public static final ActionType <TransportGetAllocationStatsAction .Response > TYPE = new ActionType <>("cluster:monitor/allocation/stats" );
4953
50- private final AllocationStatsService allocationStatsService ;
54+ private final SingleResultDeduplicator < Map < String , NodeAllocationStats >> allocationStatsSupplier ;
5155 private final DiskThresholdSettings diskThresholdSettings ;
5256 private final FeatureService featureService ;
5357
@@ -69,9 +73,15 @@ public TransportGetAllocationStatsAction(
6973 actionFilters ,
7074 TransportGetAllocationStatsAction .Request ::new ,
7175 TransportGetAllocationStatsAction .Response ::new ,
72- threadPool .executor (ThreadPool .Names .MANAGEMENT )
76+ // DIRECT is ok here because we fork the allocation stats computation onto a MANAGEMENT thread if needed, or else we return
77+ // very cheaply.
78+ EsExecutors .DIRECT_EXECUTOR_SERVICE
79+ );
80+ final var managementExecutor = threadPool .executor (ThreadPool .Names .MANAGEMENT );
81+ this .allocationStatsSupplier = new SingleResultDeduplicator <>(
82+ threadPool .getThreadContext (),
83+ l -> managementExecutor .execute (ActionRunnable .supply (l , allocationStatsService ::stats ))
7384 );
74- this .allocationStatsService = allocationStatsService ;
7585 this .diskThresholdSettings = new DiskThresholdSettings (clusterService .getSettings (), clusterService .getClusterSettings ());
7686 this .featureService = featureService ;
7787 }
@@ -88,15 +98,21 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
8898
8999 @ Override
90100 protected void masterOperation (Task task , Request request , ClusterState state , ActionListener <Response > listener ) throws Exception {
91- listener .onResponse (
92- new Response (
93- request .metrics ().contains (Metric .ALLOCATIONS ) ? allocationStatsService .stats () : Map .of (),
101+ // NB we are still on a transport thread here - if adding more functionality here make sure to fork to a different pool
102+
103+ final SubscribableListener <Map <String , NodeAllocationStats >> allocationStatsStep = request .metrics ().contains (Metric .ALLOCATIONS )
104+ ? SubscribableListener .newForked (allocationStatsSupplier ::execute )
105+ : SubscribableListener .newSucceeded (Map .of ());
106+
107+ allocationStatsStep .andThenApply (
108+ allocationStats -> new Response (
109+ allocationStats ,
94110 request .metrics ().contains (Metric .FS )
95111 && featureService .clusterHasFeature (clusterService .state (), AllocationStatsFeatures .INCLUDE_DISK_THRESHOLD_SETTINGS )
96112 ? diskThresholdSettings
97113 : null
98114 )
99- );
115+ ). addListener ( listener ) ;
100116 }
101117
102118 @ Override
0 commit comments