Skip to content

Commit 9ca9b4b

Browse files
committed
Use shard write loads instead of estimating
1 parent 9e36975 commit 9ca9b4b

File tree

2 files changed

+45
-87
lines changed

2 files changed

+45
-87
lines changed

server/src/main/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulator.java

Lines changed: 11 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -9,40 +9,33 @@
99

1010
package org.elasticsearch.cluster.routing;
1111

12-
import com.carrotsearch.hppc.ObjectFloatHashMap;
13-
import com.carrotsearch.hppc.ObjectFloatMap;
12+
import com.carrotsearch.hppc.ObjectDoubleHashMap;
13+
import com.carrotsearch.hppc.ObjectDoubleMap;
1414

1515
import org.elasticsearch.cluster.NodeUsageStatsForThreadPools;
16-
import org.elasticsearch.cluster.metadata.IndexAbstraction;
17-
import org.elasticsearch.cluster.metadata.ProjectMetadata;
1816
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
1917
import org.elasticsearch.common.util.Maps;
20-
import org.elasticsearch.index.Index;
2118
import org.elasticsearch.index.shard.ShardId;
2219
import org.elasticsearch.threadpool.ThreadPool;
2320

24-
import java.util.HashMap;
25-
import java.util.HashSet;
2621
import java.util.Map;
27-
import java.util.Objects;
28-
import java.util.Set;
2922
import java.util.stream.Collectors;
3023

