Skip to content

Commit 68be3df

Browse files
check that new shard assignment won't exceed threshold
improvements and *Tests are complete
1 parent 606cadc commit 68be3df

File tree

3 files changed

+103
-78
lines changed

3 files changed

+103
-78
lines changed

server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -96,16 +96,34 @@ private static NodeUsageStatsForThreadPools.ThreadPoolUsageStats replaceWritePoo
9696
) {
9797
final NodeUsageStatsForThreadPools.ThreadPoolUsageStats writeThreadPoolStats = value.threadPoolUsageStatsMap()
9898
.get(ThreadPool.Names.WRITE);
99-
var newAverageThreadPoolUtilization = (writeThreadPoolStats.averageThreadPoolUtilization() + (writeLoadDelta / writeThreadPoolStats
100-
.totalThreadPoolThreads()));
101-
assert newAverageThreadPoolUtilization < 1.00;
10299
return new NodeUsageStatsForThreadPools.ThreadPoolUsageStats(
103100
writeThreadPoolStats.totalThreadPoolThreads(),
104-
(float) Math.max(
105-
(writeThreadPoolStats.averageThreadPoolUtilization() + (writeLoadDelta / writeThreadPoolStats.totalThreadPoolThreads())),
106-
0.0
101+
updateNodeUtilizationWithShardMovements(
102+
writeThreadPoolStats.averageThreadPoolUtilization(),
103+
(float) writeLoadDelta,
104+
writeThreadPoolStats.totalThreadPoolThreads()
107105
),
108106
writeThreadPoolStats.maxThreadPoolQueueLatencyMillis()
109107
);
110108
}
109+
110+
/**
111+
* The {@code nodeUtilization} is the average utilization per thread for some duration of time. The {@code shardWriteLoadDelta} is the
112+
* sum of shards' total execution time. Dividing the shards total execution time by the number of threads provides the average
113+
* utilization of each write thread for those shards. The change in shard load can then be added to the node utilization.
114+
*
115+
* @param nodeUtilization The current node-level write load percent utilization.
116+
* @param shardWriteLoadDelta The change in shard(s) execution time across all threads. This can be positive or negative depending on
117+
* whether shards were moved onto the node or off of the node.
118+
* @param numberOfWriteThreads The number of threads available in the node's write thread pool.
119+
* @return The new node-level write load percent utilization after adding the shard write load delta.
120+
*/
121+
public static float updateNodeUtilizationWithShardMovements(
122+
float nodeUtilization,
123+
float shardWriteLoadDelta,
124+
int numberOfWriteThreads
125+
) {
126+
float newNodeUtilization = nodeUtilization + (shardWriteLoadDelta / numberOfWriteThreads);
127+
return (float) Math.max(newNodeUtilization, 0.0);
128+
}
111129
}

server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java

Lines changed: 30 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,12 @@
1414
import org.elasticsearch.cluster.NodeUsageStatsForThreadPools.ThreadPoolUsageStats;
1515
import org.elasticsearch.cluster.metadata.IndexMetadata;
1616
import org.elasticsearch.cluster.routing.RoutingNode;
17+
import org.elasticsearch.cluster.routing.ShardMovementWriteLoadSimulator;
1718
import org.elasticsearch.cluster.routing.ShardRouting;
1819
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
1920
import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings;
2021
import org.elasticsearch.common.settings.ClusterSettings;
22+
import org.elasticsearch.core.Strings;
2123
import org.elasticsearch.threadpool.ThreadPool;
2224

