3131import org .elasticsearch .common .io .stream .StreamInput ;
3232import org .elasticsearch .common .io .stream .StreamOutput ;
3333import org .elasticsearch .common .settings .Setting ;
34+ import org .elasticsearch .common .settings .Settings ;
3435import org .elasticsearch .common .util .concurrent .EsExecutors ;
3536import org .elasticsearch .core .Nullable ;
3637import org .elasticsearch .core .TimeValue ;
@@ -51,11 +52,10 @@ public class TransportGetAllocationStatsAction extends TransportMasterNodeReadAc
5152
5253 public static final ActionType <TransportGetAllocationStatsAction .Response > TYPE = new ActionType <>("cluster:monitor/allocation/stats" );
5354
54- public static final long CACHE_DISABLED = 0L ;
5555 public static final Setting <TimeValue > CACHE_MAX_AGE_SETTING = Setting .timeSetting (
56- "cluster.transport.get. allocation.stats.action. cache.max_age " ,
56+ "cluster.routing. allocation.stats.cache.ttl " ,
5757 TimeValue .timeValueMinutes (1 ),
58- TimeValue .timeValueMillis ( CACHE_DISABLED ) ,
58+ TimeValue .ZERO ,
5959 TimeValue .timeValueMinutes (10 ),
6060 Setting .Property .NodeScope
6161 );
@@ -66,6 +66,7 @@ public class TransportGetAllocationStatsAction extends TransportMasterNodeReadAc
6666
6767 @ Inject
6868 public TransportGetAllocationStatsAction (
69+ Settings settings ,
6970 TransportService transportService ,
7071 ClusterService clusterService ,
7172 ThreadPool threadPool ,
@@ -85,12 +86,13 @@ public TransportGetAllocationStatsAction(
8586 EsExecutors .DIRECT_EXECUTOR_SERVICE
8687 );
8788 final var managementExecutor = threadPool .executor (ThreadPool .Names .MANAGEMENT );
88- this .allocationStatsCache = new AllocationStatsCache (clusterService . getClusterSettings (). get (CACHE_MAX_AGE_SETTING ).millis ());
89+ this .allocationStatsCache = new AllocationStatsCache (CACHE_MAX_AGE_SETTING . get (settings ).millis (), threadPool );
8990 this .allocationStatsSupplier = new SingleResultDeduplicator <>(
9091 threadPool .getThreadContext (),
9192 l -> managementExecutor .execute (ActionRunnable .supply (l , () -> {
93+ // Check the cache again here to prevent duplicate work when a thread has a cache miss and is just about to fork just as
94+ // other threads are coming off a deduplicator call that is about to finish.
9295 final var cachedStats = allocationStatsCache .get ();
93-
9496 if (cachedStats != null ) {
9597 return cachedStats ;
9698 }
@@ -103,11 +105,6 @@ public TransportGetAllocationStatsAction(
103105 this .diskThresholdSettings = new DiskThresholdSettings (clusterService .getSettings (), clusterService .getClusterSettings ());
104106 }
105107
106- // Package access, intended for unit testing only.
107- void setCacheMaxAge (TimeValue maxAge ) {
108- this .allocationStatsCache .setMaxAgeMsecs (maxAge .millis ());
109- }
110-
111108 @ Override
112109 protected void doExecute (Task task , Request request , ActionListener <Response > listener ) {
113110 if (clusterService .state ().getMinTransportVersion ().before (TransportVersions .V_8_14_0 )) {
@@ -122,9 +119,16 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
122119 protected void masterOperation (Task task , Request request , ClusterState state , ActionListener <Response > listener ) throws Exception {
123120 // NB we are still on a transport thread here - if adding more functionality here make sure to fork to a different pool
124121
125- final SubscribableListener <Map <String , NodeAllocationStats >> allocationStatsStep = request .metrics ().contains (Metric .ALLOCATIONS )
126- ? SubscribableListener .newForked (allocationStatsSupplier ::execute )
127- : SubscribableListener .newSucceeded (Map .of ());
122+ SubscribableListener <Map <String , NodeAllocationStats >> allocationStatsStep ;
123+
124+ if (request .metrics ().contains (Metric .ALLOCATIONS )) {
125+ final var cachedStats = allocationStatsCache .get ();
126+ allocationStatsStep = cachedStats != null
127+ ? SubscribableListener .newSucceeded (cachedStats )
128+ : SubscribableListener .newForked (allocationStatsSupplier ::execute );
129+ } else {
130+ allocationStatsStep = SubscribableListener .newSucceeded (Map .of ());
131+ }
128132
129133 allocationStatsStep .andThenApply (
130134 allocationStats -> new Response (allocationStats , request .metrics ().contains (Metric .FS ) ? diskThresholdSettings : null )
@@ -216,27 +220,25 @@ public DiskThresholdSettings getDiskThresholdSettings() {
216220 private record CachedAllocationStats (Map <String , NodeAllocationStats > stats , long timestampMsecs ) {}
217221
218222 private static class AllocationStatsCache {
219- private volatile long maxAgeMsecs ;
223+ private final long maxAgeMillis ;
224+ private final ThreadPool threadPool ;
220225 private final AtomicReference <CachedAllocationStats > cachedStats ;
221226
222- AllocationStatsCache (long maxAgeMsecs ) {
223- this .maxAgeMsecs = maxAgeMsecs ;
227+ AllocationStatsCache (long maxAgeMillis , ThreadPool threadPool ) {
228+ this .maxAgeMillis = maxAgeMillis ;
229+ this .threadPool = threadPool ;
224230 this .cachedStats = new AtomicReference <>();
225231 }
226232
227- void setMaxAgeMsecs (long maxAgeMsecs ) {
228- this .maxAgeMsecs = maxAgeMsecs ;
229- }
230-
231233 Map <String , NodeAllocationStats > get () {
232234
233- if (maxAgeMsecs == CACHE_DISABLED ) {
235+ if (maxAgeMillis == 0L ) {
234236 return null ;
235237 }
236238
237239 final var stats = cachedStats .get ();
238240
239- if (stats == null || System . currentTimeMillis () - stats .timestampMsecs > maxAgeMsecs ) {
241+ if (stats == null || threadPool . relativeTimeInMillis () - stats .timestampMsecs > maxAgeMillis ) {
240242 return null ;
241243 }
242244
@@ -245,8 +247,8 @@ Map<String, NodeAllocationStats> get() {
245247
246248 void put (Map <String , NodeAllocationStats > stats ) {
247249
248- if (maxAgeMsecs > CACHE_DISABLED ) {
249- cachedStats .set (new CachedAllocationStats (stats , System . currentTimeMillis ()));
250+ if (maxAgeMillis > 0L ) {
251+ cachedStats .set (new CachedAllocationStats (stats , threadPool . relativeTimeInMillis ()));
250252 }
251253 }
252254 }
0 commit comments