diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java index 958bfd8756dac..e138a2d0b5f6d 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java @@ -347,7 +347,9 @@ public void testClusterInfoIncludesNodeUsageStatsForThreadPools() { var settings = Settings.builder() .put( WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(), - WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED + randomBoolean() + ? WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED + : WriteLoadConstraintSettings.WriteLoadDeciderStatus.LOW_THRESHOLD_ONLY ) // Manually control cluster info refreshes .put(InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.getKey(), "60m") diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java index f752650e3f3d7..a7821b5bf1f13 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -319,7 +319,9 @@ public void testNodeWriteLoadsArePresent() { Settings.builder() .put( WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(), - WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED + randomBoolean() + ? WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED + : WriteLoadConstraintSettings.WriteLoadDeciderStatus.LOW_THRESHOLD_ONLY ) .build() ); @@ -376,7 +378,9 @@ public void testShardWriteLoadsArePresent() { Settings.builder() .put( WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(), - WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED + randomBoolean() + ? WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED + : WriteLoadConstraintSettings.WriteLoadDeciderStatus.LOW_THRESHOLD_ONLY ) .build() ); diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java index 5633bd8b89e1e..1d3b79a0dc1af 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java @@ -66,6 +66,7 @@ import org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.SnapshotInProgressAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; +import org.elasticsearch.cluster.routing.allocation.decider.WriteLoadConstraintDecider; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.NamedWriteable; import org.elasticsearch.common.io.stream.NamedWriteableRegistry.Entry; @@ -446,6 +447,7 @@ public static Collection createAllocationDeciders( addAllocationDecider(deciders, new SnapshotInProgressAllocationDecider()); addAllocationDecider(deciders, new RestoreInProgressAllocationDecider()); addAllocationDecider(deciders, new NodeShutdownAllocationDecider()); + addAllocationDecider(deciders, new WriteLoadConstraintDecider(clusterSettings)); addAllocationDecider(deciders, new NodeReplacementAllocationDecider()); addAllocationDecider(deciders, new FilterAllocationDecider(settings, clusterSettings)); addAllocationDecider(deciders, new SameShardAllocationDecider(clusterSettings)); diff --git a/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java b/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java index 6de0640f4422f..bd947c117f0e5 100644 --- a/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java +++ b/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java @@ -219,7 +219,7 @@ void execute() { logger.trace("starting async refresh"); try (var ignoredRefs = fetchRefs) { - maybeFetchIndicesStats(diskThresholdEnabled || writeLoadConstraintEnabled == WriteLoadDeciderStatus.ENABLED); + maybeFetchIndicesStats(diskThresholdEnabled || writeLoadConstraintEnabled.atLeastLowThresholdEnabled()); maybeFetchNodeStats(diskThresholdEnabled || estimatedHeapThresholdEnabled); maybeFetchNodesEstimatedHeapUsage(estimatedHeapThresholdEnabled); maybeFetchNodesUsageStatsForThreadPools(writeLoadConstraintEnabled); @@ -262,7 +262,7 @@ private void maybeFetchNodesEstimatedHeapUsage(boolean shouldFetch) { } private void maybeFetchNodesUsageStatsForThreadPools(WriteLoadDeciderStatus writeLoadConstraintEnabled) { - if (writeLoadConstraintEnabled != WriteLoadDeciderStatus.DISABLED) { + if (writeLoadConstraintEnabled.atLeastLowThresholdEnabled()) { try (var ignored = threadPool.getThreadContext().clearTraceContext()) { fetchNodesUsageStatsForThreadPools(); } @@ -313,7 +313,7 @@ private void fetchIndicesStats() { // This returns the shard sizes on disk indicesStatsRequest.store(true); } - if (writeLoadConstraintEnabled == WriteLoadDeciderStatus.ENABLED) { + if (writeLoadConstraintEnabled.atLeastLowThresholdEnabled()) { // This returns the shard write-loads indicesStatsRequest.indexing(true); } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java b/server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java index dda98184b5f5a..a807e3d9b6427 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java @@ -88,11 +88,32 @@ private static NodeUsageStatsForThreadPools.ThreadPoolUsageStats replaceWritePoo .get(ThreadPool.Names.WRITE); return new NodeUsageStatsForThreadPools.ThreadPoolUsageStats( writeThreadPoolStats.totalThreadPoolThreads(), - (float) Math.max( - (writeThreadPoolStats.averageThreadPoolUtilization() + (writeLoadDelta / writeThreadPoolStats.totalThreadPoolThreads())), - 0.0 + updateNodeUtilizationWithShardMovements( + writeThreadPoolStats.averageThreadPoolUtilization(), + (float) writeLoadDelta, + writeThreadPoolStats.totalThreadPoolThreads() ), writeThreadPoolStats.maxThreadPoolQueueLatencyMillis() ); } + + /** + * The {@code nodeUtilization} is the average utilization per thread for some duration of time. The {@code shardWriteLoadDelta} is the + * sum of shards' total execution time. Dividing the shards total execution time by the number of threads provides the average + * utilization of each write thread for those shards. The change in shard load can then be added to the node utilization. + * + * @param nodeUtilization The current node-level write load percent utilization. + * @param shardWriteLoadDelta The change in shard(s) execution time across all threads. This can be positive or negative depending on + * whether shards were moved onto the node or off of the node. + * @param numberOfWriteThreads The number of threads available in the node's write thread pool. + * @return The new node-level write load percent utilization after adding the shard write load delta. + */ + public static float updateNodeUtilizationWithShardMovements( + float nodeUtilization, + float shardWriteLoadDelta, + int numberOfWriteThreads + ) { + float newNodeUtilization = nodeUtilization + (shardWriteLoadDelta / numberOfWriteThreads); + return (float) Math.max(newNodeUtilization, 0.0); + } } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/WriteLoadConstraintSettings.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/WriteLoadConstraintSettings.java index 23e1cb563f9fd..3ee0702b13192 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/WriteLoadConstraintSettings.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/WriteLoadConstraintSettings.java @@ -30,13 +30,30 @@ public enum WriteLoadDeciderStatus { */ DISABLED, /** - * Only the low-threshold is enabled (write-load will not trigger rebalance) + * Only the low write low threshold, to try to avoid allocating to a node exceeding + * {@link #WRITE_LOAD_DECIDER_HIGH_UTILIZATION_THRESHOLD_SETTING}. Write-load hot-spot will not trigger rebalancing. */ - LOW_ONLY, + LOW_THRESHOLD_ONLY, /** - * The decider is enabled + * All write load decider development work is turned on. */ - ENABLED + ENABLED; + + public boolean fullyEnabled() { + return this == ENABLED; + } + + public boolean notFullyEnabled() { + return this != ENABLED; + } + + public boolean atLeastLowThresholdEnabled() { + return this != DISABLED; + } + + public boolean disabled() { + return this == DISABLED; + } } public static final Setting WRITE_LOAD_DECIDER_ENABLED_SETTING = Setting.enumSetting( @@ -102,10 +119,16 @@ public enum WriteLoadDeciderStatus { WriteLoadDeciderStatus writeLoadDeciderStatus; TimeValue writeLoadDeciderRerouteIntervalSetting; + double writeThreadPoolHighUtilizationThresholdSetting; - WriteLoadConstraintSettings(ClusterSettings clusterSettings) { + public WriteLoadConstraintSettings(ClusterSettings clusterSettings) { clusterSettings.initializeAndWatch(WRITE_LOAD_DECIDER_ENABLED_SETTING, this::setWriteLoadConstraintEnabled); clusterSettings.initializeAndWatch(WRITE_LOAD_DECIDER_REROUTE_INTERVAL_SETTING, this::setWriteLoadDeciderRerouteIntervalSetting); + clusterSettings.initializeAndWatch( + WRITE_LOAD_DECIDER_HIGH_UTILIZATION_THRESHOLD_SETTING, + this::setWriteThreadPoolHighUtilizationThresholdSetting + ); + }; private void setWriteLoadConstraintEnabled(WriteLoadDeciderStatus status) { @@ -120,7 +143,15 @@ public TimeValue getWriteLoadDeciderRerouteIntervalSetting() { return this.writeLoadDeciderRerouteIntervalSetting; } + public double getWriteThreadPoolHighUtilizationThresholdSetting() { + return this.writeThreadPoolHighUtilizationThresholdSetting; + } + private void setWriteLoadDeciderRerouteIntervalSetting(TimeValue timeValue) { this.writeLoadDeciderRerouteIntervalSetting = timeValue; } + + private void setWriteThreadPoolHighUtilizationThresholdSetting(RatioValue percent) { + this.writeThreadPoolHighUtilizationThresholdSetting = percent.getAsRatio(); + } } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java new file mode 100644 index 0000000000000..ef24760f02a6b --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java @@ -0,0 +1,124 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster.routing.allocation.decider; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.cluster.NodeUsageStatsForThreadPools.ThreadPoolUsageStats; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.ShardMovementWriteLoadSimulator; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; +import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.core.Strings; +import org.elasticsearch.threadpool.ThreadPool; + +/** + * Decides whether shards can be allocated to cluster nodes, or can remain on cluster nodes, based on the target node's current write thread + * pool usage stats and any candidate shard's write load estimate. + */ +public class WriteLoadConstraintDecider extends AllocationDecider { + private static final Logger logger = LogManager.getLogger(WriteLoadConstraintDecider.class); + + public static final String NAME = "write_load"; + + private final WriteLoadConstraintSettings writeLoadConstraintSettings; + + public WriteLoadConstraintDecider(ClusterSettings clusterSettings) { + this.writeLoadConstraintSettings = new WriteLoadConstraintSettings(clusterSettings); + } + + @Override + public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + if (writeLoadConstraintSettings.getWriteLoadConstraintEnabled().disabled()) { + return Decision.single(Decision.Type.YES, NAME, "Decider is disabled"); + } + + // Check whether the shard being relocated has any write load estimate. If it does not, then this decider has no opinion. + var allShardWriteLoads = allocation.clusterInfo().getShardWriteLoads(); + var shardWriteLoad = allShardWriteLoads.get(shardRouting.shardId()); + if (shardWriteLoad == null || shardWriteLoad == 0) { + return Decision.single(Decision.Type.YES, NAME, "Shard has no estimated write load. Decider takes no action."); + } + + var allNodeUsageStats = allocation.clusterInfo().getNodeUsageStatsForThreadPools(); + var nodeUsageStatsForThreadPools = allNodeUsageStats.get(node.nodeId()); + if (nodeUsageStatsForThreadPools == null) { + // No node-level thread pool usage stats were reported for this node. Let's assume this is OK and that the simulator will handle + // setting a node-level write load for this node after this shard is assigned. + return Decision.single(Decision.Type.YES, NAME, "The node has no write load estimate. Decider takes no action."); + } + + assert nodeUsageStatsForThreadPools.threadPoolUsageStatsMap().isEmpty() == false; + assert nodeUsageStatsForThreadPools.threadPoolUsageStatsMap().get(ThreadPool.Names.WRITE) != null; + var nodeWriteThreadPoolStats = nodeUsageStatsForThreadPools.threadPoolUsageStatsMap().get(ThreadPool.Names.WRITE); + var nodeWriteThreadPoolLoadThreshold = writeLoadConstraintSettings.getWriteThreadPoolHighUtilizationThresholdSetting(); + if (nodeWriteThreadPoolStats.averageThreadPoolUtilization() >= nodeWriteThreadPoolLoadThreshold) { + // The node's write thread pool usage stats already show high utilization above the threshold for accepting new shards. + String explain = Strings.format( + "Node [%s] with write thread pool utilization [%.2f] already exceeds the high utilization threshold of [%f]. Cannot " + + "allocate shard [%s] to node without risking increased write latencies.", + node.nodeId(), + nodeWriteThreadPoolStats.averageThreadPoolUtilization(), + nodeWriteThreadPoolLoadThreshold, + shardRouting.shardId() + ); + logger.debug(explain); + return Decision.single(Decision.Type.NO, NAME, explain); + } + + if (calculateShardMovementChange(nodeWriteThreadPoolStats, shardWriteLoad) >= nodeWriteThreadPoolLoadThreshold) { + // The node's write thread pool usage would be raised above the high utilization threshold with assignment of the new shard. + // This could lead to a hot spot on this node and is undesirable. + String explain = Strings.format( + "The high utilization threshold of [%f] would be exceeded on node [%s] with utilization [%.2f] if shard [%s] with " + + "estimated additional utilisation [%.5f] (write load [%.5f] / threads [%d]) were assigned to it. Cannot allocate " + + "shard to node without risking increased write latencies.", + nodeWriteThreadPoolLoadThreshold, + node.nodeId(), + nodeWriteThreadPoolStats.averageThreadPoolUtilization(), + shardRouting.shardId(), + shardWriteLoad / nodeWriteThreadPoolStats.totalThreadPoolThreads(), + shardWriteLoad, + nodeWriteThreadPoolStats.totalThreadPoolThreads() + ); + logger.debug(explain); + return Decision.single(Decision.Type.NO, NAME, explain); + } + + return Decision.YES; + } + + @Override + public Decision canRemain(IndexMetadata indexMetadata, ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + if (writeLoadConstraintSettings.getWriteLoadConstraintEnabled().notFullyEnabled()) { + return Decision.single(Decision.Type.YES, NAME, "canRemain() is not enabled"); + } + + // TODO: implement + + return Decision.single(Decision.Type.YES, NAME, "canRemain() is not yet implemented"); + } + + /** + * Calculates the change to the node's write thread pool utilization percentage if the shard is added to the node. + * Returns the percent thread pool utilization change. + */ + private float calculateShardMovementChange(ThreadPoolUsageStats nodeWriteThreadPoolStats, double shardWriteLoad) { + assert shardWriteLoad > 0; + return ShardMovementWriteLoadSimulator.updateNodeUtilizationWithShardMovements( + nodeWriteThreadPoolStats.averageThreadPoolUtilization(), + (float) shardWriteLoad, + nodeWriteThreadPoolStats.totalThreadPoolThreads() + ); + } +} diff --git a/server/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java b/server/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java index 2908bff995340..70df5f78615ca 100644 --- a/server/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java @@ -36,6 +36,7 @@ import org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.SnapshotInProgressAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; +import org.elasticsearch.cluster.routing.allocation.decider.WriteLoadConstraintDecider; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.IndexScopedSettings; @@ -278,6 +279,7 @@ public void testAllocationDeciderOrder() { SnapshotInProgressAllocationDecider.class, RestoreInProgressAllocationDecider.class, NodeShutdownAllocationDecider.class, + WriteLoadConstraintDecider.class, NodeReplacementAllocationDecider.class, FilterAllocationDecider.class, SameShardAllocationDecider.class, diff --git a/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java b/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java index 72eb5a6a3b764..1e91f69f47573 100644 --- a/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java @@ -62,7 +62,9 @@ public void testScheduling() { .put(InternalClusterInfoService.CLUSTER_ROUTING_ALLOCATION_ESTIMATED_HEAP_THRESHOLD_DECIDER_ENABLED.getKey(), true) .put( WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(), - WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED + randomBoolean() + ? WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED + : WriteLoadConstraintSettings.WriteLoadDeciderStatus.LOW_THRESHOLD_ONLY ); if (randomBoolean()) { settingsBuilder.put(INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.getKey(), randomIntBetween(10000, 60000) + "ms"); diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderTests.java new file mode 100644 index 0000000000000..12bfd8a0a4789 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderTests.java @@ -0,0 +1,306 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster.routing.allocation.decider; + +import org.elasticsearch.action.support.replication.ClusterStateCreationUtils; +import org.elasticsearch.cluster.ClusterInfo; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ESAllocationTestCase; +import org.elasticsearch.cluster.NodeUsageStatsForThreadPools; +import org.elasticsearch.cluster.NodeUsageStatsForThreadPools.ThreadPoolUsageStats; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.RoutingNodes; +import org.elasticsearch.cluster.routing.RoutingNodesHelper; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.TestShardRouting; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; +import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.HashMap; + +import static org.elasticsearch.common.settings.ClusterSettings.createBuiltInClusterSettings; + +public class WriteLoadConstraintDeciderTests extends ESAllocationTestCase { + + /** + * Test the write load decider behavior when disabled + */ + public void testWriteLoadDeciderDisabled() { + String indexName = "test-index"; + var testHarness = createClusterStateAndRoutingAllocation(indexName); + + // The write load decider is disabled by default. + + var writeLoadDecider = createWriteLoadConstraintDecider(Settings.builder().build()); + + assertEquals( + Decision.Type.YES, + writeLoadDecider.canAllocate( + testHarness.shardRouting2, + testHarness.exceedingThresholdRoutingNode, + testHarness.routingAllocation + ).type() + ); + assertEquals( + Decision.Type.YES, + writeLoadDecider.canAllocate(testHarness.shardRouting1, testHarness.belowThresholdRoutingNode, testHarness.routingAllocation) + .type() + ); + assertEquals( + Decision.Type.YES, + writeLoadDecider.canAllocate(testHarness.shardRouting1, testHarness.nearThresholdRoutingNode, testHarness.routingAllocation) + .type() + ); + assertEquals( + Decision.Type.YES, + writeLoadDecider.canAllocate( + testHarness.thirdRoutingNoWriteLoad, + testHarness.exceedingThresholdRoutingNode, + testHarness.routingAllocation + ).type() + ); + + assertEquals( + Decision.Type.YES, + writeLoadDecider.canRemain( + testHarness.clusterState.metadata().getProject().index(indexName), + testHarness.shardRouting1, + testHarness.exceedingThresholdRoutingNode, + testHarness.routingAllocation + ).type() + ); + } + + /** + * Test the {@link WriteLoadConstraintDecider#canAllocate} implementation. + */ + public void testWriteLoadDeciderCanAllocate() { + String indexName = "test-index"; + var testHarness = createClusterStateAndRoutingAllocation(indexName); + + var writeLoadDecider = createWriteLoadConstraintDecider( + Settings.builder() + .put( + WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(), + randomBoolean() + ? WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED + : WriteLoadConstraintSettings.WriteLoadDeciderStatus.LOW_THRESHOLD_ONLY + ) + .build() + ); + assertEquals( + "Assigning a new shard to a node that is above the threshold should fail", + Decision.Type.NO, + writeLoadDecider.canAllocate( + testHarness.shardRouting2, + testHarness.exceedingThresholdRoutingNode, + testHarness.routingAllocation + ).type() + ); + assertEquals( + "Assigning a new shard to a node that has capacity should succeed", + Decision.Type.YES, + writeLoadDecider.canAllocate(testHarness.shardRouting1, testHarness.belowThresholdRoutingNode, testHarness.routingAllocation) + .type() + ); + assertEquals( + "Assigning a new shard without a write load estimate should _not_ be blocked by lack of capacity", + Decision.Type.YES, + writeLoadDecider.canAllocate( + testHarness.thirdRoutingNoWriteLoad, + testHarness.exceedingThresholdRoutingNode, + testHarness.routingAllocation + ).type() + ); + assertEquals( + "Assigning a new shard that would cause the node to exceed capacity should fail", + Decision.Type.NO, + writeLoadDecider.canAllocate(testHarness.shardRouting1, testHarness.nearThresholdRoutingNode, testHarness.routingAllocation) + .type() + ); + } + + /** + * Carries all the cluster state objects needed for testing after {@link #createClusterStateAndRoutingAllocation} sets them up. + */ + private record TestHarness( + ClusterState clusterState, + RoutingAllocation routingAllocation, + RoutingNode exceedingThresholdRoutingNode, + RoutingNode belowThresholdRoutingNode, + RoutingNode nearThresholdRoutingNode, + ShardRouting shardRouting1, + ShardRouting shardRouting2, + ShardRouting thirdRoutingNoWriteLoad + ) {} + + /** + * Creates all the cluster state and objects needed to test the {@link WriteLoadConstraintDecider}. + */ + private TestHarness createClusterStateAndRoutingAllocation(String indexName) { + /** + * Create the ClusterState for multiple nodes and multiple index shards. + */ + + ClusterState clusterState = ClusterStateCreationUtils.stateWithAssignedPrimariesAndReplicas(new String[] { indexName }, 3, 1); + // The number of data nodes the util method above creates is numberOfReplicas+1, and three data nodes are needed for this test. + assertEquals(3, clusterState.nodes().size()); + assertEquals(1, clusterState.metadata().getTotalNumberOfIndices()); + + /** + * Fetch references to the nodes and index shards from the generated ClusterState, so the ClusterInfo can be created from them. + */ + + var discoveryNodeIterator = clusterState.nodes().iterator(); + assertTrue(discoveryNodeIterator.hasNext()); + var exceedingThresholdDiscoveryNode = discoveryNodeIterator.next(); + assertTrue(discoveryNodeIterator.hasNext()); + var belowThresholdDiscoveryNode2 = discoveryNodeIterator.next(); + assertTrue(discoveryNodeIterator.hasNext()); + var nearThresholdDiscoveryNode3 = discoveryNodeIterator.next(); + assertFalse(discoveryNodeIterator.hasNext()); + + var indexIterator = clusterState.metadata().indicesAllProjects().iterator(); + assertTrue(indexIterator.hasNext()); + IndexMetadata testIndexMetadata = indexIterator.next(); + assertFalse(indexIterator.hasNext()); + Index testIndex = testIndexMetadata.getIndex(); + assertEquals(3, testIndexMetadata.getNumberOfShards()); + ShardId testShardId1 = new ShardId(testIndex, 0); + ShardId testShardId2 = new ShardId(testIndex, 1); + ShardId testShardId3NoWriteLoad = new ShardId(testIndex, 2); + + /** + * Create a ClusterInfo that includes the node and shard level write load estimates for a variety of node capacity situations. + */ + + var nodeThreadPoolStatsWithWriteExceedingThreshold = createNodeUsageStatsForThreadPools( + exceedingThresholdDiscoveryNode, + 8, + 0.99f, + 0 + ); + var nodeThreadPoolStatsWithWriteBelowThreshold = createNodeUsageStatsForThreadPools(belowThresholdDiscoveryNode2, 8, 0.50f, 0); + var nodeThreadPoolStatsWithWriteNearThreshold = createNodeUsageStatsForThreadPools(nearThresholdDiscoveryNode3, 8, 0.89f, 0); + + // Create a map of usage per node. + var nodeIdToNodeUsageStatsForThreadPools = new HashMap(); + nodeIdToNodeUsageStatsForThreadPools.put(exceedingThresholdDiscoveryNode.getId(), nodeThreadPoolStatsWithWriteExceedingThreshold); + nodeIdToNodeUsageStatsForThreadPools.put(belowThresholdDiscoveryNode2.getId(), nodeThreadPoolStatsWithWriteBelowThreshold); + nodeIdToNodeUsageStatsForThreadPools.put(nearThresholdDiscoveryNode3.getId(), nodeThreadPoolStatsWithWriteNearThreshold); + + // Create a map of usage per shard. + var shardIdToWriteLoadEstimate = new HashMap(); + shardIdToWriteLoadEstimate.put(testShardId1, 0.5); + shardIdToWriteLoadEstimate.put(testShardId2, 0.5); + shardIdToWriteLoadEstimate.put(testShardId3NoWriteLoad, 0d); + + ClusterInfo clusterInfo = ClusterInfo.builder() + .nodeUsageStatsForThreadPools(nodeIdToNodeUsageStatsForThreadPools) + .shardWriteLoads(shardIdToWriteLoadEstimate) + .build(); + + /** + * Create the RoutingAllocation from the ClusterState and ClusterInfo above, and set up the other input for the WriteLoadDecider. + */ + + var routingAllocation = new RoutingAllocation( + null, + RoutingNodes.immutable(clusterState.globalRoutingTable(), clusterState.nodes()), + clusterState, + clusterInfo, + null, + System.nanoTime() + ); + + ShardRouting shardRouting1 = TestShardRouting.newShardRouting( + testShardId1, + exceedingThresholdDiscoveryNode.getId(), + null, + true, + ShardRoutingState.STARTED + ); + ShardRouting shardRouting2 = TestShardRouting.newShardRouting( + testShardId2, + belowThresholdDiscoveryNode2.getId(), + null, + true, + ShardRoutingState.STARTED + ); + ShardRouting thirdRoutingNoWriteLoad = TestShardRouting.newShardRouting( + testShardId3NoWriteLoad, + belowThresholdDiscoveryNode2.getId(), + null, + true, + ShardRoutingState.STARTED + ); + + RoutingNode exceedingThresholdRoutingNode = RoutingNodesHelper.routingNode( + exceedingThresholdDiscoveryNode.getId(), + exceedingThresholdDiscoveryNode, + shardRouting1 + ); + RoutingNode belowThresholdRoutingNode = RoutingNodesHelper.routingNode( + belowThresholdDiscoveryNode2.getId(), + belowThresholdDiscoveryNode2, + shardRouting2 + ); + RoutingNode nearThresholdRoutingNode = RoutingNodesHelper.routingNode( + nearThresholdDiscoveryNode3.getId(), + nearThresholdDiscoveryNode3, + new ShardRouting[] {} + ); + + return new TestHarness( + clusterState, + routingAllocation, + exceedingThresholdRoutingNode, + belowThresholdRoutingNode, + nearThresholdRoutingNode, + shardRouting1, + shardRouting2, + thirdRoutingNoWriteLoad + ); + } + + private WriteLoadConstraintDecider createWriteLoadConstraintDecider(Settings settings) { + return new WriteLoadConstraintDecider(createBuiltInClusterSettings(settings)); + } + + /** + * Helper to create a {@link NodeUsageStatsForThreadPools} for the given node with the given WRITE thread pool usage stats. + */ + private NodeUsageStatsForThreadPools createNodeUsageStatsForThreadPools( + DiscoveryNode discoveryNode, + int totalWriteThreadPoolThreads, + float averageWriteThreadPoolUtilization, + long averageWriteThreadPoolQueueLatencyMillis + ) { + + // Create thread pool usage stats map for node1. + var writeThreadPoolUsageStats = new ThreadPoolUsageStats( + totalWriteThreadPoolThreads, + averageWriteThreadPoolUtilization, + averageWriteThreadPoolQueueLatencyMillis + ); + var threadPoolUsageMap = new HashMap(); + threadPoolUsageMap.put(ThreadPool.Names.WRITE, writeThreadPoolUsageStats); + + // Create the node's thread pool usage map + return new NodeUsageStatsForThreadPools(discoveryNode.getId(), threadPoolUsageMap); + } +}