Skip to content
Merged
6 changes: 6 additions & 0 deletions docs/changelog/124898.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 124898
summary: Add cache support in `TransportGetAllocationStatsAction`
area: Allocation
type: enhancement
issues:
- 110716
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ There are a number of settings available to control the shard allocation process
* [Disk-based shard allocation settings](#disk-based-shard-allocation) explains how Elasticsearch takes available disk space into account, and the related settings.
* [Shard allocation awareness](docs-content://deploy-manage/distributed-architecture/shard-allocation-relocation-recovery/shard-allocation-awareness.md) and [Forced awareness](docs-content://deploy-manage/distributed-architecture/shard-allocation-relocation-recovery/shard-allocation-awareness.md#forced-awareness) control how shards can be distributed across different racks or availability zones.
* [Cluster-level shard allocation filtering](#cluster-shard-allocation-filtering) allows certain nodes or groups of nodes excluded from allocation so that they can be decommissioned.
* [Cluster-level node allocation stats cache settings](#node-allocation-stats-cache) control the node allocation statistics cache on the master node.

Besides these, there are a few other [miscellaneous cluster-level settings](/reference/elasticsearch/configuration-reference/miscellaneous-cluster-settings.md).

Expand Down Expand Up @@ -233,7 +234,7 @@ You can use [custom node attributes](/reference/elasticsearch/configuration-refe
: ([Dynamic](docs-content://deploy-manage/deploy/self-managed/configure-elasticsearch.md#dynamic-cluster-setting)) The shard allocation awareness values that must exist for shards to be reallocated in case of location failure. Learn more about [forced awareness](docs-content://deploy-manage/distributed-architecture/shard-allocation-relocation-recovery/shard-allocation-awareness.md#forced-awareness).


## Cluster-level shard allocation filterin [cluster-shard-allocation-filtering]
## Cluster-level shard allocation filtering [cluster-shard-allocation-filtering]

You can use cluster-level shard allocation filters to control where {{es}} allocates shards from any index. These cluster wide filters are applied in conjunction with [per-index allocation filtering](/reference/elasticsearch/index-settings/shard-allocation.md) and [allocation awareness](docs-content://deploy-manage/distributed-architecture/shard-allocation-relocation-recovery/shard-allocation-awareness.md).

Expand Down Expand Up @@ -303,4 +304,7 @@ PUT _cluster/settings
```


## Node Allocation Stats Cache [node-allocation-stats-cache]

`cluster.routing.allocation.stats.cache.ttl`
: ([Dynamic](docs-content://deploy-manage/deploy/self-managed/configure-elasticsearch.md#dynamic-cluster-setting)) Calculating the node allocation stats for a [Get node statistics API call](https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-nodes-stats) can become expensive on the master for clusters with a high number of nodes. To prevent overloading the master the node allocation stats are cached on the master for 1 minute `1m` by default. This setting can be used to adjust the cache time to live value, if necessary, keeping in mind the tradeoff between the freshness of the statistics and the processing costs on the master. The cache can be disabled (not recommended) by setting the value to `0s` (the minimum value). The maximum value is 10 minutes `10m`.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
Expand All @@ -42,13 +43,25 @@
import java.io.IOException;
import java.util.EnumSet;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;

public class TransportGetAllocationStatsAction extends TransportMasterNodeReadAction<
TransportGetAllocationStatsAction.Request,
TransportGetAllocationStatsAction.Response> {

public static final ActionType<TransportGetAllocationStatsAction.Response> TYPE = new ActionType<>("cluster:monitor/allocation/stats");

public static final TimeValue DEFAULT_CACHE_TTL = TimeValue.timeValueMinutes(1);
public static final Setting<TimeValue> CACHE_TTL_SETTING = Setting.timeSetting(
"cluster.routing.allocation.stats.cache.ttl",
Comment on lines +55 to +56
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What visibility/exposure should this setting have? I saw other cluster.routing.allocation.* settings in docs in cluster-level-shard-allocation-routing-settings.md. Should it be documented there?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we need to add documentation, either in this PR or in follow up. Both are fine.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++ would prefer to add the docs in the same PR, otherwise it tends to get forgotten

DEFAULT_CACHE_TTL,
TimeValue.ZERO,
TimeValue.timeValueMinutes(10),
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

private final AllocationStatsCache allocationStatsCache;
private final SingleResultDeduplicator<Map<String, NodeAllocationStats>> allocationStatsSupplier;
private final DiskThresholdSettings diskThresholdSettings;

Expand All @@ -73,11 +86,22 @@ public TransportGetAllocationStatsAction(
EsExecutors.DIRECT_EXECUTOR_SERVICE
);
final var managementExecutor = threadPool.executor(ThreadPool.Names.MANAGEMENT);
this.allocationStatsSupplier = new SingleResultDeduplicator<>(
threadPool.getThreadContext(),
l -> managementExecutor.execute(ActionRunnable.supply(l, allocationStatsService::stats))
);
this.allocationStatsCache = new AllocationStatsCache(threadPool, DEFAULT_CACHE_TTL);
this.allocationStatsSupplier = new SingleResultDeduplicator<>(threadPool.getThreadContext(), l -> {
final var cachedStats = allocationStatsCache.get();
if (cachedStats != null) {
l.onResponse(cachedStats);
return;
}

managementExecutor.execute(ActionRunnable.supply(l, () -> {
final var stats = allocationStatsService.stats();
allocationStatsCache.put(stats);
return stats;
}));
});
this.diskThresholdSettings = new DiskThresholdSettings(clusterService.getSettings(), clusterService.getClusterSettings());
clusterService.getClusterSettings().initializeAndWatch(CACHE_TTL_SETTING, this.allocationStatsCache::setTTL);
}

@Override
Expand Down Expand Up @@ -184,4 +208,41 @@ public DiskThresholdSettings getDiskThresholdSettings() {
return diskThresholdSettings;
}
}

private record CachedAllocationStats(Map<String, NodeAllocationStats> stats, long timestampMillis) {}

private static class AllocationStatsCache {
private volatile long ttlMillis;
private final ThreadPool threadPool;
private final AtomicReference<CachedAllocationStats> cachedStats;

AllocationStatsCache(ThreadPool threadPool, TimeValue ttl) {
this.threadPool = threadPool;
this.cachedStats = new AtomicReference<>();
setTTL(ttl);
}

void setTTL(TimeValue ttl) {
ttlMillis = ttl.millis();
if (ttlMillis == 0L) {
cachedStats.set(null);
}
}

Map<String, NodeAllocationStats> get() {
if (ttlMillis == 0L) {
return null;
}

// We don't set the atomic ref to null here upon expiration since we know it is about to be replaced with a fresh instance.
final var stats = cachedStats.get();
return stats == null || threadPool.relativeTimeInMillis() - stats.timestampMillis > ttlMillis ? null : stats.stats;
}

void put(Map<String, NodeAllocationStats> stats) {
if (ttlMillis > 0L) {
cachedStats.set(new CachedAllocationStats(stats, threadPool.relativeTimeInMillis()));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.elasticsearch.common.settings;

import org.apache.logging.log4j.LogManager;
import org.elasticsearch.action.admin.cluster.allocation.TransportGetAllocationStatsAction;
import org.elasticsearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction;
import org.elasticsearch.action.admin.indices.close.TransportCloseIndexAction;
import org.elasticsearch.action.bulk.IncrementalBulkService;
Expand Down Expand Up @@ -633,6 +634,7 @@ public void apply(Settings value, Settings current, Settings previous) {
DataStreamGlobalRetentionSettings.DATA_STREAMS_MAX_RETENTION_SETTING,
ShardsAvailabilityHealthIndicatorService.REPLICA_UNASSIGNED_BUFFER_TIME,
DataStream.isFailureStoreFeatureFlagEnabled() ? DataStreamFailureStoreSettings.DATA_STREAM_FAILURE_STORED_ENABLED_SETTING : null,
IndexingStatsSettings.RECENT_WRITE_LOAD_HALF_LIFE_SETTING
IndexingStatsSettings.RECENT_WRITE_LOAD_HALF_LIFE_SETTING,
TransportGetAllocationStatsAction.CACHE_TTL_SETTING
).filter(Objects::nonNull).collect(toSet());
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,27 @@

package org.elasticsearch.action.admin.cluster.allocation;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequestParameters.Metric;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.allocation.AllocationStatsService;
import org.elasticsearch.cluster.routing.allocation.NodeAllocationStats;
import org.elasticsearch.cluster.routing.allocation.NodeAllocationStatsTests;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.node.Node;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.telemetry.metric.MeterRegistry;
import org.elasticsearch.test.ClusterServiceUtils;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.CapturingTransport;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.DefaultBuiltInExecutorBuilders;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.junit.After;
Expand All @@ -35,6 +42,7 @@
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import static org.hamcrest.Matchers.anEmptyMap;
import static org.hamcrest.Matchers.containsString;
Expand All @@ -47,7 +55,9 @@

public class TransportGetAllocationStatsActionTests extends ESTestCase {

private ThreadPool threadPool;
private long startTimeMillis;
private TimeValue allocationStatsCacheTTL;
private ControlledRelativeTimeThreadPool threadPool;
private ClusterService clusterService;
private TransportService transportService;
private AllocationStatsService allocationStatsService;
Expand All @@ -58,8 +68,16 @@ public class TransportGetAllocationStatsActionTests extends ESTestCase {
@Before
public void setUp() throws Exception {
super.setUp();
threadPool = new TestThreadPool(TransportClusterAllocationExplainActionTests.class.getName());
clusterService = ClusterServiceUtils.createClusterService(threadPool);
startTimeMillis = 0L;
allocationStatsCacheTTL = TimeValue.timeValueMinutes(1);
threadPool = new ControlledRelativeTimeThreadPool(TransportClusterAllocationExplainActionTests.class.getName(), startTimeMillis);
clusterService = ClusterServiceUtils.createClusterService(
threadPool,
new ClusterSettings(
Settings.builder().put(TransportGetAllocationStatsAction.CACHE_TTL_SETTING.getKey(), allocationStatsCacheTTL).build(),
ClusterSettings.BUILT_IN_CLUSTER_SETTINGS
)
);
transportService = new CapturingTransport().createTransportService(
clusterService.getSettings(),
threadPool,
Expand Down Expand Up @@ -87,7 +105,17 @@ public void tearDown() throws Exception {
transportService.close();
}

private void disableAllocationStatsCache() {
setAllocationStatsCacheTTL(TimeValue.ZERO);
}

private void setAllocationStatsCacheTTL(TimeValue ttl) {
clusterService.getClusterSettings()
.applySettings(Settings.builder().put(TransportGetAllocationStatsAction.CACHE_TTL_SETTING.getKey(), ttl).build());
};

public void testReturnsOnlyRequestedStats() throws Exception {
disableAllocationStatsCache();
int expectedNumberOfStatsServiceCalls = 0;

for (final var metrics : List.of(
Expand Down Expand Up @@ -129,6 +157,7 @@ public void testReturnsOnlyRequestedStats() throws Exception {
}

public void testDeduplicatesStatsComputations() throws InterruptedException {
disableAllocationStatsCache();
final var requestCounter = new AtomicInteger();
final var isExecuting = new AtomicBoolean();
when(allocationStatsService.stats()).thenAnswer(invocation -> {
Expand Down Expand Up @@ -173,4 +202,84 @@ public void testDeduplicatesStatsComputations() throws InterruptedException {
thread.join();
}
}

public void testGetStatsWithCachingEnabled() throws Exception {

final AtomicReference<Map<String, NodeAllocationStats>> allocationStats = new AtomicReference<>();
int numExpectedAllocationStatsServiceCalls = 0;

final Runnable resetExpectedAllocationStats = () -> {
final var stats = Map.of(randomIdentifier(), NodeAllocationStatsTests.randomNodeAllocationStats());
allocationStats.set(stats);
when(allocationStatsService.stats()).thenReturn(stats);
};

final CheckedConsumer<ActionListener<Void>, Exception> threadTask = l -> {
final var request = new TransportGetAllocationStatsAction.Request(
TEST_REQUEST_TIMEOUT,
new TaskId(randomIdentifier(), randomNonNegativeLong()),
EnumSet.of(Metric.ALLOCATIONS)
);

action.masterOperation(mock(Task.class), request, ClusterState.EMPTY_STATE, l.map(response -> {
assertSame("Expected the cached allocation stats to be returned", response.getNodeAllocationStats(), allocationStats.get());
return null;
}));
};

// Initial cache miss, all threads should get the same value.
resetExpectedAllocationStats.run();
ESTestCase.startInParallel(between(1, 5), threadNumber -> safeAwait(threadTask));
verify(allocationStatsService, times(++numExpectedAllocationStatsServiceCalls)).stats();

// Advance the clock to a time less than or equal to the TTL and verify we still get the cached stats.
threadPool.setCurrentTimeInMillis(startTimeMillis + between(0, (int) allocationStatsCacheTTL.millis()));
ESTestCase.startInParallel(between(1, 5), threadNumber -> safeAwait(threadTask));
verify(allocationStatsService, times(numExpectedAllocationStatsServiceCalls)).stats();

// Force the cached stats to expire.
threadPool.setCurrentTimeInMillis(startTimeMillis + allocationStatsCacheTTL.getMillis() + 1);

// Expect a single call to the stats service on the cache miss.
resetExpectedAllocationStats.run();
ESTestCase.startInParallel(between(1, 5), threadNumber -> safeAwait(threadTask));
verify(allocationStatsService, times(++numExpectedAllocationStatsServiceCalls)).stats();

// Update the TTL setting to disable the cache, we expect a service call each time.
setAllocationStatsCacheTTL(TimeValue.ZERO);
safeAwait(threadTask);
safeAwait(threadTask);
numExpectedAllocationStatsServiceCalls += 2;
verify(allocationStatsService, times(numExpectedAllocationStatsServiceCalls)).stats();

// Re-enable the cache, only one thread should call the stats service.
setAllocationStatsCacheTTL(TimeValue.timeValueMinutes(5));
resetExpectedAllocationStats.run();
ESTestCase.startInParallel(between(1, 5), threadNumber -> safeAwait(threadTask));
verify(allocationStatsService, times(++numExpectedAllocationStatsServiceCalls)).stats();
}

private static class ControlledRelativeTimeThreadPool extends ThreadPool {

private long currentTimeInMillis;

ControlledRelativeTimeThreadPool(String name, long startTimeMillis) {
super(
Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), name).build(),
MeterRegistry.NOOP,
new DefaultBuiltInExecutorBuilders()
);
this.currentTimeInMillis = startTimeMillis;
stopCachedTimeThread();
}

@Override
public long relativeTimeInMillis() {
return currentTimeInMillis;
}

void setCurrentTimeInMillis(long currentTimeInMillis) {
this.currentTimeInMillis = currentTimeInMillis;
}
}
}
Loading