From e50820621c26f04872b4cf103f2431ef5c1fda3f Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Mon, 28 Jul 2025 15:53:03 -0400 Subject: [PATCH 01/16] WriteLoadConstraintDecider PoC --- .../decider/WriteLoadConstraintDeciderIT.java | 29 +++++++ .../elasticsearch/cluster/ClusterModule.java | 2 + .../WriteLoadConstraintSettings.java | 16 +++- .../decider/WriteLoadConstraintDecider.java | 87 +++++++++++++++++++ .../WriteLoadConstraintDeciderTests.java | 35 ++++++++ 5 files changed, 168 insertions(+), 1 deletion(-) create mode 100644 server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderIT.java create mode 100644 server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java create mode 100644 server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderTests.java diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderIT.java new file mode 100644 index 0000000000000..3e35db734da18 --- /dev/null +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderIT.java @@ -0,0 +1,29 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster.routing.allocation.decider; + +import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ESIntegTestCase; + +public class WriteLoadConstraintDeciderIT extends ESIntegTestCase { + @Override + protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal, otherSettings)) + .put( + WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(), + WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED + ) + .build(); + } + + // TODO: integration testing to see that the components work together. +} diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java index 5633bd8b89e1e..f8faf648eaf4b 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java @@ -66,6 +66,7 @@ import org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.SnapshotInProgressAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; +import org.elasticsearch.cluster.routing.allocation.decider.WriteLoadConstraintDecider; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.NamedWriteable; import org.elasticsearch.common.io.stream.NamedWriteableRegistry.Entry; @@ -453,6 +454,7 @@ public static Collection createAllocationDeciders( addAllocationDecider(deciders, new ThrottlingAllocationDecider(clusterSettings)); addAllocationDecider(deciders, new ShardsLimitAllocationDecider(clusterSettings)); addAllocationDecider(deciders, new AwarenessAllocationDecider(settings, clusterSettings)); + addAllocationDecider(deciders, new WriteLoadConstraintDecider(clusterSettings)); clusterPlugins.stream() .flatMap(p -> p.createAllocationDeciders(settings, clusterSettings).stream()) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/WriteLoadConstraintSettings.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/WriteLoadConstraintSettings.java index 23e1cb563f9fd..c20a4db5bede3 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/WriteLoadConstraintSettings.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/WriteLoadConstraintSettings.java @@ -102,10 +102,16 @@ public enum WriteLoadDeciderStatus { WriteLoadDeciderStatus writeLoadDeciderStatus; TimeValue writeLoadDeciderRerouteIntervalSetting; + double writeThreadPoolHighUtilizationThresholdSetting; - WriteLoadConstraintSettings(ClusterSettings clusterSettings) { + public WriteLoadConstraintSettings(ClusterSettings clusterSettings) { clusterSettings.initializeAndWatch(WRITE_LOAD_DECIDER_ENABLED_SETTING, this::setWriteLoadConstraintEnabled); clusterSettings.initializeAndWatch(WRITE_LOAD_DECIDER_REROUTE_INTERVAL_SETTING, this::setWriteLoadDeciderRerouteIntervalSetting); + clusterSettings.initializeAndWatch( + WRITE_LOAD_DECIDER_HIGH_UTILIZATION_THRESHOLD_SETTING, + this::setWriteThreadPoolHighUtilizationThresholdSetting + ); + }; private void setWriteLoadConstraintEnabled(WriteLoadDeciderStatus status) { @@ -120,7 +126,15 @@ public TimeValue getWriteLoadDeciderRerouteIntervalSetting() { return this.writeLoadDeciderRerouteIntervalSetting; } + public double getWriteThreadPoolHighUtilizationThresholdSetting() { + return this.writeThreadPoolHighUtilizationThresholdSetting; + } + private void setWriteLoadDeciderRerouteIntervalSetting(TimeValue timeValue) { this.writeLoadDeciderRerouteIntervalSetting = timeValue; } + + private void setWriteThreadPoolHighUtilizationThresholdSetting(RatioValue percent) { + this.writeThreadPoolHighUtilizationThresholdSetting = percent.getAsRatio(); + } } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java new file mode 100644 index 0000000000000..7a021c7779b98 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java @@ -0,0 +1,87 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster.routing.allocation.decider; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; +import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.threadpool.ThreadPool; + +/** + * Decides whether shards can be allocated to cluster nodes, or can remain on cluster nodes, based on the target node's current write thread + * pool usage stats and any candidate shard's write load estimate. + */ +public class WriteLoadConstraintDecider extends AllocationDecider { + private static final Logger logger = LogManager.getLogger(WriteLoadConstraintDecider.class); + + public static final String NAME = "write_load"; + + private final WriteLoadConstraintSettings writeLoadConstraintSettings; + + public WriteLoadConstraintDecider(ClusterSettings clusterSettings) { + this.writeLoadConstraintSettings = new WriteLoadConstraintSettings(clusterSettings); + } + + @Override + public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + if (writeLoadConstraintSettings.getWriteLoadConstraintEnabled() != WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED) { + return Decision.YES; + } + + // Check whether the shard being relocated has any write load estimate. If it does not, then this decider has no opinion. + var allShardWriteLoads = allocation.clusterInfo().getShardWriteLoads(); + var shardWriteLoad = allShardWriteLoads.get(shardRouting.shardId()); + if (shardWriteLoad == null || shardWriteLoad == 0) { + return Decision.YES; + } + + var allNodeUsageStats = allocation.clusterInfo().getNodeUsageStatsForThreadPools(); + var nodeUsageStatsForThreadPools = allNodeUsageStats.get(node.nodeId()); + if (nodeUsageStatsForThreadPools == null) { + // No node-level thread pool usage stats were reported for this node. Let's assume this is OK and that the simulator will handle + // setting a node-level write load for this node after this shard is assigned. + return Decision.YES; + } + + assert nodeUsageStatsForThreadPools.threadPoolUsageStatsMap().isEmpty() == false; + assert nodeUsageStatsForThreadPools.threadPoolUsageStatsMap().get(ThreadPool.Names.WRITE) != null; + var nodeWriteThreadPoolStats = nodeUsageStatsForThreadPools.threadPoolUsageStatsMap().get(ThreadPool.Names.WRITE); + var nodeWriteThreadPoolLoadThreshold = writeLoadConstraintSettings.getWriteThreadPoolHighUtilizationThresholdSetting(); + if (nodeWriteThreadPoolStats.averageThreadPoolUtilization() >= nodeWriteThreadPoolLoadThreshold) { + // The node's write thread pool usage stats already show high utilization above the threshold for accepting new shards. + logger.debug( + "The high utilization threshold of {} has already been reached on node {}. Cannot allocate shard {} to node {} " + + "without risking increased write latencies.", + nodeWriteThreadPoolLoadThreshold, + node.nodeId(), + shardRouting.shardId(), + node.nodeId() + ); + return Decision.NO; + } + + return Decision.YES; + } + + @Override + public Decision canRemain(IndexMetadata indexMetadata, ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + if (writeLoadConstraintSettings.getWriteLoadConstraintEnabled() != WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED) { + return Decision.YES; + } + + return Decision.YES; + } + +} diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderTests.java new file mode 100644 index 0000000000000..796c284d0846d --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderTests.java @@ -0,0 +1,35 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster.routing.allocation.decider; + +import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings; +import org.elasticsearch.common.settings.Settings; + +public class WriteLoadConstraintDeciderTests { + + public void testWriteLoadDeciderIsDisabled() { + // TODO + } + + public void testShardWithNoWriteLoadEstimateIsAlwaysYES() { + Settings writeLoadConstraintSettings = Settings.builder() + .put( + WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(), + WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED + ) + .build(); + // TODO + } + + public void testShardWithWriteLoadEstimate() { + // TODO: test successful re-assignment and rejected re-assignment due to threshold + } + +} From 72471c917754065db2516878dc97e47e63baee60 Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Mon, 28 Jul 2025 17:53:42 -0400 Subject: [PATCH 02/16] add test extension --- .../allocation/decider/WriteLoadConstraintDeciderTests.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderTests.java index 796c284d0846d..dd582ead2118d 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderTests.java @@ -9,10 +9,11 @@ package org.elasticsearch.cluster.routing.allocation.decider; +import org.elasticsearch.cluster.ESAllocationTestCase; import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings; import org.elasticsearch.common.settings.Settings; -public class WriteLoadConstraintDeciderTests { +public class WriteLoadConstraintDeciderTests extends ESAllocationTestCase { public void testWriteLoadDeciderIsDisabled() { // TODO From 121d4deea916fe3172d30813976ea385a0e20cb1 Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Wed, 30 Jul 2025 20:41:29 -0400 Subject: [PATCH 03/16] WIP testing -- can't run until internet returns --- .../ShardMovementWriteLoadSimulator.java | 13 ++ .../decider/WriteLoadConstraintDecider.java | 2 + .../WriteLoadConstraintDeciderTests.java | 190 +++++++++++++++++- 3 files changed, 204 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java b/server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java index 1a729a992583c..1cf5d070d8ad0 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java @@ -80,12 +80,25 @@ public Map simulatedNodeUsageStatsForThrea return Collections.unmodifiableMap(adjustedNodeUsageStatsForThreadPools); } + /** + * averageThreadPoolUtilization + * total shard execution time (sum of shard write loads) / total execution time (totalThreadPoolThreads x duration) + * (30 + 20 + 40) / (4 x 30s) = 90 / 120 = .75 + * + * writeLoadDelta / totalThreadPoolThreads + * 20 / 4 = 5 + * + */ + private static NodeUsageStatsForThreadPools.ThreadPoolUsageStats replaceWritePoolStats( NodeUsageStatsForThreadPools value, double writeLoadDelta ) { final NodeUsageStatsForThreadPools.ThreadPoolUsageStats writeThreadPoolStats = value.threadPoolUsageStatsMap() .get(ThreadPool.Names.WRITE); + var newAverageThreadPoolUtilization = (writeThreadPoolStats.averageThreadPoolUtilization() + (writeLoadDelta / writeThreadPoolStats + .totalThreadPoolThreads())); + assert newAverageThreadPoolUtilization < 1.00; return new NodeUsageStatsForThreadPools.ThreadPoolUsageStats( writeThreadPoolStats.totalThreadPoolThreads(), (float) Math.max( diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java index 7a021c7779b98..06e16025eaafd 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java @@ -55,6 +55,8 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing return Decision.YES; } + // NOMERGE: should create a utility class (eventually, maybe duplicate code for this class, for now, if simulator does have bug) + // to calculate the change node's usage change with assignment of the new shard assert nodeUsageStatsForThreadPools.threadPoolUsageStatsMap().isEmpty() == false; assert nodeUsageStatsForThreadPools.threadPoolUsageStatsMap().get(ThreadPool.Names.WRITE) != null; var nodeWriteThreadPoolStats = nodeUsageStatsForThreadPools.threadPoolUsageStatsMap().get(ThreadPool.Names.WRITE); diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderTests.java index dd582ead2118d..273b007b656d6 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderTests.java @@ -9,14 +9,176 @@ package org.elasticsearch.cluster.routing.allocation.decider; +import org.elasticsearch.action.support.replication.ClusterStateCreationUtils; +import org.elasticsearch.cluster.ClusterInfo; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ESAllocationTestCase; +import org.elasticsearch.cluster.NodeUsageStatsForThreadPools; +import org.elasticsearch.cluster.NodeUsageStatsForThreadPools.ThreadPoolUsageStats; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.RoutingNodes; +import org.elasticsearch.cluster.routing.RoutingNodesHelper; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.TestShardRouting; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.HashMap; + +import static org.elasticsearch.common.settings.ClusterSettings.createBuiltInClusterSettings; public class WriteLoadConstraintDeciderTests extends ESAllocationTestCase { + // NOMERGE: I wonder if we don't want a node utilization percent, but rather a total node execution time used and a total node execution + // time possible. The math would be a whole lot easier on us... public void testWriteLoadDeciderIsDisabled() { - // TODO + String indexName = "test-index"; + + // Set up multiple nodes and an index with multiple shards. + // DiscoveryNode discoveryNode1 = newNode("node1"); + // DiscoveryNode discoveryNode2 = newNode("node2"); + // ShardId shardId1 = new ShardId(indexName, IndexMetadata.INDEX_UUID_NA_VALUE, 0); + // ShardId shardId2 = new ShardId(indexName, IndexMetadata.INDEX_UUID_NA_VALUE, 1); + + /** + * Create the ClusterState + */ + + ClusterState clusterState = ClusterStateCreationUtils.stateWithAssignedPrimariesAndReplicas(new String[] { indexName }, 3, 0); + assertEquals(2, clusterState.nodes().size()); + assertEquals(1, clusterState.metadata().getTotalNumberOfIndices()); + + /** + * Fetch the nodes and index shards from the generated ClusterState. + */ + + var discoveryNodeIterator = clusterState.nodes().iterator(); + assertTrue(discoveryNodeIterator.hasNext()); + var exceedingThresholdDiscoveryNode = discoveryNodeIterator.next(); + assertTrue(discoveryNodeIterator.hasNext()); + var belowThresholdDiscoveryNode2 = discoveryNodeIterator.next(); + assertFalse(discoveryNodeIterator.hasNext()); + + var indexIterator = clusterState.metadata().indicesAllProjects().iterator(); + assertTrue(indexIterator.hasNext()); + IndexMetadata testIndexMetadata = indexIterator.next(); + assertFalse(indexIterator.hasNext()); + Index testIndex = testIndexMetadata.getIndex(); + assertEquals(3, testIndexMetadata.getNumberOfShards()); + ShardId testShardId1 = new ShardId(testIndex, 0); + ShardId testShardId2 = new ShardId(testIndex, 1); + ShardId testShardId3NoWriteLoad = new ShardId(testIndex, 1); + + /** + * Create the ClusterInfo that includes the node and shard level write load estimates. + */ + + var nodeThreadPoolStatsWithWriteExceedingThreshold = createNodeUsageStatsForThreadPools( + exceedingThresholdDiscoveryNode, + 8, + 0.99f, + 0 + ); + var nodeThreadPoolStatsWithWriteBelowThreshold = createNodeUsageStatsForThreadPools(belowThresholdDiscoveryNode2, 8, 0.50f, 0); + + // Create a map of usage per node. + var nodeIdToNodeUsageStatsForThreadPools = new HashMap(); + nodeIdToNodeUsageStatsForThreadPools.put(exceedingThresholdDiscoveryNode.getId(), nodeThreadPoolStatsWithWriteExceedingThreshold); + nodeIdToNodeUsageStatsForThreadPools.put(belowThresholdDiscoveryNode2.getId(), nodeThreadPoolStatsWithWriteBelowThreshold); + + var shardIdToWriteLoadEstimate = new HashMap(); + shardIdToWriteLoadEstimate.put(testShardId1, 1.5); + shardIdToWriteLoadEstimate.put(testShardId2, 1.5); + shardIdToWriteLoadEstimate.put(testShardId3NoWriteLoad, 0d); + + ClusterInfo clusterInfo = ClusterInfo.builder() + .nodeUsageStatsForThreadPools(nodeIdToNodeUsageStatsForThreadPools) + .shardWriteLoads(shardIdToWriteLoadEstimate) + .build(); + + /** + * Create the RoutingAllocation + */ + + var routingAllocation = new RoutingAllocation( + null, + RoutingNodes.immutable(clusterState.globalRoutingTable(), clusterState.nodes()), + clusterState, + clusterInfo, + null, + System.nanoTime() + ); + + ShardRouting shardRouting1 = TestShardRouting.newShardRouting( + testShardId1, + exceedingThresholdDiscoveryNode.getId(), + null, + true, + ShardRoutingState.STARTED + ); + ShardRouting shardRouting2 = TestShardRouting.newShardRouting( + testShardId2, + belowThresholdDiscoveryNode2.getId(), + null, + true, + ShardRoutingState.STARTED + ); + ShardRouting thirdRoutingNoWriteLoad = TestShardRouting.newShardRouting( + testShardId3NoWriteLoad, + belowThresholdDiscoveryNode2.getId(), + null, + true, + ShardRoutingState.STARTED + ); + + assertTrue(discoveryNodeIterator.hasNext()); + RoutingNode exceedingThresholdRoutingNode = RoutingNodesHelper.routingNode( + exceedingThresholdDiscoveryNode.getId(), + discoveryNodeIterator.next(), + shardRouting1 + ); + assertTrue(discoveryNodeIterator.hasNext()); + RoutingNode belowThresholdRoutingNode = RoutingNodesHelper.routingNode( + belowThresholdDiscoveryNode2.getId(), + discoveryNodeIterator.next(), + shardRouting2 + ); + assertFalse(discoveryNodeIterator.hasNext()); + + /** + * Test the write load decider + */ + + // The write load decider is disabled by default. + var writeLoadDecider = createWriteLoadConstraintDecider(Settings.builder().build()); + + assertEquals(Decision.YES, writeLoadDecider.canAllocate(shardRouting2, exceedingThresholdRoutingNode, routingAllocation)); + assertEquals(Decision.YES, writeLoadDecider.canAllocate(shardRouting1, belowThresholdRoutingNode, routingAllocation)); + assertEquals(Decision.YES, writeLoadDecider.canAllocate(thirdRoutingNoWriteLoad, exceedingThresholdRoutingNode, routingAllocation)); + + // Check that the answers change when enabled. + writeLoadDecider = createWriteLoadConstraintDecider( + Settings.builder() + .put( + WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(), + WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED + ) + .build() + ); + + assertEquals(Decision.NO, writeLoadDecider.canAllocate(shardRouting2, exceedingThresholdRoutingNode, routingAllocation)); + assertEquals(Decision.YES, writeLoadDecider.canAllocate(shardRouting1, belowThresholdRoutingNode, routingAllocation)); + assertEquals(Decision.YES, writeLoadDecider.canAllocate(thirdRoutingNoWriteLoad, exceedingThresholdRoutingNode, routingAllocation)); + + // NOMERGE: test that adding a shard is rejected if it would overflow the utilization threshold? + // Need to implement the logic in the decider, I don't check right now. } public void testShardWithNoWriteLoadEstimateIsAlwaysYES() { @@ -33,4 +195,30 @@ public void testShardWithWriteLoadEstimate() { // TODO: test successful re-assignment and rejected re-assignment due to threshold } + private WriteLoadConstraintDecider createWriteLoadConstraintDecider(Settings settings) { + return new WriteLoadConstraintDecider(createBuiltInClusterSettings(settings)); + } + + /** + * Helper to create a {@link NodeUsageStatsForThreadPools} for the given node with the given WRITE thread pool usage stats. + */ + private NodeUsageStatsForThreadPools createNodeUsageStatsForThreadPools( + DiscoveryNode discoveryNode, + int totalWriteThreadPoolThreads, + float averageWriteThreadPoolUtilization, + long averageWriteThreadPoolQueueLatencyMillis + ) { + + // Create thread pool usage stats map for node1. + var writeThreadPoolUsageStats = new ThreadPoolUsageStats( + totalWriteThreadPoolThreads, + averageWriteThreadPoolUtilization, + averageWriteThreadPoolQueueLatencyMillis + ); + var threadPoolUsageMap = new HashMap(); + threadPoolUsageMap.put(ThreadPool.Names.WRITE, writeThreadPoolUsageStats); + + // Create the node's thread pool usage map + return new NodeUsageStatsForThreadPools(discoveryNode.getId(), threadPoolUsageMap); + } } From 3dcb94dacad73f1d0eb28453dad9a707d5ad2279 Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Thu, 31 Jul 2025 14:42:39 -0700 Subject: [PATCH 04/16] wip --- .../decider/WriteLoadConstraintDecider.java | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java index 06e16025eaafd..b5005d46b96d9 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java @@ -11,6 +11,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.cluster.NodeUsageStatsForThreadPools.ThreadPoolUsageStats; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.ShardRouting; @@ -74,6 +75,22 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing return Decision.NO; } + if (nodeWriteThreadPoolStats.averageThreadPoolUtilization() + calculateShardMovementChange(nodeWriteThreadPoolStats, shardWriteLoad) >= nodeWriteThreadPoolLoadThreshold) { + // The node's write thread pool usage would be raised above the high utilization threshold. This could lead to a hot spot on + // this node and is undesirable. + logger.debug( + "The high utilization threshold of {} would be exceeded on node {} if shard {} with estimated write load {} were " + + "assigned to it. Cannot allocate shard {} to node {} without risking increased write latencies.", + nodeWriteThreadPoolLoadThreshold, + node.nodeId(), + shardRouting.shardId(), + shardWriteLoad, + shardRouting.shardId(), + node.nodeId() + ); + return Decision.NO; + } + return Decision.YES; } @@ -86,4 +103,14 @@ public Decision canRemain(IndexMetadata indexMetadata, ShardRouting shardRouting return Decision.YES; } + /** + * Calculates the change to the node's write thread pool utilization percentage if the shard is added to the node. + * Returns the percent thread pool utilization change. + */ + private float calculateShardMovementChange(ThreadPoolUsageStats nodeWriteThreadPoolStats, double shardWriteLoad) { + assert shardWriteLoad > 0; + // NOMERGE: move this into an utility class, should be commonly accessible with the simulator. + // TODO: implement.. + return 0; + } } From 6d85655cf883ef826e3649d04a4028a122ec6e3f Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Fri, 1 Aug 2025 15:38:17 -0700 Subject: [PATCH 05/16] log msg --- .../java/org/elasticsearch/index/shard/IndexShardIT.java | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java index f752650e3f3d7..a0c13580a6dcd 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -394,6 +394,7 @@ public void testShardWriteLoadsArePresent() { for (int i = 0; i < indexMetadata.getNumberOfShards(); i++) { final ShardId shardId = new ShardId(indexMetadata.getIndex(), i); assertTrue(shardWriteLoads.containsKey(shardId)); + logger.info("~~~shardWriteLoads: " + shardWriteLoads.get(shardId)); maximumLoadRecorded = Math.max(shardWriteLoads.get(shardId), maximumLoadRecorded); } // Each index should have seen some write-load From 68be3dfac624af6db45225191885e1b3816695cb Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Mon, 11 Aug 2025 12:48:30 -0700 Subject: [PATCH 06/16] check that new shard assignment won't exceed threshold improvements and *Tests are complete --- .../ShardMovementWriteLoadSimulator.java | 30 ++++-- .../decider/WriteLoadConstraintDecider.java | 54 ++++++----- .../WriteLoadConstraintDeciderTests.java | 97 ++++++++++--------- 3 files changed, 103 insertions(+), 78 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java b/server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java index b8d42f0f1cde1..58befedd6443b 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java @@ -96,16 +96,34 @@ private static NodeUsageStatsForThreadPools.ThreadPoolUsageStats replaceWritePoo ) { final NodeUsageStatsForThreadPools.ThreadPoolUsageStats writeThreadPoolStats = value.threadPoolUsageStatsMap() .get(ThreadPool.Names.WRITE); - var newAverageThreadPoolUtilization = (writeThreadPoolStats.averageThreadPoolUtilization() + (writeLoadDelta / writeThreadPoolStats - .totalThreadPoolThreads())); - assert newAverageThreadPoolUtilization < 1.00; return new NodeUsageStatsForThreadPools.ThreadPoolUsageStats( writeThreadPoolStats.totalThreadPoolThreads(), - (float) Math.max( - (writeThreadPoolStats.averageThreadPoolUtilization() + (writeLoadDelta / writeThreadPoolStats.totalThreadPoolThreads())), - 0.0 + updateNodeUtilizationWithShardMovements( + writeThreadPoolStats.averageThreadPoolUtilization(), + (float) writeLoadDelta, + writeThreadPoolStats.totalThreadPoolThreads() ), writeThreadPoolStats.maxThreadPoolQueueLatencyMillis() ); } + + /** + * The {@code nodeUtilization} is the average utilization per thread for some duration of time. The {@code shardWriteLoadDelta} is the + * sum of shards' total execution time. Dividing the shards total execution time by the number of threads provides the average + * utilization of each write thread for those shards. The change in shard load can then be added to the node utilization. + * + * @param nodeUtilization The current node-level write load percent utilization. + * @param shardWriteLoadDelta The change in shard(s) execution time across all threads. This can be positive or negative depending on + * whether shards were moved onto the node or off of the node. + * @param numberOfWriteThreads The number of threads available in the node's write thread pool. + * @return The new node-level write load percent utilization after adding the shard write load delta. + */ + public static float updateNodeUtilizationWithShardMovements( + float nodeUtilization, + float shardWriteLoadDelta, + int numberOfWriteThreads + ) { + float newNodeUtilization = nodeUtilization + (shardWriteLoadDelta / numberOfWriteThreads); + return (float) Math.max(newNodeUtilization, 0.0); + } } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java index b5005d46b96d9..08287fd387514 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java @@ -14,10 +14,12 @@ import org.elasticsearch.cluster.NodeUsageStatsForThreadPools.ThreadPoolUsageStats; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.ShardMovementWriteLoadSimulator; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings; import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.core.Strings; import org.elasticsearch.threadpool.ThreadPool; /** @@ -38,14 +40,14 @@ public WriteLoadConstraintDecider(ClusterSettings clusterSettings) { @Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { if (writeLoadConstraintSettings.getWriteLoadConstraintEnabled() != WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED) { - return Decision.YES; + return Decision.single(Decision.Type.YES, NAME, "Decider is disabled"); } // Check whether the shard being relocated has any write load estimate. If it does not, then this decider has no opinion. var allShardWriteLoads = allocation.clusterInfo().getShardWriteLoads(); var shardWriteLoad = allShardWriteLoads.get(shardRouting.shardId()); if (shardWriteLoad == null || shardWriteLoad == 0) { - return Decision.YES; + return Decision.single(Decision.Type.YES, NAME, "Shard has no estimated write load. Decider takes no action."); } var allNodeUsageStats = allocation.clusterInfo().getNodeUsageStatsForThreadPools(); @@ -53,42 +55,44 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing if (nodeUsageStatsForThreadPools == null) { // No node-level thread pool usage stats were reported for this node. Let's assume this is OK and that the simulator will handle // setting a node-level write load for this node after this shard is assigned. - return Decision.YES; + return Decision.single(Decision.Type.YES, NAME, "The node has no write load estimate. Decider takes no action."); } - // NOMERGE: should create a utility class (eventually, maybe duplicate code for this class, for now, if simulator does have bug) - // to calculate the change node's usage change with assignment of the new shard assert nodeUsageStatsForThreadPools.threadPoolUsageStatsMap().isEmpty() == false; assert nodeUsageStatsForThreadPools.threadPoolUsageStatsMap().get(ThreadPool.Names.WRITE) != null; var nodeWriteThreadPoolStats = nodeUsageStatsForThreadPools.threadPoolUsageStatsMap().get(ThreadPool.Names.WRITE); var nodeWriteThreadPoolLoadThreshold = writeLoadConstraintSettings.getWriteThreadPoolHighUtilizationThresholdSetting(); if (nodeWriteThreadPoolStats.averageThreadPoolUtilization() >= nodeWriteThreadPoolLoadThreshold) { // The node's write thread pool usage stats already show high utilization above the threshold for accepting new shards. - logger.debug( - "The high utilization threshold of {} has already been reached on node {}. Cannot allocate shard {} to node {} " - + "without risking increased write latencies.", - nodeWriteThreadPoolLoadThreshold, + String debugMsg = Strings.format( + "Node [%s] with write thread pool utilization [%f] already exceeds the high utilization threshold of [%f]. Cannot allocate " + + "shard [%s] to node without risking increased write latencies.", node.nodeId(), - shardRouting.shardId(), - node.nodeId() + nodeWriteThreadPoolStats.averageThreadPoolUtilization(), + nodeWriteThreadPoolLoadThreshold, + shardRouting.shardId() ); - return Decision.NO; + logger.debug(debugMsg); + return Decision.single(Decision.Type.NO, NAME, debugMsg); } - if (nodeWriteThreadPoolStats.averageThreadPoolUtilization() + calculateShardMovementChange(nodeWriteThreadPoolStats, shardWriteLoad) >= nodeWriteThreadPoolLoadThreshold) { - // The node's write thread pool usage would be raised above the high utilization threshold. This could lead to a hot spot on - // this node and is undesirable. - logger.debug( - "The high utilization threshold of {} would be exceeded on node {} if shard {} with estimated write load {} were " - + "assigned to it. Cannot allocate shard {} to node {} without risking increased write latencies.", + if (calculateShardMovementChange(nodeWriteThreadPoolStats, shardWriteLoad) >= nodeWriteThreadPoolLoadThreshold) { + // The node's write thread pool usage would be raised above the high utilization threshold with assignment of the new shard. + // This could lead to a hot spot on this node and is undesirable. + String debugMsg = Strings.format( + "The high utilization threshold of [%f] would be exceeded on node [%s] with utilization [%f] if shard [%s] with estimated " + + "write load [%f] (execution time [%f] / threads [%d]) were assigned to it. Cannot allocate shard to node without " + + "risking increased write latencies.", nodeWriteThreadPoolLoadThreshold, node.nodeId(), + nodeWriteThreadPoolStats.averageThreadPoolUtilization(), shardRouting.shardId(), + shardWriteLoad / nodeWriteThreadPoolStats.totalThreadPoolThreads(), shardWriteLoad, - shardRouting.shardId(), - node.nodeId() + nodeWriteThreadPoolStats.totalThreadPoolThreads() ); - return Decision.NO; + logger.debug(debugMsg); + return Decision.single(Decision.Type.NO, NAME, debugMsg); } return Decision.YES; @@ -109,8 +113,10 @@ public Decision canRemain(IndexMetadata indexMetadata, ShardRouting shardRouting */ private float calculateShardMovementChange(ThreadPoolUsageStats nodeWriteThreadPoolStats, double shardWriteLoad) { assert shardWriteLoad > 0; - // NOMERGE: move this into an utility class, should be commonly accessible with the simulator. - // TODO: implement.. - return 0; + return ShardMovementWriteLoadSimulator.updateNodeUtilizationWithShardMovements( + nodeWriteThreadPoolStats.averageThreadPoolUtilization(), + (float) shardWriteLoad, + nodeWriteThreadPoolStats.totalThreadPoolThreads() + ); } } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderTests.java index 273b007b656d6..0dc7694fcf1a3 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderTests.java @@ -36,27 +36,20 @@ public class WriteLoadConstraintDeciderTests extends ESAllocationTestCase { - // NOMERGE: I wonder if we don't want a node utilization percent, but rather a total node execution time used and a total node execution - // time possible. The math would be a whole lot easier on us... - public void testWriteLoadDeciderIsDisabled() { + public void testWriteLoadDecider() { String indexName = "test-index"; - // Set up multiple nodes and an index with multiple shards. - // DiscoveryNode discoveryNode1 = newNode("node1"); - // DiscoveryNode discoveryNode2 = newNode("node2"); - // ShardId shardId1 = new ShardId(indexName, IndexMetadata.INDEX_UUID_NA_VALUE, 0); - // ShardId shardId2 = new ShardId(indexName, IndexMetadata.INDEX_UUID_NA_VALUE, 1); - /** - * Create the ClusterState + * Create the ClusterState for multiple nodes and multiple index shards. */ - ClusterState clusterState = ClusterStateCreationUtils.stateWithAssignedPrimariesAndReplicas(new String[] { indexName }, 3, 0); - assertEquals(2, clusterState.nodes().size()); + ClusterState clusterState = ClusterStateCreationUtils.stateWithAssignedPrimariesAndReplicas(new String[] { indexName }, 3, 1); + // The number of data nodes the util method above creates is numberOfReplicas+1. + assertEquals(3, clusterState.nodes().size()); assertEquals(1, clusterState.metadata().getTotalNumberOfIndices()); /** - * Fetch the nodes and index shards from the generated ClusterState. + * Fetch references to the nodes and index shards from the generated ClusterState, so the ClusterInfo can be created from them. */ var discoveryNodeIterator = clusterState.nodes().iterator(); @@ -64,6 +57,8 @@ public void testWriteLoadDeciderIsDisabled() { var exceedingThresholdDiscoveryNode = discoveryNodeIterator.next(); assertTrue(discoveryNodeIterator.hasNext()); var belowThresholdDiscoveryNode2 = discoveryNodeIterator.next(); + assertTrue(discoveryNodeIterator.hasNext()); + var nearThresholdDiscoveryNode3 = discoveryNodeIterator.next(); assertFalse(discoveryNodeIterator.hasNext()); var indexIterator = clusterState.metadata().indicesAllProjects().iterator(); @@ -74,10 +69,10 @@ public void testWriteLoadDeciderIsDisabled() { assertEquals(3, testIndexMetadata.getNumberOfShards()); ShardId testShardId1 = new ShardId(testIndex, 0); ShardId testShardId2 = new ShardId(testIndex, 1); - ShardId testShardId3NoWriteLoad = new ShardId(testIndex, 1); + ShardId testShardId3NoWriteLoad = new ShardId(testIndex, 2); /** - * Create the ClusterInfo that includes the node and shard level write load estimates. + * Create a ClusterInfo that includes the node and shard level write load estimates for a variety of node capacity situations. */ var nodeThreadPoolStatsWithWriteExceedingThreshold = createNodeUsageStatsForThreadPools( @@ -87,15 +82,18 @@ public void testWriteLoadDeciderIsDisabled() { 0 ); var nodeThreadPoolStatsWithWriteBelowThreshold = createNodeUsageStatsForThreadPools(belowThresholdDiscoveryNode2, 8, 0.50f, 0); + var nodeThreadPoolStatsWithWriteNearThreshold = createNodeUsageStatsForThreadPools(nearThresholdDiscoveryNode3, 8, 0.89f, 0); // Create a map of usage per node. var nodeIdToNodeUsageStatsForThreadPools = new HashMap(); nodeIdToNodeUsageStatsForThreadPools.put(exceedingThresholdDiscoveryNode.getId(), nodeThreadPoolStatsWithWriteExceedingThreshold); nodeIdToNodeUsageStatsForThreadPools.put(belowThresholdDiscoveryNode2.getId(), nodeThreadPoolStatsWithWriteBelowThreshold); + nodeIdToNodeUsageStatsForThreadPools.put(nearThresholdDiscoveryNode3.getId(), nodeThreadPoolStatsWithWriteNearThreshold); + // Create a map of usage per shard. var shardIdToWriteLoadEstimate = new HashMap(); - shardIdToWriteLoadEstimate.put(testShardId1, 1.5); - shardIdToWriteLoadEstimate.put(testShardId2, 1.5); + shardIdToWriteLoadEstimate.put(testShardId1, 0.5); + shardIdToWriteLoadEstimate.put(testShardId2, 0.5); shardIdToWriteLoadEstimate.put(testShardId3NoWriteLoad, 0d); ClusterInfo clusterInfo = ClusterInfo.builder() @@ -104,7 +102,7 @@ public void testWriteLoadDeciderIsDisabled() { .build(); /** - * Create the RoutingAllocation + * Create the RoutingAllocation from the ClusterState and ClusterInfo above, and set up the other input for the WriteLoadDecider. */ var routingAllocation = new RoutingAllocation( @@ -138,32 +136,36 @@ public void testWriteLoadDeciderIsDisabled() { ShardRoutingState.STARTED ); - assertTrue(discoveryNodeIterator.hasNext()); RoutingNode exceedingThresholdRoutingNode = RoutingNodesHelper.routingNode( exceedingThresholdDiscoveryNode.getId(), - discoveryNodeIterator.next(), + exceedingThresholdDiscoveryNode, shardRouting1 ); - assertTrue(discoveryNodeIterator.hasNext()); RoutingNode belowThresholdRoutingNode = RoutingNodesHelper.routingNode( belowThresholdDiscoveryNode2.getId(), - discoveryNodeIterator.next(), + belowThresholdDiscoveryNode2, shardRouting2 ); - assertFalse(discoveryNodeIterator.hasNext()); + RoutingNode nearThresholdRoutingNode = RoutingNodesHelper.routingNode( + nearThresholdDiscoveryNode3.getId(), + nearThresholdDiscoveryNode3, + new ShardRouting[] {} + ); /** * Test the write load decider */ // The write load decider is disabled by default. - var writeLoadDecider = createWriteLoadConstraintDecider(Settings.builder().build()); - assertEquals(Decision.YES, writeLoadDecider.canAllocate(shardRouting2, exceedingThresholdRoutingNode, routingAllocation)); - assertEquals(Decision.YES, writeLoadDecider.canAllocate(shardRouting1, belowThresholdRoutingNode, routingAllocation)); - assertEquals(Decision.YES, writeLoadDecider.canAllocate(thirdRoutingNoWriteLoad, exceedingThresholdRoutingNode, routingAllocation)); + var writeLoadDecider = createWriteLoadConstraintDecider(Settings.builder().build()); + assertEquals(Decision.Type.YES, writeLoadDecider.canAllocate(shardRouting2, exceedingThresholdRoutingNode, routingAllocation).type()); + assertEquals(Decision.Type.YES, writeLoadDecider.canAllocate(shardRouting1, belowThresholdRoutingNode, routingAllocation).type()); + assertEquals(Decision.Type.YES, writeLoadDecider.canAllocate(shardRouting1, nearThresholdRoutingNode, routingAllocation).type()); + assertEquals(Decision.Type.YES, writeLoadDecider.canAllocate(thirdRoutingNoWriteLoad, exceedingThresholdRoutingNode, routingAllocation).type()); // Check that the answers change when enabled. + writeLoadDecider = createWriteLoadConstraintDecider( Settings.builder() .put( @@ -172,27 +174,26 @@ public void testWriteLoadDeciderIsDisabled() { ) .build() ); - - assertEquals(Decision.NO, writeLoadDecider.canAllocate(shardRouting2, exceedingThresholdRoutingNode, routingAllocation)); - assertEquals(Decision.YES, writeLoadDecider.canAllocate(shardRouting1, belowThresholdRoutingNode, routingAllocation)); - assertEquals(Decision.YES, writeLoadDecider.canAllocate(thirdRoutingNoWriteLoad, exceedingThresholdRoutingNode, routingAllocation)); - - // NOMERGE: test that adding a shard is rejected if it would overflow the utilization threshold? - // Need to implement the logic in the decider, I don't check right now. - } - - public void testShardWithNoWriteLoadEstimateIsAlwaysYES() { - Settings writeLoadConstraintSettings = Settings.builder() - .put( - WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(), - WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED - ) - .build(); - // TODO - } - - public void testShardWithWriteLoadEstimate() { - // TODO: test successful re-assignment and rejected re-assignment due to threshold + assertEquals( + "Assigning a new shard to a node that is above the threshold should fail", + Decision.Type.NO, + writeLoadDecider.canAllocate(shardRouting2, exceedingThresholdRoutingNode, routingAllocation).type() + ); + assertEquals( + "Assigning a new shard to a node that has capacity should succeed", + Decision.Type.YES, + writeLoadDecider.canAllocate(shardRouting1, belowThresholdRoutingNode, routingAllocation).type() + ); + assertEquals( + "Assigning a new shard without a write load estimate should _not_ be blocked by lack of capacity", + Decision.Type.YES, + writeLoadDecider.canAllocate(thirdRoutingNoWriteLoad, exceedingThresholdRoutingNode, routingAllocation).type() + ); + assertEquals( + "Assigning a new shard that would cause the node to exceed capacity should fail", + Decision.Type.NO, + writeLoadDecider.canAllocate(shardRouting1, nearThresholdRoutingNode, routingAllocation).type() + ); } private WriteLoadConstraintDecider createWriteLoadConstraintDecider(Settings settings) { From 6244ef80ca4f679eb89c32705dd4d13297151581 Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Mon, 11 Aug 2025 15:36:06 -0700 Subject: [PATCH 07/16] cleanup --- .../org/elasticsearch/index/shard/IndexShardIT.java | 1 - .../routing/ShardMovementWriteLoadSimulator.java | 10 ---------- .../allocation/decider/WriteLoadConstraintDecider.java | 2 +- 3 files changed, 1 insertion(+), 12 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java index a0c13580a6dcd..f752650e3f3d7 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -394,7 +394,6 @@ public void testShardWriteLoadsArePresent() { for (int i = 0; i < indexMetadata.getNumberOfShards(); i++) { final ShardId shardId = new ShardId(indexMetadata.getIndex(), i); assertTrue(shardWriteLoads.containsKey(shardId)); - logger.info("~~~shardWriteLoads: " + shardWriteLoads.get(shardId)); maximumLoadRecorded = Math.max(shardWriteLoads.get(shardId), maximumLoadRecorded); } // Each index should have seen some write-load diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java b/server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java index 58befedd6443b..a807e3d9b6427 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java @@ -80,16 +80,6 @@ public Map simulatedNodeUsageStatsForThrea return Collections.unmodifiableMap(adjustedNodeUsageStatsForThreadPools); } - /** - * averageThreadPoolUtilization - * total shard execution time (sum of shard write loads) / total execution time (totalThreadPoolThreads x duration) - * (30 + 20 + 40) / (4 x 30s) = 90 / 120 = .75 - * - * writeLoadDelta / totalThreadPoolThreads - * 20 / 4 = 5 - * - */ - private static NodeUsageStatsForThreadPools.ThreadPoolUsageStats replaceWritePoolStats( NodeUsageStatsForThreadPools value, double writeLoadDelta diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java index 08287fd387514..f9ec125a51539 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java @@ -39,7 +39,7 @@ public WriteLoadConstraintDecider(ClusterSettings clusterSettings) { @Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { - if (writeLoadConstraintSettings.getWriteLoadConstraintEnabled() != WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED) { + if (writeLoadConstraintSettings.getWriteLoadConstraintEnabled() == WriteLoadConstraintSettings.WriteLoadDeciderStatus.DISABLED) { return Decision.single(Decision.Type.YES, NAME, "Decider is disabled"); } From da441c3c0f13b5402a523a11340661d795403412 Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Mon, 11 Aug 2025 22:42:42 +0000 Subject: [PATCH 08/16] [CI] Auto commit changes from spotless --- .../decider/WriteLoadConstraintDeciderTests.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderTests.java index 0dc7694fcf1a3..4cb95efe23264 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderTests.java @@ -159,10 +159,16 @@ public void testWriteLoadDecider() { // The write load decider is disabled by default. var writeLoadDecider = createWriteLoadConstraintDecider(Settings.builder().build()); - assertEquals(Decision.Type.YES, writeLoadDecider.canAllocate(shardRouting2, exceedingThresholdRoutingNode, routingAllocation).type()); + assertEquals( + Decision.Type.YES, + writeLoadDecider.canAllocate(shardRouting2, exceedingThresholdRoutingNode, routingAllocation).type() + ); assertEquals(Decision.Type.YES, writeLoadDecider.canAllocate(shardRouting1, belowThresholdRoutingNode, routingAllocation).type()); assertEquals(Decision.Type.YES, writeLoadDecider.canAllocate(shardRouting1, nearThresholdRoutingNode, routingAllocation).type()); - assertEquals(Decision.Type.YES, writeLoadDecider.canAllocate(thirdRoutingNoWriteLoad, exceedingThresholdRoutingNode, routingAllocation).type()); + assertEquals( + Decision.Type.YES, + writeLoadDecider.canAllocate(thirdRoutingNoWriteLoad, exceedingThresholdRoutingNode, routingAllocation).type() + ); // Check that the answers change when enabled. From 7ce1e59806e563515fe198d3f194c1f8176fe3e8 Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Mon, 11 Aug 2025 15:49:26 -0700 Subject: [PATCH 09/16] clear IT test for later --- .../decider/WriteLoadConstraintDeciderIT.java | 29 ------------------- 1 file changed, 29 deletions(-) delete mode 100644 server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderIT.java diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderIT.java deleted file mode 100644 index 3e35db734da18..0000000000000 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderIT.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the "Elastic License - * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side - * Public License v 1"; you may not use this file except in compliance with, at - * your election, the "Elastic License 2.0", the "GNU Affero General Public - * License v3.0 only", or the "Server Side Public License, v 1". - */ - -package org.elasticsearch.cluster.routing.allocation.decider; - -import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.test.ESIntegTestCase; - -public class WriteLoadConstraintDeciderIT extends ESIntegTestCase { - @Override - protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { - return Settings.builder() - .put(super.nodeSettings(nodeOrdinal, otherSettings)) - .put( - WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(), - WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED - ) - .build(); - } - - // TODO: integration testing to see that the components work together. -} From 74b38e1bd43ab6f14363e3b2616cf554fa78f6bf Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Wed, 13 Aug 2025 15:13:05 -0700 Subject: [PATCH 10/16] fix allocation decider call order --- .../src/main/java/org/elasticsearch/cluster/ClusterModule.java | 2 +- .../test/java/org/elasticsearch/cluster/ClusterModuleTests.java | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java index f8faf648eaf4b..1d3b79a0dc1af 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java @@ -447,6 +447,7 @@ public static Collection createAllocationDeciders( addAllocationDecider(deciders, new SnapshotInProgressAllocationDecider()); addAllocationDecider(deciders, new RestoreInProgressAllocationDecider()); addAllocationDecider(deciders, new NodeShutdownAllocationDecider()); + addAllocationDecider(deciders, new WriteLoadConstraintDecider(clusterSettings)); addAllocationDecider(deciders, new NodeReplacementAllocationDecider()); addAllocationDecider(deciders, new FilterAllocationDecider(settings, clusterSettings)); addAllocationDecider(deciders, new SameShardAllocationDecider(clusterSettings)); @@ -454,7 +455,6 @@ public static Collection createAllocationDeciders( addAllocationDecider(deciders, new ThrottlingAllocationDecider(clusterSettings)); addAllocationDecider(deciders, new ShardsLimitAllocationDecider(clusterSettings)); addAllocationDecider(deciders, new AwarenessAllocationDecider(settings, clusterSettings)); - addAllocationDecider(deciders, new WriteLoadConstraintDecider(clusterSettings)); clusterPlugins.stream() .flatMap(p -> p.createAllocationDeciders(settings, clusterSettings).stream()) diff --git a/server/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java b/server/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java index 2908bff995340..70df5f78615ca 100644 --- a/server/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java @@ -36,6 +36,7 @@ import org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.SnapshotInProgressAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; +import org.elasticsearch.cluster.routing.allocation.decider.WriteLoadConstraintDecider; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.IndexScopedSettings; @@ -278,6 +279,7 @@ public void testAllocationDeciderOrder() { SnapshotInProgressAllocationDecider.class, RestoreInProgressAllocationDecider.class, NodeShutdownAllocationDecider.class, + WriteLoadConstraintDecider.class, NodeReplacementAllocationDecider.class, FilterAllocationDecider.class, SameShardAllocationDecider.class, From 8b955e62cbbe97d6e05884c2fce00824d932f2ef Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Wed, 13 Aug 2025 15:13:39 -0700 Subject: [PATCH 11/16] update WriteLoadDeciderStatus callers with enum helper methods --- .../cluster/InternalClusterInfoService.java | 6 ++--- .../WriteLoadConstraintSettings.java | 25 ++++++++++++++++--- .../decider/WriteLoadConstraintDecider.java | 6 +++-- 3 files changed, 28 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java b/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java index 6de0640f4422f..bd947c117f0e5 100644 --- a/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java +++ b/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java @@ -219,7 +219,7 @@ void execute() { logger.trace("starting async refresh"); try (var ignoredRefs = fetchRefs) { - maybeFetchIndicesStats(diskThresholdEnabled || writeLoadConstraintEnabled == WriteLoadDeciderStatus.ENABLED); + maybeFetchIndicesStats(diskThresholdEnabled || writeLoadConstraintEnabled.atLeastLowThresholdEnabled()); maybeFetchNodeStats(diskThresholdEnabled || estimatedHeapThresholdEnabled); maybeFetchNodesEstimatedHeapUsage(estimatedHeapThresholdEnabled); maybeFetchNodesUsageStatsForThreadPools(writeLoadConstraintEnabled); @@ -262,7 +262,7 @@ private void maybeFetchNodesEstimatedHeapUsage(boolean shouldFetch) { } private void maybeFetchNodesUsageStatsForThreadPools(WriteLoadDeciderStatus writeLoadConstraintEnabled) { - if (writeLoadConstraintEnabled != WriteLoadDeciderStatus.DISABLED) { + if (writeLoadConstraintEnabled.atLeastLowThresholdEnabled()) { try (var ignored = threadPool.getThreadContext().clearTraceContext()) { fetchNodesUsageStatsForThreadPools(); } @@ -313,7 +313,7 @@ private void fetchIndicesStats() { // This returns the shard sizes on disk indicesStatsRequest.store(true); } - if (writeLoadConstraintEnabled == WriteLoadDeciderStatus.ENABLED) { + if (writeLoadConstraintEnabled.atLeastLowThresholdEnabled()) { // This returns the shard write-loads indicesStatsRequest.indexing(true); } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/WriteLoadConstraintSettings.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/WriteLoadConstraintSettings.java index c20a4db5bede3..3ee0702b13192 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/WriteLoadConstraintSettings.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/WriteLoadConstraintSettings.java @@ -30,13 +30,30 @@ public enum WriteLoadDeciderStatus { */ DISABLED, /** - * Only the low-threshold is enabled (write-load will not trigger rebalance) + * Only the low write low threshold, to try to avoid allocating to a node exceeding + * {@link #WRITE_LOAD_DECIDER_HIGH_UTILIZATION_THRESHOLD_SETTING}. Write-load hot-spot will not trigger rebalancing. */ - LOW_ONLY, + LOW_THRESHOLD_ONLY, /** - * The decider is enabled + * All write load decider development work is turned on. */ - ENABLED + ENABLED; + + public boolean fullyEnabled() { + return this == ENABLED; + } + + public boolean notFullyEnabled() { + return this != ENABLED; + } + + public boolean atLeastLowThresholdEnabled() { + return this != DISABLED; + } + + public boolean disabled() { + return this == DISABLED; + } } public static final Setting WRITE_LOAD_DECIDER_ENABLED_SETTING = Setting.enumSetting( diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java index f9ec125a51539..f3d4765975a41 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java @@ -39,7 +39,7 @@ public WriteLoadConstraintDecider(ClusterSettings clusterSettings) { @Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { - if (writeLoadConstraintSettings.getWriteLoadConstraintEnabled() == WriteLoadConstraintSettings.WriteLoadDeciderStatus.DISABLED) { + if (writeLoadConstraintSettings.getWriteLoadConstraintEnabled().disabled()) { return Decision.single(Decision.Type.YES, NAME, "Decider is disabled"); } @@ -100,10 +100,12 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing @Override public Decision canRemain(IndexMetadata indexMetadata, ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { - if (writeLoadConstraintSettings.getWriteLoadConstraintEnabled() != WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED) { + if (writeLoadConstraintSettings.getWriteLoadConstraintEnabled().notFullyEnabled()) { return Decision.YES; } + // TODO: implement + return Decision.YES; } From fc934dc2a68986339096ceb1a8ed6d5939defcfb Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Wed, 13 Aug 2025 15:22:54 -0700 Subject: [PATCH 12/16] change debugMsg to explain, to reflect what that the output goes into the allocation explain response --- .../decider/WriteLoadConstraintDecider.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java index f3d4765975a41..a499dc12cb71f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java @@ -64,7 +64,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing var nodeWriteThreadPoolLoadThreshold = writeLoadConstraintSettings.getWriteThreadPoolHighUtilizationThresholdSetting(); if (nodeWriteThreadPoolStats.averageThreadPoolUtilization() >= nodeWriteThreadPoolLoadThreshold) { // The node's write thread pool usage stats already show high utilization above the threshold for accepting new shards. - String debugMsg = Strings.format( + String explain = Strings.format( "Node [%s] with write thread pool utilization [%f] already exceeds the high utilization threshold of [%f]. Cannot allocate " + "shard [%s] to node without risking increased write latencies.", node.nodeId(), @@ -72,14 +72,14 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing nodeWriteThreadPoolLoadThreshold, shardRouting.shardId() ); - logger.debug(debugMsg); - return Decision.single(Decision.Type.NO, NAME, debugMsg); + logger.debug(explain); + return Decision.single(Decision.Type.NO, NAME, explain); } if (calculateShardMovementChange(nodeWriteThreadPoolStats, shardWriteLoad) >= nodeWriteThreadPoolLoadThreshold) { // The node's write thread pool usage would be raised above the high utilization threshold with assignment of the new shard. // This could lead to a hot spot on this node and is undesirable. - String debugMsg = Strings.format( + String explain = Strings.format( "The high utilization threshold of [%f] would be exceeded on node [%s] with utilization [%f] if shard [%s] with estimated " + "write load [%f] (execution time [%f] / threads [%d]) were assigned to it. Cannot allocate shard to node without " + "risking increased write latencies.", @@ -91,8 +91,8 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing shardWriteLoad, nodeWriteThreadPoolStats.totalThreadPoolThreads() ); - logger.debug(debugMsg); - return Decision.single(Decision.Type.NO, NAME, debugMsg); + logger.debug(explain); + return Decision.single(Decision.Type.NO, NAME, explain); } return Decision.YES; From a0e61e0c9f08c1dddef93afef4532b84f17b6fe7 Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Wed, 13 Aug 2025 15:34:33 -0700 Subject: [PATCH 13/16] randomize testing to select ENABLED or LOW_THRESHOLD_ONLY --- .../org/elasticsearch/cluster/ClusterInfoServiceIT.java | 4 +++- .../java/org/elasticsearch/index/shard/IndexShardIT.java | 8 ++++++-- .../InternalClusterInfoServiceSchedulingTests.java | 4 +++- .../decider/WriteLoadConstraintDeciderTests.java | 6 ++++-- 4 files changed, 16 insertions(+), 6 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java index 6fd3133686b64..e992a264cc6f3 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java @@ -344,7 +344,9 @@ public void testClusterInfoIncludesNodeUsageStatsForThreadPools() { var settings = Settings.builder() .put( WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(), - WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED + randomBoolean() + ? WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED + : WriteLoadConstraintSettings.WriteLoadDeciderStatus.LOW_THRESHOLD_ONLY ) .build(); var masterName = internalCluster().startMasterOnlyNode(settings); diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java index f752650e3f3d7..a7821b5bf1f13 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -319,7 +319,9 @@ public void testNodeWriteLoadsArePresent() { Settings.builder() .put( WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(), - WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED + randomBoolean() + ? WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED + : WriteLoadConstraintSettings.WriteLoadDeciderStatus.LOW_THRESHOLD_ONLY ) .build() ); @@ -376,7 +378,9 @@ public void testShardWriteLoadsArePresent() { Settings.builder() .put( WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(), - WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED + randomBoolean() + ? WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED + : WriteLoadConstraintSettings.WriteLoadDeciderStatus.LOW_THRESHOLD_ONLY ) .build() ); diff --git a/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java b/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java index 72eb5a6a3b764..1e91f69f47573 100644 --- a/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java @@ -62,7 +62,9 @@ public void testScheduling() { .put(InternalClusterInfoService.CLUSTER_ROUTING_ALLOCATION_ESTIMATED_HEAP_THRESHOLD_DECIDER_ENABLED.getKey(), true) .put( WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(), - WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED + randomBoolean() + ? WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED + : WriteLoadConstraintSettings.WriteLoadDeciderStatus.LOW_THRESHOLD_ONLY ); if (randomBoolean()) { settingsBuilder.put(INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.getKey(), randomIntBetween(10000, 60000) + "ms"); diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderTests.java index 4cb95efe23264..fb3424d961a8b 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderTests.java @@ -44,7 +44,7 @@ public void testWriteLoadDecider() { */ ClusterState clusterState = ClusterStateCreationUtils.stateWithAssignedPrimariesAndReplicas(new String[] { indexName }, 3, 1); - // The number of data nodes the util method above creates is numberOfReplicas+1. + // The number of data nodes the util method above creates is numberOfReplicas+1, and three data nodes are needed for this test. assertEquals(3, clusterState.nodes().size()); assertEquals(1, clusterState.metadata().getTotalNumberOfIndices()); @@ -176,7 +176,9 @@ public void testWriteLoadDecider() { Settings.builder() .put( WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(), - WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED + randomBoolean() + ? WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED + : WriteLoadConstraintSettings.WriteLoadDeciderStatus.LOW_THRESHOLD_ONLY ) .build() ); From 3014d883ff1e447796b35c6372e5376553ac0491 Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Wed, 13 Aug 2025 16:06:29 -0700 Subject: [PATCH 14/16] modularize test set up --- .../WriteLoadConstraintDeciderTests.java | 173 +++++++++++++----- 1 file changed, 123 insertions(+), 50 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderTests.java index fb3424d961a8b..12bfd8a0a4789 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderTests.java @@ -36,9 +36,122 @@ public class WriteLoadConstraintDeciderTests extends ESAllocationTestCase { - public void testWriteLoadDecider() { + /** + * Test the write load decider behavior when disabled + */ + public void testWriteLoadDeciderDisabled() { + String indexName = "test-index"; + var testHarness = createClusterStateAndRoutingAllocation(indexName); + + // The write load decider is disabled by default. + + var writeLoadDecider = createWriteLoadConstraintDecider(Settings.builder().build()); + + assertEquals( + Decision.Type.YES, + writeLoadDecider.canAllocate( + testHarness.shardRouting2, + testHarness.exceedingThresholdRoutingNode, + testHarness.routingAllocation + ).type() + ); + assertEquals( + Decision.Type.YES, + writeLoadDecider.canAllocate(testHarness.shardRouting1, testHarness.belowThresholdRoutingNode, testHarness.routingAllocation) + .type() + ); + assertEquals( + Decision.Type.YES, + writeLoadDecider.canAllocate(testHarness.shardRouting1, testHarness.nearThresholdRoutingNode, testHarness.routingAllocation) + .type() + ); + assertEquals( + Decision.Type.YES, + writeLoadDecider.canAllocate( + testHarness.thirdRoutingNoWriteLoad, + testHarness.exceedingThresholdRoutingNode, + testHarness.routingAllocation + ).type() + ); + + assertEquals( + Decision.Type.YES, + writeLoadDecider.canRemain( + testHarness.clusterState.metadata().getProject().index(indexName), + testHarness.shardRouting1, + testHarness.exceedingThresholdRoutingNode, + testHarness.routingAllocation + ).type() + ); + } + + /** + * Test the {@link WriteLoadConstraintDecider#canAllocate} implementation. + */ + public void testWriteLoadDeciderCanAllocate() { String indexName = "test-index"; + var testHarness = createClusterStateAndRoutingAllocation(indexName); + var writeLoadDecider = createWriteLoadConstraintDecider( + Settings.builder() + .put( + WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(), + randomBoolean() + ? WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED + : WriteLoadConstraintSettings.WriteLoadDeciderStatus.LOW_THRESHOLD_ONLY + ) + .build() + ); + assertEquals( + "Assigning a new shard to a node that is above the threshold should fail", + Decision.Type.NO, + writeLoadDecider.canAllocate( + testHarness.shardRouting2, + testHarness.exceedingThresholdRoutingNode, + testHarness.routingAllocation + ).type() + ); + assertEquals( + "Assigning a new shard to a node that has capacity should succeed", + Decision.Type.YES, + writeLoadDecider.canAllocate(testHarness.shardRouting1, testHarness.belowThresholdRoutingNode, testHarness.routingAllocation) + .type() + ); + assertEquals( + "Assigning a new shard without a write load estimate should _not_ be blocked by lack of capacity", + Decision.Type.YES, + writeLoadDecider.canAllocate( + testHarness.thirdRoutingNoWriteLoad, + testHarness.exceedingThresholdRoutingNode, + testHarness.routingAllocation + ).type() + ); + assertEquals( + "Assigning a new shard that would cause the node to exceed capacity should fail", + Decision.Type.NO, + writeLoadDecider.canAllocate(testHarness.shardRouting1, testHarness.nearThresholdRoutingNode, testHarness.routingAllocation) + .type() + ); + } + + /** + * Carries all the cluster state objects needed for testing after {@link #createClusterStateAndRoutingAllocation} sets them up. + */ + private record TestHarness( + ClusterState clusterState, + RoutingAllocation routingAllocation, + RoutingNode exceedingThresholdRoutingNode, + RoutingNode belowThresholdRoutingNode, + RoutingNode nearThresholdRoutingNode, + ShardRouting shardRouting1, + ShardRouting shardRouting2, + ShardRouting thirdRoutingNoWriteLoad + ) {} + + /** + * Creates all the cluster state and objects needed to test the {@link WriteLoadConstraintDecider}. + */ + private TestHarness createClusterStateAndRoutingAllocation(String indexName) { /** * Create the ClusterState for multiple nodes and multiple index shards. */ @@ -152,55 +265,15 @@ public void testWriteLoadDecider() { new ShardRouting[] {} ); - /** - * Test the write load decider - */ - - // The write load decider is disabled by default. - - var writeLoadDecider = createWriteLoadConstraintDecider(Settings.builder().build()); - assertEquals( - Decision.Type.YES, - writeLoadDecider.canAllocate(shardRouting2, exceedingThresholdRoutingNode, routingAllocation).type() - ); - assertEquals(Decision.Type.YES, writeLoadDecider.canAllocate(shardRouting1, belowThresholdRoutingNode, routingAllocation).type()); - assertEquals(Decision.Type.YES, writeLoadDecider.canAllocate(shardRouting1, nearThresholdRoutingNode, routingAllocation).type()); - assertEquals( - Decision.Type.YES, - writeLoadDecider.canAllocate(thirdRoutingNoWriteLoad, exceedingThresholdRoutingNode, routingAllocation).type() - ); - - // Check that the answers change when enabled. - - writeLoadDecider = createWriteLoadConstraintDecider( - Settings.builder() - .put( - WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(), - randomBoolean() - ? WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED - : WriteLoadConstraintSettings.WriteLoadDeciderStatus.LOW_THRESHOLD_ONLY - ) - .build() - ); - assertEquals( - "Assigning a new shard to a node that is above the threshold should fail", - Decision.Type.NO, - writeLoadDecider.canAllocate(shardRouting2, exceedingThresholdRoutingNode, routingAllocation).type() - ); - assertEquals( - "Assigning a new shard to a node that has capacity should succeed", - Decision.Type.YES, - writeLoadDecider.canAllocate(shardRouting1, belowThresholdRoutingNode, routingAllocation).type() - ); - assertEquals( - "Assigning a new shard without a write load estimate should _not_ be blocked by lack of capacity", - Decision.Type.YES, - writeLoadDecider.canAllocate(thirdRoutingNoWriteLoad, exceedingThresholdRoutingNode, routingAllocation).type() - ); - assertEquals( - "Assigning a new shard that would cause the node to exceed capacity should fail", - Decision.Type.NO, - writeLoadDecider.canAllocate(shardRouting1, nearThresholdRoutingNode, routingAllocation).type() + return new TestHarness( + clusterState, + routingAllocation, + exceedingThresholdRoutingNode, + belowThresholdRoutingNode, + nearThresholdRoutingNode, + shardRouting1, + shardRouting2, + thirdRoutingNoWriteLoad ); } From 0824b59c59e6b50d5c60dc03bd610a2c1e694383 Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Thu, 14 Aug 2025 11:00:45 -0700 Subject: [PATCH 15/16] improve explanation messages: limit decimal points and improve wording --- .../allocation/decider/WriteLoadConstraintDecider.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java index a499dc12cb71f..1fdf80ea9b4cb 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java @@ -65,8 +65,8 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing if (nodeWriteThreadPoolStats.averageThreadPoolUtilization() >= nodeWriteThreadPoolLoadThreshold) { // The node's write thread pool usage stats already show high utilization above the threshold for accepting new shards. String explain = Strings.format( - "Node [%s] with write thread pool utilization [%f] already exceeds the high utilization threshold of [%f]. Cannot allocate " - + "shard [%s] to node without risking increased write latencies.", + "Node [%s] with write thread pool utilization [%.2f] already exceeds the high utilization threshold of [%f]. Cannot " + + "allocate shard [%s] to node without risking increased write latencies.", node.nodeId(), nodeWriteThreadPoolStats.averageThreadPoolUtilization(), nodeWriteThreadPoolLoadThreshold, @@ -80,9 +80,9 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing // The node's write thread pool usage would be raised above the high utilization threshold with assignment of the new shard. // This could lead to a hot spot on this node and is undesirable. String explain = Strings.format( - "The high utilization threshold of [%f] would be exceeded on node [%s] with utilization [%f] if shard [%s] with estimated " - + "write load [%f] (execution time [%f] / threads [%d]) were assigned to it. Cannot allocate shard to node without " - + "risking increased write latencies.", + "The high utilization threshold of [%f] would be exceeded on node [%s] with utilization [%.2f] if shard [%s] with " + + "estimated additional utilisation [%.5f] (write load [%.5f] / threads [%d]) were assigned to it. Cannot allocate " + + "shard to node without risking increased write latencies.", nodeWriteThreadPoolLoadThreshold, node.nodeId(), nodeWriteThreadPoolStats.averageThreadPoolUtilization(), From 85f444872209f1bb1150c15e67cedb7217dc6f94 Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Thu, 14 Aug 2025 15:16:43 -0700 Subject: [PATCH 16/16] add explanation to canRemain method to fix testing --- .../allocation/decider/WriteLoadConstraintDecider.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java index 1fdf80ea9b4cb..ef24760f02a6b 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java @@ -101,12 +101,12 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing @Override public Decision canRemain(IndexMetadata indexMetadata, ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { if (writeLoadConstraintSettings.getWriteLoadConstraintEnabled().notFullyEnabled()) { - return Decision.YES; + return Decision.single(Decision.Type.YES, NAME, "canRemain() is not enabled"); } // TODO: implement - return Decision.YES; + return Decision.single(Decision.Type.YES, NAME, "canRemain() is not yet implemented"); } /**