@@ -87,21 +87,19 @@ public TransportGetAllocationStatsAction(
8787 );
8888 final var managementExecutor = threadPool .executor (ThreadPool .Names .MANAGEMENT );
8989 this .allocationStatsCache = new AllocationStatsCache (CACHE_MAX_AGE_SETTING .get (settings ).millis (), threadPool );
90- this .allocationStatsSupplier = new SingleResultDeduplicator <>(
91- threadPool .getThreadContext (),
92- l -> managementExecutor .execute (ActionRunnable .supply (l , () -> {
93- // Check the cache again here to prevent duplicate work when a thread has a cache miss and is just about to fork just as
94- // other threads are coming off a deduplicator call that is about to finish.
95- final var cachedStats = allocationStatsCache .get ();
96- if (cachedStats != null ) {
97- return cachedStats ;
98- }
90+ this .allocationStatsSupplier = new SingleResultDeduplicator <>(threadPool .getThreadContext (), l -> {
91+ final var cachedStats = allocationStatsCache .get ();
92+ if (cachedStats != null ) {
93+ l .onResponse (cachedStats );
94+ return ;
95+ }
9996
97+ managementExecutor .execute (ActionRunnable .supply (l , () -> {
10098 final var stats = allocationStatsService .stats ();
10199 allocationStatsCache .put (stats );
102100 return stats ;
103- }))
104- );
101+ }));
102+ } );
105103 this .diskThresholdSettings = new DiskThresholdSettings (clusterService .getSettings (), clusterService .getClusterSettings ());
106104 }
107105
@@ -119,16 +117,9 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
119117 protected void masterOperation (Task task , Request request , ClusterState state , ActionListener <Response > listener ) throws Exception {
120118 // NB we are still on a transport thread here - if adding more functionality here make sure to fork to a different pool
121119
122- SubscribableListener <Map <String , NodeAllocationStats >> allocationStatsStep ;
123-
124- if (request .metrics ().contains (Metric .ALLOCATIONS )) {
125- final var cachedStats = allocationStatsCache .get ();
126- allocationStatsStep = cachedStats != null
127- ? SubscribableListener .newSucceeded (cachedStats )
128- : SubscribableListener .newForked (allocationStatsSupplier ::execute );
129- } else {
130- allocationStatsStep = SubscribableListener .newSucceeded (Map .of ());
131- }
120+ final SubscribableListener <Map <String , NodeAllocationStats >> allocationStatsStep = request .metrics ().contains (Metric .ALLOCATIONS )
121+ ? SubscribableListener .newForked (allocationStatsSupplier ::execute )
122+ : SubscribableListener .newSucceeded (Map .of ());
132123
133124 allocationStatsStep .andThenApply (
134125 allocationStats -> new Response (allocationStats , request .metrics ().contains (Metric .FS ) ? diskThresholdSettings : null )
0 commit comments