Skip to content

Commit 3930dcd

Browse files
Add cache support in TransportGetAllocationStatsAction
Adds a new setting TransportGetAllocationStatsAction.CACHE_MAX_AGE_SETTING to configure the max age for cached AllocationStats on the master. The default value is currently 1 minute per the suggestion in issue 110716. Closes #110716
1 parent 12c2dd5 commit 3930dcd

File tree

3 files changed

+117
-2
lines changed

3 files changed

+117
-2
lines changed

server/src/main/java/org/elasticsearch/action/admin/cluster/allocation/TransportGetAllocationStatsAction.java

Lines changed: 67 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.elasticsearch.cluster.service.ClusterService;
3131
import org.elasticsearch.common.io.stream.StreamInput;
3232
import org.elasticsearch.common.io.stream.StreamOutput;
33+
import org.elasticsearch.common.settings.Setting;
3334
import org.elasticsearch.common.util.concurrent.EsExecutors;
3435
import org.elasticsearch.core.Nullable;
3536
import org.elasticsearch.core.TimeValue;
@@ -42,13 +43,24 @@
4243
import java.io.IOException;
4344
import java.util.EnumSet;
4445
import java.util.Map;
46+
import java.util.concurrent.atomic.AtomicReference;
4547

4648
public class TransportGetAllocationStatsAction extends TransportMasterNodeReadAction<
4749
TransportGetAllocationStatsAction.Request,
4850
TransportGetAllocationStatsAction.Response> {
4951

5052
public static final ActionType<TransportGetAllocationStatsAction.Response> TYPE = new ActionType<>("cluster:monitor/allocation/stats");
5153

54+
public static final long CACHE_DISABLED = 0L;
55+
public static final Setting<TimeValue> CACHE_MAX_AGE_SETTING = Setting.timeSetting(
56+
"cluster.transport.get.allocation.stats.action.cache.max_age",
57+
TimeValue.timeValueMinutes(1),
58+
TimeValue.timeValueMillis(CACHE_DISABLED),
59+
TimeValue.timeValueMinutes(10),
60+
Setting.Property.NodeScope
61+
);
62+
63+
private final AllocationStatsCache allocationStatsCache;
5264
private final SingleResultDeduplicator<Map<String, NodeAllocationStats>> allocationStatsSupplier;
5365
private final DiskThresholdSettings diskThresholdSettings;
5466

@@ -73,13 +85,29 @@ public TransportGetAllocationStatsAction(
7385
EsExecutors.DIRECT_EXECUTOR_SERVICE
7486
);
7587
final var managementExecutor = threadPool.executor(ThreadPool.Names.MANAGEMENT);
88+
this.allocationStatsCache = new AllocationStatsCache(clusterService.getClusterSettings().get(CACHE_MAX_AGE_SETTING).millis());
7689
this.allocationStatsSupplier = new SingleResultDeduplicator<>(
7790
threadPool.getThreadContext(),
78-
l -> managementExecutor.execute(ActionRunnable.supply(l, allocationStatsService::stats))
91+
l -> managementExecutor.execute(ActionRunnable.supply(l, () -> {
92+
final var cachedStats = allocationStatsCache.get();
93+
94+
if (cachedStats != null) {
95+
return cachedStats;
96+
}
97+
98+
final var stats = allocationStatsService.stats();
99+
allocationStatsCache.put(stats);
100+
return stats;
101+
}))
79102
);
80103
this.diskThresholdSettings = new DiskThresholdSettings(clusterService.getSettings(), clusterService.getClusterSettings());
81104
}
82105

106+
// Package access, intended for unit testing only.
107+
void setCacheMaxAge(TimeValue maxAge) {
108+
this.allocationStatsCache.setMaxAgeMsecs(maxAge.millis());
109+
}
110+
83111
@Override
84112
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
85113
if (clusterService.state().getMinTransportVersion().before(TransportVersions.V_8_14_0)) {
@@ -184,4 +212,42 @@ public DiskThresholdSettings getDiskThresholdSettings() {
184212
return diskThresholdSettings;
185213
}
186214
}
215+
216+
private record CachedAllocationStats(Map<String, NodeAllocationStats> stats, long timestampMsecs) {}
217+
218+
private static class AllocationStatsCache {
219+
private volatile long maxAgeMsecs;
220+
private final AtomicReference<CachedAllocationStats> cachedStats;
221+
222+
AllocationStatsCache(long maxAgeMsecs) {
223+
this.maxAgeMsecs = maxAgeMsecs;
224+
this.cachedStats = new AtomicReference<>();
225+
}
226+
227+
void setMaxAgeMsecs(long maxAgeMsecs) {
228+
this.maxAgeMsecs = maxAgeMsecs;
229+
}
230+
231+
Map<String, NodeAllocationStats> get() {
232+
233+
if (maxAgeMsecs == CACHE_DISABLED) {
234+
return null;
235+
}
236+
237+
final var stats = cachedStats.get();
238+
239+
if (stats == null || System.currentTimeMillis() - stats.timestampMsecs > maxAgeMsecs) {
240+
return null;
241+
}
242+
243+
return stats.stats;
244+
}
245+
246+
void put(Map<String, NodeAllocationStats> stats) {
247+
248+
if (maxAgeMsecs > CACHE_DISABLED) {
249+
cachedStats.set(new CachedAllocationStats(stats, System.currentTimeMillis()));
250+
}
251+
}
252+
}
187253
}

server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
package org.elasticsearch.common.settings;
1010

