1515import org .elasticsearch .action .ActionResponse ;
1616import org .elasticsearch .action .ActionRunnable ;
1717import org .elasticsearch .action .ActionType ;
18- import org .elasticsearch .action .SingleResultDeduplicator ;
1918import org .elasticsearch .action .admin .cluster .node .stats .NodesStatsRequestParameters .Metric ;
2019import org .elasticsearch .action .support .ActionFilters ;
20+ import org .elasticsearch .action .support .ContextPreservingActionListener ;
2121import org .elasticsearch .action .support .SubscribableListener ;
2222import org .elasticsearch .action .support .master .MasterNodeReadRequest ;
2323import org .elasticsearch .action .support .master .TransportMasterNodeReadAction ;
3535import org .elasticsearch .core .Nullable ;
3636import org .elasticsearch .core .TimeValue ;
3737import org .elasticsearch .injection .guice .Inject ;
38+ import org .elasticsearch .tasks .CancellableTask ;
3839import org .elasticsearch .tasks .Task ;
40+ import org .elasticsearch .tasks .TaskCancelledException ;
3941import org .elasticsearch .tasks .TaskId ;
4042import org .elasticsearch .threadpool .ThreadPool ;
4143import org .elasticsearch .transport .TransportService ;
4244
4345import java .io .IOException ;
46+ import java .util .ArrayList ;
4447import java .util .EnumSet ;
48+ import java .util .List ;
4549import java .util .Map ;
4650import java .util .concurrent .atomic .AtomicReference ;
51+ import java .util .function .Consumer ;
4752
4853public class TransportGetAllocationStatsAction extends TransportMasterNodeReadAction <
4954 TransportGetAllocationStatsAction .Request ,
@@ -62,8 +67,10 @@ public class TransportGetAllocationStatsAction extends TransportMasterNodeReadAc
6267 );
6368
6469 private final AllocationStatsCache allocationStatsCache ;
65- private final SingleResultDeduplicator < Map <String , NodeAllocationStats >> allocationStatsSupplier ;
70+ private final Consumer < ActionListener < Map <String , NodeAllocationStats > >> allocationStatsSupplier ;
6671 private final DiskThresholdSettings diskThresholdSettings ;
72+ private SubscribableListener <Response > waitingListeners ;
73+ private List <TaskListenerPair > tasksList ;
6774
6875 @ Inject
6976 public TransportGetAllocationStatsAction (
@@ -87,19 +94,19 @@ public TransportGetAllocationStatsAction(
8794 );
8895 final var managementExecutor = threadPool .executor (ThreadPool .Names .MANAGEMENT );
8996 this .allocationStatsCache = new AllocationStatsCache (threadPool , DEFAULT_CACHE_TTL );
90- this .allocationStatsSupplier = new SingleResultDeduplicator <>( threadPool . getThreadContext (), l -> {
97+ this .allocationStatsSupplier = l -> {
9198 final var cachedStats = allocationStatsCache .get ();
9299 if (cachedStats != null ) {
93100 l .onResponse (cachedStats );
94101 return ;
95102 }
96103
97104 managementExecutor .execute (ActionRunnable .supply (l , () -> {
98- final var stats = allocationStatsService .stats ();
105+ final var stats = allocationStatsService .stats (this :: ensureNotCancelled );
99106 allocationStatsCache .put (stats );
100107 return stats ;
101108 }));
102- }) ;
109+ };
103110 this .diskThresholdSettings = new DiskThresholdSettings (clusterService .getSettings (), clusterService .getClusterSettings ());
104111 clusterService .getClusterSettings ().initializeAndWatch (CACHE_TTL_SETTING , this .allocationStatsCache ::setTTL );
105112 }
@@ -118,13 +125,65 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
118125 protected void masterOperation (Task task , Request request , ClusterState state , ActionListener <Response > listener ) throws Exception {
119126 // NB we are still on a transport thread here - if adding more functionality here make sure to fork to a different pool
120127
121- final SubscribableListener <Map <String , NodeAllocationStats >> allocationStatsStep = request .metrics ().contains (Metric .ALLOCATIONS )
122- ? SubscribableListener .newForked (allocationStatsSupplier ::execute )
123- : SubscribableListener .newSucceeded (Map .of ());
128+ if (request .metrics ().contains (Metric .ALLOCATIONS ) == false ) {
129+ listener .onResponse (statsToResponse (Map .of (), request ));
130+ return ;
131+ }
132+ // Perform a cheap check for the cached stats up front.
133+ final var cachedStats = allocationStatsCache .get ();
134+ if (cachedStats != null ) {
135+ listener .onResponse (statsToResponse (cachedStats , request ));
136+ return ;
137+ }
138+
139+ assert task instanceof CancellableTask ;
140+ final var wrappedListener = ContextPreservingActionListener .wrapPreservingContext (listener , threadPool .getThreadContext ());
141+ final var taskListenerPair = new TaskListenerPair ((CancellableTask ) task , wrappedListener );
142+
143+ synchronized (this ) {
144+ if (waitingListeners != null ) {
145+ tasksList .add (taskListenerPair );
146+ waitingListeners .addListener (wrappedListener );
147+ return ;
148+ }
124149
125- allocationStatsStep .andThenApply (
126- allocationStats -> new Response (allocationStats , request .metrics ().contains (Metric .FS ) ? diskThresholdSettings : null )
127- ).addListener (listener );
150+ tasksList = new ArrayList <>();
151+ waitingListeners = new SubscribableListener <>();
152+ tasksList .add (taskListenerPair );
153+ waitingListeners .addListener (ActionListener .runBefore (wrappedListener , () -> {
154+ synchronized (this ) {
155+ waitingListeners = null ;
156+ tasksList = null ;
157+ }
158+ }));
159+ }
160+
161+ SubscribableListener .newForked (allocationStatsSupplier ::accept )
162+ .andThenApply (stats -> statsToResponse (stats , request ))
163+ .addListener (waitingListeners );
164+ }
165+
166+ private Response statsToResponse (Map <String , NodeAllocationStats > stats , Request request ) {
167+ return new Response (stats , request .metrics ().contains (Metric .FS ) ? diskThresholdSettings : null );
168+ }
169+
170+ private void ensureNotCancelled () {
171+ final int count ;
172+ synchronized (this ) {
173+ count = tasksList .size ();
174+ }
175+ boolean allTasksCancelled = true ;
176+ // Check each task to give each task a chance to invoke their listener (once) when cancelled.
177+ for (int i = 0 ; i < count ; ++i ) {
178+ final TaskListenerPair taskPair ;
179+ synchronized (this ) {
180+ taskPair = tasksList .get (i );
181+ }
182+ allTasksCancelled &= taskPair .isCancelled ();
183+ }
184+ if (allTasksCancelled ) {
185+ throw new TaskCancelledException ("task cancelled" );
186+ }
128187 }
129188
130189 @ Override
@@ -167,6 +226,11 @@ public EnumSet<Metric> metrics() {
167226 public ActionRequestValidationException validate () {
168227 return null ;
169228 }
229+
230+ @ Override
231+ public Task createTask (long id , String type , String action , TaskId parentTaskId , Map <String , String > headers ) {
232+ return new CancellableTask (id , type , action , "" , parentTaskId , headers );
233+ }
170234 }
171235
172236 public static class Response extends ActionResponse {
@@ -245,4 +309,24 @@ void put(Map<String, NodeAllocationStats> stats) {
245309 }
246310 }
247311 }
312+
313+ private static class TaskListenerPair {
314+ private final CancellableTask task ;
315+ private final ActionListener <Response > listener ;
316+ private boolean detectedCancellation ;
317+
318+ TaskListenerPair (CancellableTask task , ActionListener <Response > listener ) {
319+ this .task = task ;
320+ this .listener = listener ;
321+ this .detectedCancellation = false ;
322+ }
323+
324+ boolean isCancelled () {
325+ if (detectedCancellation == false && task .isCancelled ()) {
326+ detectedCancellation = true ;
327+ listener .onFailure (new TaskCancelledException ("task cancelled" ));
328+ }
329+ return task .isCancelled ();
330+ }
331+ }
248332}
0 commit comments