3030import org .elasticsearch .cluster .service .ClusterService ;
3131import org .elasticsearch .common .io .stream .StreamInput ;
3232import org .elasticsearch .common .io .stream .StreamOutput ;
33+ import org .elasticsearch .common .settings .Setting ;
3334import org .elasticsearch .common .util .concurrent .EsExecutors ;
3435import org .elasticsearch .core .Nullable ;
3536import org .elasticsearch .core .TimeValue ;
4243import java .io .IOException ;
4344import java .util .EnumSet ;
4445import java .util .Map ;
46+ import java .util .concurrent .atomic .AtomicReference ;
4547
4648public class TransportGetAllocationStatsAction extends TransportMasterNodeReadAction <
4749 TransportGetAllocationStatsAction .Request ,
4850 TransportGetAllocationStatsAction .Response > {
4951
5052 public static final ActionType <TransportGetAllocationStatsAction .Response > TYPE = new ActionType <>("cluster:monitor/allocation/stats" );
5153
54+ public static final long CACHE_DISABLED = 0L ;
55+ public static final Setting <TimeValue > CACHE_MAX_AGE_SETTING = Setting .timeSetting (
56+ "cluster.transport.get.allocation.stats.action.cache.max_age" ,
57+ TimeValue .timeValueMinutes (1 ),
58+ TimeValue .timeValueMillis (CACHE_DISABLED ),
59+ TimeValue .timeValueMinutes (10 ),
60+ Setting .Property .NodeScope
61+ );
62+
63+ private final AllocationStatsCache allocationStatsCache ;
5264 private final SingleResultDeduplicator <Map <String , NodeAllocationStats >> allocationStatsSupplier ;
5365 private final DiskThresholdSettings diskThresholdSettings ;
5466
@@ -73,13 +85,29 @@ public TransportGetAllocationStatsAction(
7385 EsExecutors .DIRECT_EXECUTOR_SERVICE
7486 );
7587 final var managementExecutor = threadPool .executor (ThreadPool .Names .MANAGEMENT );
88+ this .allocationStatsCache = new AllocationStatsCache (clusterService .getClusterSettings ().get (CACHE_MAX_AGE_SETTING ).millis ());
7689 this .allocationStatsSupplier = new SingleResultDeduplicator <>(
7790 threadPool .getThreadContext (),
78- l -> managementExecutor .execute (ActionRunnable .supply (l , allocationStatsService ::stats ))
91+ l -> managementExecutor .execute (ActionRunnable .supply (l , () -> {
92+ final var cachedStats = allocationStatsCache .get ();
93+
94+ if (cachedStats != null ) {
95+ return cachedStats ;
96+ }
97+
98+ final var stats = allocationStatsService .stats ();
99+ allocationStatsCache .put (stats );
100+ return stats ;
101+ }))
79102 );
80103 this .diskThresholdSettings = new DiskThresholdSettings (clusterService .getSettings (), clusterService .getClusterSettings ());
81104 }
82105
106+ // Package access, intended for unit testing only.
107+ void setCacheMaxAge (TimeValue maxAge ) {
108+ this .allocationStatsCache .setMaxAgeMsecs (maxAge .millis ());
109+ }
110+
83111 @ Override
84112 protected void doExecute (Task task , Request request , ActionListener <Response > listener ) {
85113 if (clusterService .state ().getMinTransportVersion ().before (TransportVersions .V_8_14_0 )) {
@@ -185,4 +213,42 @@ public DiskThresholdSettings getDiskThresholdSettings() {
185213 return diskThresholdSettings ;
186214 }
187215 }
216+
217+ private record CachedAllocationStats (Map <String , NodeAllocationStats > stats , long timestampMsecs ) {}
218+
219+ private static class AllocationStatsCache {
220+ private volatile long maxAgeMsecs ;
221+ private final AtomicReference <CachedAllocationStats > cachedStats ;
222+
223+ AllocationStatsCache (long maxAgeMsecs ) {
224+ this .maxAgeMsecs = maxAgeMsecs ;
225+ this .cachedStats = new AtomicReference <>();
226+ }
227+
228+ void setMaxAgeMsecs (long maxAgeMsecs ) {
229+ this .maxAgeMsecs = maxAgeMsecs ;
230+ }
231+
232+ Map <String , NodeAllocationStats > get () {
233+
234+ if (maxAgeMsecs == CACHE_DISABLED ) {
235+ return null ;
236+ }
237+
238+ final var stats = cachedStats .get ();
239+
240+ if (stats == null || System .currentTimeMillis () - stats .timestampMsecs > maxAgeMsecs ) {
241+ return null ;
242+ }
243+
244+ return stats .stats ;
245+ }
246+
247+ void put (Map <String , NodeAllocationStats > stats ) {
248+
249+ if (maxAgeMsecs > CACHE_DISABLED ) {
250+ cachedStats .set (new CachedAllocationStats (stats , System .currentTimeMillis ()));
251+ }
252+ }
253+ }
188254}
0 commit comments