2325
/**
@@ -38,57 +40,59 @@ public WriteLoadConstraintDecider(ClusterSettings clusterSettings) {
3840
@Override
3941
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
4042
if (writeLoadConstraintSettings.getWriteLoadConstraintEnabled() != WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED) {
41-
return Decision.YES;
43+
return Decision.single(Decision.Type.YES, NAME, "Decider is disabled");
4244
}
4345

4446
// Check whether the shard being relocated has any write load estimate. If it does not, then this decider has no opinion.
4547
var allShardWriteLoads = allocation.clusterInfo().getShardWriteLoads();
4648
var shardWriteLoad = allShardWriteLoads.get(shardRouting.shardId());
4749
if (shardWriteLoad == null || shardWriteLoad == 0) {
48-
return Decision.YES;
50+
return Decision.single(Decision.Type.YES, NAME, "Shard has no estimated write load. Decider takes no action.");
4951
}
5052

5153
var allNodeUsageStats = allocation.clusterInfo().getNodeUsageStatsForThreadPools();
5254
var nodeUsageStatsForThreadPools = allNodeUsageStats.get(node.nodeId());
5355
if (nodeUsageStatsForThreadPools == null) {
5456
// No node-level thread pool usage stats were reported for this node. Let's assume this is OK and that the simulator will handle
5557
// setting a node-level write load for this node after this shard is assigned.
56-
return Decision.YES;
58+
return Decision.single(Decision.Type.YES, NAME, "The node has no write load estimate. Decider takes no action.");
5759
}
5860

59-
// NOMERGE: should create a utility class (eventually, maybe duplicate code for this class, for now, if simulator does have bug)
60-
// to calculate the change node's usage change with assignment of the new shard
6161
assert nodeUsageStatsForThreadPools.threadPoolUsageStatsMap().isEmpty() == false;
6262
assert nodeUsageStatsForThreadPools.threadPoolUsageStatsMap().get(ThreadPool.Names.WRITE) != null;
6363
var nodeWriteThreadPoolStats = nodeUsageStatsForThreadPools.threadPoolUsageStatsMap().get(ThreadPool.Names.WRITE);
6464
var nodeWriteThreadPoolLoadThreshold = writeLoadConstraintSettings.getWriteThreadPoolHighUtilizationThresholdSetting();
6565
if (nodeWriteThreadPoolStats.averageThreadPoolUtilization() >= nodeWriteThreadPoolLoadThreshold) {
6666
// The node's write thread pool usage stats already show high utilization above the threshold for accepting new shards.
67-
logger.debug(
68-
"The high utilization threshold of {} has already been reached on node {}. Cannot allocate shard {} to node {} "
69-
+ "without risking increased write latencies.",
70-
nodeWriteThreadPoolLoadThreshold,
67+
String debugMsg = Strings.format(
68+
"Node [%s] with write thread pool utilization [%f] already exceeds the high utilization threshold of [%f]. Cannot allocate "
69+
+ "shard [%s] to node without risking increased write latencies.",
7170
node.nodeId(),
72-
shardRouting.shardId(),
73-
node.nodeId()
71+
nodeWriteThreadPoolStats.averageThreadPoolUtilization(),
72+
nodeWriteThreadPoolLoadThreshold,
73+
shardRouting.shardId()
7474
);
75-
return Decision.NO;
75+
logger.debug(debugMsg);
76+
return Decision.single(Decision.Type.NO, NAME, debugMsg);
7677
}
7778

78-
if (nodeWriteThreadPoolStats.averageThreadPoolUtilization() + calculateShardMovementChange(nodeWriteThreadPoolStats, shardWriteLoad) >= nodeWriteThreadPoolLoadThreshold) {
79-
// The node's write thread pool usage would be raised above the high utilization threshold. This could lead to a hot spot on
80-
// this node and is undesirable.
81-
logger.debug(
82-
"The high utilization threshold of {} would be exceeded on node {} if shard {} with estimated write load {} were "
83-
+ "assigned to it. Cannot allocate shard {} to node {} without risking increased write latencies.",
79+
if (calculateShardMovementChange(nodeWriteThreadPoolStats, shardWriteLoad) >= nodeWriteThreadPoolLoadThreshold) {
80+
// The node's write thread pool usage would be raised above the high utilization threshold with assignment of the new shard.
81+
// This could lead to a hot spot on this node and is undesirable.
82+
String debugMsg = Strings.format(
83+
"The high utilization threshold of [%f] would be exceeded on node [%s] with utilization [%f] if shard [%s] with estimated "
84+
+ "write load [%f] (execution time [%f] / threads [%d]) were assigned to it. Cannot allocate shard to node without "
85+
+ "risking increased write latencies.",
8486
nodeWriteThreadPoolLoadThreshold,
8587
node.nodeId(),
88+
nodeWriteThreadPoolStats.averageThreadPoolUtilization(),
8689
shardRouting.shardId(),
90+
shardWriteLoad / nodeWriteThreadPoolStats.totalThreadPoolThreads(),
8791
shardWriteLoad,
88-
shardRouting.shardId(),
89-
node.nodeId()
92+
nodeWriteThreadPoolStats.totalThreadPoolThreads()
9093
);
91-
return Decision.NO;
94+
logger.debug(debugMsg);
95+
return Decision.single(Decision.Type.NO, NAME, debugMsg);
9296
}
9397

9498
return Decision.YES;
@@ -109,8 +113,10 @@ public Decision canRemain(IndexMetadata indexMetadata, ShardRouting shardRouting
109113
*/
110114
private float calculateShardMovementChange(ThreadPoolUsageStats nodeWriteThreadPoolStats, double shardWriteLoad) {
111115
assert shardWriteLoad > 0;
112-
// NOMERGE: move this into an utility class, should be commonly accessible with the simulator.
113-
// TODO: implement..
114-
return 0;
116+
return ShardMovementWriteLoadSimulator.updateNodeUtilizationWithShardMovements(
117+
nodeWriteThreadPoolStats.averageThreadPoolUtilization(),
118+
(float) shardWriteLoad,
119+
nodeWriteThreadPoolStats.totalThreadPoolThreads()
120+
);
115121
}
116122
}

