Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/124898.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 124898
summary: Add cache support in `TransportGetAllocationStatsAction`
area: Allocation
type: enhancement
issues:
- 110716
6 changes: 6 additions & 0 deletions docs/changelog/125588.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 125588
summary: Add cache support in `TransportGetAllocationStatsAction`
area: Allocation
type: enhancement
issues:
- 110716
9 changes: 9 additions & 0 deletions docs/reference/modules/cluster.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ There are a number of settings available to control the shard allocation process
* <<cluster-shard-allocation-filtering>> allows certain nodes or groups of
nodes excluded from allocation so that they can be decommissioned.

* <<node-allocation-stats-cache>> control the node allocation statistics cache on the master node.

Besides these, there are a few other <<misc-cluster-settings,miscellaneous cluster-level settings>>.

include::cluster/shards_allocation.asciidoc[]
Expand All @@ -47,4 +49,11 @@ minimize the risk of losing all shard copies in the event of a failure. <<shard-

include::cluster/allocation_filtering.asciidoc[]

[[node-allocation-stats-cache]]
==== Cluster-level node allocation stats cache settings

`cluster.routing.allocation.stats.cache.ttl`::
(<<dynamic-cluster-setting,Dynamic>>)
Calculating the node allocation stats for a <<cluster-nodes-stats,Get node statistics API call>> can become expensive on the master for clusters with a high number of nodes. To prevent overloading the master the node allocation stats are cached on the master for 1 minute `1m` by default. This setting can be used to adjust the cache time to live value, if necessary, keeping in mind the tradeoff between the freshness of the statistics and the processing costs on the master. The cache can be disabled (not recommended) by setting the value to `0s` (the minimum value). The maximum value is 10 minutes `10m`.

include::cluster/misc.asciidoc[]
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
Expand All @@ -43,13 +44,25 @@
import java.io.IOException;
import java.util.EnumSet;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;

public class TransportGetAllocationStatsAction extends TransportMasterNodeReadAction<
TransportGetAllocationStatsAction.Request,
TransportGetAllocationStatsAction.Response> {

public static final ActionType<TransportGetAllocationStatsAction.Response> TYPE = new ActionType<>("cluster:monitor/allocation/stats");

public static final TimeValue DEFAULT_CACHE_TTL = TimeValue.timeValueMinutes(1);
public static final Setting<TimeValue> CACHE_TTL_SETTING = Setting.timeSetting(
"cluster.routing.allocation.stats.cache.ttl",
DEFAULT_CACHE_TTL,
TimeValue.ZERO,
TimeValue.timeValueMinutes(10),
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

private final AllocationStatsCache allocationStatsCache;
private final SingleResultDeduplicator<Map<String, NodeAllocationStats>> allocationStatsSupplier;
private final DiskThresholdSettings diskThresholdSettings;
private final FeatureService featureService;
Expand All @@ -76,12 +89,23 @@ public TransportGetAllocationStatsAction(
EsExecutors.DIRECT_EXECUTOR_SERVICE
);
final var managementExecutor = threadPool.executor(ThreadPool.Names.MANAGEMENT);
this.allocationStatsSupplier = new SingleResultDeduplicator<>(
threadPool.getThreadContext(),
l -> managementExecutor.execute(ActionRunnable.supply(l, allocationStatsService::stats))
);
this.allocationStatsCache = new AllocationStatsCache(threadPool, DEFAULT_CACHE_TTL);
this.allocationStatsSupplier = new SingleResultDeduplicator<>(threadPool.getThreadContext(), l -> {
final var cachedStats = allocationStatsCache.get();
if (cachedStats != null) {
l.onResponse(cachedStats);
return;
}

managementExecutor.execute(ActionRunnable.supply(l, () -> {
final var stats = allocationStatsService.stats();
allocationStatsCache.put(stats);
return stats;
}));
});
this.diskThresholdSettings = new DiskThresholdSettings(clusterService.getSettings(), clusterService.getClusterSettings());
this.featureService = featureService;
clusterService.getClusterSettings().initializeAndWatch(CACHE_TTL_SETTING, this.allocationStatsCache::setTTL);
}

@Override
Expand Down Expand Up @@ -195,4 +219,41 @@ public DiskThresholdSettings getDiskThresholdSettings() {
return diskThresholdSettings;
}
}

private record CachedAllocationStats(Map<String, NodeAllocationStats> stats, long timestampMillis) {}

