Skip to content

Commit a31485f

Browse files
authored
Add simple queue latency simulation by shard movemment (#133197)
Any momving away shard reduces queue latency to zero, effectively "fixes' hot-spotting. Rinse and repeat when the next ClusterInfo arrives. Resolves: ES-12622
1 parent 21471a1 commit a31485f

File tree

2 files changed

+85
-18
lines changed

2 files changed

+85
-18
lines changed

server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919
import org.elasticsearch.threadpool.ThreadPool;
2020

2121
import java.util.Collections;
22+
import java.util.HashSet;
2223
import java.util.Map;
24+
import java.util.Set;
2325

2426
/**
2527
* Simulates the impact to each node's write-load in response to the movement of individual
@@ -30,11 +32,14 @@ public class ShardMovementWriteLoadSimulator {
3032
private final Map<String, NodeUsageStatsForThreadPools> originalNodeUsageStatsForThreadPools;
3133
private final ObjectDoubleMap<String> simulatedNodeWriteLoadDeltas;
3234
private final Map<ShardId, Double> writeLoadsPerShard;
35+
// The set to track whether a node has seen a shard move away from it
36+
private final Set<String> nodesWithMovedAwayShard;
3337

3438
public ShardMovementWriteLoadSimulator(RoutingAllocation routingAllocation) {
3539
this.originalNodeUsageStatsForThreadPools = routingAllocation.clusterInfo().getNodeUsageStatsForThreadPools();
3640
this.writeLoadsPerShard = routingAllocation.clusterInfo().getShardWriteLoads();
3741
this.simulatedNodeWriteLoadDeltas = new ObjectDoubleHashMap<>();
42+
this.nodesWithMovedAwayShard = new HashSet<>();
3843
}
3944

4045
public void simulateShardStarted(ShardRouting shardRouting) {
@@ -46,6 +51,7 @@ public void simulateShardStarted(ShardRouting shardRouting) {
4651
// This is a shard being relocated
4752
simulatedNodeWriteLoadDeltas.addTo(shardRouting.relocatingNodeId(), -1 * writeLoadForShard);
4853
simulatedNodeWriteLoadDeltas.addTo(shardRouting.currentNodeId(), writeLoadForShard);
54+
nodesWithMovedAwayShard.add(shardRouting.relocatingNodeId());
4955
} else {
5056
// This is a new shard starting, it's unlikely we'll have a write-load value for a new
5157
// shard, but we may be able to estimate if the new shard is created as part of a datastream
@@ -69,7 +75,11 @@ public Map<String, NodeUsageStatsForThreadPools> simulatedNodeUsageStatsForThrea
6975
Maps.copyMapWithAddedOrReplacedEntry(
7076
entry.getValue().threadPoolUsageStatsMap(),
7177
ThreadPool.Names.WRITE,
72-
replaceWritePoolStats(entry.getValue(), simulatedNodeWriteLoadDeltas.get(entry.getKey()))
78+
replaceWritePoolStats(
79+
entry.getValue(),
80+
simulatedNodeWriteLoadDeltas.get(entry.getKey()),
81+
nodesWithMovedAwayShard.contains(entry.getKey())
82+
)
7383
)
7484
);
7585
adjustedNodeUsageStatsForThreadPools.put(entry.getKey(), adjustedValue);
@@ -82,7 +92,8 @@ public Map<String, NodeUsageStatsForThreadPools> simulatedNodeUsageStatsForThrea
8292

8393
private static NodeUsageStatsForThreadPools.ThreadPoolUsageStats replaceWritePoolStats(
8494
NodeUsageStatsForThreadPools value,
85-
double writeLoadDelta
95+
double writeLoadDelta,
96+
boolean hasSeenMovedAwayShard
8697
) {
8798
final NodeUsageStatsForThreadPools.ThreadPoolUsageStats writeThreadPoolStats = value.threadPoolUsageStatsMap()
8899
.get(ThreadPool.Names.WRITE);
@@ -93,7 +104,7 @@ private static NodeUsageStatsForThreadPools.ThreadPoolUsageStats replaceWritePoo
93104
(float) writeLoadDelta,
94105
writeThreadPoolStats.totalThreadPoolThreads()
95106
),
96-
writeThreadPoolStats.maxThreadPoolQueueLatencyMillis()
107+
adjustThreadPoolQueueLatencyWithShardMovements(writeThreadPoolStats.maxThreadPoolQueueLatencyMillis(), hasSeenMovedAwayShard)
97108
);
98109
}
99110

@@ -116,4 +127,18 @@ public static float updateNodeUtilizationWithShardMovements(
116127
float newNodeUtilization = nodeUtilization + (shardWriteLoadDelta / numberOfWriteThreads);
117128
return (float) Math.max(newNodeUtilization, 0.0);
118129
}
130+
131+
/**
132+
* Adjust the max thread pool queue latency by accounting for whether shard has moved away from the node.
133+
* @param maxThreadPoolQueueLatencyMillis The current max thread pool queue latency.
134+
* @param hasSeenMovedAwayShard Whether the node has seen a shard move away from it.
135+
* @return The new adjusted max thread pool queue latency.
136+
*/
137+
public static long adjustThreadPoolQueueLatencyWithShardMovements(long maxThreadPoolQueueLatencyMillis, boolean hasSeenMovedAwayShard) {
138+
// Intentionally keep it simple by reducing queue latency to zero if we move any shard off the node.
139+
// This means the node is considered as no longer hot-spotting (with respect to queue latency) once a shard moves away.
140+
// The next ClusterInfo will come in up-to 30 seconds later, and we will see the actual impact of the
141+
// shard movement and repeat the process. We keep the queue latency unchanged if shard moves onto the node.
142+
return hasSeenMovedAwayShard ? 0L : maxThreadPoolQueueLatencyMillis;
143+
}
119144
}