server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderTests.java

Lines changed: 49 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -36,34 +36,29 @@
3636

3737
public class WriteLoadConstraintDeciderTests extends ESAllocationTestCase {
3838

39-
// NOMERGE: I wonder if we don't want a node utilization percent, but rather a total node execution time used and a total node execution
40-
// time possible. The math would be a whole lot easier on us...
41-
public void testWriteLoadDeciderIsDisabled() {
39+
public void testWriteLoadDecider() {
4240
String indexName = "test-index";
4341

44-
// Set up multiple nodes and an index with multiple shards.
45-
// DiscoveryNode discoveryNode1 = newNode("node1");
46-
// DiscoveryNode discoveryNode2 = newNode("node2");
47-
// ShardId shardId1 = new ShardId(indexName, IndexMetadata.INDEX_UUID_NA_VALUE, 0);
48-
// ShardId shardId2 = new ShardId(indexName, IndexMetadata.INDEX_UUID_NA_VALUE, 1);
49-
5042
/**
51-
* Create the ClusterState
43+
* Create the ClusterState for multiple nodes and multiple index shards.
5244
*/
5345

54-
ClusterState clusterState = ClusterStateCreationUtils.stateWithAssignedPrimariesAndReplicas(new String[] { indexName }, 3, 0);
55-
assertEquals(2, clusterState.nodes().size());
46+
ClusterState clusterState = ClusterStateCreationUtils.stateWithAssignedPrimariesAndReplicas(new String[] { indexName }, 3, 1);
47+
// The number of data nodes the util method above creates is numberOfReplicas+1.
48+
assertEquals(3, clusterState.nodes().size());
5649
assertEquals(1, clusterState.metadata().getTotalNumberOfIndices());
5750

5851
/**
59-
* Fetch the nodes and index shards from the generated ClusterState.
52+
* Fetch references to the nodes and index shards from the generated ClusterState, so the ClusterInfo can be created from them.
6053
*/
6154

6255
var discoveryNodeIterator = clusterState.nodes().iterator();
6356
assertTrue(discoveryNodeIterator.hasNext());
6457
var exceedingThresholdDiscoveryNode = discoveryNodeIterator.next();
6558
assertTrue(discoveryNodeIterator.hasNext());
6659
var belowThresholdDiscoveryNode2 = discoveryNodeIterator.next();
60+
assertTrue(discoveryNodeIterator.hasNext());
61+
var nearThresholdDiscoveryNode3 = discoveryNodeIterator.next();
6762
assertFalse(discoveryNodeIterator.hasNext());
6863

6964
var indexIterator = clusterState.metadata().indicesAllProjects().iterator();
@@ -74,10 +69,10 @@ public void testWriteLoadDeciderIsDisabled() {
7469
assertEquals(3, testIndexMetadata.getNumberOfShards());
7570
ShardId testShardId1 = new ShardId(testIndex, 0);
7671
ShardId testShardId2 = new ShardId(testIndex, 1);
77-
ShardId testShardId3NoWriteLoad = new ShardId(testIndex, 1);
72+
ShardId testShardId3NoWriteLoad = new ShardId(testIndex, 2);
7873

7974
/**
80-
* Create the ClusterInfo that includes the node and shard level write load estimates.
75+
* Create a ClusterInfo that includes the node and shard level write load estimates for a variety of node capacity situations.
8176
*/
8277

8378
var nodeThreadPoolStatsWithWriteExceedingThreshold = createNodeUsageStatsForThreadPools(
@@ -87,15 +82,18 @@ public void testWriteLoadDeciderIsDisabled() {
8782
0
8883
);
8984
var nodeThreadPoolStatsWithWriteBelowThreshold = createNodeUsageStatsForThreadPools(belowThresholdDiscoveryNode2, 8, 0.50f, 0);
85+
var nodeThreadPoolStatsWithWriteNearThreshold = createNodeUsageStatsForThreadPools(nearThresholdDiscoveryNode3, 8, 0.89f, 0);
9086

9187
// Create a map of usage per node.
9288
var nodeIdToNodeUsageStatsForThreadPools = new HashMap<String, NodeUsageStatsForThreadPools>();
9389
nodeIdToNodeUsageStatsForThreadPools.put(exceedingThresholdDiscoveryNode.getId(), nodeThreadPoolStatsWithWriteExceedingThreshold);
9490
nodeIdToNodeUsageStatsForThreadPools.put(belowThresholdDiscoveryNode2.getId(), nodeThreadPoolStatsWithWriteBelowThreshold);
91+
nodeIdToNodeUsageStatsForThreadPools.put(nearThresholdDiscoveryNode3.getId(), nodeThreadPoolStatsWithWriteNearThreshold);
9592

93+
// Create a map of usage per shard.
9694
var shardIdToWriteLoadEstimate = new HashMap<ShardId, Double>();
97-
shardIdToWriteLoadEstimate.put(testShardId1, 1.5);
98-
shardIdToWriteLoadEstimate.put(testShardId2, 1.5);
95+
shardIdToWriteLoadEstimate.put(testShardId1, 0.5);
96+
shardIdToWriteLoadEstimate.put(testShardId2, 0.5);
9997
shardIdToWriteLoadEstimate.put(testShardId3NoWriteLoad, 0d);
10098

10199
ClusterInfo clusterInfo = ClusterInfo.builder()
@@ -104,7 +102,7 @@ public void testWriteLoadDeciderIsDisabled() {
104102
.build();
105103

106104
/**
107-
* Create the RoutingAllocation
105+
* Create the RoutingAllocation from the ClusterState and ClusterInfo above, and set up the other input for the WriteLoadDecider.
108106
*/
109107

110108
var routingAllocation = new RoutingAllocation(
@@ -138,32 +136,36 @@ public void testWriteLoadDeciderIsDisabled() {
138136
ShardRoutingState.STARTED
139137
);
140138

141-
assertTrue(discoveryNodeIterator.hasNext());
142139
RoutingNode exceedingThresholdRoutingNode = RoutingNodesHelper.routingNode(
143140
exceedingThresholdDiscoveryNode.getId(),
144-
discoveryNodeIterator.next(),
141+
exceedingThresholdDiscoveryNode,
145142
shardRouting1
146143
);
147-
assertTrue(discoveryNodeIterator.hasNext());
148144
RoutingNode belowThresholdRoutingNode = RoutingNodesHelper.routingNode(
149145
belowThresholdDiscoveryNode2.getId(),
150-
discoveryNodeIterator.next(),
146+
belowThresholdDiscoveryNode2,
151147
shardRouting2
152148
);
153-
assertFalse(discoveryNodeIterator.hasNext());
149+
RoutingNode nearThresholdRoutingNode = RoutingNodesHelper.routingNode(
150+
nearThresholdDiscoveryNode3.getId(),
151+
nearThresholdDiscoveryNode3,
152+
new ShardRouting[] {}
153+
);
154154

155155
/**
156156
* Test the write load decider
157157
*/
158158

159159
// The write load decider is disabled by default.
160-
var writeLoadDecider = createWriteLoadConstraintDecider(Settings.builder().build());
161160

162-
assertEquals(Decision.YES, writeLoadDecider.canAllocate(shardRouting2, exceedingThresholdRoutingNode, routingAllocation));
163-
assertEquals(Decision.YES, writeLoadDecider.canAllocate(shardRouting1, belowThresholdRoutingNode, routingAllocation));
164-
assertEquals(Decision.YES, writeLoadDecider.canAllocate(thirdRoutingNoWriteLoad, exceedingThresholdRoutingNode, routingAllocation));
161+
var writeLoadDecider = createWriteLoadConstraintDecider(Settings.builder().build());
162+
assertEquals(Decision.Type.YES, writeLoadDecider.canAllocate(shardRouting2, exceedingThresholdRoutingNode, routingAllocation).type());
163+
assertEquals(Decision.Type.YES, writeLoadDecider.canAllocate(shardRouting1, belowThresholdRoutingNode, routingAllocation).type());
164+
assertEquals(Decision.Type.YES, writeLoadDecider.canAllocate(shardRouting1, nearThresholdRoutingNode, routingAllocation).type());
165+
assertEquals(Decision.Type.YES, writeLoadDecider.canAllocate(thirdRoutingNoWriteLoad, exceedingThresholdRoutingNode, routingAllocation).type());
165166

166167
// Check that the answers change when enabled.
168+
167169
writeLoadDecider = createWriteLoadConstraintDecider(
168170
Settings.builder()
169171
.put(
@@ -172,27 +174,26 @@ public void testWriteLoadDeciderIsDisabled() {
172174
)
173175
.build()
174176
);
175-
176-
assertEquals(Decision.NO, writeLoadDecider.canAllocate(shardRouting2, exceedingThresholdRoutingNode, routingAllocation));
177-
assertEquals(Decision.YES, writeLoadDecider.canAllocate(shardRouting1, belowThresholdRoutingNode, routingAllocation));
178-
assertEquals(Decision.YES, writeLoadDecider.canAllocate(thirdRoutingNoWriteLoad, exceedingThresholdRoutingNode, routingAllocation));
179-
180-
// NOMERGE: test that adding a shard is rejected if it would overflow the utilization threshold?
181-
// Need to implement the logic in the decider, I don't check right now.
182-
}
183-
184-
public void testShardWithNoWriteLoadEstimateIsAlwaysYES() {
185-
Settings writeLoadConstraintSettings = Settings.builder()
186-
.put(
187-
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(),
188-
WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED
189-
)
190-
.build();
191-
// TODO
192-
}
193-
194-
public void testShardWithWriteLoadEstimate() {
195-
// TODO: test successful re-assignment and rejected re-assignment due to threshold
177+
assertEquals(
178+
"Assigning a new shard to a node that is above the threshold should fail",
179+
Decision.Type.NO,
180+
writeLoadDecider.canAllocate(shardRouting2, exceedingThresholdRoutingNode, routingAllocation).type()
181+
);
182+
assertEquals(
183+
"Assigning a new shard to a node that has capacity should succeed",
184+
Decision.Type.YES,
185+
writeLoadDecider.canAllocate(shardRouting1, belowThresholdRoutingNode, routingAllocation).type()
186+
);
187+
assertEquals(
188+
"Assigning a new shard without a write load estimate should _not_ be blocked by lack of capacity",
189+
Decision.Type.YES,
190+
writeLoadDecider.canAllocate(thirdRoutingNoWriteLoad, exceedingThresholdRoutingNode, routingAllocation).type()
191+
);
192+
assertEquals(
193+
"Assigning a new shard that would cause the node to exceed capacity should fail",
194+
Decision.Type.NO,
195+
writeLoadDecider.canAllocate(shardRouting1, nearThresholdRoutingNode, routingAllocation).type()
196+
);
196197
}
197198

198199
private WriteLoadConstraintDecider createWriteLoadConstraintDecider(Settings settings) {

0 commit comments

Comments
 (0)