private static class AllocationStatsCache {
private volatile long ttlMillis;
private final ThreadPool threadPool;
private final AtomicReference<CachedAllocationStats> cachedStats;

AllocationStatsCache(ThreadPool threadPool, TimeValue ttl) {
this.threadPool = threadPool;
this.cachedStats = new AtomicReference<>();
setTTL(ttl);
}

void setTTL(TimeValue ttl) {
ttlMillis = ttl.millis();
if (ttlMillis == 0L) {
cachedStats.set(null);
}
}

Map<String, NodeAllocationStats> get() {
if (ttlMillis == 0L) {
return null;
}

// We don't set the atomic ref to null here upon expiration since we know it is about to be replaced with a fresh instance.
final var stats = cachedStats.get();
return stats == null || threadPool.relativeTimeInMillis() - stats.timestampMillis > ttlMillis ? null : stats.stats;
}

void put(Map<String, NodeAllocationStats> stats) {
if (ttlMillis > 0L) {
cachedStats.set(new CachedAllocationStats(stats, threadPool.relativeTimeInMillis()));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.elasticsearch.common.settings;

import org.apache.logging.log4j.LogManager;
import org.elasticsearch.action.admin.cluster.allocation.TransportGetAllocationStatsAction;
import org.elasticsearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction;
import org.elasticsearch.action.admin.indices.close.TransportCloseIndexAction;
import org.elasticsearch.action.bulk.IncrementalBulkService;
Expand Down Expand Up @@ -623,6 +624,7 @@ public void apply(Settings value, Settings current, Settings previous) {
DataStreamGlobalRetentionSettings.DATA_STREAMS_DEFAULT_RETENTION_SETTING,
DataStreamGlobalRetentionSettings.DATA_STREAMS_MAX_RETENTION_SETTING,
ShardsAvailabilityHealthIndicatorService.REPLICA_UNASSIGNED_BUFFER_TIME,
DataStream.isFailureStoreFeatureFlagEnabled() ? DataStreamFailureStoreSettings.DATA_STREAM_FAILURE_STORED_ENABLED_SETTING : null
DataStream.isFailureStoreFeatureFlagEnabled() ? DataStreamFailureStoreSettings.DATA_STREAM_FAILURE_STORED_ENABLED_SETTING : null,
TransportGetAllocationStatsAction.CACHE_TTL_SETTING
).filter(Objects::nonNull).collect(toSet());
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,28 @@

package org.elasticsearch.action.admin.cluster.allocation;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequestParameters.Metric;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.allocation.AllocationStatsService;
import org.elasticsearch.cluster.routing.allocation.NodeAllocationStats;
import org.elasticsearch.cluster.routing.allocation.NodeAllocationStatsTests;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.features.FeatureService;
import org.elasticsearch.node.Node;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.telemetry.metric.MeterRegistry;
import org.elasticsearch.test.ClusterServiceUtils;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.CapturingTransport;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.DefaultBuiltInExecutorBuilders;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.junit.After;
Expand All @@ -35,6 +42,7 @@
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import static org.hamcrest.Matchers.anEmptyMap;
import static org.hamcrest.Matchers.containsString;
Expand All @@ -44,12 +52,15 @@
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class TransportGetAllocationStatsActionTests extends ESTestCase {

private ThreadPool threadPool;
private long startTimeMillis;
private TimeValue allocationStatsCacheTTL;
private ControlledRelativeTimeThreadPool threadPool;
private ClusterService clusterService;
private TransportService transportService;
private AllocationStatsService allocationStatsService;
Expand All @@ -61,8 +72,16 @@ public class TransportGetAllocationStatsActionTests extends ESTestCase {
@Before
public void setUp() throws Exception {
super.setUp();
threadPool = new TestThreadPool(TransportClusterAllocationExplainActionTests.class.getName());
clusterService = ClusterServiceUtils.createClusterService(threadPool);
startTimeMillis = 0L;
allocationStatsCacheTTL = TimeValue.timeValueMinutes(1);
threadPool = new ControlledRelativeTimeThreadPool(TransportClusterAllocationExplainActionTests.class.getName(), startTimeMillis);
clusterService = ClusterServiceUtils.createClusterService(
threadPool,
new ClusterSettings(
Settings.builder().put(TransportGetAllocationStatsAction.CACHE_TTL_SETTING.getKey(), allocationStatsCacheTTL).build(),
ClusterSettings.BUILT_IN_CLUSTER_SETTINGS
)
);
transportService = new CapturingTransport().createTransportService(
clusterService.getSettings(),
threadPool,
Expand Down Expand Up @@ -92,7 +111,17 @@ public void tearDown() throws Exception {
transportService.close();
}

private void disableAllocationStatsCache() {
setAllocationStatsCacheTTL(TimeValue.ZERO);
}

private void setAllocationStatsCacheTTL(TimeValue ttl) {
clusterService.getClusterSettings()
.applySettings(Settings.builder().put(TransportGetAllocationStatsAction.CACHE_TTL_SETTING.getKey(), ttl).build());
};

public void testReturnsOnlyRequestedStats() throws Exception {
disableAllocationStatsCache();

var metrics = EnumSet.copyOf(randomSubsetOf(Metric.values().length, Metric.values()));

Expand Down Expand Up @@ -126,6 +155,7 @@ public void testReturnsOnlyRequestedStats() throws Exception {
}

public void testDeduplicatesStatsComputations() throws InterruptedException {
disableAllocationStatsCache();
final var requestCounter = new AtomicInteger();
final var isExecuting = new AtomicBoolean();
when(allocationStatsService.stats()).thenAnswer(invocation -> {
Expand Down Expand Up @@ -170,4 +200,84 @@ public void testDeduplicatesStatsComputations() throws InterruptedException {
thread.join();
}
}

public void testGetStatsWithCachingEnabled() throws Exception {

final AtomicReference<Map<String, NodeAllocationStats>> allocationStats = new AtomicReference<>();
int numExpectedAllocationStatsServiceCalls = 0;

final Runnable resetExpectedAllocationStats = () -> {
final var stats = Map.of(randomIdentifier(), NodeAllocationStatsTests.randomNodeAllocationStats());
allocationStats.set(stats);
when(allocationStatsService.stats()).thenReturn(stats);
};

final CheckedConsumer<ActionListener<Void>, Exception> threadTask = l -> {
final var request = new TransportGetAllocationStatsAction.Request(
TEST_REQUEST_TIMEOUT,
new TaskId(randomIdentifier(), randomNonNegativeLong()),
EnumSet.of(Metric.ALLOCATIONS)
);

action.masterOperation(mock(Task.class), request, ClusterState.EMPTY_STATE, l.map(response -> {
assertSame("Expected the cached allocation stats to be returned", response.getNodeAllocationStats(), allocationStats.get());
return null;
}));
};

// Initial cache miss, all threads should get the same value.
resetExpectedAllocationStats.run();
ESTestCase.startInParallel(between(1, 5), threadNumber -> safeAwait(threadTask));
verify(allocationStatsService, times(++numExpectedAllocationStatsServiceCalls)).stats();

// Advance the clock to a time less than or equal to the TTL and verify we still get the cached stats.
threadPool.setCurrentTimeInMillis(startTimeMillis + between(0, (int) allocationStatsCacheTTL.millis()));
ESTestCase.startInParallel(between(1, 5), threadNumber -> safeAwait(threadTask));
verify(allocationStatsService, times(numExpectedAllocationStatsServiceCalls)).stats();

// Force the cached stats to expire.
threadPool.setCurrentTimeInMillis(startTimeMillis + allocationStatsCacheTTL.getMillis() + 1);

// Expect a single call to the stats service on the cache miss.
resetExpectedAllocationStats.run();
ESTestCase.startInParallel(between(1, 5), threadNumber -> safeAwait(threadTask));
verify(allocationStatsService, times(++numExpectedAllocationStatsServiceCalls)).stats();

// Update the TTL setting to disable the cache, we expect a service call each time.
setAllocationStatsCacheTTL(TimeValue.ZERO);
safeAwait(threadTask);
safeAwait(threadTask);
numExpectedAllocationStatsServiceCalls += 2;
verify(allocationStatsService, times(numExpectedAllocationStatsServiceCalls)).stats();

// Re-enable the cache, only one thread should call the stats service.
setAllocationStatsCacheTTL(TimeValue.timeValueMinutes(5));
resetExpectedAllocationStats.run();
ESTestCase.startInParallel(between(1, 5), threadNumber -> safeAwait(threadTask));
verify(allocationStatsService, times(++numExpectedAllocationStatsServiceCalls)).stats();
}

private static class ControlledRelativeTimeThreadPool extends ThreadPool {

private long currentTimeInMillis;

ControlledRelativeTimeThreadPool(String name, long startTimeMillis) {
super(
Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), name).build(),
MeterRegistry.NOOP,
new DefaultBuiltInExecutorBuilders()
);
this.currentTimeInMillis = startTimeMillis;
stopCachedTimeThread();
}

@Override
public long relativeTimeInMillis() {
return currentTimeInMillis;
}

void setCurrentTimeInMillis(long currentTimeInMillis) {
this.currentTimeInMillis = currentTimeInMillis;
}
}
}