Skip to content

Commit 14c9fb0

Browse files
authored
Ensure shard movement is always recorded for hotspot mitigation (elastic#138142)
We should record the movement even if the shard has no write load. This ensures hot-spot migitation happens only once every ClusterInfo polling cycle as we agreed. Resolves: elastic#138137
1 parent 770c334 commit 14c9fb0

File tree

4 files changed

+118
-20
lines changed

4 files changed

+118
-20
lines changed

muted-tests.yml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -477,9 +477,6 @@ tests:
477477
- class: org.elasticsearch.xpack.downsample.DataStreamLifecycleDownsampleIT
478478
method: testUpdateDownsampleSamplingMode
479479
issue: https://github.com/elastic/elasticsearch/issues/138135
480-
- class: org.elasticsearch.indices.mapping.UpdateMappingIntegrationIT
481-
method: testUpdateMappingWithConflicts
482-
issue: https://github.com/elastic/elasticsearch/issues/138137
483480

484481
# Examples:
485482
#

server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,14 +51,17 @@ public void simulateShardStarted(ShardRouting shardRouting) {
5151
// This is a shard being relocated
5252
simulatedNodeWriteLoadDeltas.addTo(shardRouting.relocatingNodeId(), -1 * writeLoadForShard);
5353
simulatedNodeWriteLoadDeltas.addTo(shardRouting.currentNodeId(), writeLoadForShard);
54-
nodesWithMovedAwayShard.add(shardRouting.relocatingNodeId());
5554
} else {
5655
// This is a new shard starting, it's unlikely we'll have a write-load value for a new
5756
// shard, but we may be able to estimate if the new shard is created as part of a datastream
5857
// rollover. See https://elasticco.atlassian.net/browse/ES-12469
5958
simulatedNodeWriteLoadDeltas.addTo(shardRouting.currentNodeId(), writeLoadForShard);
6059
}
6160
}
61+
// Always record the moving shard regardless whether its write load is adjusted
62+
if (shardRouting.relocatingNodeId() != null) {
63+
nodesWithMovedAwayShard.add(shardRouting.relocatingNodeId());
64+
}
6265
}
6366

