3030import org .elasticsearch .cluster .service .ClusterService ;
3131import org .elasticsearch .common .io .stream .StreamInput ;
3232import org .elasticsearch .common .io .stream .StreamOutput ;
33+ import org .elasticsearch .common .settings .Setting ;
3334import org .elasticsearch .common .util .concurrent .EsExecutors ;
3435import org .elasticsearch .core .Nullable ;
3536import org .elasticsearch .core .TimeValue ;
4243import java .io .IOException ;
4344import java .util .EnumSet ;
4445import java .util .Map ;
46+ import java .util .concurrent .atomic .AtomicReference ;
4547
4648public class TransportGetAllocationStatsAction extends TransportMasterNodeReadAction <
4749 TransportGetAllocationStatsAction .Request ,
4850 TransportGetAllocationStatsAction .Response > {
4951
5052 public static final ActionType <TransportGetAllocationStatsAction .Response > TYPE = new ActionType <>("cluster:monitor/allocation/stats" );
5153
54+ public static final long CACHE_DISABLED = 0L ;
55+ public static final Setting <TimeValue > CACHE_MAX_AGE_SETTING = Setting .timeSetting (
56+ "cluster.transport.get.allocation.stats.action.cache.max_age" ,
57+ TimeValue .timeValueMinutes (1 ),
58+ TimeValue .timeValueMillis (CACHE_DISABLED ),
59+ TimeValue .timeValueMinutes (10 ),
60+ Setting .Property .NodeScope
61+ );
62+
63+ private final AllocationStatsCache allocationStatsCache ;
5264 private final SingleResultDeduplicator <Map <String , NodeAllocationStats >> allocationStatsSupplier ;
5365 private final DiskThresholdSettings diskThresholdSettings ;
5466
@@ -73,13 +85,29 @@ public TransportGetAllocationStatsAction(
7385 EsExecutors .DIRECT_EXECUTOR_SERVICE
7486 );
7587 final var managementExecutor = threadPool .executor (ThreadPool .Names .MANAGEMENT );
88+ this .allocationStatsCache = new AllocationStatsCache (clusterService .getClusterSettings ().get (CACHE_MAX_AGE_SETTING ).millis ());
7689 this .allocationStatsSupplier = new SingleResultDeduplicator <>(
7790 threadPool .getThreadContext (),
78- l -> managementExecutor .execute (ActionRunnable .supply (l , allocationStatsService ::stats ))
91+ l -> managementExecutor .execute (ActionRunnable .supply (l , () -> {
92+ final var cachedStats = allocationStatsCache .get ();
93+
94+ if (cachedStats != null ) {
95+ return cachedStats ;
96+ }
97+
98+ final var stats = allocationStatsService .stats ();
99+ allocationStatsCache .put (stats );
100+ return stats ;
101+ }))
79102 );
80103 this .diskThresholdSettings = new DiskThresholdSettings (clusterService .getSettings (), clusterService .getClusterSettings ());
81104 }
82105
106+ // Package access, intended for unit testing only.
107+ void setCacheMaxAge (TimeValue maxAge ) {
108+ this .allocationStatsCache .setMaxAgeMsecs (maxAge .millis ());
109+ }
110+
83111 @ Override
84112 protected void doExecute (Task task , Request request , ActionListener <Response > listener ) {
85113 if (clusterService .state ().getMinTransportVersion ().before (TransportVersions .V_8_14_0 )) {
@@ -184,4 +212,42 @@ public DiskThresholdSettings getDiskThresholdSettings() {
184212 return diskThresholdSettings ;
185213 }
186214 }
215+
216+ private record CachedAllocationStats (Map <String , NodeAllocationStats > stats , long timestampMsecs ) {}
217+
218+ private static class AllocationStatsCache {
219+ private volatile long maxAgeMsecs ;
220+ private final AtomicReference <CachedAllocationStats > cachedStats ;
221+
222+ AllocationStatsCache (long maxAgeMsecs ) {
223+ this .maxAgeMsecs = maxAgeMsecs ;
224+ this .cachedStats = new AtomicReference <>();
225+ }
226+
227+ void setMaxAgeMsecs (long maxAgeMsecs ) {
228+ this .maxAgeMsecs = maxAgeMsecs ;
229+ }
230+
231+ Map <String , NodeAllocationStats > get () {
232+
233+ if (maxAgeMsecs == CACHE_DISABLED ) {
234+ return null ;
235+ }
236+
237+ final var stats = cachedStats .get ();
238+
239+ if (stats == null || System .currentTimeMillis () - stats .timestampMsecs > maxAgeMsecs ) {
240+ return null ;
241+ }
242+
243+ return stats .stats ;
244+ }
245+
246+ void put (Map <String , NodeAllocationStats > stats ) {
247+
248+ if (maxAgeMsecs > CACHE_DISABLED ) {
249+ cachedStats .set (new CachedAllocationStats (stats , System .currentTimeMillis ()));
250+ }
251+ }
252+ }
187253}
0 commit comments