Skip to content

Commit 79c44be

Browse files
Add cache support in TransportGetAllocationStatsAction (elastic#124898)
Adds a new cache and setting TransportGetAllocationStatsAction.CACHE_TTL_SETTING "cluster.routing.allocation.stats.cache.ttl" to configure the max age for cached NodeAllocationStats on the master. The default value is currently 1 minute per the suggestion in issue 110716. Closes elastic#110716
1 parent 48ab734 commit 79c44be

File tree

5 files changed

+197
-9
lines changed

5 files changed

+197
-9
lines changed

docs/changelog/124898.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 124898
2+
summary: Add cache support in `TransportGetAllocationStatsAction`
3+
area: Allocation
4+
type: enhancement
5+
issues:
6+
- 110716

docs/reference/modules/cluster.asciidoc

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ There are a number of settings available to control the shard allocation process
2121
* <<cluster-shard-allocation-filtering>> allows certain nodes or groups of
2222
nodes excluded from allocation so that they can be decommissioned.
2323

24+
* <<node-allocation-stats-cache>> control the node allocation statistics cache on the master node.
25+
2426
Besides these, there are a few other <<misc-cluster-settings,miscellaneous cluster-level settings>>.
2527

2628
include::cluster/shards_allocation.asciidoc[]
@@ -47,4 +49,11 @@ minimize the risk of losing all shard copies in the event of a failure. <<shard-
4749

4850
include::cluster/allocation_filtering.asciidoc[]
4951

52+
[[node-allocation-stats-cache]]
53+
==== Cluster-level node allocation stats cache settings
54+
55+
`cluster.routing.allocation.stats.cache.ttl`::
56+
(<<dynamic-cluster-setting,Dynamic>>)
57+
Calculating the node allocation stats for a <<cluster-nodes-stats,Get node statistics API call>> can become expensive on the master for clusters with a high number of nodes. To prevent overloading the master the node allocation stats are cached on the master for 1 minute `1m` by default. This setting can be used to adjust the cache time to live value, if necessary, keeping in mind the tradeoff between the freshness of the statistics and the processing costs on the master. The cache can be disabled (not recommended) by setting the value to `0s` (the minimum value). The maximum value is 10 minutes `10m`.
58+
5059
include::cluster/misc.asciidoc[]

server/src/main/java/org/elasticsearch/action/admin/cluster/allocation/TransportGetAllocationStatsAction.java

Lines changed: 65 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.elasticsearch.cluster.service.ClusterService;
3131
import org.elasticsearch.common.io.stream.StreamInput;
3232
import org.elasticsearch.common.io.stream.StreamOutput;
33+
import org.elasticsearch.common.settings.Setting;
3334
import org.elasticsearch.common.util.concurrent.EsExecutors;
3435
import org.elasticsearch.core.Nullable;
3536
import org.elasticsearch.core.TimeValue;
@@ -43,13 +44,25 @@
4344
import java.io.IOException;
4445
import java.util.EnumSet;
4546
import java.util.Map;
47+
import java.util.concurrent.atomic.AtomicReference;
4648

4749
public class TransportGetAllocationStatsAction extends TransportMasterNodeReadAction<
4850
TransportGetAllocationStatsAction.Request,
4951
TransportGetAllocationStatsAction.Response> {
5052

5153
public static final ActionType<TransportGetAllocationStatsAction.Response> TYPE = new ActionType<>("cluster:monitor/allocation/stats");
5254

55+
public static final TimeValue DEFAULT_CACHE_TTL = TimeValue.timeValueMinutes(1);
56+
public static final Setting<TimeValue> CACHE_TTL_SETTING = Setting.timeSetting(
57+
"cluster.routing.allocation.stats.cache.ttl",
58+
DEFAULT_CACHE_TTL,
59+
TimeValue.ZERO,
60+
TimeValue.timeValueMinutes(10),
61+
Setting.Property.NodeScope,
62+
Setting.Property.Dynamic
63+
);
64+
65+
private final AllocationStatsCache allocationStatsCache;
5366
private final SingleResultDeduplicator<Map<String, NodeAllocationStats>> allocationStatsSupplier;
5467
private final DiskThresholdSettings diskThresholdSettings;
5568
private final FeatureService featureService;
@@ -76,12 +89,23 @@ public TransportGetAllocationStatsAction(
7689
EsExecutors.DIRECT_EXECUTOR_SERVICE
7790
);
7891
final var managementExecutor = threadPool.executor(ThreadPool.Names.MANAGEMENT);
79-
this.allocationStatsSupplier = new SingleResultDeduplicator<>(
80-
threadPool.getThreadContext(),
81-
l -> managementExecutor.execute(ActionRunnable.supply(l, allocationStatsService::stats))
82-
);
92+
this.allocationStatsCache = new AllocationStatsCache(threadPool, DEFAULT_CACHE_TTL);
93+
this.allocationStatsSupplier = new SingleResultDeduplicator<>(threadPool.getThreadContext(), l -> {
94+
final var cachedStats = allocationStatsCache.get();
95+
if (cachedStats != null) {
96+
l.onResponse(cachedStats);
97+
return;
98+
}
99+
100+
managementExecutor.execute(ActionRunnable.supply(l, () -> {
101+
final var stats = allocationStatsService.stats();
102+
allocationStatsCache.put(stats);
103+
return stats;
104+
}));
105+
});
83106
this.diskThresholdSettings = new DiskThresholdSettings(clusterService.getSettings(), clusterService.getClusterSettings());
84107
this.featureService = featureService;
108+
clusterService.getClusterSettings().initializeAndWatch(CACHE_TTL_SETTING, this.allocationStatsCache::setTTL);
85109
}
86110

87111
@Override
@@ -195,4 +219,41 @@ public DiskThresholdSettings getDiskThresholdSettings() {
195219
return diskThresholdSettings;
196220
}
197221
}
222+
223+
private record CachedAllocationStats(Map<String, NodeAllocationStats> stats, long timestampMillis) {}
224+
225+
private static class AllocationStatsCache {
226+
private volatile long ttlMillis;
227+
private final ThreadPool threadPool;
228+
private final AtomicReference<CachedAllocationStats> cachedStats;
229+
230+
AllocationStatsCache(ThreadPool threadPool, TimeValue ttl) {
231+
this.threadPool = threadPool;
232+
this.cachedStats = new AtomicReference<>();
233+
setTTL(ttl);
234+
}
235+
236+
void setTTL(TimeValue ttl) {
237+
ttlMillis = ttl.millis();
238+
if (ttlMillis == 0L) {
239+
cachedStats.set(null);
240+
}
241+
}
242+
243+
Map<String, NodeAllocationStats> get() {
244+
if (ttlMillis == 0L) {
245+
return null;
246+
}
247+
248+
// We don't set the atomic ref to null here upon expiration since we know it is about to be replaced with a fresh instance.
249+
final var stats = cachedStats.get();
250+
return stats == null || threadPool.relativeTimeInMillis() - stats.timestampMillis > ttlMillis ? null : stats.stats;
251+
}
252+
253+
void put(Map<String, NodeAllocationStats> stats) {
254+
if (ttlMillis > 0L) {
255+
cachedStats.set(new CachedAllocationStats(stats, threadPool.relativeTimeInMillis()));
256+
}
257+
}
258+
}
198259
}