3124
public class WriteLoadPerShardSimulator {
3225

33-
private final ObjectFloatMap<String> simulatedWriteLoadDeltas;
26+
private final ObjectDoubleMap<String> simulatedWriteLoadDeltas;
3427
private final RoutingAllocation routingAllocation;
35-
private final ObjectFloatMap<ShardId> writeLoadsPerShard;
28+
private final Map<ShardId, Double> writeLoadsPerShard;
3629

3730
public WriteLoadPerShardSimulator(RoutingAllocation routingAllocation) {
3831
this.routingAllocation = routingAllocation;
39-
this.simulatedWriteLoadDeltas = new ObjectFloatHashMap<>();
40-
writeLoadsPerShard = estimateWriteLoadsPerShard(routingAllocation);
32+
this.simulatedWriteLoadDeltas = new ObjectDoubleHashMap<>();
33+
writeLoadsPerShard = routingAllocation.clusterInfo().getShardWriteLoads();
4134
}
4235

4336
public void simulateShardStarted(ShardRouting shardRouting) {
44-
final float writeLoadForShard = writeLoadsPerShard.get(shardRouting.shardId());
45-
if (writeLoadForShard > 0.0) {
37+
final Double writeLoadForShard = writeLoadsPerShard.get(shardRouting.shardId());
38+
if (writeLoadForShard != null) {
4639
if (shardRouting.relocatingNodeId() != null) {
4740
// relocating
4841
simulatedWriteLoadDeltas.addTo(shardRouting.relocatingNodeId(), -1 * writeLoadForShard);
@@ -76,76 +69,15 @@ public Map<String, NodeUsageStatsForThreadPools> nodeUsageStatsForThreadPools()
7669

7770
private NodeUsageStatsForThreadPools.ThreadPoolUsageStats replaceWritePoolStats(
7871
NodeUsageStatsForThreadPools value,
79-
float writeLoadDelta
72+
double writeLoadDelta
8073
) {
8174
final NodeUsageStatsForThreadPools.ThreadPoolUsageStats writeThreadPoolStats = value.threadPoolUsageStatsMap()
8275
.get(ThreadPool.Names.WRITE);
8376
return new NodeUsageStatsForThreadPools.ThreadPoolUsageStats(
8477
writeThreadPoolStats.totalThreadPoolThreads(),
85-
writeThreadPoolStats.averageThreadPoolUtilization() + (writeLoadDelta / writeThreadPoolStats.totalThreadPoolThreads()),
78+
(float) (writeThreadPoolStats.averageThreadPoolUtilization() + (writeLoadDelta / writeThreadPoolStats
79+
.totalThreadPoolThreads())),
8680
writeThreadPoolStats.averageThreadPoolQueueLatencyMillis()
8781
);
8882
}
89-
90-
// Everything below this line can probably go once we are publishing shard-write-load estimates to the master
91-
92-
private static ObjectFloatMap<ShardId> estimateWriteLoadsPerShard(RoutingAllocation allocation) {
93-
final Map<ShardId, Average> writeLoadPerShard = new HashMap<>();
94-
final Set<String> writeIndexNames = getWriteIndexNames(allocation);
95-
final Map<String, NodeUsageStatsForThreadPools> nodeUsageStatsForThreadPools = allocation.clusterInfo()
96-
.getNodeUsageStatsForThreadPools();
97-
for (final Map.Entry<String, NodeUsageStatsForThreadPools> usageStatsForThreadPoolsEntry : nodeUsageStatsForThreadPools
98-
.entrySet()) {
99-
final NodeUsageStatsForThreadPools value = usageStatsForThreadPoolsEntry.getValue();
100-
final NodeUsageStatsForThreadPools.ThreadPoolUsageStats writeThreadPoolStats = value.threadPoolUsageStatsMap()
101-
.get(ThreadPool.Names.WRITE);
102-
if (writeThreadPoolStats == null) {
103-
// No stats from this node yet
104-
continue;
105-
}
106-
float writeUtilisation = writeThreadPoolStats.averageThreadPoolUtilization() * writeThreadPoolStats.totalThreadPoolThreads();
107-
108-
final String nodeId = usageStatsForThreadPoolsEntry.getKey();
109-
final RoutingNode node = allocation.routingNodes().node(nodeId);
110-
final Set<ShardId> writeShardsOnNode = new HashSet<>();
111-
for (final ShardRouting shardRouting : node) {
112-
if (shardRouting.role() != ShardRouting.Role.SEARCH_ONLY && writeIndexNames.contains(shardRouting.index().getName())) {
113-
writeShardsOnNode.add(shardRouting.shardId());
114-
}
115-
}
116-
writeShardsOnNode.forEach(
117-
shardId -> writeLoadPerShard.computeIfAbsent(shardId, k -> new Average()).add(writeUtilisation / writeShardsOnNode.size())
118-
);
119-
}
120-
final ObjectFloatMap<ShardId> writeLoads = new ObjectFloatHashMap<>(writeLoadPerShard.size());
121-
writeLoadPerShard.forEach((shardId, average) -> writeLoads.put(shardId, average.get()));
122-
return writeLoads;
123-
}
124-
125-
private static Set<String> getWriteIndexNames(RoutingAllocation allocation) {
126-
return allocation.metadata()
127-
.projects()
128-
.values()
129-
.stream()
130-
.map(ProjectMetadata::getIndicesLookup)
131-
.flatMap(indicesLookup -> indicesLookup.values().stream())
132-
.map(IndexAbstraction::getWriteIndex)
133-
.filter(Objects::nonNull)
134-
.map(Index::getName)
135-
.collect(Collectors.toUnmodifiableSet());
136-
}
137-
138-
private static final class Average {
139-
int count;
140-
float sum;
141-
142-
public void add(float value) {
143-
count++;
144-
sum += value;
145-
}
146-
147-
public float get() {
148-
return sum / count;
149-
}
150-
}
15183
}

server/src/test/java/org/elasticsearch/cluster/routing/WriteLoadPerShardSimulatorTests.java

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,22 @@
1313
import org.elasticsearch.cluster.ClusterInfo;
1414
import org.elasticsearch.cluster.ClusterState;
1515
import org.elasticsearch.cluster.NodeUsageStatsForThreadPools;
16+
import org.elasticsearch.cluster.metadata.ProjectId;
1617
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
1718
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
19+
import org.elasticsearch.index.shard.ShardId;
1820
import org.elasticsearch.snapshots.SnapshotShardSizeInfo;
1921
import org.elasticsearch.test.ESTestCase;
2022
import org.hamcrest.Matchers;
2123

24+
import java.util.Arrays;
2225
import java.util.HashMap;
26+
import java.util.HashSet;
2327
import java.util.List;
2428
import java.util.Map;
29+
import java.util.Set;
30+
import java.util.stream.Collectors;
31+
import java.util.stream.IntStream;
2532
import java.util.stream.StreamSupport;
2633

2734
import static org.hamcrest.Matchers.equalTo;
@@ -33,14 +40,15 @@ public class WriteLoadPerShardSimulatorTests extends ESTestCase {
3340

3441
private static final RoutingChangesObserver NOOP = new RoutingChangesObserver() {
3542
};
43+
public static final String[] INDICES = { "indexOne", "indexTwo", "indexThree" };
3644

3745
/**
3846
* We should not adjust the values if there's no movement
3947
*/
4048
public void testNoShardMovement() {
4149
final var originalNode0WriteLoadStats = randomUsageStats();
4250
final var originalNode1WriteLoadStats = randomUsageStats();
43-
final var allocation = createRoutingAllocation(originalNode0WriteLoadStats, originalNode1WriteLoadStats);
51+
final var allocation = createRoutingAllocation(originalNode0WriteLoadStats, originalNode1WriteLoadStats, Set.of());
4452

4553
final var writeLoadPerShardSimulator = new WriteLoadPerShardSimulator(allocation);
4654
final var calculatedNodeUsageStates = writeLoadPerShardSimulator.nodeUsageStatsForThreadPools();
@@ -58,7 +66,7 @@ public void testNoShardMovement() {
5866
public void testMovementOfAShardWillReduceThreadPoolUtilisation() {
5967
final var originalNode0WriteLoadStats = randomUsageStats();
6068
final var originalNode1WriteLoadStats = randomUsageStats();
61-
final var allocation = createRoutingAllocation(originalNode0WriteLoadStats, originalNode1WriteLoadStats);
69+
final var allocation = createRoutingAllocation(originalNode0WriteLoadStats, originalNode1WriteLoadStats, Set.of());
6270
final var writeLoadPerShardSimulator = new WriteLoadPerShardSimulator(allocation);
6371

6472
// Relocate a random shard from node_0 to node_1
@@ -83,7 +91,7 @@ public void testMovementOfAShardWillReduceThreadPoolUtilisation() {
8391
public void testMovementFollowedByMovementBackWillNotChangeAnything() {
8492
final var originalNode0WriteLoadStats = randomUsageStats();
8593
final var originalNode1WriteLoadStats = randomUsageStats();
86-
final var allocation = createRoutingAllocation(originalNode0WriteLoadStats, originalNode1WriteLoadStats);
94+
final var allocation = createRoutingAllocation(originalNode0WriteLoadStats, originalNode1WriteLoadStats, Set.of());
8795
final var writeLoadPerShardSimulator = new WriteLoadPerShardSimulator(allocation);
8896

8997
// Relocate a random shard from node_0 to node_1
@@ -122,7 +130,11 @@ public void testMovementFollowedByMovementBackWillNotChangeAnything() {
122130
public void testMovementBetweenNodesWithNoThreadPoolStats() {
123131
final var originalNode0WriteLoadStats = randomBoolean() ? randomUsageStats() : null;
124132
final var originalNode1WriteLoadStats = randomBoolean() ? randomUsageStats() : null;
125-
final var allocation = createRoutingAllocation(originalNode0WriteLoadStats, originalNode1WriteLoadStats);
133+
final var allocation = createRoutingAllocation(
134+
originalNode0WriteLoadStats,
135+
originalNode1WriteLoadStats,
136+
new HashSet<>(randomSubsetOf(Arrays.asList(INDICES)))
137+
);
126138
final var writeLoadPerShardSimulator = new WriteLoadPerShardSimulator(allocation);
127139

128140
// Relocate a random shard from node_0 to node_1
@@ -153,7 +165,8 @@ private NodeUsageStatsForThreadPools.ThreadPoolUsageStats randomUsageStats() {
153165

154166
private RoutingAllocation createRoutingAllocation(
155167
NodeUsageStatsForThreadPools.ThreadPoolUsageStats node0WriteLoadStats,
156-
NodeUsageStatsForThreadPools.ThreadPoolUsageStats node1WriteLoadStats
168+
NodeUsageStatsForThreadPools.ThreadPoolUsageStats node1WriteLoadStats,
169+
Set<String> indicesWithNoWriteLoad
157170
) {
158171
final Map<String, NodeUsageStatsForThreadPools> nodeUsageStats = new HashMap<>();
159172
if (node0WriteLoadStats != null) {
@@ -163,16 +176,29 @@ private RoutingAllocation createRoutingAllocation(
163176
nodeUsageStats.put("node_1", new NodeUsageStatsForThreadPools("node_1", Map.of("write", node1WriteLoadStats)));
164177
}
165178

179+
final ClusterState clusterState = createClusterState();
180+
final ClusterInfo clusterInfo = ClusterInfo.builder()
181+
.nodeUsageStatsForThreadPools(nodeUsageStats)
182+
.shardWriteLoads(
183+
clusterState.metadata()
184+
.getProject(ProjectId.DEFAULT)
185+
.stream()
186+
.filter(index -> indicesWithNoWriteLoad.contains(index.getIndex().getName()) == false)
187+
.flatMap(index -> IntStream.range(0, 3).mapToObj(shardNum -> new ShardId(index.getIndex(), shardNum)))
188+
.collect(Collectors.toUnmodifiableMap(shardId -> shardId, shardId -> randomDoubleBetween(0.1, 5.0, true)))
189+
)
190+
.build();
191+
166192
return new RoutingAllocation(
167193
new AllocationDeciders(List.of()),
168-
createClusterState(),
169-
ClusterInfo.builder().nodeUsageStatsForThreadPools(nodeUsageStats).build(),
194+
clusterState,
195+
clusterInfo,
170196
SnapshotShardSizeInfo.EMPTY,
171197
System.nanoTime()
172198
).mutableCloneForSimulation();
173199
}
174200

175201
private ClusterState createClusterState() {
176-
return ClusterStateCreationUtils.stateWithAssignedPrimariesAndReplicas(new String[] { "indexOne", "indexTwo", "indexThree" }, 3, 0);
202+
return ClusterStateCreationUtils.stateWithAssignedPrimariesAndReplicas(INDICES, 3, 0);
177203
}
178204
}

0 commit comments

Comments
 (0)