diff --git a/docs/changelog/125588.yaml b/docs/changelog/125588.yaml new file mode 100644 index 0000000000000..323198282095e --- /dev/null +++ b/docs/changelog/125588.yaml @@ -0,0 +1,6 @@ +pr: 125588 +summary: Add cache support in `TransportGetAllocationStatsAction` +area: Allocation +type: enhancement +issues: + - 110716 diff --git a/docs/reference/modules/cluster.asciidoc b/docs/reference/modules/cluster.asciidoc index cf8e97de5e188..5bcbe9bd383eb 100644 --- a/docs/reference/modules/cluster.asciidoc +++ b/docs/reference/modules/cluster.asciidoc @@ -21,6 +21,8 @@ There are a number of settings available to control the shard allocation process * <> allows certain nodes or groups of nodes excluded from allocation so that they can be decommissioned. +* <> control the node allocation statistics cache on the master node. + Besides these, there are a few other <>. include::cluster/shards_allocation.asciidoc[] @@ -47,4 +49,11 @@ minimize the risk of losing all shard copies in the event of a failure. <>) + Calculating the node allocation stats for a <> can become expensive on the master for clusters with a high number of nodes. To prevent overloading the master the node allocation stats are cached on the master for 1 minute `1m` by default. This setting can be used to adjust the cache time to live value, if necessary, keeping in mind the tradeoff between the freshness of the statistics and the processing costs on the master. The cache can be disabled (not recommended) by setting the value to `0s` (the minimum value). The maximum value is 10 minutes `10m`. + include::cluster/misc.asciidoc[] diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/allocation/TransportGetAllocationStatsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/allocation/TransportGetAllocationStatsAction.java index b1fdbe26db465..20ec16fb9d352 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/allocation/TransportGetAllocationStatsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/allocation/TransportGetAllocationStatsAction.java @@ -30,6 +30,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; @@ -43,6 +44,7 @@ import java.io.IOException; import java.util.EnumSet; import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; public class TransportGetAllocationStatsAction extends TransportMasterNodeReadAction< TransportGetAllocationStatsAction.Request, @@ -50,6 +52,17 @@ public class TransportGetAllocationStatsAction extends TransportMasterNodeReadAc public static final ActionType TYPE = new ActionType<>("cluster:monitor/allocation/stats"); + public static final TimeValue DEFAULT_CACHE_TTL = TimeValue.timeValueMinutes(1); + public static final Setting CACHE_TTL_SETTING = Setting.timeSetting( + "cluster.routing.allocation.stats.cache.ttl", + DEFAULT_CACHE_TTL, + TimeValue.ZERO, + TimeValue.timeValueMinutes(10), + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + + private final AllocationStatsCache allocationStatsCache; private final SingleResultDeduplicator> allocationStatsSupplier; private final DiskThresholdSettings diskThresholdSettings; private final FeatureService featureService; @@ -76,12 +89,23 @@ public TransportGetAllocationStatsAction( EsExecutors.DIRECT_EXECUTOR_SERVICE ); final var managementExecutor = threadPool.executor(ThreadPool.Names.MANAGEMENT); - this.allocationStatsSupplier = new SingleResultDeduplicator<>( - threadPool.getThreadContext(), - l -> managementExecutor.execute(ActionRunnable.supply(l, allocationStatsService::stats)) - ); + this.allocationStatsCache = new AllocationStatsCache(threadPool, DEFAULT_CACHE_TTL); + this.allocationStatsSupplier = new SingleResultDeduplicator<>(threadPool.getThreadContext(), l -> { + final var cachedStats = allocationStatsCache.get(); + if (cachedStats != null) { + l.onResponse(cachedStats); + return; + } + + managementExecutor.execute(ActionRunnable.supply(l, () -> { + final var stats = allocationStatsService.stats(); + allocationStatsCache.put(stats); + return stats; + })); + }); this.diskThresholdSettings = new DiskThresholdSettings(clusterService.getSettings(), clusterService.getClusterSettings()); this.featureService = featureService; + clusterService.getClusterSettings().initializeAndWatch(CACHE_TTL_SETTING, this.allocationStatsCache::setTTL); } @Override @@ -195,4 +219,41 @@ public DiskThresholdSettings getDiskThresholdSettings() { return diskThresholdSettings; } } + + private record CachedAllocationStats(Map stats, long timestampMillis) {} + + private static class AllocationStatsCache { + private volatile long ttlMillis; + private final ThreadPool threadPool; + private final AtomicReference cachedStats; + + AllocationStatsCache(ThreadPool threadPool, TimeValue ttl) { + this.threadPool = threadPool; + this.cachedStats = new AtomicReference<>(); + setTTL(ttl); + } + + void setTTL(TimeValue ttl) { + ttlMillis = ttl.millis(); + if (ttlMillis == 0L) { + cachedStats.set(null); + } + } + + Map get() { + if (ttlMillis == 0L) { + return null; + } + + // We don't set the atomic ref to null here upon expiration since we know it is about to be replaced with a fresh instance. + final var stats = cachedStats.get(); + return stats == null || threadPool.relativeTimeInMillis() - stats.timestampMillis > ttlMillis ? null : stats.stats; + } + + void put(Map stats) { + if (ttlMillis > 0L) { + cachedStats.set(new CachedAllocationStats(stats, threadPool.relativeTimeInMillis())); + } + } + } } diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index ae8c264d382ac..831c94eaa803c 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -9,6 +9,7 @@ package org.elasticsearch.common.settings; import org.apache.logging.log4j.LogManager; +import org.elasticsearch.action.admin.cluster.allocation.TransportGetAllocationStatsAction; import org.elasticsearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction; import org.elasticsearch.action.admin.indices.close.TransportCloseIndexAction; import org.elasticsearch.action.bulk.IncrementalBulkService; @@ -623,6 +624,7 @@ public void apply(Settings value, Settings current, Settings previous) { DataStreamGlobalRetentionSettings.DATA_STREAMS_DEFAULT_RETENTION_SETTING, DataStreamGlobalRetentionSettings.DATA_STREAMS_MAX_RETENTION_SETTING, ShardsAvailabilityHealthIndicatorService.REPLICA_UNASSIGNED_BUFFER_TIME, - DataStream.isFailureStoreFeatureFlagEnabled() ? DataStreamFailureStoreSettings.DATA_STREAM_FAILURE_STORED_ENABLED_SETTING : null + DataStream.isFailureStoreFeatureFlagEnabled() ? DataStreamFailureStoreSettings.DATA_STREAM_FAILURE_STORED_ENABLED_SETTING : null, + TransportGetAllocationStatsAction.CACHE_TTL_SETTING ).filter(Objects::nonNull).collect(toSet()); } diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/TransportGetAllocationStatsActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/TransportGetAllocationStatsActionTests.java index 0e7cf3d613321..0c0d4b112c35c 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/TransportGetAllocationStatsActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/TransportGetAllocationStatsActionTests.java @@ -9,21 +9,28 @@ package org.elasticsearch.action.admin.cluster.allocation; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequestParameters.Metric; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.routing.allocation.AllocationStatsService; +import org.elasticsearch.cluster.routing.allocation.NodeAllocationStats; import org.elasticsearch.cluster.routing.allocation.NodeAllocationStatsTests; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.core.TimeValue; import org.elasticsearch.features.FeatureService; +import org.elasticsearch.node.Node; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.telemetry.metric.MeterRegistry; import org.elasticsearch.test.ClusterServiceUtils; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.transport.CapturingTransport; -import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.DefaultBuiltInExecutorBuilders; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.junit.After; @@ -35,6 +42,7 @@ import java.util.concurrent.CyclicBarrier; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import static org.hamcrest.Matchers.anEmptyMap; import static org.hamcrest.Matchers.containsString; @@ -44,12 +52,15 @@ import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class TransportGetAllocationStatsActionTests extends ESTestCase { - private ThreadPool threadPool; + private long startTimeMillis; + private TimeValue allocationStatsCacheTTL; + private ControlledRelativeTimeThreadPool threadPool; private ClusterService clusterService; private TransportService transportService; private AllocationStatsService allocationStatsService; @@ -61,8 +72,16 @@ public class TransportGetAllocationStatsActionTests extends ESTestCase { @Before public void setUp() throws Exception { super.setUp(); - threadPool = new TestThreadPool(TransportClusterAllocationExplainActionTests.class.getName()); - clusterService = ClusterServiceUtils.createClusterService(threadPool); + startTimeMillis = 0L; + allocationStatsCacheTTL = TimeValue.timeValueMinutes(1); + threadPool = new ControlledRelativeTimeThreadPool(TransportClusterAllocationExplainActionTests.class.getName(), startTimeMillis); + clusterService = ClusterServiceUtils.createClusterService( + threadPool, + new ClusterSettings( + Settings.builder().put(TransportGetAllocationStatsAction.CACHE_TTL_SETTING.getKey(), allocationStatsCacheTTL).build(), + ClusterSettings.BUILT_IN_CLUSTER_SETTINGS + ) + ); transportService = new CapturingTransport().createTransportService( clusterService.getSettings(), threadPool, @@ -92,7 +111,17 @@ public void tearDown() throws Exception { transportService.close(); } + private void disableAllocationStatsCache() { + setAllocationStatsCacheTTL(TimeValue.ZERO); + } + + private void setAllocationStatsCacheTTL(TimeValue ttl) { + clusterService.getClusterSettings() + .applySettings(Settings.builder().put(TransportGetAllocationStatsAction.CACHE_TTL_SETTING.getKey(), ttl).build()); + }; + public void testReturnsOnlyRequestedStats() throws Exception { + disableAllocationStatsCache(); var metrics = EnumSet.copyOf(randomSubsetOf(Metric.values().length, Metric.values())); @@ -126,6 +155,7 @@ public void testReturnsOnlyRequestedStats() throws Exception { } public void testDeduplicatesStatsComputations() throws InterruptedException { + disableAllocationStatsCache(); final var requestCounter = new AtomicInteger(); final var isExecuting = new AtomicBoolean(); when(allocationStatsService.stats()).thenAnswer(invocation -> { @@ -170,4 +200,84 @@ public void testDeduplicatesStatsComputations() throws InterruptedException { thread.join(); } } + + public void testGetStatsWithCachingEnabled() throws Exception { + + final AtomicReference> allocationStats = new AtomicReference<>(); + int numExpectedAllocationStatsServiceCalls = 0; + + final Runnable resetExpectedAllocationStats = () -> { + final var stats = Map.of(randomIdentifier(), NodeAllocationStatsTests.randomNodeAllocationStats()); + allocationStats.set(stats); + when(allocationStatsService.stats()).thenReturn(stats); + }; + + final CheckedConsumer, Exception> threadTask = l -> { + final var request = new TransportGetAllocationStatsAction.Request( + TEST_REQUEST_TIMEOUT, + new TaskId(randomIdentifier(), randomNonNegativeLong()), + EnumSet.of(Metric.ALLOCATIONS) + ); + + action.masterOperation(mock(Task.class), request, ClusterState.EMPTY_STATE, l.map(response -> { + assertSame("Expected the cached allocation stats to be returned", response.getNodeAllocationStats(), allocationStats.get()); + return null; + })); + }; + + // Initial cache miss, all threads should get the same value. + resetExpectedAllocationStats.run(); + ESTestCase.startInParallel(between(1, 5), threadNumber -> safeAwait(threadTask)); + verify(allocationStatsService, times(++numExpectedAllocationStatsServiceCalls)).stats(); + + // Advance the clock to a time less than or equal to the TTL and verify we still get the cached stats. + threadPool.setCurrentTimeInMillis(startTimeMillis + between(0, (int) allocationStatsCacheTTL.millis())); + ESTestCase.startInParallel(between(1, 5), threadNumber -> safeAwait(threadTask)); + verify(allocationStatsService, times(numExpectedAllocationStatsServiceCalls)).stats(); + + // Force the cached stats to expire. + threadPool.setCurrentTimeInMillis(startTimeMillis + allocationStatsCacheTTL.getMillis() + 1); + + // Expect a single call to the stats service on the cache miss. + resetExpectedAllocationStats.run(); + ESTestCase.startInParallel(between(1, 5), threadNumber -> safeAwait(threadTask)); + verify(allocationStatsService, times(++numExpectedAllocationStatsServiceCalls)).stats(); + + // Update the TTL setting to disable the cache, we expect a service call each time. + setAllocationStatsCacheTTL(TimeValue.ZERO); + safeAwait(threadTask); + safeAwait(threadTask); + numExpectedAllocationStatsServiceCalls += 2; + verify(allocationStatsService, times(numExpectedAllocationStatsServiceCalls)).stats(); + + // Re-enable the cache, only one thread should call the stats service. + setAllocationStatsCacheTTL(TimeValue.timeValueMinutes(5)); + resetExpectedAllocationStats.run(); + ESTestCase.startInParallel(between(1, 5), threadNumber -> safeAwait(threadTask)); + verify(allocationStatsService, times(++numExpectedAllocationStatsServiceCalls)).stats(); + } + + private static class ControlledRelativeTimeThreadPool extends ThreadPool { + + private long currentTimeInMillis; + + ControlledRelativeTimeThreadPool(String name, long startTimeMillis) { + super( + Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), name).build(), + MeterRegistry.NOOP, + new DefaultBuiltInExecutorBuilders() + ); + this.currentTimeInMillis = startTimeMillis; + stopCachedTimeThread(); + } + + @Override + public long relativeTimeInMillis() { + return currentTimeInMillis; + } + + void setCurrentTimeInMillis(long currentTimeInMillis) { + this.currentTimeInMillis = currentTimeInMillis; + } + } }