server/src/test/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulatorTests.java

Lines changed: 57 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,9 @@ public void testNoShardMovement() {
4848
final var originalNode0ThreadPoolStats = randomThreadPoolUsageStats();
4949
final var originalNode1ThreadPoolStats = randomThreadPoolUsageStats();
5050
final var allocation = createRoutingAllocationWithRandomisedWriteLoads(
51+
Set.of(),
5152
originalNode0ThreadPoolStats,
52-
originalNode1ThreadPoolStats,
53-
Set.of()
53+
originalNode1ThreadPoolStats
5454
);
5555

5656
final var shardMovementWriteLoadSimulator = new ShardMovementWriteLoadSimulator(allocation);
@@ -66,13 +66,15 @@ public void testNoShardMovement() {
6666
);
6767
}
6868

69-
public void testMovementOfAShardWillMoveThreadPoolUtilisation() {
69+
public void testMovementOfAShardWillMoveThreadPoolStats() {
7070
final var originalNode0ThreadPoolStats = randomThreadPoolUsageStats();
7171
final var originalNode1ThreadPoolStats = randomThreadPoolUsageStats();
72+
final var originalNode2ThreadPoolStats = randomThreadPoolUsageStats();
7273
final var allocation = createRoutingAllocationWithRandomisedWriteLoads(
74+
Set.of(),
7375
originalNode0ThreadPoolStats,
7476
originalNode1ThreadPoolStats,
75-
Set.of()
77+
originalNode2ThreadPoolStats
7678
);
7779
final var shardMovementWriteLoadSimulator = new ShardMovementWriteLoadSimulator(allocation);
7880

@@ -84,7 +86,7 @@ public void testMovementOfAShardWillMoveThreadPoolUtilisation() {
8486
final ShardRouting movedAndStartedShard = allocation.routingNodes().startShard(moveShardTuple.v2(), NOOP, expectedShardSize);
8587

8688
final var calculatedNodeUsageStats = shardMovementWriteLoadSimulator.simulatedNodeUsageStatsForThreadPools();
87-
assertThat(calculatedNodeUsageStats, Matchers.aMapWithSize(2));
89+
assertThat(calculatedNodeUsageStats, Matchers.aMapWithSize(3));
8890

8991
final var shardWriteLoad = allocation.clusterInfo().getShardWriteLoads().get(randomShard.shardId());
9092
final var expectedUtilisationReductionAtSource = shardWriteLoad / originalNode0ThreadPoolStats.totalThreadPoolThreads();
@@ -109,6 +111,19 @@ public void testMovementOfAShardWillMoveThreadPoolUtilisation() {
109111
closeTo(expectedUtilisationIncreaseAtDestination, 0.001f)
110112
);
111113

114+
// Queue latency reduced for node_0 since it has a shard moved out
115+
assertThat(getMaxThreadPoolQueueLatency(shardMovementWriteLoadSimulator, "node_0"), equalTo(0L));
116+
// Queue latency stays unchanged for node_1 since it only has a shard moved in
117+
assertThat(
118+
getMaxThreadPoolQueueLatency(shardMovementWriteLoadSimulator, "node_1"),
119+
equalTo(originalNode1ThreadPoolStats.maxThreadPoolQueueLatencyMillis())
120+
);
121+
// Queue latency stays unchanged for node_2 since it has no shard movement
122+
assertThat(
123+
getMaxThreadPoolQueueLatency(shardMovementWriteLoadSimulator, "node_2"),
124+
equalTo(originalNode2ThreadPoolStats.maxThreadPoolQueueLatencyMillis())
125+
);
126+
112127
// Then move it back
113128
final var moveBackTuple = allocation.routingNodes()
114129
.relocateShard(movedAndStartedShard, "node_0", expectedShardSize, "testing", NOOP);
@@ -123,15 +138,25 @@ public void testMovementOfAShardWillMoveThreadPoolUtilisation() {
123138
getAverageWritePoolUtilization(shardMovementWriteLoadSimulator, "node_1"),
124139
equalTo(originalNode1ThreadPoolStats.averageThreadPoolUtilization())
125140
);
141+
142+
// We intentionally keep things simple so that if a shard has moved away from a node, its queue latency is reduced to zero
143+
// regardless of whether other shards have subsequently moved onto or out of the same node.
144+
assertThat(getMaxThreadPoolQueueLatency(shardMovementWriteLoadSimulator, "node_0"), equalTo(0L));
145+
assertThat(getMaxThreadPoolQueueLatency(shardMovementWriteLoadSimulator, "node_1"), equalTo(0L));
146+
// Queue latency stays unchanged for node_2 since it has no shard movement
147+
assertThat(
148+
getMaxThreadPoolQueueLatency(shardMovementWriteLoadSimulator, "node_2"),
149+
equalTo(originalNode2ThreadPoolStats.maxThreadPoolQueueLatencyMillis())
150+
);
126151
}
127152

128153
public void testMovementBetweenNodesWithNoThreadPoolAndWriteLoadStats() {
129154
final var originalNode0ThreadPoolStats = randomBoolean() ? randomThreadPoolUsageStats() : null;
130155
final var originalNode1ThreadPoolStats = randomBoolean() ? randomThreadPoolUsageStats() : null;
131156
final var allocation = createRoutingAllocationWithRandomisedWriteLoads(
157+
new HashSet<>(randomSubsetOf(Arrays.asList(INDICES))),
132158
originalNode0ThreadPoolStats,
133-
originalNode1ThreadPoolStats,
134-
new HashSet<>(randomSubsetOf(Arrays.asList(INDICES)))
159+
originalNode1ThreadPoolStats
135160
);
136161
final var shardMovementWriteLoadSimulator = new ShardMovementWriteLoadSimulator(allocation);
137162

@@ -147,12 +172,29 @@ public void testMovementBetweenNodesWithNoThreadPoolAndWriteLoadStats() {
147172
assertThat(simulated.containsKey("node_1"), equalTo(originalNode1ThreadPoolStats != null));
148173
}
149174

175+
public void testUpdateThreadPoolQueueLatencyWithShardMovements() {
176+
final long originalLatency = randomNonNegativeLong();
177+
178+
assertThat(
179+
ShardMovementWriteLoadSimulator.adjustThreadPoolQueueLatencyWithShardMovements(originalLatency, false),
180+
equalTo(originalLatency)
181+
);
182+
183+
assertThat(ShardMovementWriteLoadSimulator.adjustThreadPoolQueueLatencyWithShardMovements(originalLatency, true), equalTo(0L));
184+
}
185+
150186
private float getAverageWritePoolUtilization(ShardMovementWriteLoadSimulator shardMovementWriteLoadSimulator, String nodeId) {
151187
final var generatedNodeUsageStates = shardMovementWriteLoadSimulator.simulatedNodeUsageStatsForThreadPools();
152188
final var node0WritePoolStats = generatedNodeUsageStates.get(nodeId).threadPoolUsageStatsMap().get("write");
153189
return node0WritePoolStats.averageThreadPoolUtilization();
154190
}
155191

192+
private long getMaxThreadPoolQueueLatency(ShardMovementWriteLoadSimulator shardMovementWriteLoadSimulator, String nodeId) {
193+
final var generatedNodeUsageStates = shardMovementWriteLoadSimulator.simulatedNodeUsageStatsForThreadPools();
194+
final var writePoolStats = generatedNodeUsageStates.get(nodeId).threadPoolUsageStatsMap().get("write");
195+
return writePoolStats.maxThreadPoolQueueLatencyMillis();
196+
}
197+
156198
private NodeUsageStatsForThreadPools.ThreadPoolUsageStats randomThreadPoolUsageStats() {
157199
return new NodeUsageStatsForThreadPools.ThreadPoolUsageStats(
158200
randomIntBetween(4, 16),
@@ -162,16 +204,16 @@ private NodeUsageStatsForThreadPools.ThreadPoolUsageStats randomThreadPoolUsageS
162204
}
163205

164206
private RoutingAllocation createRoutingAllocationWithRandomisedWriteLoads(
165-
NodeUsageStatsForThreadPools.ThreadPoolUsageStats node0ThreadPoolStats,
166-
NodeUsageStatsForThreadPools.ThreadPoolUsageStats node1ThreadPoolStats,
167-
Set<String> indicesWithNoWriteLoad
207+
Set<String> indicesWithNoWriteLoad,
208+
NodeUsageStatsForThreadPools.ThreadPoolUsageStats... arrayOfNodeThreadPoolStats
168209
) {
169210
final Map<String, NodeUsageStatsForThreadPools> nodeUsageStats = new HashMap<>();
170-
if (node0ThreadPoolStats != null) {
171-
nodeUsageStats.put("node_0", new NodeUsageStatsForThreadPools("node_0", Map.of("write", node0ThreadPoolStats)));
172-
}
173-
if (node1ThreadPoolStats != null) {
174-
nodeUsageStats.put("node_1", new NodeUsageStatsForThreadPools("node_1", Map.of("write", node1ThreadPoolStats)));
211+
for (int i = 0; i < arrayOfNodeThreadPoolStats.length; i++) {
212+
final var nodeThreadPoolStats = arrayOfNodeThreadPoolStats[i];
213+
if (nodeThreadPoolStats != null) {
214+
final var nodeId = "node_" + i;
215+
nodeUsageStats.put(nodeId, new NodeUsageStatsForThreadPools(nodeId, Map.of("write", nodeThreadPoolStats)));
216+
}
175217
}
176218

177219
final ClusterState clusterState = createClusterState();

0 commit comments

Comments
 (0)