Skip to content

Commit 0ebd75d

Browse files
committed
Estimate impact of shard movement using node-level write load
1 parent dc48b4b commit 0ebd75d

File tree

3 files changed

+347
-3
lines changed

3 files changed

+347
-3
lines changed

server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
import org.elasticsearch.cluster.ClusterInfo.NodeAndShard;
1313
import org.elasticsearch.cluster.routing.ShardRouting;
14+
import org.elasticsearch.cluster.routing.WriteLoadPerShardSimulator;
1415
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
1516
import org.elasticsearch.common.util.CopyOnFirstWriteMap;
1617
import org.elasticsearch.index.shard.ShardId;
@@ -34,7 +35,7 @@ public class ClusterInfoSimulator {
3435
private final Map<ShardId, Long> shardDataSetSizes;
3536
private final Map<NodeAndShard, String> dataPath;
3637
private final Map<String, EstimatedHeapUsage> estimatedHeapUsages;
37-
private final Map<String, NodeUsageStatsForThreadPools> nodeThreadPoolUsageStats;
38+
private final WriteLoadPerShardSimulator writeLoadPerShardSimulator;
3839

3940
public ClusterInfoSimulator(RoutingAllocation allocation) {
4041
this.allocation = allocation;
@@ -44,7 +45,7 @@ public ClusterInfoSimulator(RoutingAllocation allocation) {
4445
this.shardDataSetSizes = Map.copyOf(allocation.clusterInfo().shardDataSetSizes);
4546
this.dataPath = Map.copyOf(allocation.clusterInfo().dataPath);
4647
this.estimatedHeapUsages = allocation.clusterInfo().getEstimatedHeapUsages();
47-
this.nodeThreadPoolUsageStats = allocation.clusterInfo().getNodeUsageStatsForThreadPools();
48+
this.writeLoadPerShardSimulator = new WriteLoadPerShardSimulator(allocation);
4849
}
4950

5051
/**
@@ -115,6 +116,7 @@ public void simulateShardStarted(ShardRouting shard) {
115116
shardSizes.put(shardIdentifierFromRouting(shard), project.getIndexSafe(shard.index()).ignoreDiskWatermarks() ? 0 : size);
116117
}
117118
}
119+
writeLoadPerShardSimulator.simulateShardStarted(shard);
118120
}
119121

120122
private void modifyDiskUsage(String nodeId, long freeDelta) {
@@ -159,7 +161,7 @@ public ClusterInfo getClusterInfo() {
159161
dataPath,
160162
Map.of(),
161163
estimatedHeapUsages,
162-
nodeThreadPoolUsageStats
164+
writeLoadPerShardSimulator.nodeUsageStatsForThreadPools()
163165
);
164166
}
165167
}
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.cluster.routing;
11+
12+
import com.carrotsearch.hppc.ObjectFloatHashMap;
13+
import com.carrotsearch.hppc.ObjectFloatMap;
14+
15+
import org.elasticsearch.cluster.NodeUsageStatsForThreadPools;
16+
import org.elasticsearch.cluster.metadata.IndexAbstraction;
17+
import org.elasticsearch.cluster.metadata.ProjectMetadata;
18+
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
19+
import org.elasticsearch.common.util.Maps;
20+
import org.elasticsearch.index.Index;
21+
import org.elasticsearch.index.shard.ShardId;
22+
import org.elasticsearch.threadpool.ThreadPool;
23+
24+
import java.util.HashMap;
25+
import java.util.HashSet;
26+
import java.util.Map;
27+
import java.util.Objects;
28+
import java.util.Set;
29+
import java.util.stream.Collectors;
30+
31+
public class WriteLoadPerShardSimulator {
32+
33+
private final ObjectFloatMap<String> writeLoadDeltas;
34+
private final RoutingAllocation routingAllocation;
35+
private final ObjectFloatMap<ShardId> writeLoadsPerShard;
36+
37+
public WriteLoadPerShardSimulator(RoutingAllocation routingAllocation) {
38+
this.routingAllocation = routingAllocation;
39+
this.writeLoadDeltas = new ObjectFloatHashMap<>();
40+
writeLoadsPerShard = estimateWriteLoadsPerShard(routingAllocation);
41+
}
42+
43+
public void simulateShardStarted(ShardRouting shardRouting) {
44+
float writeLoadForShard = writeLoadsPerShard.get(shardRouting.shardId());
45+
if (writeLoadForShard > 0.0) {
46+
if (shardRouting.relocatingNodeId() != null) {
47+
// relocating
48+
writeLoadDeltas.addTo(shardRouting.relocatingNodeId(), -1 * writeLoadForShard);
49+
writeLoadDeltas.addTo(shardRouting.currentNodeId(), writeLoadForShard);
50+
} else {
51+
// not sure how this would come about, perhaps when allocating a replica after a delay?
52+
writeLoadDeltas.addTo(shardRouting.currentNodeId(), writeLoadForShard);
53+
}
54+
}
55+
}
56+
57+
public Map<String, NodeUsageStatsForThreadPools> nodeUsageStatsForThreadPools() {
58+
return routingAllocation.clusterInfo()
59+
.getNodeUsageStatsForThreadPools()
60+
.entrySet()
61+
.stream()
62+
.collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, e -> {
63+
if (writeLoadDeltas.containsKey(e.getKey())) {
64+
return new NodeUsageStatsForThreadPools(
65+
e.getKey(),
66+
Maps.copyMapWithAddedOrReplacedEntry(
67+
e.getValue().threadPoolUsageStatsMap(),
68+
"write",
69+
replaceWritePoolStats(e.getValue(), writeLoadDeltas.get(e.getKey()))
70+
)
71+
);
72+
}
73+
return e.getValue();
74+
}));
75+
}
76+
77+
private NodeUsageStatsForThreadPools.ThreadPoolUsageStats replaceWritePoolStats(
78+
NodeUsageStatsForThreadPools value,
79+
float writeLoadDelta
80+
) {
81+
final NodeUsageStatsForThreadPools.ThreadPoolUsageStats writeThreadPoolStats = value.threadPoolUsageStatsMap()
82+
.get(ThreadPool.Names.WRITE);
83+
return new NodeUsageStatsForThreadPools.ThreadPoolUsageStats(
84+
writeThreadPoolStats.totalThreadPoolThreads(),
85+
writeThreadPoolStats.averageThreadPoolUtilization() + (writeLoadDelta / writeThreadPoolStats.totalThreadPoolThreads()),
86+
writeThreadPoolStats.averageThreadPoolQueueLatencyMillis()
87+
);
88+
}
89+
90+
// Everything below this line can probably go once we are publishing shard-write-load estimates to the master
91+
92+
private static ObjectFloatMap<ShardId> estimateWriteLoadsPerShard(RoutingAllocation allocation) {
93+
final Map<ShardId, Average> writeLoadPerShard = new HashMap<>();
94+
final Set<String> writeIndexNames = getWriteIndexNames(allocation);
95+
final Map<String, NodeUsageStatsForThreadPools> nodeUsageStatsForThreadPools = allocation.clusterInfo()
96+
.getNodeUsageStatsForThreadPools();
97+
for (final Map.Entry<String, NodeUsageStatsForThreadPools> usageStatsForThreadPoolsEntry : nodeUsageStatsForThreadPools
98+
.entrySet()) {
99+
final NodeUsageStatsForThreadPools value = usageStatsForThreadPoolsEntry.getValue();
100+
final NodeUsageStatsForThreadPools.ThreadPoolUsageStats writeThreadPoolStats = value.threadPoolUsageStatsMap()
101+
.get(ThreadPool.Names.WRITE);
102+
if (writeThreadPoolStats == null) {
103+
// No stats from this node yet
104+
continue;
105+
}
106+
float writeUtilisation = writeThreadPoolStats.averageThreadPoolUtilization() * writeThreadPoolStats.totalThreadPoolThreads();
107+
108+
final String nodeId = usageStatsForThreadPoolsEntry.getKey();
109+
final RoutingNode node = allocation.routingNodes().node(nodeId);
110+
final Set<ShardId> writeShardsOnNode = new HashSet<>();
111+
for (final ShardRouting shardRouting : node) {
112+
if (shardRouting.role() != ShardRouting.Role.SEARCH_ONLY && writeIndexNames.contains(shardRouting.index().getName())) {
113+
writeShardsOnNode.add(shardRouting.shardId());
114+
}
115+
}
116+
writeShardsOnNode.forEach(
117+
shardId -> writeLoadPerShard.computeIfAbsent(shardId, k -> new Average()).add(writeUtilisation / writeShardsOnNode.size())
118+
);
119+
}
120+
final ObjectFloatMap<ShardId> writeLoads = new ObjectFloatHashMap<>(writeLoadPerShard.size());
121+
writeLoadPerShard.forEach((shardId, average) -> writeLoads.put(shardId, average.get()));
122+
return writeLoads;
123+
}
124+
125+
private static Set<String> getWriteIndexNames(RoutingAllocation allocation) {
126+
return allocation.metadata()
127+
.projects()
128+
.values()
129+
.stream()
130+
.map(ProjectMetadata::getIndicesLookup)
131+
.flatMap(il -> il.values().stream())
132+
.map(IndexAbstraction::getWriteIndex)
133+
.filter(Objects::nonNull)
134+
.map(Index::getName)
135+
.collect(Collectors.toUnmodifiableSet());
136+
}
137+
138+
private static final class Average {
139+
int count;
140+
float sum;
141+
142+
public void add(float value) {
143+
count++;
144+
sum += value;
145+
}
146+
147+
public float get() {
148+
return sum / count;
149+
}
150+
}
151+
}
Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.cluster.routing;
11+
12+
import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
13+
import org.elasticsearch.cluster.ClusterInfo;
14+
import org.elasticsearch.cluster.ClusterState;
15+
import org.elasticsearch.cluster.NodeUsageStatsForThreadPools;
16+
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
17+
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
18+
import org.elasticsearch.snapshots.SnapshotShardSizeInfo;
19+
import org.elasticsearch.test.ESTestCase;
20+
import org.hamcrest.Matchers;
21+
22+
import java.util.HashMap;
23+
import java.util.List;
24+
import java.util.Map;
25+
import java.util.stream.StreamSupport;
26+
27+
import static org.hamcrest.Matchers.equalTo;
28+
import static org.hamcrest.Matchers.greaterThan;
29+
import static org.hamcrest.Matchers.lessThan;
30+
import static org.hamcrest.Matchers.sameInstance;
31+
32+
public class WriteLoadPerShardSimulatorTests extends ESTestCase {
33+
34+
private static final RoutingChangesObserver NOOP = new RoutingChangesObserver() {
35+
};
36+
37+
/**
38+
* We should not adjust the values if there's no movement
39+
*/
40+
public void testNoShardMovement() {
41+
final var originalNode0WriteLoadStats = new NodeUsageStatsForThreadPools.ThreadPoolUsageStats(
42+
randomIntBetween(4, 32),
43+
randomFloatBetween(0f, 1f, true),
44+
randomLongBetween(0L, 7_000L)
45+
);
46+
final var originalNode1WriteLoadStats = new NodeUsageStatsForThreadPools.ThreadPoolUsageStats(
47+
randomIntBetween(4, 32),
48+
randomFloatBetween(0f, 1f, true),
49+
randomLongBetween(0L, 7_000L)
50+
);
51+
final var allocation = createRoutingAllocation(originalNode0WriteLoadStats, originalNode1WriteLoadStats);
52+
53+
final var writeLoadPerShardSimulator = new WriteLoadPerShardSimulator(allocation);
54+
final var calculatedNodeUsageStates = writeLoadPerShardSimulator.nodeUsageStatsForThreadPools();
55+
assertThat(calculatedNodeUsageStates, Matchers.aMapWithSize(2));
56+
assertThat(
57+
calculatedNodeUsageStates.get("node_0").threadPoolUsageStatsMap().get("write"),
58+
sameInstance(originalNode0WriteLoadStats)
59+
);
60+
assertThat(
61+
calculatedNodeUsageStates.get("node_1").threadPoolUsageStatsMap().get("write"),
62+
sameInstance(originalNode1WriteLoadStats)
63+
);
64+
}
65+
66+
public void testMovementOfAShardWillReduceThreadPoolUtilisation() {
67+
final var originalNode0WriteLoadStats = new NodeUsageStatsForThreadPools.ThreadPoolUsageStats(
68+
randomIntBetween(4, 16),
69+
randomFloatBetween(0.2f, 1.0f, true),
70+
0
71+
);
72+
final var originalNode1WriteLoadStats = new NodeUsageStatsForThreadPools.ThreadPoolUsageStats(
73+
randomIntBetween(4, 16),
74+
randomFloatBetween(0.1f, 0.5f, true),
75+
0
76+
);
77+
final var allocation = createRoutingAllocation(originalNode0WriteLoadStats, originalNode1WriteLoadStats);
78+
79+
final var writeLoadPerShardSimulator = new WriteLoadPerShardSimulator(allocation);
80+
81+
// Relocate a random shard from node_0 to node_1
82+
final var randomShard = randomFrom(StreamSupport.stream(allocation.routingNodes().node("node_0").spliterator(), false).toList());
83+
final var moveShardTuple = allocation.routingNodes().relocateShard(randomShard, "node_1", randomNonNegativeLong(), "testing", NOOP);
84+
writeLoadPerShardSimulator.simulateShardStarted(moveShardTuple.v2());
85+
86+
final var calculatedNodeUsageStates = writeLoadPerShardSimulator.nodeUsageStatsForThreadPools();
87+
assertThat(calculatedNodeUsageStates, Matchers.aMapWithSize(2));
88+
89+
// Some node_0 utilization should have been moved to node_1
90+
assertThat(
91+
getAverageWritePoolUtilization(writeLoadPerShardSimulator, "node_0"),
92+
lessThan(originalNode0WriteLoadStats.averageThreadPoolUtilization())
93+
);
94+
assertThat(
95+
getAverageWritePoolUtilization(writeLoadPerShardSimulator, "node_1"),
96+
greaterThan(originalNode1WriteLoadStats.averageThreadPoolUtilization())
97+
);
98+
}
99+
100+
public void testMovementFollowedByMovementBackWillNotChangeAnything() {
101+
final var originalNode0WriteLoadStats = randomUsageStats();
102+
final var originalNode1WriteLoadStats = randomUsageStats();
103+
final var allocation = createRoutingAllocation(originalNode0WriteLoadStats, originalNode1WriteLoadStats);
104+
final var writeLoadPerShardSimulator = new WriteLoadPerShardSimulator(allocation);
105+
106+
// Relocate a random shard from node_0 to node_1
107+
final long expectedShardSize = randomNonNegativeLong();
108+
final var randomShard = randomFrom(StreamSupport.stream(allocation.routingNodes().node("node_0").spliterator(), false).toList());
109+
final var moveShardTuple = allocation.routingNodes().relocateShard(randomShard, "node_1", expectedShardSize, "testing", NOOP);
110+
writeLoadPerShardSimulator.simulateShardStarted(moveShardTuple.v2());
111+
final ShardRouting movedAndStartedShard = allocation.routingNodes().startShard(moveShardTuple.v2(), NOOP, expectedShardSize);
112+
113+
// Some node_0 utilization should have been moved to node_1
114+
assertThat(
115+
getAverageWritePoolUtilization(writeLoadPerShardSimulator, "node_0"),
116+
lessThan(originalNode0WriteLoadStats.averageThreadPoolUtilization())
117+
);
118+
assertThat(
119+
getAverageWritePoolUtilization(writeLoadPerShardSimulator, "node_1"),
120+
greaterThan(originalNode1WriteLoadStats.averageThreadPoolUtilization())
121+
);
122+
123+
// Then move it back
124+
final var moveBackTuple = allocation.routingNodes()
125+
.relocateShard(movedAndStartedShard, "node_0", expectedShardSize, "testing", NOOP);
126+
writeLoadPerShardSimulator.simulateShardStarted(moveBackTuple.v2());
127+
128+
// The utilization numbers should be back to their original values
129+
assertThat(
130+
getAverageWritePoolUtilization(writeLoadPerShardSimulator, "node_0"),
131+
equalTo(originalNode0WriteLoadStats.averageThreadPoolUtilization())
132+
);
133+
assertThat(
134+
getAverageWritePoolUtilization(writeLoadPerShardSimulator, "node_1"),
135+
equalTo(originalNode1WriteLoadStats.averageThreadPoolUtilization())
136+
);
137+
}
138+
139+
public void testMovementBetweenNodesWithNoThreadPoolStats() {
140+
final var originalNode0WriteLoadStats = randomBoolean() ? randomUsageStats() : null;
141+
final var originalNode1WriteLoadStats = randomBoolean() ? randomUsageStats() : null;
142+
final var allocation = createRoutingAllocation(originalNode0WriteLoadStats, originalNode1WriteLoadStats);
143+
final var writeLoadPerShardSimulator = new WriteLoadPerShardSimulator(allocation);
144+
145+
// Relocate a random shard from node_0 to node_1
146+
final long expectedShardSize = randomNonNegativeLong();
147+
final var randomShard = randomFrom(StreamSupport.stream(allocation.routingNodes().node("node_0").spliterator(), false).toList());
148+
final var moveShardTuple = allocation.routingNodes().relocateShard(randomShard, "node_1", expectedShardSize, "testing", NOOP);
149+
writeLoadPerShardSimulator.simulateShardStarted(moveShardTuple.v2());
150+
allocation.routingNodes().startShard(moveShardTuple.v2(), NOOP, expectedShardSize);
151+
152+
final var generated = writeLoadPerShardSimulator.nodeUsageStatsForThreadPools();
153+
assertThat(generated.containsKey("node_0"), equalTo(originalNode0WriteLoadStats != null));
154+
assertThat(generated.containsKey("node_1"), equalTo(originalNode1WriteLoadStats != null));
155+
}
156+
157+
private float getAverageWritePoolUtilization(WriteLoadPerShardSimulator writeLoadPerShardSimulator, String nodeId) {
158+
final var generatedNodeUsageStates = writeLoadPerShardSimulator.nodeUsageStatsForThreadPools();
159+
final var node0WritePoolStats = generatedNodeUsageStates.get(nodeId).threadPoolUsageStatsMap().get("write");
160+
return node0WritePoolStats.averageThreadPoolUtilization();
161+
}
162+
163+
private NodeUsageStatsForThreadPools.ThreadPoolUsageStats randomUsageStats() {
164+
return new NodeUsageStatsForThreadPools.ThreadPoolUsageStats(randomIntBetween(4, 16), randomFloatBetween(0.0f, 1.0f, false), 0);
165+
}
166+
167+
private RoutingAllocation createRoutingAllocation(
168+
NodeUsageStatsForThreadPools.ThreadPoolUsageStats node0WriteLoadStats,
169+
NodeUsageStatsForThreadPools.ThreadPoolUsageStats node1WriteLoadStats
170+
) {
171+
final Map<String, NodeUsageStatsForThreadPools> nodeUsageStats = new HashMap<>();
172+
if (node0WriteLoadStats != null) {
173+
nodeUsageStats.put("node_0", new NodeUsageStatsForThreadPools("node_0", Map.of("write", node0WriteLoadStats)));
174+
}
175+
if (node1WriteLoadStats != null) {
176+
nodeUsageStats.put("node_1", new NodeUsageStatsForThreadPools("node_1", Map.of("write", node1WriteLoadStats)));
177+
}
178+
179+
return new RoutingAllocation(
180+
new AllocationDeciders(List.of()),
181+
createClusterState(),
182+
ClusterInfo.builder().nodeUsageStatsForThreadPools(nodeUsageStats).build(),
183+
SnapshotShardSizeInfo.EMPTY,
184+
System.nanoTime()
185+
).mutableCloneForSimulation();
186+
}
187+
188+
private ClusterState createClusterState() {
189+
return ClusterStateCreationUtils.stateWithAssignedPrimariesAndReplicas(new String[] { "indexOne", "indexTwo", "indexThree" }, 3, 0);
190+
}
191+
}

0 commit comments

Comments
 (0)