diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java index 6dafab431500e..25ae21964ba0e 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -8,6 +8,8 @@ */ package org.elasticsearch.index.shard; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.store.LockObtainFailedException; import org.apache.lucene.util.SetOnce; @@ -22,6 +24,8 @@ import org.elasticsearch.cluster.EstimatedHeapUsage; import org.elasticsearch.cluster.EstimatedHeapUsageCollector; import org.elasticsearch.cluster.InternalClusterInfoService; +import org.elasticsearch.cluster.NodeUsageStatsForThreadPools; +import org.elasticsearch.cluster.NodeUsageStatsForThreadPoolsCollector; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeUtils; @@ -29,6 +33,7 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; import org.elasticsearch.common.UUIDs; @@ -73,6 +78,7 @@ import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.test.IndexSettingsModule; import org.elasticsearch.test.InternalSettingsPlugin; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xcontent.XContentType; import org.junit.Assert; @@ -85,6 +91,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Map; @@ -117,14 +124,20 @@ import static org.hamcrest.Matchers.either; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.lessThanOrEqualTo; public class IndexShardIT extends ESSingleNodeTestCase { + private static final Logger logger = LogManager.getLogger(IndexShardIT.class); @Override protected Collection> getPlugins() { - return pluginList(InternalSettingsPlugin.class, BogusEstimatedHeapUsagePlugin.class); + return pluginList( + InternalSettingsPlugin.class, + BogusEstimatedHeapUsagePlugin.class, + BogusNodeUsageStatsForThreadPoolsCollectorPlugin.class + ); } public void testLockTryingToDelete() throws Exception { @@ -295,6 +308,53 @@ public void testHeapUsageEstimateIsPresent() { } } + public void testNodeWriteLoadsArePresent() { + InternalClusterInfoService clusterInfoService = (InternalClusterInfoService) getInstanceFromNode(ClusterInfoService.class); + ClusterInfoServiceUtils.refresh(clusterInfoService); + Map nodeThreadPoolStats = clusterInfoService.getClusterInfo() + .getNodeUsageStatsForThreadPools(); + assertNotNull(nodeThreadPoolStats); + /** Not collecting stats yet because allocation write load stats collection is disabled by default. + * see {@link WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING} */ + assertTrue(nodeThreadPoolStats.isEmpty()); + + // Enable collection for node write loads. + updateClusterSettings( + Settings.builder() + .put( + WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(), + WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED + ) + .build() + ); + try { + // Force a ClusterInfo refresh to run collection of the node thread pool usage stats. + ClusterInfoServiceUtils.refresh(clusterInfoService); + nodeThreadPoolStats = clusterInfoService.getClusterInfo().getNodeUsageStatsForThreadPools(); + + /** Verify that each node has usage stats reported. The test {@link BogusNodeUsageStatsForThreadPoolsCollector} implementation + * generates random usage values */ + ClusterState state = getInstanceFromNode(ClusterService.class).state(); + assertEquals(state.nodes().size(), nodeThreadPoolStats.size()); + for (DiscoveryNode node : state.nodes()) { + assertTrue(nodeThreadPoolStats.containsKey(node.getId())); + NodeUsageStatsForThreadPools nodeUsageStatsForThreadPools = nodeThreadPoolStats.get(node.getId()); + assertThat(nodeUsageStatsForThreadPools.nodeId(), equalTo(node.getId())); + NodeUsageStatsForThreadPools.ThreadPoolUsageStats writeThreadPoolStats = nodeUsageStatsForThreadPools + .threadPoolUsageStatsMap() + .get(ThreadPool.Names.WRITE); + assertNotNull(writeThreadPoolStats); + assertThat(writeThreadPoolStats.totalThreadPoolThreads(), greaterThanOrEqualTo(0)); + assertThat(writeThreadPoolStats.averageThreadPoolUtilization(), greaterThanOrEqualTo(0.0f)); + assertThat(writeThreadPoolStats.averageThreadPoolQueueLatencyMillis(), greaterThanOrEqualTo(0L)); + } + } finally { + updateClusterSettings( + Settings.builder().putNull(WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey()).build() + ); + } + } + public void testIndexCanChangeCustomDataPath() throws Exception { final String index = "test-custom-data-path"; final Path sharedDataPath = getInstanceFromNode(Environment.class).sharedDataDir().resolve(randomAsciiLettersOfLength(10)); @@ -875,4 +935,61 @@ public ClusterService getClusterService() { return clusterService.get(); } } + + /** + * A simple {@link NodeUsageStatsForThreadPoolsCollector} implementation that creates and returns random + * {@link NodeUsageStatsForThreadPools} for each node in the cluster. + *

+ * Note: there's an 'org.elasticsearch.cluster.NodeUsageStatsForThreadPoolsCollector' file that declares this implementation so that the + * plugin system can pick it up and use it for the test set-up. + */ + public static class BogusNodeUsageStatsForThreadPoolsCollector implements NodeUsageStatsForThreadPoolsCollector { + + private final BogusNodeUsageStatsForThreadPoolsCollectorPlugin plugin; + + public BogusNodeUsageStatsForThreadPoolsCollector(BogusNodeUsageStatsForThreadPoolsCollectorPlugin plugin) { + this.plugin = plugin; + } + + @Override + public void collectUsageStats(ActionListener> listener) { + ActionListener.completeWith( + listener, + () -> plugin.getClusterService() + .state() + .nodes() + .stream() + .collect(Collectors.toUnmodifiableMap(DiscoveryNode::getId, node -> makeRandomNodeUsageStats(node.getId()))) + ); + } + + private NodeUsageStatsForThreadPools makeRandomNodeUsageStats(String nodeId) { + NodeUsageStatsForThreadPools.ThreadPoolUsageStats writeThreadPoolStats = new NodeUsageStatsForThreadPools.ThreadPoolUsageStats( + randomNonNegativeInt(), + randomFloat(), + randomNonNegativeLong() + ); + Map statsForThreadPools = new HashMap<>(); + statsForThreadPools.put(ThreadPool.Names.WRITE, writeThreadPoolStats); + return new NodeUsageStatsForThreadPools(nodeId, statsForThreadPools); + } + } + + /** + * Make a plugin to gain access to the {@link ClusterService} instance. + */ + public static class BogusNodeUsageStatsForThreadPoolsCollectorPlugin extends Plugin implements ClusterPlugin { + + private final SetOnce clusterService = new SetOnce<>(); + + @Override + public Collection createComponents(PluginServices services) { + clusterService.set(services.clusterService()); + return List.of(); + } + + public ClusterService getClusterService() { + return clusterService.get(); + } + } } diff --git a/server/src/internalClusterTest/resources/META-INF/services/org.elasticsearch.cluster.NodeUsageStatsForThreadPoolsCollector b/server/src/internalClusterTest/resources/META-INF/services/org.elasticsearch.cluster.NodeUsageStatsForThreadPoolsCollector new file mode 100644 index 0000000000000..787ce436c3ca6 --- /dev/null +++ b/server/src/internalClusterTest/resources/META-INF/services/org.elasticsearch.cluster.NodeUsageStatsForThreadPoolsCollector @@ -0,0 +1,10 @@ +# +# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +# or more contributor license agreements. Licensed under the "Elastic License +# 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side +# Public License v 1"; you may not use this file except in compliance with, at +# your election, the "Elastic License 2.0", the "GNU Affero General Public +# License v3.0 only", or the "Server Side Public License, v 1". +# + +org.elasticsearch.index.shard.IndexShardIT$BogusNodeUsageStatsForThreadPoolsCollector diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 35f5423df0ffb..70727715a91c0 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -337,6 +337,7 @@ static TransportVersion def(int id) { public static final TransportVersion ML_INFERENCE_CUSTOM_SERVICE_EMBEDDING_TYPE = def(9_118_0_00); public static final TransportVersion ESQL_FIXED_INDEX_LIKE = def(9_119_0_00); public static final TransportVersion LOOKUP_JOIN_CCS = def(9_120_0_00); + public static final TransportVersion NODE_USAGE_STATS_FOR_THREAD_POOLS_IN_CLUSTER_INFO = def(9_121_0_00); /* * STOP! READ THIS FIRST! No, really, diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java b/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java index e8cb0421979c5..7c627638e9b0b 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java @@ -58,9 +58,10 @@ public class ClusterInfo implements ChunkedToXContent, Writeable { final Map dataPath; final Map reservedSpace; final Map estimatedHeapUsages; + final Map nodeUsageStatsForThreadPools; protected ClusterInfo() { - this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of()); + this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of()); } /** @@ -73,6 +74,7 @@ protected ClusterInfo() { * @param dataPath the shard routing to datapath mapping * @param reservedSpace reserved space per shard broken down by node and data path * @param estimatedHeapUsages estimated heap usage broken down by node + * @param nodeUsageStatsForThreadPools node-level usage stats (operational load) broken down by node * @see #shardIdentifierFromRouting */ public ClusterInfo( @@ -82,7 +84,8 @@ public ClusterInfo( Map shardDataSetSizes, Map dataPath, Map reservedSpace, - Map estimatedHeapUsages + Map estimatedHeapUsages, + Map nodeUsageStatsForThreadPools ) { this.leastAvailableSpaceUsage = Map.copyOf(leastAvailableSpaceUsage); this.mostAvailableSpaceUsage = Map.copyOf(mostAvailableSpaceUsage); @@ -91,6 +94,7 @@ public ClusterInfo( this.dataPath = Map.copyOf(dataPath); this.reservedSpace = Map.copyOf(reservedSpace); this.estimatedHeapUsages = Map.copyOf(estimatedHeapUsages); + this.nodeUsageStatsForThreadPools = Map.copyOf(nodeUsageStatsForThreadPools); } public ClusterInfo(StreamInput in) throws IOException { @@ -107,6 +111,11 @@ public ClusterInfo(StreamInput in) throws IOException { } else { this.estimatedHeapUsages = Map.of(); } + if (in.getTransportVersion().onOrAfter(TransportVersions.NODE_USAGE_STATS_FOR_THREAD_POOLS_IN_CLUSTER_INFO)) { + this.nodeUsageStatsForThreadPools = in.readImmutableMap(NodeUsageStatsForThreadPools::new); + } else { + this.nodeUsageStatsForThreadPools = Map.of(); + } } @Override @@ -124,6 +133,9 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getTransportVersion().onOrAfter(TransportVersions.HEAP_USAGE_IN_CLUSTER_INFO)) { out.writeMap(this.estimatedHeapUsages, StreamOutput::writeWriteable); } + if (out.getTransportVersion().onOrAfter(TransportVersions.NODE_USAGE_STATS_FOR_THREAD_POOLS_IN_CLUSTER_INFO)) { + out.writeMap(this.nodeUsageStatsForThreadPools, StreamOutput::writeWriteable); + } } /** @@ -204,8 +216,8 @@ public Iterator toXContentChunked(ToXContent.Params params return builder.endObject(); // NodeAndPath }), endArray() // end "reserved_sizes" - // NOTE: We don't serialize estimatedHeapUsages at this stage, to avoid - // committing to API payloads until the feature is settled + // NOTE: We don't serialize estimatedHeapUsages/nodeUsageStatsForThreadPools at this stage, to avoid + // committing to API payloads until the features are settled ); } @@ -220,6 +232,13 @@ public Map getEstimatedHeapUsages() { return estimatedHeapUsages; } + /** + * Returns a map containing thread pool usage stats for each node, keyed by node ID. + */ + public Map getNodeUsageStatsForThreadPools() { + return nodeUsageStatsForThreadPools; + } + /** * Returns a node id to disk usage mapping for the path that has the least available space on the node. * Note that this does not take account of reserved space: there may be another path with less available _and unreserved_ space. @@ -311,12 +330,21 @@ public boolean equals(Object o) { && shardSizes.equals(that.shardSizes) && shardDataSetSizes.equals(that.shardDataSetSizes) && dataPath.equals(that.dataPath) - && reservedSpace.equals(that.reservedSpace); + && reservedSpace.equals(that.reservedSpace) + && nodeUsageStatsForThreadPools.equals(that.nodeUsageStatsForThreadPools); } @Override public int hashCode() { - return Objects.hash(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, shardDataSetSizes, dataPath, reservedSpace); + return Objects.hash( + leastAvailableSpaceUsage, + mostAvailableSpaceUsage, + shardSizes, + shardDataSetSizes, + dataPath, + reservedSpace, + nodeUsageStatsForThreadPools + ); } @Override diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java b/server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java index b47b15f545ed8..7e995404191d6 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java @@ -34,6 +34,7 @@ public class ClusterInfoSimulator { private final Map shardDataSetSizes; private final Map dataPath; private final Map estimatedHeapUsages; + private final Map nodeThreadPoolUsageStats; public ClusterInfoSimulator(RoutingAllocation allocation) { this.allocation = allocation; @@ -43,6 +44,7 @@ public ClusterInfoSimulator(RoutingAllocation allocation) { this.shardDataSetSizes = Map.copyOf(allocation.clusterInfo().shardDataSetSizes); this.dataPath = Map.copyOf(allocation.clusterInfo().dataPath); this.estimatedHeapUsages = allocation.clusterInfo().getEstimatedHeapUsages(); + this.nodeThreadPoolUsageStats = allocation.clusterInfo().getNodeUsageStatsForThreadPools(); } /** @@ -156,7 +158,8 @@ public ClusterInfo getClusterInfo() { shardDataSetSizes, dataPath, Map.of(), - estimatedHeapUsages + estimatedHeapUsages, + nodeThreadPoolUsageStats ); } } diff --git a/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java b/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java index 066667dfaba84..89394c8fa8ba8 100644 --- a/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java +++ b/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java @@ -29,6 +29,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings; +import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings.WriteLoadDeciderStatus; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; @@ -50,6 +51,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.function.Consumer; +import static org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING; import static org.elasticsearch.core.Strings.format; /** @@ -92,6 +94,7 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt private volatile boolean diskThresholdEnabled; private volatile boolean estimatedHeapThresholdEnabled; + private volatile WriteLoadDeciderStatus writeLoadConstraintEnabled; private volatile TimeValue updateFrequency; private volatile TimeValue fetchTimeout; @@ -99,6 +102,7 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt private volatile Map mostAvailableSpaceUsages; private volatile Map maxHeapPerNode; private volatile Map estimatedHeapUsagePerNode; + private volatile Map nodeThreadPoolUsageStatsPerNode; private volatile IndicesStatsSummary indicesStatsSummary; private final ThreadPool threadPool; @@ -108,6 +112,7 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt private final Object mutex = new Object(); private final List> nextRefreshListeners = new ArrayList<>(); private final EstimatedHeapUsageCollector estimatedHeapUsageCollector; + private final NodeUsageStatsForThreadPoolsCollector nodeUsageStatsForThreadPoolsCollector; private AsyncRefresh currentRefresh; private RefreshScheduler refreshScheduler; @@ -118,16 +123,19 @@ public InternalClusterInfoService( ClusterService clusterService, ThreadPool threadPool, Client client, - EstimatedHeapUsageCollector estimatedHeapUsageCollector + EstimatedHeapUsageCollector estimatedHeapUsageCollector, + NodeUsageStatsForThreadPoolsCollector nodeUsageStatsForThreadPoolsCollector ) { this.leastAvailableSpaceUsages = Map.of(); this.mostAvailableSpaceUsages = Map.of(); this.maxHeapPerNode = Map.of(); this.estimatedHeapUsagePerNode = Map.of(); + this.nodeThreadPoolUsageStatsPerNode = Map.of(); this.indicesStatsSummary = IndicesStatsSummary.EMPTY; this.threadPool = threadPool; this.client = client; this.estimatedHeapUsageCollector = estimatedHeapUsageCollector; + this.nodeUsageStatsForThreadPoolsCollector = nodeUsageStatsForThreadPoolsCollector; this.updateFrequency = INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.get(settings); this.fetchTimeout = INTERNAL_CLUSTER_INFO_TIMEOUT_SETTING.get(settings); this.diskThresholdEnabled = DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.get(settings); @@ -142,6 +150,8 @@ public InternalClusterInfoService( CLUSTER_ROUTING_ALLOCATION_ESTIMATED_HEAP_THRESHOLD_DECIDER_ENABLED, this::setEstimatedHeapThresholdEnabled ); + + clusterSettings.initializeAndWatch(WRITE_LOAD_DECIDER_ENABLED_SETTING, this::setWriteLoadConstraintEnabled); } private void setDiskThresholdEnabled(boolean diskThresholdEnabled) { @@ -152,6 +162,10 @@ private void setEstimatedHeapThresholdEnabled(boolean estimatedHeapThresholdEnab this.estimatedHeapThresholdEnabled = estimatedHeapThresholdEnabled; } + private void setWriteLoadConstraintEnabled(WriteLoadDeciderStatus writeLoadConstraintEnabled) { + this.writeLoadConstraintEnabled = writeLoadConstraintEnabled; + } + private void setFetchTimeout(TimeValue fetchTimeout) { this.fetchTimeout = fetchTimeout; } @@ -204,6 +218,7 @@ void execute() { maybeFetchIndicesStats(diskThresholdEnabled); maybeFetchNodeStats(diskThresholdEnabled || estimatedHeapThresholdEnabled); maybeFetchNodesEstimatedHeapUsage(estimatedHeapThresholdEnabled); + maybeFetchNodesUsageStatsForThreadPools(writeLoadConstraintEnabled); } } @@ -242,6 +257,32 @@ private void maybeFetchNodesEstimatedHeapUsage(boolean shouldFetch) { } } + private void maybeFetchNodesUsageStatsForThreadPools(WriteLoadDeciderStatus writeLoadConstraintEnabled) { + if (writeLoadConstraintEnabled != WriteLoadDeciderStatus.DISABLED) { + try (var ignored = threadPool.getThreadContext().clearTraceContext()) { + fetchNodesUsageStatsForThreadPools(); + } + } else { + logger.trace("skipping collecting shard/node write load estimates from cluster, feature currently disabled"); + nodeThreadPoolUsageStatsPerNode = Map.of(); + } + } + + private void fetchNodesUsageStatsForThreadPools() { + nodeUsageStatsForThreadPoolsCollector.collectUsageStats(ActionListener.releaseAfter(new ActionListener<>() { + @Override + public void onResponse(Map writeLoads) { + nodeThreadPoolUsageStatsPerNode = writeLoads; + } + + @Override + public void onFailure(Exception e) { + logger.warn("failed to fetch write load estimates for nodes", e); + nodeThreadPoolUsageStatsPerNode = Map.of(); + } + }, fetchRefs.acquire())); + } + private void fetchNodesEstimatedHeapUsage() { estimatedHeapUsageCollector.collectClusterHeapUsage(ActionListener.releaseAfter(new ActionListener<>() { @Override @@ -486,6 +527,8 @@ public ClusterInfo getClusterInfo() { estimatedHeapUsages.put(nodeId, new EstimatedHeapUsage(nodeId, maxHeapSize.getBytes(), estimatedHeapUsage)); } }); + final Map nodeThreadPoolUsageStats = new HashMap<>(); + nodeThreadPoolUsageStatsPerNode.forEach((nodeId, nodeWriteLoad) -> { nodeThreadPoolUsageStats.put(nodeId, nodeWriteLoad); }); return new ClusterInfo( leastAvailableSpaceUsages, mostAvailableSpaceUsages, @@ -493,7 +536,8 @@ public ClusterInfo getClusterInfo() { indicesStatsSummary.shardDataSetSizes, indicesStatsSummary.dataPath, indicesStatsSummary.reservedSpace, - estimatedHeapUsages + estimatedHeapUsages, + nodeThreadPoolUsageStats ); } diff --git a/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPools.java b/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPools.java new file mode 100644 index 0000000000000..5e84f29af8412 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPools.java @@ -0,0 +1,121 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; + +/** + * Record of a node's thread pool usage stats (operation load). Maps thread pool stats by thread pool name. + * + * @param nodeId The node ID. + * @param threadPoolUsageStatsMap A map of thread pool name ({@link org.elasticsearch.threadpool.ThreadPool.Names}) to the thread pool's + * usage stats ({@link ThreadPoolUsageStats}). + */ +public record NodeUsageStatsForThreadPools(String nodeId, Map threadPoolUsageStatsMap) implements Writeable { + + public NodeUsageStatsForThreadPools(StreamInput in) throws IOException { + this(in.readString(), in.readMap(ThreadPoolUsageStats::new)); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(this.nodeId); + out.writeMap(threadPoolUsageStatsMap, StreamOutput::writeWriteable); + } + + @Override + public int hashCode() { + return Objects.hash(nodeId, threadPoolUsageStatsMap); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + NodeUsageStatsForThreadPools other = (NodeUsageStatsForThreadPools) o; + for (var entry : other.threadPoolUsageStatsMap.entrySet()) { + var loadStats = threadPoolUsageStatsMap.get(entry.getKey()); + if (loadStats == null || loadStats.equals(entry.getValue()) == false) { + return false; + } + } + return true; + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(getClass().getSimpleName() + "{nodeId=" + nodeId + ", threadPoolUsageStatsMap=["); + for (var entry : threadPoolUsageStatsMap.entrySet()) { + builder.append("{ThreadPool.Names=" + entry.getKey() + ", ThreadPoolUsageStats=" + entry.getValue() + "}"); + } + builder.append("]}"); + return builder.toString(); + } + + /** + * Record of usage stats for a thread pool. + * + * @param totalThreadPoolThreads Total number of threads in the thread pool. + * @param averageThreadPoolUtilization Percent of thread pool threads that are in use, averaged over some period of time. + * @param averageThreadPoolQueueLatencyMillis How much time tasks spend in the thread pool queue. Zero if there is nothing being queued + * in the write thread pool. + */ + public record ThreadPoolUsageStats( + int totalThreadPoolThreads, + float averageThreadPoolUtilization, + long averageThreadPoolQueueLatencyMillis + ) implements Writeable { + + public ThreadPoolUsageStats(StreamInput in) throws IOException { + this(in.readVInt(), in.readFloat(), in.readVLong()); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVInt(this.totalThreadPoolThreads); + out.writeFloat(this.averageThreadPoolUtilization); + out.writeVLong(this.averageThreadPoolQueueLatencyMillis); + } + + @Override + public int hashCode() { + return Objects.hash(totalThreadPoolThreads, averageThreadPoolUtilization, averageThreadPoolQueueLatencyMillis); + } + + @Override + public String toString() { + return "[totalThreadPoolThreads=" + + totalThreadPoolThreads + + ", averageThreadPoolUtilization=" + + averageThreadPoolUtilization + + ", averageThreadPoolQueueLatencyMillis=" + + averageThreadPoolQueueLatencyMillis + + "]"; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ThreadPoolUsageStats other = (ThreadPoolUsageStats) o; + return totalThreadPoolThreads == other.totalThreadPoolThreads + && averageThreadPoolUtilization == other.averageThreadPoolUtilization + && averageThreadPoolQueueLatencyMillis == other.averageThreadPoolQueueLatencyMillis; + } + + } // ThreadPoolUsageStats + +} diff --git a/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPoolsCollector.java b/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPoolsCollector.java new file mode 100644 index 0000000000000..e302a4abed559 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPoolsCollector.java @@ -0,0 +1,33 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster; + +import org.elasticsearch.action.ActionListener; + +import java.util.Map; + +/** + * Collects the usage stats (like write thread pool load) estimations for each node in the cluster. + *

+ * Results are returned as a map of node ID to node usage stats. + */ +public interface NodeUsageStatsForThreadPoolsCollector { + /** + * This will be used when there is no NodeUsageLoadCollector available. + */ + NodeUsageStatsForThreadPoolsCollector EMPTY = listener -> listener.onResponse(Map.of()); + + /** + * Collects the write load estimates from the cluster. + * + * @param listener The listener to receive the write load results. + */ + void collectUsageStats(ActionListener> listener); +} diff --git a/server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java b/server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java index bfa8f0a01c661..326002c7d346c 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java +++ b/server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java @@ -14,6 +14,7 @@ import org.elasticsearch.cluster.ClusterInfoService; import org.elasticsearch.cluster.EstimatedHeapUsageCollector; import org.elasticsearch.cluster.InternalClusterInfoService; +import org.elasticsearch.cluster.NodeUsageStatsForThreadPoolsCollector; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.breaker.CircuitBreaker; @@ -79,12 +80,17 @@ ClusterInfoService newClusterInfoService( EstimatedHeapUsageCollector.class, () -> EstimatedHeapUsageCollector.EMPTY ); + final NodeUsageStatsForThreadPoolsCollector nodeUsageStatsForThreadPoolsCollector = pluginsService.loadSingletonServiceProvider( + NodeUsageStatsForThreadPoolsCollector.class, + () -> NodeUsageStatsForThreadPoolsCollector.EMPTY + ); final InternalClusterInfoService service = new InternalClusterInfoService( settings, clusterService, threadPool, client, - estimatedHeapUsageCollector + estimatedHeapUsageCollector, + nodeUsageStatsForThreadPoolsCollector ); if (DiscoveryNode.isMasterNode(settings)) { // listen for state changes (this node starts/stops being the elected master, or new nodes are added) diff --git a/server/src/test/java/org/elasticsearch/cluster/ClusterInfoTests.java b/server/src/test/java/org/elasticsearch/cluster/ClusterInfoTests.java index 25932c9e8b9f3..814aa102ce284 100644 --- a/server/src/test/java/org/elasticsearch/cluster/ClusterInfoTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/ClusterInfoTests.java @@ -13,6 +13,7 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.AbstractChunkedSerializingTestCase; import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.threadpool.ThreadPool; import java.util.HashMap; import java.util.Map; @@ -42,7 +43,8 @@ public static ClusterInfo randomClusterInfo() { randomDataSetSizes(), randomRoutingToDataPath(), randomReservedSpace(), - randomNodeHeapUsage() + randomNodeHeapUsage(), + randomNodeUsageStatsForThreadPools() ); } @@ -62,6 +64,23 @@ private static Map randomNodeHeapUsage() { return nodeHeapUsage; } + private static Map randomNodeUsageStatsForThreadPools() { + int numEntries = randomIntBetween(0, 128); + Map nodeUsageStatsForThreadPools = new HashMap<>(numEntries); + for (int i = 0; i < numEntries; i++) { + String nodeIdKey = randomAlphaOfLength(32); + NodeUsageStatsForThreadPools.ThreadPoolUsageStats writeThreadPoolUsageStats = + new NodeUsageStatsForThreadPools.ThreadPoolUsageStats(/* totalThreadPoolThreads= */ randomIntBetween(1, 16), + /* averageThreadPoolUtilization= */ randomFloat(), + /* averageThreadPoolQueueLatencyMillis= */ randomLongBetween(0, 50000) + ); + Map usageStatsForThreadPools = new HashMap<>(); + usageStatsForThreadPools.put(ThreadPool.Names.WRITE, writeThreadPoolUsageStats); + nodeUsageStatsForThreadPools.put(ThreadPool.Names.WRITE, new NodeUsageStatsForThreadPools(nodeIdKey, usageStatsForThreadPools)); + } + return nodeUsageStatsForThreadPools; + } + private static Map randomDiskUsage() { int numEntries = randomIntBetween(0, 128); Map builder = new HashMap<>(numEntries); diff --git a/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java b/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java index ea9c940793778..6e80e0d087993 100644 --- a/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java @@ -21,6 +21,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeUtils; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings; import org.elasticsearch.cluster.service.ClusterApplierService; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.FakeThreadPoolMasterService; @@ -55,7 +56,11 @@ public void testScheduling() { final Settings.Builder settingsBuilder = Settings.builder() .put(Node.NODE_NAME_SETTING.getKey(), discoveryNode.getName()) - .put(InternalClusterInfoService.CLUSTER_ROUTING_ALLOCATION_ESTIMATED_HEAP_THRESHOLD_DECIDER_ENABLED.getKey(), true); + .put(InternalClusterInfoService.CLUSTER_ROUTING_ALLOCATION_ESTIMATED_HEAP_THRESHOLD_DECIDER_ENABLED.getKey(), true) + .put( + WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(), + WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED + ); if (randomBoolean()) { settingsBuilder.put(INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.getKey(), randomIntBetween(10000, 60000) + "ms"); } @@ -79,12 +84,16 @@ protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() { final FakeClusterInfoServiceClient client = new FakeClusterInfoServiceClient(threadPool); final EstimatedHeapUsageCollector mockEstimatedHeapUsageCollector = spy(new StubEstimatedEstimatedHeapUsageCollector()); + final NodeUsageStatsForThreadPoolsCollector mockNodeUsageStatsForThreadPoolsCollector = spy( + new StubNodeUsageStatsForThreadPoolsCollector() + ); final InternalClusterInfoService clusterInfoService = new InternalClusterInfoService( settings, clusterService, threadPool, client, - mockEstimatedHeapUsageCollector + mockEstimatedHeapUsageCollector, + mockNodeUsageStatsForThreadPoolsCollector ); clusterService.addListener(clusterInfoService); clusterInfoService.addListener(ignored -> {}); @@ -122,12 +131,14 @@ protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() { for (int i = 0; i < 3; i++) { Mockito.clearInvocations(mockEstimatedHeapUsageCollector); + Mockito.clearInvocations(mockNodeUsageStatsForThreadPoolsCollector); final int initialRequestCount = client.requestCount; final long duration = INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.get(settings).millis(); runFor(deterministicTaskQueue, duration); deterministicTaskQueue.runAllRunnableTasks(); assertThat(client.requestCount, equalTo(initialRequestCount + 2)); // should have run two client requests per interval verify(mockEstimatedHeapUsageCollector).collectClusterHeapUsage(any()); // Should poll for heap usage once per interval + verify(mockNodeUsageStatsForThreadPoolsCollector).collectUsageStats(any()); } final AtomicBoolean failMaster2 = new AtomicBoolean(); @@ -152,6 +163,17 @@ public void collectClusterHeapUsage(ActionListener> listener) } } + /** + * Simple for test {@link NodeUsageStatsForThreadPoolsCollector} implementation that returns an empty map of nodeId string to + * {@link NodeUsageStatsForThreadPools}. + */ + private static class StubNodeUsageStatsForThreadPoolsCollector implements NodeUsageStatsForThreadPoolsCollector { + @Override + public void collectUsageStats(ActionListener> listener) { + listener.onResponse(Map.of()); + } + } + private static void runFor(DeterministicTaskQueue deterministicTaskQueue, long duration) { final long endTime = deterministicTaskQueue.getCurrentTimeMillis() + duration; while (deterministicTaskQueue.getCurrentTimeMillis() < endTime diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/ExpectedShardSizeEstimatorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/ExpectedShardSizeEstimatorTests.java index 754b4d2b22d0d..96e277284a659 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/ExpectedShardSizeEstimatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/ExpectedShardSizeEstimatorTests.java @@ -206,6 +206,7 @@ private static ClusterInfo createClusterInfo(ShardRouting shard, Long size) { Map.of(), Map.of(), Map.of(), + Map.of(), Map.of() ); } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsServiceTests.java index 14e0aaa253749..8b54cecb29580 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsServiceTests.java @@ -79,6 +79,7 @@ public void testShardStats() { Map.of(), Map.of(), Map.of(), + Map.of(), Map.of() ); diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java index df0fa875a7249..170677ff3632f 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java @@ -1580,7 +1580,7 @@ private static ClusterInfo clusterInfo( Map diskUsages, Map reservedSpace ) { - return new ClusterInfo(diskUsages, Map.of(), Map.of(), Map.of(), Map.of(), reservedSpace, Map.of()); + return new ClusterInfo(diskUsages, Map.of(), Map.of(), Map.of(), Map.of(), reservedSpace, Map.of(), Map.of()); } private static DiscoveryNode newFrozenOnlyNode(String nodeId) { diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/ExpectedShardSizeAllocationTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/ExpectedShardSizeAllocationTests.java index f1a2b4b1358fe..5ab57a2bba607 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/ExpectedShardSizeAllocationTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/ExpectedShardSizeAllocationTests.java @@ -259,11 +259,12 @@ private static ClusterInfo createClusterInfoWith(ShardId shardId, long size) { Map.of(), Map.of(), Map.of(), + Map.of(), Map.of() ); } private static ClusterInfo createClusterInfo(Map diskUsage, Map shardSizes) { - return new ClusterInfo(diskUsage, diskUsage, shardSizes, Map.of(), Map.of(), Map.of(), Map.of()); + return new ClusterInfo(diskUsage, diskUsage, shardSizes, Map.of(), Map.of(), Map.of(), Map.of(), Map.of()); } } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java index 8ab031aa53fe1..50ace093019ed 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java @@ -609,6 +609,7 @@ public void testShardSizeDiscrepancyWithinIndex() { Map.of(), Map.of(), Map.of(), + Map.of(), Map.of() ) ); @@ -705,7 +706,7 @@ private RoutingAllocation createRoutingAllocation(ClusterState clusterState) { } private static ClusterInfo createClusterInfo(Map indexSizes) { - return new ClusterInfo(Map.of(), Map.of(), indexSizes, Map.of(), Map.of(), Map.of(), Map.of()); + return new ClusterInfo(Map.of(), Map.of(), indexSizes, Map.of(), Map.of(), Map.of(), Map.of(), Map.of()); } private static IndexMetadata.Builder anIndex(String name) { diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterAllocationSimulationTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterAllocationSimulationTests.java index 277521c5832a1..44e9eb7ff5232 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterAllocationSimulationTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterAllocationSimulationTests.java @@ -561,7 +561,7 @@ public ClusterInfo getClusterInfo() { dataPath.put(new ClusterInfo.NodeAndShard(shardRouting.currentNodeId(), shardRouting.shardId()), "/data"); } - return new ClusterInfo(diskSpaceUsage, diskSpaceUsage, shardSizes, Map.of(), dataPath, Map.of(), Map.of()); + return new ClusterInfo(diskSpaceUsage, diskSpaceUsage, shardSizes, Map.of(), dataPath, Map.of(), Map.of(), Map.of()); } } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterBalanceStatsTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterBalanceStatsTests.java index 80fe603488fd3..73d0d7ac00796 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterBalanceStatsTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterBalanceStatsTests.java @@ -346,6 +346,7 @@ private ClusterInfo createClusterInfo(List> shardSizes) { Map.of(), Map.of(), Map.of(), + Map.of(), Map.of() ); } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterInfoSimulatorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterInfoSimulatorTests.java index b67e248999ced..921e9046a57e5 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterInfoSimulatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterInfoSimulatorTests.java @@ -697,6 +697,7 @@ public ClusterInfo build() { Map.of(), Map.of(), reservedSpace, + Map.of(), Map.of() ); } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputerTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputerTests.java index a0d28ce124584..9c8147f507961 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputerTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputerTests.java @@ -690,7 +690,7 @@ public void testDesiredBalanceShouldConvergeInABigCluster() { .stream() .collect(toMap(Map.Entry::getKey, it -> new DiskUsage(it.getKey(), it.getKey(), "/data", diskSize, diskSize - it.getValue()))); - var clusterInfo = new ClusterInfo(diskUsage, diskUsage, shardSizes, Map.of(), dataPath, Map.of(), Map.of()); + var clusterInfo = new ClusterInfo(diskUsage, diskUsage, shardSizes, Map.of(), dataPath, Map.of(), Map.of(), Map.of()); var settings = Settings.EMPTY; @@ -1196,7 +1196,7 @@ public ClusterInfoTestBuilder withReservedSpace(String nodeId, long size, ShardI } public ClusterInfo build() { - return new ClusterInfo(diskUsage, diskUsage, shardSizes, Map.of(), Map.of(), reservedSpace, Map.of()); + return new ClusterInfo(diskUsage, diskUsage, shardSizes, Map.of(), Map.of(), reservedSpace, Map.of(), Map.of()); } } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java index 844912cba4c17..37646d376f8fd 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java @@ -620,6 +620,7 @@ public void testUnassignedAllocationPredictsDiskUsage() { ImmutableOpenMap.of(), ImmutableOpenMap.of(), ImmutableOpenMap.of(), + ImmutableOpenMap.of(), ImmutableOpenMap.of() ); diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java index 5467d313834b8..f85c2678e04e7 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java @@ -1406,7 +1406,7 @@ static class DevNullClusterInfo extends ClusterInfo { Map shardSizes, Map reservedSpace ) { - super(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, Map.of(), Map.of(), reservedSpace, Map.of()); + super(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, Map.of(), Map.of(), reservedSpace, Map.of(), Map.of()); } @Override diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java index 7da75f61da801..dc98cf5349c09 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java @@ -109,6 +109,7 @@ public void testCanAllocateUsesMaxAvailableSpace() { Map.of(), Map.of(), Map.of(), + Map.of(), Map.of() ); RoutingAllocation allocation = new RoutingAllocation( @@ -181,6 +182,7 @@ private void doTestCannotAllocateDueToLackOfDiskResources(boolean testMaxHeadroo Map.of(), Map.of(), Map.of(), + Map.of(), Map.of() ); RoutingAllocation allocation = new RoutingAllocation( @@ -327,6 +329,7 @@ private void doTestCanRemainUsesLeastAvailableSpace(boolean testMaxHeadroom) { Map.of(), shardRoutingMap, Map.of(), + Map.of(), Map.of() ); RoutingAllocation allocation = new RoutingAllocation( @@ -847,6 +850,7 @@ public void testDecidesYesIfWatermarksIgnored() { Map.of(), Map.of(), Map.of(), + Map.of(), Map.of() ); RoutingAllocation allocation = new RoutingAllocation( @@ -915,6 +919,7 @@ public void testCannotForceAllocateOver100PercentUsage() { Map.of(), Map.of(), Map.of(), + Map.of(), Map.of() ); RoutingAllocation allocation = new RoutingAllocation( diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java b/test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java index 01b11ce97460a..6b6136c6c861b 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java @@ -43,7 +43,7 @@ public static class TestPlugin extends Plugin {} private volatile BiFunction diskUsageFunction; public MockInternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, NodeClient client) { - super(settings, clusterService, threadPool, client, EstimatedHeapUsageCollector.EMPTY); + super(settings, clusterService, threadPool, client, EstimatedHeapUsageCollector.EMPTY, NodeUsageStatsForThreadPoolsCollector.EMPTY); } public void setDiskUsageFunctionAndRefresh(BiFunction diskUsageFn) { diff --git a/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderService.java b/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderService.java index e451b1d45817d..6c4066a447b67 100644 --- a/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderService.java +++ b/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderService.java @@ -960,6 +960,7 @@ private ExtendedClusterInfo(Map extraShardSizes, ClusterInfo info) Map.of(), Map.of(), Map.of(), + Map.of(), Map.of() ); this.delegate = info; diff --git a/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/capacity/AutoscalingCalculateCapacityServiceTests.java b/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/capacity/AutoscalingCalculateCapacityServiceTests.java index 12f7dde103c9c..a5ce2ff894817 100644 --- a/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/capacity/AutoscalingCalculateCapacityServiceTests.java +++ b/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/capacity/AutoscalingCalculateCapacityServiceTests.java @@ -262,7 +262,7 @@ public void testContext() { } } state = ClusterState.builder(ClusterName.DEFAULT).nodes(nodes).build(); - info = new ClusterInfo(leastUsages, mostUsages, Map.of(), Map.of(), Map.of(), Map.of(), Map.of()); + info = new ClusterInfo(leastUsages, mostUsages, Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of()); context = new AutoscalingCalculateCapacityService.DefaultAutoscalingDeciderContext( roleNames, state, @@ -311,7 +311,7 @@ public void testContext() { ) ); - info = new ClusterInfo(leastUsages, mostUsages, Map.of(), Map.of(), Map.of(), Map.of(), Map.of()); + info = new ClusterInfo(leastUsages, mostUsages, Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of()); context = new AutoscalingCalculateCapacityService.DefaultAutoscalingDeciderContext( roleNames, state, diff --git a/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/storage/FrozenStorageDeciderServiceTests.java b/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/storage/FrozenStorageDeciderServiceTests.java index 37295ebf44208..e28f2b7ba2ec8 100644 --- a/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/storage/FrozenStorageDeciderServiceTests.java +++ b/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/storage/FrozenStorageDeciderServiceTests.java @@ -109,7 +109,7 @@ public Tuple sizeAndClusterInfo(IndexMetadata indexMetadata) // add irrelevant shards noise for completeness (should not happen IRL). sizes.put(new ShardId(index, i), randomLongBetween(0, Integer.MAX_VALUE)); } - ClusterInfo info = new ClusterInfo(Map.of(), Map.of(), Map.of(), sizes, Map.of(), Map.of(), Map.of()); + ClusterInfo info = new ClusterInfo(Map.of(), Map.of(), Map.of(), sizes, Map.of(), Map.of(), Map.of(), Map.of()); return Tuple.tuple(totalSize, info); } } diff --git a/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/storage/ProactiveStorageDeciderServiceTests.java b/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/storage/ProactiveStorageDeciderServiceTests.java index 8c1f18e84a619..dec4e3bd146c8 100644 --- a/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/storage/ProactiveStorageDeciderServiceTests.java +++ b/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/storage/ProactiveStorageDeciderServiceTests.java @@ -408,7 +408,7 @@ private ClusterInfo randomClusterInfo(ProjectState projectState) { for (var id : projectState.cluster().nodes().getDataNodes().keySet()) { diskUsage.put(id, new DiskUsage(id, id, "/test", Long.MAX_VALUE, Long.MAX_VALUE)); } - return new ClusterInfo(diskUsage, diskUsage, shardSizes, Map.of(), Map.of(), Map.of(), Map.of()); + return new ClusterInfo(diskUsage, diskUsage, shardSizes, Map.of(), Map.of(), Map.of(), Map.of(), Map.of()); } private ProjectMetadata applyCreatedDates(ProjectMetadata project, DataStream ds, long last, long decrement) { diff --git a/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderServiceTests.java b/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderServiceTests.java index 2ee94340f6d2c..94364e3e90c27 100644 --- a/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderServiceTests.java +++ b/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderServiceTests.java @@ -379,7 +379,7 @@ public void validateSizeOf(ClusterState clusterState, ShardRouting subjectShard, } private ReactiveStorageDeciderService.AllocationState createAllocationState(Map shardSize, ClusterState clusterState) { - ClusterInfo info = new ClusterInfo(Map.of(), Map.of(), shardSize, Map.of(), Map.of(), Map.of(), Map.of()); + ClusterInfo info = new ClusterInfo(Map.of(), Map.of(), shardSize, Map.of(), Map.of(), Map.of(), Map.of(), Map.of()); ReactiveStorageDeciderService.AllocationState allocationState = new ReactiveStorageDeciderService.AllocationState( clusterState, null, @@ -544,7 +544,7 @@ public void testUnmovableSize() { } var diskUsages = Map.of(nodeId, new DiskUsage(nodeId, null, null, ByteSizeUnit.KB.toBytes(100), ByteSizeUnit.KB.toBytes(5))); - ClusterInfo info = new ClusterInfo(diskUsages, diskUsages, shardSize, Map.of(), Map.of(), Map.of(), Map.of()); + ClusterInfo info = new ClusterInfo(diskUsages, diskUsages, shardSize, Map.of(), Map.of(), Map.of(), Map.of(), Map.of()); ReactiveStorageDeciderService.AllocationState allocationState = new ReactiveStorageDeciderService.AllocationState( clusterState, diff --git a/x-pack/plugin/deprecation/src/test/java/org/elasticsearch/xpack/deprecation/TransportNodeDeprecationCheckActionTests.java b/x-pack/plugin/deprecation/src/test/java/org/elasticsearch/xpack/deprecation/TransportNodeDeprecationCheckActionTests.java index 40a564088aee6..905dd93c3ff1b 100644 --- a/x-pack/plugin/deprecation/src/test/java/org/elasticsearch/xpack/deprecation/TransportNodeDeprecationCheckActionTests.java +++ b/x-pack/plugin/deprecation/src/test/java/org/elasticsearch/xpack/deprecation/TransportNodeDeprecationCheckActionTests.java @@ -174,6 +174,7 @@ public void testCheckDiskLowWatermark() { Map.of(), Map.of(), Map.of(), + Map.of(), Map.of() ); DeprecationIssue issue = TransportNodeDeprecationCheckAction.checkDiskLowWatermark(