server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
package org.elasticsearch.common.settings;
1010

1111
import org.apache.logging.log4j.LogManager;
12+
import org.elasticsearch.action.admin.cluster.allocation.TransportGetAllocationStatsAction;
1213
import org.elasticsearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction;
1314
import org.elasticsearch.action.admin.indices.close.TransportCloseIndexAction;
1415
import org.elasticsearch.action.bulk.IncrementalBulkService;
@@ -623,6 +624,7 @@ public void apply(Settings value, Settings current, Settings previous) {
623624
DataStreamGlobalRetentionSettings.DATA_STREAMS_DEFAULT_RETENTION_SETTING,
624625
DataStreamGlobalRetentionSettings.DATA_STREAMS_MAX_RETENTION_SETTING,
625626
ShardsAvailabilityHealthIndicatorService.REPLICA_UNASSIGNED_BUFFER_TIME,
626-
DataStream.isFailureStoreFeatureFlagEnabled() ? DataStreamFailureStoreSettings.DATA_STREAM_FAILURE_STORED_ENABLED_SETTING : null
627+
DataStream.isFailureStoreFeatureFlagEnabled() ? DataStreamFailureStoreSettings.DATA_STREAM_FAILURE_STORED_ENABLED_SETTING : null,
628+
TransportGetAllocationStatsAction.CACHE_TTL_SETTING
627629
).filter(Objects::nonNull).collect(toSet());
628630
}

server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/TransportGetAllocationStatsActionTests.java

Lines changed: 114 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,21 +9,28 @@
99

1010
package org.elasticsearch.action.admin.cluster.allocation;
1111

