Skip to content

Commit 68d949c

Browse files
Add cache support in TransportGetAllocationStatsAction
Adds a new setting TransportGetAllocationStatsAction.CACHE_MAX_AGE_SETTING to configure the max age for cached AllocationStats on the master. The default value is currently 1 minute per the suggestion in issue 110716. Closes #110716
1 parent d387af8 commit 68d949c

File tree

3 files changed

+117
-2
lines changed

3 files changed

+117
-2
lines changed

server/src/main/java/org/elasticsearch/action/admin/cluster/allocation/TransportGetAllocationStatsAction.java

Lines changed: 67 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.elasticsearch.cluster.service.ClusterService;
3131
import org.elasticsearch.common.io.stream.StreamInput;
3232
import org.elasticsearch.common.io.stream.StreamOutput;
33+
import org.elasticsearch.common.settings.Setting;
3334
import org.elasticsearch.common.util.concurrent.EsExecutors;
3435
import org.elasticsearch.core.Nullable;
3536
import org.elasticsearch.core.TimeValue;
@@ -42,13 +43,24 @@
4243
import java.io.IOException;
4344
import java.util.EnumSet;
4445
import java.util.Map;
46+
import java.util.concurrent.atomic.AtomicReference;
4547

4648
public class TransportGetAllocationStatsAction extends TransportMasterNodeReadAction<
4749
TransportGetAllocationStatsAction.Request,
4850
TransportGetAllocationStatsAction.Response> {
4951

5052
public static final ActionType<TransportGetAllocationStatsAction.Response> TYPE = new ActionType<>("cluster:monitor/allocation/stats");
5153

54+
public static final long CACHE_DISABLED = 0L;
55+
public static final Setting<TimeValue> CACHE_MAX_AGE_SETTING = Setting.timeSetting(
56+
"cluster.transport.get.allocation.stats.action.cache.max_age",
57+
TimeValue.timeValueMinutes(1),
58+
TimeValue.timeValueMillis(CACHE_DISABLED),
59+
TimeValue.timeValueMinutes(10),
60+
Setting.Property.NodeScope
61+
);
62+
63+
private final AllocationStatsCache allocationStatsCache;
5264
private final SingleResultDeduplicator<Map<String, NodeAllocationStats>> allocationStatsSupplier;
5365
private final DiskThresholdSettings diskThresholdSettings;
5466

@@ -73,13 +85,29 @@ public TransportGetAllocationStatsAction(
7385
EsExecutors.DIRECT_EXECUTOR_SERVICE
7486
);
7587
final var managementExecutor = threadPool.executor(ThreadPool.Names.MANAGEMENT);
88+
this.allocationStatsCache = new AllocationStatsCache(clusterService.getClusterSettings().get(CACHE_MAX_AGE_SETTING).millis());
7689
this.allocationStatsSupplier = new SingleResultDeduplicator<>(
7790
threadPool.getThreadContext(),
78-
l -> managementExecutor.execute(ActionRunnable.supply(l, allocationStatsService::stats))
91+
l -> managementExecutor.execute(ActionRunnable.supply(l, () -> {
92+
final var cachedStats = allocationStatsCache.get();
93+
94+
if (cachedStats != null) {
95+
return cachedStats;
96+
}
97+
98+
final var stats = allocationStatsService.stats();
99+
allocationStatsCache.put(stats);
100+
return stats;
101+
}))
79102
);
80103
this.diskThresholdSettings = new DiskThresholdSettings(clusterService.getSettings(), clusterService.getClusterSettings());
81104
}
82105

106+
// Package access, intended for unit testing only.
107+
void setCacheMaxAge(TimeValue maxAge) {
108+
this.allocationStatsCache.setMaxAgeMsecs(maxAge.millis());
109+
}
110+
83111
@Override
84112
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
85113
if (clusterService.state().getMinTransportVersion().before(TransportVersions.V_8_14_0)) {
@@ -185,4 +213,42 @@ public DiskThresholdSettings getDiskThresholdSettings() {
185213
return diskThresholdSettings;
186214
}
187215
}
216+
217+
private record CachedAllocationStats(Map<String, NodeAllocationStats> stats, long timestampMsecs) {}
218+
219+
private static class AllocationStatsCache {
220+
private volatile long maxAgeMsecs;
221+
private final AtomicReference<CachedAllocationStats> cachedStats;
222+
223+
AllocationStatsCache(long maxAgeMsecs) {
224+
this.maxAgeMsecs = maxAgeMsecs;
225+
this.cachedStats = new AtomicReference<>();
226+
}
227+
228+
void setMaxAgeMsecs(long maxAgeMsecs) {
229+
this.maxAgeMsecs = maxAgeMsecs;
230+
}
231+
232+
Map<String, NodeAllocationStats> get() {
233+
234+
if (maxAgeMsecs == CACHE_DISABLED) {
235+
return null;
236+
}
237+
238+
final var stats = cachedStats.get();
239+
240+
if (stats == null || System.currentTimeMillis() - stats.timestampMsecs > maxAgeMsecs) {
241+
return null;
242+
}
243+
244+
return stats.stats;
245+
}
246+
247+
void put(Map<String, NodeAllocationStats> stats) {
248+
249+
if (maxAgeMsecs > CACHE_DISABLED) {
250+
cachedStats.set(new CachedAllocationStats(stats, System.currentTimeMillis()));
251+
}
252+
}
253+
}
188254
}

server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
package org.elasticsearch.common.settings;
1010

