Skip to content

Commit 012df24

Browse files
authored
Don't allow allocation of shards with zero/unknown write load to overloaded nodes (#138148)
1 parent fbee351 commit 012df24

File tree

3 files changed

+130
-11
lines changed

3 files changed

+130
-11
lines changed

server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -59,13 +59,6 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
5959
return allocation.decision(Decision.YES, NAME, "Shard is unassigned. Decider takes no action.");
6060
}
6161

62-
// Check whether the shard being relocated has any write load estimate. If it does not, then this decider has no opinion.
63-
var allShardWriteLoads = allocation.clusterInfo().getShardWriteLoads();
64-
var shardWriteLoad = allShardWriteLoads.get(shardRouting.shardId());
65-
if (shardWriteLoad == null || shardWriteLoad == 0) {
66-
return allocation.decision(Decision.YES, NAME, "Shard has no estimated write load. Decider takes no action.");
67-
}
68-
6962
var allNodeUsageStats = allocation.clusterInfo().getNodeUsageStatsForThreadPools();
7063
var nodeUsageStatsForThreadPools = allNodeUsageStats.get(node.nodeId());
7164
if (nodeUsageStatsForThreadPools == null) {
@@ -98,6 +91,8 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
9891
}
9992
}
10093

94+
var allShardWriteLoads = allocation.clusterInfo().getShardWriteLoads();
95+
var shardWriteLoad = allShardWriteLoads.getOrDefault(shardRouting.shardId(), 0.0);
10196
var newWriteThreadPoolUtilization = calculateShardMovementChange(nodeWriteThreadPoolStats, shardWriteLoad);
10297
if (newWriteThreadPoolUtilization >= nodeWriteThreadPoolLoadThreshold) {
10398
// The node's write thread pool usage would be raised above the high utilization threshold with assignment of the new shard.
@@ -195,7 +190,6 @@ private Double getShardWriteLoad(RoutingAllocation allocation, ShardRouting shar
195190
* Returns the percent thread pool utilization change.
196191
*/
197192
private float calculateShardMovementChange(ThreadPoolUsageStats nodeWriteThreadPoolStats, double shardWriteLoad) {
198-
assert shardWriteLoad > 0;
199193
return ShardMovementWriteLoadSimulator.updateNodeUtilizationWithShardMovements(
200194
nodeWriteThreadPoolStats.averageThreadPoolUtilization(),
201195
(float) shardWriteLoad,

server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputerTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,8 @@ public void testNoInfiniteLoopBetweenHotspotMitigationAndBalancing() {
223223
final var input = new DesiredBalanceInput(42, routingAllocation, List.of());
224224
final var computer = createDesiredBalanceComputer(new BalancedShardsAllocator(settings));
225225
var balance = computer.compute(DesiredBalance.BECOME_MASTER_INITIAL, input, queue(), ignored -> true);
226-
assertThat(balance.getAssignment(new ShardId(index, 0)).nodeIds(), equalTo(Set.of("node-0")));
226+
// This is an unusual edge case that will result in both shards being moved to the non-hot-spotting node
227+
assertThat(balance.getAssignment(new ShardId(index, 0)).nodeIds(), equalTo(Set.of("node-1")));
227228
assertThat(balance.getAssignment(new ShardId(index, 1)).nodeIds(), equalTo(Set.of("node-1")));
228229
}
229230

server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderTests.java

Lines changed: 126 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,15 @@
1111

1212
import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
1313
import org.elasticsearch.cluster.ClusterInfo;
14+
import org.elasticsearch.cluster.ClusterInfoSimulator;
1415
import org.elasticsearch.cluster.ClusterState;
1516
import org.elasticsearch.cluster.ESAllocationTestCase;
1617
import org.elasticsearch.cluster.NodeUsageStatsForThreadPools;
1718
import org.elasticsearch.cluster.NodeUsageStatsForThreadPools.ThreadPoolUsageStats;
1819
import org.elasticsearch.cluster.metadata.IndexMetadata;
20+
import org.elasticsearch.cluster.metadata.ProjectId;
1921
import org.elasticsearch.cluster.node.DiscoveryNode;
22+
import org.elasticsearch.cluster.routing.RoutingChangesObserver;
2023
import org.elasticsearch.cluster.routing.RoutingNode;
2124
import org.elasticsearch.cluster.routing.RoutingNodes;
2225
import org.elasticsearch.cluster.routing.RoutingNodesHelper;
@@ -25,15 +28,22 @@
2528
import org.elasticsearch.cluster.routing.TestShardRouting;
2629
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
2730
import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings;
31+
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
2832
import org.elasticsearch.common.Strings;
2933
import org.elasticsearch.common.regex.Regex;
3034
import org.elasticsearch.common.settings.ClusterSettings;
3135
import org.elasticsearch.common.settings.Settings;
36+
import org.elasticsearch.core.TimeValue;
3237
import org.elasticsearch.index.Index;
3338
import org.elasticsearch.index.shard.ShardId;
39+
import org.elasticsearch.snapshots.SnapshotShardSizeInfo;
3440
import org.elasticsearch.threadpool.ThreadPool;
3541

3642
import java.util.HashMap;
43+
import java.util.HashSet;
44+
import java.util.List;
45+
import java.util.Map;
46+
import java.util.stream.Collectors;
3747

3848
import static org.elasticsearch.common.settings.ClusterSettings.createBuiltInClusterSettings;
3949
import static org.mockito.ArgumentMatchers.any;
@@ -155,14 +165,25 @@ public void testWriteLoadDeciderCanAllocate() {
155165
"Shard [[test-index][0]] in index [[test-index]] can be assigned to node [*]. The node's utilization would become [*]"
156166
);
157167
assertDecisionMatches(
158-
"Assigning a new shard without a write load estimate should _not_ be blocked by lack of capacity",
168+
"Assigning a new shard without a write load estimate to an over-threshold node should be blocked",
159169
writeLoadDecider.canAllocate(
160170
testHarness.shardRoutingNoWriteLoad,
161171
testHarness.exceedingThresholdRoutingNode,
162172
testHarness.routingAllocation
163173
),
174+
Decision.Type.NOT_PREFERRED,
175+
"Node [*] with write thread pool utilization [0.99] already exceeds the high utilization threshold of "
176+
+ "[0.900000]. Cannot allocate shard [[test-index][2]] to node without risking increased write latencies."
177+
);
178+
assertDecisionMatches(
179+
"Assigning a new shard without a write load estimate to an under-threshold node should be allowed",
180+
writeLoadDecider.canAllocate(
181+
testHarness.shardRoutingNoWriteLoad,
182+
testHarness.belowThresholdRoutingNode,
183+
testHarness.routingAllocation
184+
),
164185
Decision.Type.YES,
165-
"Shard has no estimated write load. Decider takes no action."
186+
"Shard [[test-index][2]] in index [[test-index]] can be assigned to node [*]. The node's utilization would become [*]"
166187
);
167188
assertDecisionMatches(
168189
"Assigning a new shard that would cause the node to exceed capacity should fail",
@@ -247,6 +268,109 @@ public void testWriteLoadDeciderCanRemain() {
247268
);
248269
}
249270

271+
public void testWriteLoadDeciderShouldPreventBalancerMovingShardsBack() {
272+
final var indexName = randomIdentifier();
273+
final int numThreads = randomIntBetween(1, 10);
274+
final float highUtilizationThreshold = randomFloatBetween(0.5f, 0.9f, true);
275+
final long highLatencyThreshold = randomLongBetween(1000, 10000);
276+
final var settings = Settings.builder()
277+
.put(
278+
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(),
279+
WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED
280+
)
281+
.put(WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_HIGH_UTILIZATION_THRESHOLD_SETTING.getKey(), highUtilizationThreshold)
282+
.put(
283+
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_QUEUE_LATENCY_THRESHOLD_SETTING.getKey(),
284+
TimeValue.timeValueMillis(highLatencyThreshold)
285+
)
286+
.build();
287+
288+
final var state = ClusterStateCreationUtils.state(2, new String[] { indexName }, 4);
289+
final var balancedShardsAllocator = new BalancedShardsAllocator(settings);
290+
final var overloadedNode = randomFrom(state.nodes().getAllNodes());
291+
final var otherNode = state.nodes().stream().filter(node -> node != overloadedNode).findFirst().orElseThrow();
292+
final var clusterInfo = ClusterInfo.builder()
293+
.nodeUsageStatsForThreadPools(
294+
Map.of(
295+
overloadedNode.getId(),
296+
new NodeUsageStatsForThreadPools(
297+
overloadedNode.getId(),
298+
Map.of(
299+
ThreadPool.Names.WRITE,
300+
new ThreadPoolUsageStats(
301+
numThreads,
302+
randomFloatBetween(highUtilizationThreshold, 1.1f, false),
303+
randomLongBetween(highLatencyThreshold, highLatencyThreshold * 2)
304+
)
305+
)
306+
),
307+
otherNode.getId(),
308+
new NodeUsageStatsForThreadPools(
309+
otherNode.getId(),
310+
Map.of(
311+
ThreadPool.Names.WRITE,
312+
new ThreadPoolUsageStats(
313+
numThreads,
314+
randomFloatBetween(0.0f, highUtilizationThreshold / 2, true),
315+
randomLongBetween(0, highLatencyThreshold / 2)
316+
)
317+
)
318+
)
319+
)
320+
)
321+
// simulate all zero or missing shard write loads
322+
.shardWriteLoads(
323+
state.routingTable(ProjectId.DEFAULT)
324+
.allShards()
325+
.filter(ignored -> randomBoolean()) // some write-loads are missing altogether
326+
.collect(Collectors.toMap(ShardRouting::shardId, ignored -> 0.0d)) // the rest are zero
327+
)
328+
.build();
329+
330+
final var clusterSettings = createBuiltInClusterSettings(settings);
331+
final var writeLoadConstraintDecider = new WriteLoadConstraintDecider(clusterSettings);
332+
final var routingAllocation = new RoutingAllocation(
333+
new AllocationDeciders(List.of(writeLoadConstraintDecider)),
334+
state.getRoutingNodes().mutableCopy(),
335+
state,
336+
clusterInfo,
337+
SnapshotShardSizeInfo.EMPTY,
338+
randomLong()
339+
);
340+
341+
// This should move a shard in an attempt to resolve the hot-spot
342+
balancedShardsAllocator.allocate(routingAllocation);
343+
344+
assertEquals(1, routingAllocation.routingNodes().node(overloadedNode.getId()).numberOfOwningShards());
345+
assertEquals(3, routingAllocation.routingNodes().node(otherNode.getId()).numberOfOwningShards());
346+
347+
final var clusterInfoSimulator = new ClusterInfoSimulator(routingAllocation);
348+
final var movedShards = new HashSet<ShardRouting>();
349+
for (RoutingNode routingNode : routingAllocation.routingNodes()) {
350+
movedShards.addAll(routingNode.shardsWithState(ShardRoutingState.INITIALIZING).collect(Collectors.toSet()));
351+
}
352+
movedShards.forEach(shardRouting -> {
353+
routingAllocation.routingNodes().startShard(shardRouting, new RoutingChangesObserver() {
354+
}, randomNonNegativeLong());
355+
clusterInfoSimulator.simulateShardStarted(shardRouting);
356+
});
357+
358+
// This should run through the balancer without moving any shards back
359+
ClusterInfo simulatedClusterInfo = clusterInfoSimulator.getClusterInfo();
360+
balancedShardsAllocator.allocate(
361+
new RoutingAllocation(
362+
routingAllocation.deciders(),
363+
routingAllocation.routingNodes(),
364+
routingAllocation.getClusterState(),
365+
simulatedClusterInfo,
366+
routingAllocation.snapshotShardSizeInfo(),
367+
randomLong()
368+
)
369+
);
370+
assertEquals(1, routingAllocation.routingNodes().node(overloadedNode.getId()).numberOfOwningShards());
371+
assertEquals(3, routingAllocation.routingNodes().node(otherNode.getId()).numberOfOwningShards());
372+
}
373+
250374
private void assertDecisionMatches(String description, Decision decision, Decision.Type type, String explanationPattern) {
251375
assertEquals(description, type, decision.type());
252376
if (explanationPattern == null) {

0 commit comments

Comments
 (0)