Skip to content

Commit 9f22685

Browse files
JeremyDahlgrenomricohenn
authored andcommitted
Add cache support in TransportGetAllocationStatsAction (elastic#124898)
Adds a new cache and setting TransportGetAllocationStatsAction.CACHE_TTL_SETTING "cluster.routing.allocation.stats.cache.ttl" to configure the max age for cached NodeAllocationStats on the master. The default value is currently 1 minute per the suggestion in issue 110716. Closes elastic#110716
1 parent 9140dd1 commit 9f22685

File tree

5 files changed

+192
-10
lines changed

5 files changed

+192
-10
lines changed

docs/changelog/124898.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 124898
2+
summary: Add cache support in `TransportGetAllocationStatsAction`
3+
area: Allocation
4+
type: enhancement
5+
issues:
6+
- 110716

docs/reference/elasticsearch/configuration-reference/cluster-level-shard-allocation-routing-settings.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ There are a number of settings available to control the shard allocation process
1919
* [Disk-based shard allocation settings](#disk-based-shard-allocation) explains how Elasticsearch takes available disk space into account, and the related settings.
2020
* [Shard allocation awareness](docs-content://deploy-manage/distributed-architecture/shard-allocation-relocation-recovery/shard-allocation-awareness.md) and [Forced awareness](docs-content://deploy-manage/distributed-architecture/shard-allocation-relocation-recovery/shard-allocation-awareness.md#forced-awareness) control how shards can be distributed across different racks or availability zones.
2121
* [Cluster-level shard allocation filtering](#cluster-shard-allocation-filtering) allows certain nodes or groups of nodes excluded from allocation so that they can be decommissioned.
22+
* [Cluster-level node allocation stats cache settings](#node-allocation-stats-cache) control the node allocation statistics cache on the master node.
2223

2324
Besides these, there are a few other [miscellaneous cluster-level settings](/reference/elasticsearch/configuration-reference/miscellaneous-cluster-settings.md).
2425

@@ -233,7 +234,7 @@ You can use [custom node attributes](/reference/elasticsearch/configuration-refe
233234
: ([Dynamic](docs-content://deploy-manage/deploy/self-managed/configure-elasticsearch.md#dynamic-cluster-setting)) The shard allocation awareness values that must exist for shards to be reallocated in case of location failure. Learn more about [forced awareness](docs-content://deploy-manage/distributed-architecture/shard-allocation-relocation-recovery/shard-allocation-awareness.md#forced-awareness).
234235

235236

236-
## Cluster-level shard allocation filterin [cluster-shard-allocation-filtering]
237+
## Cluster-level shard allocation filtering [cluster-shard-allocation-filtering]
237238

238239
You can use cluster-level shard allocation filters to control where {{es}} allocates shards from any index. These cluster wide filters are applied in conjunction with [per-index allocation filtering](/reference/elasticsearch/index-settings/shard-allocation.md) and [allocation awareness](docs-content://deploy-manage/distributed-architecture/shard-allocation-relocation-recovery/shard-allocation-awareness.md).
239240

@@ -303,4 +304,7 @@ PUT _cluster/settings
303304
```
304305

305306

307+
## Node Allocation Stats Cache [node-allocation-stats-cache]
306308

309+
`cluster.routing.allocation.stats.cache.ttl`
310+
: ([Dynamic](docs-content://deploy-manage/deploy/self-managed/configure-elasticsearch.md#dynamic-cluster-setting)) Calculating the node allocation stats for a [Get node statistics API call](https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-nodes-stats) can become expensive on the master for clusters with a high number of nodes. To prevent overloading the master the node allocation stats are cached on the master for 1 minute `1m` by default. This setting can be used to adjust the cache time to live value, if necessary, keeping in mind the tradeoff between the freshness of the statistics and the processing costs on the master. The cache can be disabled (not recommended) by setting the value to `0s` (the minimum value). The maximum value is 10 minutes `10m`.

server/src/main/java/org/elasticsearch/action/admin/cluster/allocation/TransportGetAllocationStatsAction.java

Lines changed: 65 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.elasticsearch.cluster.service.ClusterService;
3131
import org.elasticsearch.common.io.stream.StreamInput;
3232
import org.elasticsearch.common.io.stream.StreamOutput;
33+
import org.elasticsearch.common.settings.Setting;
3334
import org.elasticsearch.common.util.concurrent.EsExecutors;
3435
import org.elasticsearch.core.Nullable;
3536
import org.elasticsearch.core.TimeValue;
@@ -42,13 +43,25 @@
4243
import java.io.IOException;
4344
import java.util.EnumSet;
4445
import java.util.Map;
46+
import java.util.concurrent.atomic.AtomicReference;
4547

4648
public class TransportGetAllocationStatsAction extends TransportMasterNodeReadAction<
4749
TransportGetAllocationStatsAction.Request,
4850
TransportGetAllocationStatsAction.Response> {
4951

5052
public static final ActionType<TransportGetAllocationStatsAction.Response> TYPE = new ActionType<>("cluster:monitor/allocation/stats");
5153

54+
public static final TimeValue DEFAULT_CACHE_TTL = TimeValue.timeValueMinutes(1);
55+
public static final Setting<TimeValue> CACHE_TTL_SETTING = Setting.timeSetting(
56+
"cluster.routing.allocation.stats.cache.ttl",
57+
DEFAULT_CACHE_TTL,
58+
TimeValue.ZERO,
59+
TimeValue.timeValueMinutes(10),
60+
Setting.Property.NodeScope,
61+
Setting.Property.Dynamic
62+
);
63+
64+
private final AllocationStatsCache allocationStatsCache;
5265
private final SingleResultDeduplicator<Map<String, NodeAllocationStats>> allocationStatsSupplier;
5366
private final DiskThresholdSettings diskThresholdSettings;
5467

@@ -73,11 +86,22 @@ public TransportGetAllocationStatsAction(
7386
EsExecutors.DIRECT_EXECUTOR_SERVICE
7487
);
7588
final var managementExecutor = threadPool.executor(ThreadPool.Names.MANAGEMENT);
76-
this.allocationStatsSupplier = new SingleResultDeduplicator<>(
77-
threadPool.getThreadContext(),
78-
l -> managementExecutor.execute(ActionRunnable.supply(l, allocationStatsService::stats))
79-
);
89+
this.allocationStatsCache = new AllocationStatsCache(threadPool, DEFAULT_CACHE_TTL);
90+
this.allocationStatsSupplier = new SingleResultDeduplicator<>(threadPool.getThreadContext(), l -> {
91+
final var cachedStats = allocationStatsCache.get();
92+
if (cachedStats != null) {
93+
l.onResponse(cachedStats);
94+
return;
95+
}
96+
97+
managementExecutor.execute(ActionRunnable.supply(l, () -> {
98+
final var stats = allocationStatsService.stats();
99+
allocationStatsCache.put(stats);
100+
return stats;
101+
}));
102+
});
80103
this.diskThresholdSettings = new DiskThresholdSettings(clusterService.getSettings(), clusterService.getClusterSettings());
104+
clusterService.getClusterSettings().initializeAndWatch(CACHE_TTL_SETTING, this.allocationStatsCache::setTTL);
81105
}
82106

83107
@Override
@@ -184,4 +208,41 @@ public DiskThresholdSettings getDiskThresholdSettings() {
184208
return diskThresholdSettings;
185209
}
186210
}
211+
212+
private record CachedAllocationStats(Map<String, NodeAllocationStats> stats, long timestampMillis) {}
213+
214+
private static class AllocationStatsCache {
215+
private volatile long ttlMillis;
216+
private final ThreadPool threadPool;
217+
private final AtomicReference<CachedAllocationStats> cachedStats;
218+
219+
AllocationStatsCache(ThreadPool threadPool, TimeValue ttl) {
220+
this.threadPool = threadPool;
221+
this.cachedStats = new AtomicReference<>();
222+
setTTL(ttl);
223+
}
224+
225+
void setTTL(TimeValue ttl) {
226+
ttlMillis = ttl.millis();
227+
if (ttlMillis == 0L) {
228+
cachedStats.set(null);
229+
}
230+
}
231+
232+
Map<String, NodeAllocationStats> get() {
233+
if (ttlMillis == 0L) {
234+
return null;
235+
}
236+
237+
// We don't set the atomic ref to null here upon expiration since we know it is about to be replaced with a fresh instance.
238+
final var stats = cachedStats.get();
239+
return stats == null || threadPool.relativeTimeInMillis() - stats.timestampMillis > ttlMillis ? null : stats.stats;
240+
}
241+
242+
void put(Map<String, NodeAllocationStats> stats) {
243+
if (ttlMillis > 0L) {
244+
cachedStats.set(new CachedAllocationStats(stats, threadPool.relativeTimeInMillis()));
245+
}
246+
}
247+
}
187248
}

server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
package org.elasticsearch.common.settings;
1010

1111
import org.apache.logging.log4j.LogManager;
12+
import org.elasticsearch.action.admin.cluster.allocation.TransportGetAllocationStatsAction;
1213
import org.elasticsearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction;
1314
import org.elasticsearch.action.admin.indices.close.TransportCloseIndexAction;
1415
import org.elasticsearch.action.bulk.IncrementalBulkService;
@@ -633,6 +634,7 @@ public void apply(Settings value, Settings current, Settings previous) {
633634
DataStreamGlobalRetentionSettings.DATA_STREAMS_MAX_RETENTION_SETTING,
634635
ShardsAvailabilityHealthIndicatorService.REPLICA_UNASSIGNED_BUFFER_TIME,
635636
DataStream.isFailureStoreFeatureFlagEnabled() ? DataStreamFailureStoreSettings.DATA_STREAM_FAILURE_STORED_ENABLED_SETTING : null,
636-
IndexingStatsSettings.RECENT_WRITE_LOAD_HALF_LIFE_SETTING
637+
IndexingStatsSettings.RECENT_WRITE_LOAD_HALF_LIFE_SETTING,
638+
TransportGetAllocationStatsAction.CACHE_TTL_SETTING
637639
).filter(Objects::nonNull).collect(toSet());
638640
}

server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/TransportGetAllocationStatsActionTests.java

Lines changed: 113 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,20 +9,27 @@
99

1010
package org.elasticsearch.action.admin.cluster.allocation;
1111

12+
import org.elasticsearch.action.ActionListener;
1213
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequestParameters.Metric;
1314
import org.elasticsearch.action.support.ActionFilters;
1415
import org.elasticsearch.action.support.PlainActionFuture;
1516
import org.elasticsearch.cluster.ClusterState;
1617
import org.elasticsearch.cluster.routing.allocation.AllocationStatsService;
18+
import org.elasticsearch.cluster.routing.allocation.NodeAllocationStats;
1719
import org.elasticsearch.cluster.routing.allocation.NodeAllocationStatsTests;
1820
import org.elasticsearch.cluster.service.ClusterService;
21+
import org.elasticsearch.common.settings.ClusterSettings;
22+
import org.elasticsearch.common.settings.Settings;
23+
import org.elasticsearch.core.CheckedConsumer;
1924
import org.elasticsearch.core.TimeValue;
25+
import org.elasticsearch.node.Node;
2026
import org.elasticsearch.tasks.Task;
2127
import org.elasticsearch.tasks.TaskId;
28+
import org.elasticsearch.telemetry.metric.MeterRegistry;
2229
import org.elasticsearch.test.ClusterServiceUtils;
2330
import org.elasticsearch.test.ESTestCase;
2431
import org.elasticsearch.test.transport.CapturingTransport;
25-
import org.elasticsearch.threadpool.TestThreadPool;
32+
import org.elasticsearch.threadpool.DefaultBuiltInExecutorBuilders;
2633
import org.elasticsearch.threadpool.ThreadPool;
2734
import org.elasticsearch.transport.TransportService;
2835
import org.junit.After;
@@ -35,6 +42,7 @@
3542
import java.util.concurrent.CyclicBarrier;
3643
import java.util.concurrent.atomic.AtomicBoolean;
3744
import java.util.concurrent.atomic.AtomicInteger;
45+
import java.util.concurrent.atomic.AtomicReference;
3846

3947
import static org.hamcrest.Matchers.anEmptyMap;
4048
import static org.hamcrest.Matchers.containsString;
@@ -47,7 +55,9 @@
4755

4856
public class TransportGetAllocationStatsActionTests extends ESTestCase {
4957

50-
private ThreadPool threadPool;
58+
private long startTimeMillis;
59+
private TimeValue allocationStatsCacheTTL;
60+
private ControlledRelativeTimeThreadPool threadPool;
5161
private ClusterService clusterService;
5262
private TransportService transportService;
5363
private AllocationStatsService allocationStatsService;
@@ -58,8 +68,16 @@ public class TransportGetAllocationStatsActionTests extends ESTestCase {
5868
@Before
5969
public void setUp() throws Exception {
6070
super.setUp();
61-
threadPool = new TestThreadPool(TransportClusterAllocationExplainActionTests.class.getName());
62-
clusterService = ClusterServiceUtils.createClusterService(threadPool);
71+
startTimeMillis = 0L;
72+
allocationStatsCacheTTL = TimeValue.timeValueMinutes(1);
73+
threadPool = new ControlledRelativeTimeThreadPool(TransportClusterAllocationExplainActionTests.class.getName(), startTimeMillis);
74+
clusterService = ClusterServiceUtils.createClusterService(
75+
threadPool,
76+
new ClusterSettings(
77+
Settings.builder().put(TransportGetAllocationStatsAction.CACHE_TTL_SETTING.getKey(), allocationStatsCacheTTL).build(),
78+
ClusterSettings.BUILT_IN_CLUSTER_SETTINGS
79+
)
80+
);
6381
transportService = new CapturingTransport().createTransportService(
6482
clusterService.getSettings(),
6583
threadPool,
@@ -87,7 +105,17 @@ public void tearDown() throws Exception {
87105
transportService.close();
88106
}
89107

108+
private void disableAllocationStatsCache() {
109+
setAllocationStatsCacheTTL(TimeValue.ZERO);
110+
}
111+
112+
private void setAllocationStatsCacheTTL(TimeValue ttl) {
113+
clusterService.getClusterSettings()
114+
.applySettings(Settings.builder().put(TransportGetAllocationStatsAction.CACHE_TTL_SETTING.getKey(), ttl).build());
115+
};
116+
90117
public void testReturnsOnlyRequestedStats() throws Exception {
118+
disableAllocationStatsCache();
91119
int expectedNumberOfStatsServiceCalls = 0;
92120

93121
for (final var metrics : List.of(
@@ -129,6 +157,7 @@ public void testReturnsOnlyRequestedStats() throws Exception {
129157
}
130158

131159
public void testDeduplicatesStatsComputations() throws InterruptedException {
160+
disableAllocationStatsCache();
132161
final var requestCounter = new AtomicInteger();
133162
final var isExecuting = new AtomicBoolean();
134163
when(allocationStatsService.stats()).thenAnswer(invocation -> {
@@ -173,4 +202,84 @@ public void testDeduplicatesStatsComputations() throws InterruptedException {
173202
thread.join();
174203
}
175204
}
205+
206+
public void testGetStatsWithCachingEnabled() throws Exception {
207+
208+
final AtomicReference<Map<String, NodeAllocationStats>> allocationStats = new AtomicReference<>();
209+
int numExpectedAllocationStatsServiceCalls = 0;
210+
211+
final Runnable resetExpectedAllocationStats = () -> {
212+
final var stats = Map.of(randomIdentifier(), NodeAllocationStatsTests.randomNodeAllocationStats());
213+
allocationStats.set(stats);
214+
when(allocationStatsService.stats()).thenReturn(stats);
215+
};
216+
217+
final CheckedConsumer<ActionListener<Void>, Exception> threadTask = l -> {
218+
final var request = new TransportGetAllocationStatsAction.Request(
219+
TEST_REQUEST_TIMEOUT,
220+
new TaskId(randomIdentifier(), randomNonNegativeLong()),
221+
EnumSet.of(Metric.ALLOCATIONS)
222+
);
223+
224+
action.masterOperation(mock(Task.class), request, ClusterState.EMPTY_STATE, l.map(response -> {
225+
assertSame("Expected the cached allocation stats to be returned", response.getNodeAllocationStats(), allocationStats.get());
226+
return null;
227+
}));
228+
};
229+
230+
// Initial cache miss, all threads should get the same value.
231+
resetExpectedAllocationStats.run();
232+
ESTestCase.startInParallel(between(1, 5), threadNumber -> safeAwait(threadTask));
233+
verify(allocationStatsService, times(++numExpectedAllocationStatsServiceCalls)).stats();
234+
235+
// Advance the clock to a time less than or equal to the TTL and verify we still get the cached stats.
236+
threadPool.setCurrentTimeInMillis(startTimeMillis + between(0, (int) allocationStatsCacheTTL.millis()));
237+
ESTestCase.startInParallel(between(1, 5), threadNumber -> safeAwait(threadTask));
238+
verify(allocationStatsService, times(numExpectedAllocationStatsServiceCalls)).stats();
239+
240+
// Force the cached stats to expire.
241+
threadPool.setCurrentTimeInMillis(startTimeMillis + allocationStatsCacheTTL.getMillis() + 1);
242+
243+
// Expect a single call to the stats service on the cache miss.
244+
resetExpectedAllocationStats.run();
245+
ESTestCase.startInParallel(between(1, 5), threadNumber -> safeAwait(threadTask));
246+
verify(allocationStatsService, times(++numExpectedAllocationStatsServiceCalls)).stats();
247+
248+
// Update the TTL setting to disable the cache, we expect a service call each time.
249+
setAllocationStatsCacheTTL(TimeValue.ZERO);
250+
safeAwait(threadTask);
251+
safeAwait(threadTask);
252+
numExpectedAllocationStatsServiceCalls += 2;
253+
verify(allocationStatsService, times(numExpectedAllocationStatsServiceCalls)).stats();
254+
255+
// Re-enable the cache, only one thread should call the stats service.
256+
setAllocationStatsCacheTTL(TimeValue.timeValueMinutes(5));
257+
resetExpectedAllocationStats.run();
258+
ESTestCase.startInParallel(between(1, 5), threadNumber -> safeAwait(threadTask));
259+
verify(allocationStatsService, times(++numExpectedAllocationStatsServiceCalls)).stats();
260+
}
261+
262+
private static class ControlledRelativeTimeThreadPool extends ThreadPool {
263+
264+
private long currentTimeInMillis;
265+
266+
ControlledRelativeTimeThreadPool(String name, long startTimeMillis) {
267+
super(
268+
Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), name).build(),
269+
MeterRegistry.NOOP,
270+
new DefaultBuiltInExecutorBuilders()
271+
);
272+
this.currentTimeInMillis = startTimeMillis;
273+
stopCachedTimeThread();
274+
}
275+
276+
@Override
277+
public long relativeTimeInMillis() {
278+
return currentTimeInMillis;
279+
}
280+
281+
void setCurrentTimeInMillis(long currentTimeInMillis) {
282+
this.currentTimeInMillis = currentTimeInMillis;
283+
}
284+
}
176285
}

0 commit comments

Comments
 (0)