1111
import org.apache.logging.log4j.LogManager;
12+
import org.elasticsearch.action.admin.cluster.allocation.TransportGetAllocationStatsAction;
1213
import org.elasticsearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction;
1314
import org.elasticsearch.action.admin.indices.close.TransportCloseIndexAction;
1415
import org.elasticsearch.action.bulk.IncrementalBulkService;
@@ -629,6 +630,7 @@ public void apply(Settings value, Settings current, Settings previous) {
629630
DataStreamGlobalRetentionSettings.DATA_STREAMS_DEFAULT_RETENTION_SETTING,
630631
DataStreamGlobalRetentionSettings.DATA_STREAMS_MAX_RETENTION_SETTING,
631632
ShardsAvailabilityHealthIndicatorService.REPLICA_UNASSIGNED_BUFFER_TIME,
632-
DataStream.isFailureStoreFeatureFlagEnabled() ? DataStreamFailureStoreSettings.DATA_STREAM_FAILURE_STORED_ENABLED_SETTING : null
633+
DataStream.isFailureStoreFeatureFlagEnabled() ? DataStreamFailureStoreSettings.DATA_STREAM_FAILURE_STORED_ENABLED_SETTING : null,
634+
TransportGetAllocationStatsAction.CACHE_MAX_AGE_SETTING
633635
).filter(Objects::nonNull).collect(toSet());
634636
}

server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/TransportGetAllocationStatsActionTests.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,16 @@
99

1010
package org.elasticsearch.action.admin.cluster.allocation;
1111

12+
import org.elasticsearch.action.ActionListener;
1213
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequestParameters.Metric;
1314
import org.elasticsearch.action.support.ActionFilters;
1415
import org.elasticsearch.action.support.PlainActionFuture;
1516
import org.elasticsearch.cluster.ClusterState;
1617
import org.elasticsearch.cluster.routing.allocation.AllocationStatsService;
18+
import org.elasticsearch.cluster.routing.allocation.NodeAllocationStats;
1719
import org.elasticsearch.cluster.routing.allocation.NodeAllocationStatsTests;
1820
import org.elasticsearch.cluster.service.ClusterService;
21+
import org.elasticsearch.core.CheckedConsumer;
1922
import org.elasticsearch.core.TimeValue;
2023
import org.elasticsearch.tasks.Task;
2124
import org.elasticsearch.tasks.TaskId;
@@ -34,13 +37,15 @@
3437
import java.util.concurrent.CyclicBarrier;
3538
import java.util.concurrent.atomic.AtomicBoolean;
3639
import java.util.concurrent.atomic.AtomicInteger;
40+
import java.util.concurrent.atomic.AtomicReference;
3741

3842
import static org.hamcrest.Matchers.anEmptyMap;
3943
import static org.hamcrest.Matchers.containsString;
4044
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
4145
import static org.hamcrest.Matchers.not;
4246
import static org.mockito.Mockito.mock;
4347
import static org.mockito.Mockito.never;
48+
import static org.mockito.Mockito.times;
4449
import static org.mockito.Mockito.verify;
4550
import static org.mockito.Mockito.when;
4651

@@ -162,4 +167,46 @@ public void testDeduplicatesStatsComputations() throws InterruptedException {
162167
thread.join();
163168
}
164169
}
170+
171+
public void testGetStatsWithCachingEnabled() throws Exception {
172+
173+
final AtomicReference<Map<String, NodeAllocationStats>> allocationStats = new AtomicReference<>();
174+
allocationStats.set(Map.of(randomIdentifier(), NodeAllocationStatsTests.randomNodeAllocationStats()));
175+
when(allocationStatsService.stats()).thenReturn(allocationStats.get());
176+
177+
final CheckedConsumer<ActionListener<Void>, Exception> threadTask = l -> {
178+
final var request = new TransportGetAllocationStatsAction.Request(
179+
TimeValue.ONE_MINUTE,
180+
new TaskId(randomIdentifier(), randomNonNegativeLong()),
181+
EnumSet.of(Metric.ALLOCATIONS)
182+
);
183+
184+
final var future = new PlainActionFuture<TransportGetAllocationStatsAction.Response>();
185+
action.masterOperation(mock(Task.class), request, ClusterState.EMPTY_STATE, future);
186+
final var response = future.get();
187+
assertSame("Expected the cached allocation stats to be returned", response.getNodeAllocationStats(), allocationStats.get());
188+
189+
l.onResponse(null);
190+
};
191+
192+
// Start with a high cache max age, all threads should get the same value.
193+
action.setCacheMaxAge(TimeValue.timeValueMinutes(5));
194+
ESTestCase.startInParallel(between(1, 5), threadNumber -> safeAwait(threadTask));
195+
verify(allocationStatsService, times(1)).stats();
196+
197+
// Force the cached stats to expire.
198+
action.setCacheMaxAge(TimeValue.timeValueMillis(1));
199+
Thread.sleep(2L);
200+
201+
// Expect a call to the stats service.
202+
allocationStats.set(Map.of(randomIdentifier(), NodeAllocationStatsTests.randomNodeAllocationStats()));
203+
when(allocationStatsService.stats()).thenReturn(allocationStats.get());
204+
threadTask.accept(ActionListener.noop());
205+
verify(allocationStatsService, times(2)).stats();
206+
207+
// Set a large max age for the cache again so all the threads in the next run can get the cached value.
208+
action.setCacheMaxAge(TimeValue.timeValueMinutes(5));
209+
ESTestCase.startInParallel(between(1, 5), threadNumber -> safeAwait(threadTask));
210+
verify(allocationStatsService, times(2)).stats();
211+
}
165212
}

0 commit comments

Comments
 (0)