12+
import org.elasticsearch.action.ActionListener;
1213
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequestParameters.Metric;
1314
import org.elasticsearch.action.support.ActionFilters;
1415
import org.elasticsearch.action.support.PlainActionFuture;
1516
import org.elasticsearch.cluster.ClusterState;
1617
import org.elasticsearch.cluster.routing.allocation.AllocationStatsService;
18+
import org.elasticsearch.cluster.routing.allocation.NodeAllocationStats;
1719
import org.elasticsearch.cluster.routing.allocation.NodeAllocationStatsTests;
1820
import org.elasticsearch.cluster.service.ClusterService;
21+
import org.elasticsearch.common.settings.ClusterSettings;
22+
import org.elasticsearch.common.settings.Settings;
23+
import org.elasticsearch.core.CheckedConsumer;
1924
import org.elasticsearch.core.TimeValue;
2025
import org.elasticsearch.features.FeatureService;
26+
import org.elasticsearch.node.Node;
2127
import org.elasticsearch.tasks.Task;
2228
import org.elasticsearch.tasks.TaskId;
29+
import org.elasticsearch.telemetry.metric.MeterRegistry;
2330
import org.elasticsearch.test.ClusterServiceUtils;
2431
import org.elasticsearch.test.ESTestCase;
2532
import org.elasticsearch.test.transport.CapturingTransport;
26-
import org.elasticsearch.threadpool.TestThreadPool;
33+
import org.elasticsearch.threadpool.DefaultBuiltInExecutorBuilders;
2734
import org.elasticsearch.threadpool.ThreadPool;
2835
import org.elasticsearch.transport.TransportService;
2936
import org.junit.After;
@@ -35,6 +42,7 @@
3542
import java.util.concurrent.CyclicBarrier;
3643
import java.util.concurrent.atomic.AtomicBoolean;
3744
import java.util.concurrent.atomic.AtomicInteger;
45+
import java.util.concurrent.atomic.AtomicReference;
3846

3947
import static org.hamcrest.Matchers.anEmptyMap;
4048
import static org.hamcrest.Matchers.containsString;
@@ -44,12 +52,15 @@
4452
import static org.mockito.ArgumentMatchers.eq;
4553
import static org.mockito.Mockito.mock;
4654
import static org.mockito.Mockito.never;
55+
import static org.mockito.Mockito.times;
4756
import static org.mockito.Mockito.verify;
4857
import static org.mockito.Mockito.when;
4958