1111
import org.apache.logging.log4j.LogManager;
12+
import org.elasticsearch.action.admin.cluster.allocation.TransportGetAllocationStatsAction;
1213
import org.elasticsearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction;
1314
import org.elasticsearch.action.admin.indices.close.TransportCloseIndexAction;
1415
import org.elasticsearch.action.bulk.IncrementalBulkService;
@@ -631,6 +632,7 @@ public void apply(Settings value, Settings current, Settings previous) {
631632
DataStreamGlobalRetentionSettings.DATA_STREAMS_DEFAULT_RETENTION_SETTING,
632633
DataStreamGlobalRetentionSettings.DATA_STREAMS_MAX_RETENTION_SETTING,
633634
ShardsAvailabilityHealthIndicatorService.REPLICA_UNASSIGNED_BUFFER_TIME,
634-
DataStream.isFailureStoreFeatureFlagEnabled() ? DataStreamFailureStoreSettings.DATA_STREAM_FAILURE_STORED_ENABLED_SETTING : null
635+
DataStream.isFailureStoreFeatureFlagEnabled() ? DataStreamFailureStoreSettings.DATA_STREAM_FAILURE_STORED_ENABLED_SETTING : null,
636+
TransportGetAllocationStatsAction.CACHE_MAX_AGE_SETTING
635637
).filter(Objects::nonNull).collect(toSet());
636638
}

server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/TransportGetAllocationStatsActionTests.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,16 @@
99

1010
package org.elasticsearch.action.admin.cluster.allocation;
1111

12+
import org.elasticsearch.action.ActionListener;
1213
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequestParameters.Metric;
1314
import org.elasticsearch.action.support.ActionFilters;
1415
import org.elasticsearch.action.support.PlainActionFuture;
1516
import org.elasticsearch.cluster.ClusterState;
1617
import org.elasticsearch.cluster.routing.allocation.AllocationStatsService;
18+
import org.elasticsearch.cluster.routing.allocation.NodeAllocationStats;
1719
import org.elasticsearch.cluster.routing.allocation.NodeAllocationStatsTests;
1820
import org.elasticsearch.cluster.service.ClusterService;
21+
import org.elasticsearch.core.CheckedConsumer;
1922
import org.elasticsearch.core.TimeValue;
2023
import org.elasticsearch.tasks.Task;
2124
import org.elasticsearch.tasks.TaskId;
@@ -35,12 +38,14 @@
3538
import java.util.concurrent.CyclicBarrier;
3639
import java.util.concurrent.atomic.AtomicBoolean;
3740
import java.util.concurrent.atomic.AtomicInteger;
41+
import java.util.concurrent.atomic.AtomicReference;
3842

3943
import static org.hamcrest.Matchers.anEmptyMap;
4044
import static org.hamcrest.Matchers.containsString;
4145
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
4246
import static org.hamcrest.Matchers.not;
4347
import static org.mockito.Mockito.mock;
48+
4449
import static org.mockito.Mockito.times;
4550
import static org.mockito.Mockito.verify;
4651
import static org.mockito.Mockito.when;
@@ -173,4 +178,46 @@ public void testDeduplicatesStatsComputations() throws InterruptedException {
173178
thread.join();
174179
}
175180
}
181+
182+
public void testGetStatsWithCachingEnabled() throws Exception {
183+
184+
final AtomicReference<Map<String, NodeAllocationStats>> allocationStats = new AtomicReference<>();
185+
allocationStats.set(Map.of(randomIdentifier(), NodeAllocationStatsTests.randomNodeAllocationStats()));
186+
when(allocationStatsService.stats()).thenReturn(allocationStats.get());
187+
188+
final CheckedConsumer<ActionListener<Void>, Exception> threadTask = l -> {
189+
final var request = new TransportGetAllocationStatsAction.Request(
190+
TimeValue.ONE_MINUTE,
191+
new TaskId(randomIdentifier(), randomNonNegativeLong()),
192+
EnumSet.of(Metric.ALLOCATIONS)
193+
);
194+
195+
final var future = new PlainActionFuture<TransportGetAllocationStatsAction.Response>();
196+
action.masterOperation(mock(Task.class), request, ClusterState.EMPTY_STATE, future);
197+
final var response = future.get();
198+
assertSame("Expected the cached allocation stats to be returned", response.getNodeAllocationStats(), allocationStats.get());
199+
200+
l.onResponse(null);
201+
};
202+
203+
// Start with a high cache max age, all threads should get the same value.
204+
action.setCacheMaxAge(TimeValue.timeValueMinutes(5));
205+
ESTestCase.startInParallel(between(1, 5), threadNumber -> safeAwait(threadTask));
206+
verify(allocationStatsService, times(1)).stats();
207+
208+
// Force the cached stats to expire.
209+
action.setCacheMaxAge(TimeValue.timeValueMillis(1));
210+
Thread.sleep(2L);
211+
212+
// Expect a call to the stats service.
213+
allocationStats.set(Map.of(randomIdentifier(), NodeAllocationStatsTests.randomNodeAllocationStats()));
214+
when(allocationStatsService.stats()).thenReturn(allocationStats.get());
215+
threadTask.accept(ActionListener.noop());
216+
verify(allocationStatsService, times(2)).stats();
217+
218+
// Set a large max age for the cache again so all the threads in the next run can get the cached value.
219+
action.setCacheMaxAge(TimeValue.timeValueMinutes(5));
220+
ESTestCase.startInParallel(between(1, 5), threadNumber -> safeAwait(threadTask));
221+
verify(allocationStatsService, times(2)).stats();
222+
}
176223
}

0 commit comments

Comments
 (0)