6467
/**
@@ -69,15 +72,15 @@ public Map<String, NodeUsageStatsForThreadPools> simulatedNodeUsageStatsForThrea
6972
originalNodeUsageStatsForThreadPools.size()
7073
);
7174
for (Map.Entry<String, NodeUsageStatsForThreadPools> entry : originalNodeUsageStatsForThreadPools.entrySet()) {
72-
if (simulatedNodeWriteLoadDeltas.containsKey(entry.getKey())) {
75+
if (simulatedNodeWriteLoadDeltas.containsKey(entry.getKey()) || nodesWithMovedAwayShard.contains(entry.getKey())) {
7376
var adjustedValue = new NodeUsageStatsForThreadPools(
7477
entry.getKey(),
7578
Maps.copyMapWithAddedOrReplacedEntry(
7679
entry.getValue().threadPoolUsageStatsMap(),
7780
ThreadPool.Names.WRITE,
7881
replaceWritePoolStats(
7982
entry.getValue(),
80-
simulatedNodeWriteLoadDeltas.get(entry.getKey()),
83+
simulatedNodeWriteLoadDeltas.getOrDefault(entry.getKey(), 0.0),
8184
nodesWithMovedAwayShard.contains(entry.getKey())
8285
)
8386
)

server/src/test/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulatorTests.java

Lines changed: 58 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,39 @@ public void testMovementBetweenNodesWithNoThreadPoolAndWriteLoadStats() {
172172
assertThat(simulated.containsKey("node_1"), equalTo(originalNode1ThreadPoolStats != null));
173173
}
174174

175+
public void testShardWithNoWriteLoadStillResetsQueueLatency() {
176+
final ClusterState clusterState = createClusterState();
177+
final var allocation = createRoutingAllocationWithShardWriteLoads(
178+
clusterState,
179+
Set.of(INDICES),
180+
Map.of(),
181+
randomThreadPoolUsageStats(),
182+
randomThreadPoolUsageStats()
183+
);
184+
final var shardMovementWriteLoadSimulator = new ShardMovementWriteLoadSimulator(allocation);
185+
186+
// Relocate a random shard from node_0 to node_1
187+
final var randomShard = randomFrom(StreamSupport.stream(allocation.routingNodes().node("node_0").spliterator(), false).toList());
188+
final var moveShardTuple = allocation.routingNodes().relocateShard(randomShard, "node_1", 0, "testing", NOOP);
189+
shardMovementWriteLoadSimulator.simulateShardStarted(moveShardTuple.v2());
190+
191+
final var simulated = shardMovementWriteLoadSimulator.simulatedNodeUsageStatsForThreadPools();
192+
final var threadPoolUsageStats = simulated.get("node_0").threadPoolUsageStatsMap().get("write");
193+
assertThat(threadPoolUsageStats.maxThreadPoolQueueLatencyMillis(), equalTo(0L)); // queue latency is reset
194+
// No change to write load since shard has no write load
195+
assertThat(
196+
threadPoolUsageStats.averageThreadPoolUtilization(),
197+
equalTo(
198+
allocation.clusterInfo()
199+
.getNodeUsageStatsForThreadPools()
200+
.get("node_0")
201+
.threadPoolUsageStatsMap()
202+
.get("write")
203+
.averageThreadPoolUtilization()
204+
)
205+
);
206+
}
207+
175208
public void testUpdateThreadPoolQueueLatencyWithShardMovements() {
176209
final long originalLatency = randomNonNegativeLong();
177210

@@ -206,6 +239,30 @@ private NodeUsageStatsForThreadPools.ThreadPoolUsageStats randomThreadPoolUsageS
206239
private RoutingAllocation createRoutingAllocationWithRandomisedWriteLoads(
207240
Set<String> indicesWithNoWriteLoad,
208241
NodeUsageStatsForThreadPools.ThreadPoolUsageStats... arrayOfNodeThreadPoolStats
242+
) {
243+
final ClusterState clusterState = createClusterState();
244+
final Map<ShardId, Double> shardWriteLoads = clusterState.metadata()
245+
.getProject(ProjectId.DEFAULT)
246+
.stream()
247+
.filter(index -> indicesWithNoWriteLoad.contains(index.getIndex().getName()) == false)
248+
.flatMap(index -> IntStream.range(0, 3).mapToObj(shardNum -> new ShardId(index.getIndex(), shardNum)))
249+
.collect(
250+
Collectors.toUnmodifiableMap(shardId -> shardId, shardId -> randomBoolean() ? 0.0f : randomDoubleBetween(0.1, 5.0, true))
251+
);
252+
253+
return createRoutingAllocationWithShardWriteLoads(
254+
clusterState,
255+
indicesWithNoWriteLoad,
256+
shardWriteLoads,
257+
arrayOfNodeThreadPoolStats
258+
);
259+
}
260+
261+
private RoutingAllocation createRoutingAllocationWithShardWriteLoads(
262+
ClusterState clusterState,
263+
Set<String> indicesWithNoWriteLoad,
264+
Map<ShardId, Double> shardWriteLoads,
265+
NodeUsageStatsForThreadPools.ThreadPoolUsageStats... arrayOfNodeThreadPoolStats
209266
) {
210267
final Map<String, NodeUsageStatsForThreadPools> nodeUsageStats = new HashMap<>();
211268
for (int i = 0; i < arrayOfNodeThreadPoolStats.length; i++) {
@@ -216,22 +273,9 @@ private RoutingAllocation createRoutingAllocationWithRandomisedWriteLoads(
216273
}
217274
}
218275

219-
final ClusterState clusterState = createClusterState();
220276
final ClusterInfo clusterInfo = ClusterInfo.builder()
221277
.nodeUsageStatsForThreadPools(nodeUsageStats)
222-
.shardWriteLoads(
223-
clusterState.metadata()
224-
.getProject(ProjectId.DEFAULT)
225-
.stream()
226-
.filter(index -> indicesWithNoWriteLoad.contains(index.getIndex().getName()) == false)
227-
.flatMap(index -> IntStream.range(0, 3).mapToObj(shardNum -> new ShardId(index.getIndex(), shardNum)))
228-
.collect(
229-
Collectors.toUnmodifiableMap(
230-
shardId -> shardId,
231-
shardId -> randomBoolean() ? 0.0f : randomDoubleBetween(0.1, 5.0, true)
232-
)
233-
)
234-
)
278+
.shardWriteLoads(shardWriteLoads)
235279
.build();
236280

237281
return new RoutingAllocation(

server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputerTests.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.elasticsearch.cluster.ClusterState;
2020
import org.elasticsearch.cluster.DiskUsage;
2121
import org.elasticsearch.cluster.ESAllocationTestCase;
22+
import org.elasticsearch.cluster.NodeUsageStatsForThreadPools;
2223
import org.elasticsearch.cluster.RestoreInProgress;
2324
import org.elasticsearch.cluster.TestShardRoutingRoleStrategies;
2425
import org.elasticsearch.cluster.metadata.IndexMetadata;
@@ -99,6 +100,7 @@
99100
import static org.elasticsearch.cluster.routing.ShardRoutingState.UNASSIGNED;
100101
import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting;
101102
import static org.elasticsearch.cluster.routing.TestShardRouting.shardRoutingBuilder;
103+
import static org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING;
102104
import static org.elasticsearch.common.settings.ClusterSettings.createBuiltInClusterSettings;
103105
import static org.elasticsearch.test.MockLog.assertThatLogger;
104106
import static org.hamcrest.Matchers.aMapWithSize;
@@ -173,6 +175,58 @@ public void testStopsComputingWhenStale() {
173175
);
174176
}
175177

178+
public void testNoInfiniteLoopBetweenHotspotMitigationAndBalancing() {
179+
// This test demonstrates that the computer does not get stuck in an infinite loop when moveShards and balancer moving against
180+
// each other. This is done by configuring two shards each on its own node.
181+
// - Shard 0 with no write load on node-0 with node load 0.92 and queue latency 15s
182+
// - Shard 1 with some write load on node-1 with no node load nor queue latency
183+
// 1. MoveShard will want to move shard 0 off node-0 to node-1 for hot-spot mitigation.
184+
// 2. Balance will want to move shard 0 back to node-0 to spread the index. Balancer always picks shard 0 because it has
185+
// no write load thus write load decider says YES
186+
// The computation should stop after one round of moveShards + balance because hot-spot is considered mitigated after
187+
// a single shard movement and breaks the loop.
188+
final var initialState = createInitialClusterState(2, 2, 0);
189+
final var index = initialState.metadata().getProject(ProjectId.DEFAULT).index(TEST_INDEX).getIndex();
190+
final RoutingNodes routingNodes = initialState.getRoutingNodes().mutableCopy();
191+
final var changes = mock(RoutingChangesObserver.class);
192+
for (var iterator = routingNodes.unassigned().iterator(); iterator.hasNext();) {
193+
var shardRouting = iterator.next();
194+
routingNodes.startShard(
195+
iterator.initialize(shardRouting.shardId().id() == 0 ? "node-0" : "node-1", null, 0L, changes),
196+
changes,
197+
0L
198+
);
199+
}
200+
final var clusterState = rebuildRoutingTable(initialState, routingNodes);
201+
202+
final var clusterInfo = ClusterInfo.builder()
203+
.shardWriteLoads(Map.of(new ShardId(index, 1), 0.004379))
204+
.nodeUsageStatsForThreadPools(
205+
Map.of(
206+
"node-0",
207+
new NodeUsageStatsForThreadPools(
208+
"node-0",
209+
Map.of("write", new NodeUsageStatsForThreadPools.ThreadPoolUsageStats(4, 0.92f, 15732))
210+
211+
),
212+
"node-1",
213+
new NodeUsageStatsForThreadPools(
214+
"node-1",
215+
Map.of("write", new NodeUsageStatsForThreadPools.ThreadPoolUsageStats(4, 0.0f, 0))
216+
)
217+
)
218+
)
219+
.build();
220+
221+
final var settings = Settings.builder().put(WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(), "enabled").build();
222+
final var routingAllocation = routingAllocationWithDecidersOf(clusterState, clusterInfo, settings);
223+
final var input = new DesiredBalanceInput(42, routingAllocation, List.of());
224+
final var computer = createDesiredBalanceComputer(new BalancedShardsAllocator(settings));
225+
var balance = computer.compute(DesiredBalance.BECOME_MASTER_INITIAL, input, queue(), ignored -> true);
226+
assertThat(balance.getAssignment(new ShardId(index, 0)).nodeIds(), equalTo(Set.of("node-0")));
227+
assertThat(balance.getAssignment(new ShardId(index, 1)).nodeIds(), equalTo(Set.of("node-1")));
228+
}
229+
176230
public void testIgnoresOutOfScopePrimaries() {
177231
var desiredBalanceComputer = createDesiredBalanceComputer();
178232
var clusterState = mutateAllocationStatuses(createInitialClusterState(3));

0 commit comments

Comments
 (0)