5059
public class TransportGetAllocationStatsActionTests extends ESTestCase {
5160

52-
private ThreadPool threadPool;
61+
private long startTimeMillis;
62+
private TimeValue allocationStatsCacheTTL;
63+
private ControlledRelativeTimeThreadPool threadPool;
5364
private ClusterService clusterService;
5465
private TransportService transportService;
5566
private AllocationStatsService allocationStatsService;
@@ -61,8 +72,16 @@ public class TransportGetAllocationStatsActionTests extends ESTestCase {
6172
@Before
6273
public void setUp() throws Exception {
6374
super.setUp();
64-
threadPool = new TestThreadPool(TransportClusterAllocationExplainActionTests.class.getName());
65-
clusterService = ClusterServiceUtils.createClusterService(threadPool);
75+
startTimeMillis = 0L;
76+
allocationStatsCacheTTL = TimeValue.timeValueMinutes(1);
77+
threadPool = new ControlledRelativeTimeThreadPool(TransportClusterAllocationExplainActionTests.class.getName(), startTimeMillis);
78+
clusterService = ClusterServiceUtils.createClusterService(
79+
threadPool,
80+
new ClusterSettings(
81+
Settings.builder().put(TransportGetAllocationStatsAction.CACHE_TTL_SETTING.getKey(), allocationStatsCacheTTL).build(),
82+
ClusterSettings.BUILT_IN_CLUSTER_SETTINGS
83+
)
84+
);
6685
transportService = new CapturingTransport().createTransportService(
6786
clusterService.getSettings(),
6887
threadPool,
@@ -92,7 +111,17 @@ public void tearDown() throws Exception {
92111
transportService.close();
93112
}
94113

114+
private void disableAllocationStatsCache() {
115+
setAllocationStatsCacheTTL(TimeValue.ZERO);
116+
}
117+
118+
private void setAllocationStatsCacheTTL(TimeValue ttl) {
119+
clusterService.getClusterSettings()
120+
.applySettings(Settings.builder().put(TransportGetAllocationStatsAction.CACHE_TTL_SETTING.getKey(), ttl).build());
121+
};
122+
95123
public void testReturnsOnlyRequestedStats() throws Exception {
124+
disableAllocationStatsCache();
96125

97126
var metrics = EnumSet.copyOf(randomSubsetOf(Metric.values().length, Metric.values()));
98127

@@ -126,6 +155,7 @@ public void testReturnsOnlyRequestedStats() throws Exception {
126155
}
127156

128157
public void testDeduplicatesStatsComputations() throws InterruptedException {
158+
disableAllocationStatsCache();
129159
final var requestCounter = new AtomicInteger();
130160
final var isExecuting = new AtomicBoolean();
131161
when(allocationStatsService.stats()).thenAnswer(invocation -> {
@@ -170,4 +200,84 @@ public void testDeduplicatesStatsComputations() throws InterruptedException {
170200
thread.join();
171201
}
172202
}
203+
204+
public void testGetStatsWithCachingEnabled() throws Exception {
205+
206+
final AtomicReference<Map<String, NodeAllocationStats>> allocationStats = new AtomicReference<>();
207+
int numExpectedAllocationStatsServiceCalls = 0;
208+
209+
final Runnable resetExpectedAllocationStats = () -> {
210+
final var stats = Map.of(randomIdentifier(), NodeAllocationStatsTests.randomNodeAllocationStats());
211+
allocationStats.set(stats);
212+
when(allocationStatsService.stats()).thenReturn(stats);
213+
};
214+
215+
final CheckedConsumer<ActionListener<Void>, Exception> threadTask = l -> {
216+
final var request = new TransportGetAllocationStatsAction.Request(
217+
TEST_REQUEST_TIMEOUT,
218+
new TaskId(randomIdentifier(), randomNonNegativeLong()),
219+
EnumSet.of(Metric.ALLOCATIONS)
220+
);
221+
222+
action.masterOperation(mock(Task.class), request, ClusterState.EMPTY_STATE, l.map(response -> {
223+
assertSame("Expected the cached allocation stats to be returned", response.getNodeAllocationStats(), allocationStats.get());
224+
return null;
225+
}));
226+
};
227+
228+
// Initial cache miss, all threads should get the same value.
229+
resetExpectedAllocationStats.run();
230+
ESTestCase.startInParallel(between(1, 5), threadNumber -> safeAwait(threadTask));
231+
verify(allocationStatsService, times(++numExpectedAllocationStatsServiceCalls)).stats();
232+
233+
// Advance the clock to a time less than or equal to the TTL and verify we still get the cached stats.
234+
threadPool.setCurrentTimeInMillis(startTimeMillis + between(0, (int) allocationStatsCacheTTL.millis()));
235+
ESTestCase.startInParallel(between(1, 5), threadNumber -> safeAwait(threadTask));
236+
verify(allocationStatsService, times(numExpectedAllocationStatsServiceCalls)).stats();
237+
238+
// Force the cached stats to expire.
239+
threadPool.setCurrentTimeInMillis(startTimeMillis + allocationStatsCacheTTL.getMillis() + 1);
240+
241+
// Expect a single call to the stats service on the cache miss.
242+
resetExpectedAllocationStats.run();
243+
ESTestCase.startInParallel(between(1, 5), threadNumber -> safeAwait(threadTask));
244+
verify(allocationStatsService, times(++numExpectedAllocationStatsServiceCalls)).stats();
245+
246+
// Update the TTL setting to disable the cache, we expect a service call each time.
247+
setAllocationStatsCacheTTL(TimeValue.ZERO);
248+
safeAwait(threadTask);
249+
safeAwait(threadTask);
250+
numExpectedAllocationStatsServiceCalls += 2;
251+
verify(allocationStatsService, times(numExpectedAllocationStatsServiceCalls)).stats();
252+
253+
// Re-enable the cache, only one thread should call the stats service.
254+
setAllocationStatsCacheTTL(TimeValue.timeValueMinutes(5));
255+
resetExpectedAllocationStats.run();
256+
ESTestCase.startInParallel(between(1, 5), threadNumber -> safeAwait(threadTask));
257+
verify(allocationStatsService, times(++numExpectedAllocationStatsServiceCalls)).stats();
258+
}
259+
260+
private static class ControlledRelativeTimeThreadPool extends ThreadPool {
261+
262+
private long currentTimeInMillis;
263+
264+
ControlledRelativeTimeThreadPool(String name, long startTimeMillis) {
265+
super(
266+
Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), name).build(),
267+
MeterRegistry.NOOP,
268+
new DefaultBuiltInExecutorBuilders()
269+
);
270+
this.currentTimeInMillis = startTimeMillis;
271+
stopCachedTimeThread();
272+
}
273+
274+
@Override
275+
public long relativeTimeInMillis() {
276+
return currentTimeInMillis;
277+
}
278+
279+
void setCurrentTimeInMillis(long currentTimeInMillis) {
280+
this.currentTimeInMillis = currentTimeInMillis;
281+
}
282+
}
173283
}

0 commit comments

Comments
 (0)