Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
0ebd75d
Estimate impact of shard movement using node-level write load
nicktindall Jul 17, 2025
e589db4
Naming
nicktindall Jul 17, 2025
beb2611
More randomness
nicktindall Jul 17, 2025
4e0fd1d
Merge remote-tracking branch 'origin/main' into ES-12000_add_write_lo…
nicktindall Jul 17, 2025
3f90889
Pedantry
nicktindall Jul 17, 2025
0b1d4a2
Naming
nicktindall Jul 17, 2025
9527720
Merge remote-tracking branch 'origin/main' into ES-12000_add_write_lo…
nicktindall Jul 17, 2025
9e36975
Merge branch 'main' into ES-12000_add_write_load_modeling_to_balancer
nicktindall Jul 21, 2025
9ca9b4b
Use shard write loads instead of estimating
nicktindall Jul 21, 2025
988ac3c
Add javadoc to WriteLoadPerShardSimulator
nicktindall Jul 24, 2025
f5ed735
Explain simulateShardStarted better for the new shard case
nicktindall Jul 24, 2025
8501b37
Assert on scale of utilisation change
nicktindall Jul 24, 2025
8c21cc0
Improve description of relocation
nicktindall Jul 24, 2025
519d1dd
Typo
nicktindall Jul 24, 2025
58e84a2
Rename test to indicate it also tests missing write loads
nicktindall Jul 24, 2025
faccc3d
Always simulate based on original write loads and thread pool stats
nicktindall Jul 24, 2025
edc259a
Use for-loop instead of stream
nicktindall Jul 24, 2025
f60029f
Consolidate similar tests
nicktindall Jul 24, 2025
94687e3
Naming/description of nodeUsageStatsForThreadPools
nicktindall Jul 24, 2025
466a7e0
Naming of test utility methods
nicktindall Jul 24, 2025
827f637
WriteLoadPerShardSimulator -> ShardMovementWriteLoadSimulator
nicktindall Jul 24, 2025
1c876e7
Merge remote-tracking branch 'origin/main' into ES-12000_add_write_lo…
nicktindall Jul 24, 2025
a072aaf
Increase likelihood of write loads and utilizations being 0, floor ut…
nicktindall Jul 24, 2025
6ae4aa4
Pedantry
nicktindall Jul 24, 2025
1dbee7f
Merge branch 'main' into ES-12000_add_write_load_modeling_to_balancer
nicktindall Jul 24, 2025
a4d89b9
Merge branch 'main' into ES-12000_add_write_load_modeling_to_balancer
nicktindall Jul 24, 2025
89c4d28
Merge branch 'main' into ES-12000_add_write_load_modeling_to_balancer
nicktindall Jul 25, 2025
770e04d
Assert that shardStarted only happens on destination node ina reloca…
nicktindall Jul 28, 2025
2b3ebf6
Merge branch 'main' into ES-12000_add_write_load_modeling_to_balancer
nicktindall Jul 28, 2025
34472f6
Typo
nicktindall Jul 28, 2025
cf81780
Update server/src/main/java/org/elasticsearch/cluster/routing/ShardMo…
nicktindall Jul 29, 2025
27a9ba0
Update server/src/main/java/org/elasticsearch/cluster/routing/ShardMo…
nicktindall Jul 29, 2025
1e0a4a0
Naming
nicktindall Jul 29, 2025
38c79a7
Merge branch 'main' into ES-12000_add_write_load_modeling_to_balancer
nicktindall Jul 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import org.elasticsearch.cluster.ClusterInfo.NodeAndShard;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.WriteLoadPerShardSimulator;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.util.CopyOnFirstWriteMap;
import org.elasticsearch.index.shard.ShardId;
Expand All @@ -34,7 +35,7 @@ public class ClusterInfoSimulator {
private final Map<ShardId, Long> shardDataSetSizes;
private final Map<NodeAndShard, String> dataPath;
private final Map<String, EstimatedHeapUsage> estimatedHeapUsages;
private final Map<String, NodeUsageStatsForThreadPools> nodeThreadPoolUsageStats;
private final WriteLoadPerShardSimulator writeLoadPerShardSimulator;

public ClusterInfoSimulator(RoutingAllocation allocation) {
this.allocation = allocation;
Expand All @@ -44,7 +45,7 @@ public ClusterInfoSimulator(RoutingAllocation allocation) {
this.shardDataSetSizes = Map.copyOf(allocation.clusterInfo().shardDataSetSizes);
this.dataPath = Map.copyOf(allocation.clusterInfo().dataPath);
this.estimatedHeapUsages = allocation.clusterInfo().getEstimatedHeapUsages();
this.nodeThreadPoolUsageStats = allocation.clusterInfo().getNodeUsageStatsForThreadPools();
this.writeLoadPerShardSimulator = new WriteLoadPerShardSimulator(allocation);
}

/**
Expand Down Expand Up @@ -115,6 +116,7 @@ public void simulateShardStarted(ShardRouting shard) {
shardSizes.put(shardIdentifierFromRouting(shard), project.getIndexSafe(shard.index()).ignoreDiskWatermarks() ? 0 : size);
}
}
writeLoadPerShardSimulator.simulateShardStarted(shard);
}

private void modifyDiskUsage(String nodeId, long freeDelta) {
Expand Down Expand Up @@ -159,7 +161,7 @@ public ClusterInfo getClusterInfo() {
dataPath,
Map.of(),
estimatedHeapUsages,
nodeThreadPoolUsageStats,
writeLoadPerShardSimulator.nodeUsageStatsForThreadPools(),
allocation.clusterInfo().getShardWriteLoads()
);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.cluster.routing;

import com.carrotsearch.hppc.ObjectDoubleHashMap;
import com.carrotsearch.hppc.ObjectDoubleMap;

import org.elasticsearch.cluster.NodeUsageStatsForThreadPools;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Map;
import java.util.stream.Collectors;

public class WriteLoadPerShardSimulator {

private final ObjectDoubleMap<String> simulatedWriteLoadDeltas;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a performance gain over Map<String, Double>? I'm wondering why use it, essentially.

Copy link
Contributor

@mhl-b mhl-b Jul 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because Double is a boxed value, a pointer to heap allocated double. https://docs.oracle.com/javase/tutorial/java/data/autoboxing.html. Unfortunately. For this reason you can see different classes working with generic(boxed) and primitive collections, for performance reasons. Boxed values are tracked by GC, take more space (pointer and value), and require dereference. And can be null :(

IntStream s; -> stream of int rather than Integer
LongStream s; -> stream of long
Arrays.binarySearch(); -> 17 method overrides for each primitive and generic(boxed)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://openjdk.org/jeps/402 - somewhere in future, when Brian Goetz celebrates his 80th birthday I guess :D

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If they can somehow evolve their way to "valhalla" without breaking backward compatibility and without making the language horribly inconsistent that'll be a marvel of modern engineering.

private final RoutingAllocation routingAllocation;
private final Map<ShardId, Double> writeLoadsPerShard;

public WriteLoadPerShardSimulator(RoutingAllocation routingAllocation) {
this.routingAllocation = routingAllocation;
this.simulatedWriteLoadDeltas = new ObjectDoubleHashMap<>();
writeLoadsPerShard = routingAllocation.clusterInfo().getShardWriteLoads();
}

public void simulateShardStarted(ShardRouting shardRouting) {
final Double writeLoadForShard = writeLoadsPerShard.get(shardRouting.shardId());
if (writeLoadForShard != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you test the case where a shard write load is 0/null? Like would be reported for a non-data stream index shard.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be covered by an existing test, I renamed it to make that clearer in 58e84a2.

That test randomly nullifies the thread pool stats or the write load stats for the objects in the test

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also increased the likelihood of the utilisation numbers and write loads of being zeros in a072aaf

We will not return a simulated utilisation of < 0.0 as well because that's nonsense

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, nice change to max(X, 0.0) to avoid the negative numbers

Seems possible for the node-level and shard-level write loads not to line up exactly. E.g. the latest node-level write load reported for nodeA is 0, and it holds a shardA with peak shard write load is >0. If we then move shardA away from nodeA, we'd go negative. Shards can be moved for reasons other than write load balancing, so maybe it could happen 🧐

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think that's something we'll have to live with, I don't think it should matter too much for the purpose of the simulation.

if (shardRouting.relocatingNodeId() != null) {
// relocating
simulatedWriteLoadDeltas.addTo(shardRouting.relocatingNodeId(), -1 * writeLoadForShard);
simulatedWriteLoadDeltas.addTo(shardRouting.currentNodeId(), writeLoadForShard);
} else {
// not sure how this would come about, perhaps when allocating a replica after a delay?
simulatedWriteLoadDeltas.addTo(shardRouting.currentNodeId(), writeLoadForShard);
}
}
}

public Map<String, NodeUsageStatsForThreadPools> nodeUsageStatsForThreadPools() {
return routingAllocation.clusterInfo()
.getNodeUsageStatsForThreadPools()
.entrySet()
.stream()
.collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, e -> {
if (simulatedWriteLoadDeltas.containsKey(e.getKey())) {
return new NodeUsageStatsForThreadPools(
e.getKey(),
Maps.copyMapWithAddedOrReplacedEntry(
e.getValue().threadPoolUsageStatsMap(),
"write",
replaceWritePoolStats(e.getValue(), simulatedWriteLoadDeltas.get(e.getKey()))
)
);
}
return e.getValue();
}));
}

private NodeUsageStatsForThreadPools.ThreadPoolUsageStats replaceWritePoolStats(
NodeUsageStatsForThreadPools value,
double writeLoadDelta
) {
final NodeUsageStatsForThreadPools.ThreadPoolUsageStats writeThreadPoolStats = value.threadPoolUsageStatsMap()
.get(ThreadPool.Names.WRITE);
return new NodeUsageStatsForThreadPools.ThreadPoolUsageStats(
writeThreadPoolStats.totalThreadPoolThreads(),
(float) (writeThreadPoolStats.averageThreadPoolUtilization() + (writeLoadDelta / writeThreadPoolStats
.totalThreadPoolThreads())),
writeThreadPoolStats.averageThreadPoolQueueLatencyMillis()
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.cluster.routing;

import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
import org.elasticsearch.cluster.ClusterInfo;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.NodeUsageStatsForThreadPools;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.snapshots.SnapshotShardSizeInfo;
import org.elasticsearch.test.ESTestCase;
import org.hamcrest.Matchers;

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.sameInstance;

public class WriteLoadPerShardSimulatorTests extends ESTestCase {

private static final RoutingChangesObserver NOOP = new RoutingChangesObserver() {
};
public static final String[] INDICES = { "indexOne", "indexTwo", "indexThree" };

/**
* We should not adjust the values if there's no movement
*/
public void testNoShardMovement() {
final var originalNode0WriteLoadStats = randomUsageStats();
final var originalNode1WriteLoadStats = randomUsageStats();
final var allocation = createRoutingAllocation(originalNode0WriteLoadStats, originalNode1WriteLoadStats, Set.of());

final var writeLoadPerShardSimulator = new WriteLoadPerShardSimulator(allocation);
final var calculatedNodeUsageStates = writeLoadPerShardSimulator.nodeUsageStatsForThreadPools();
assertThat(calculatedNodeUsageStates, Matchers.aMapWithSize(2));
assertThat(
calculatedNodeUsageStates.get("node_0").threadPoolUsageStatsMap().get("write"),
sameInstance(originalNode0WriteLoadStats)
);
assertThat(
calculatedNodeUsageStates.get("node_1").threadPoolUsageStatsMap().get("write"),
sameInstance(originalNode1WriteLoadStats)
);
}

public void testMovementOfAShardWillReduceThreadPoolUtilisation() {
final var originalNode0WriteLoadStats = randomUsageStats();
final var originalNode1WriteLoadStats = randomUsageStats();
final var allocation = createRoutingAllocation(originalNode0WriteLoadStats, originalNode1WriteLoadStats, Set.of());
final var writeLoadPerShardSimulator = new WriteLoadPerShardSimulator(allocation);

// Relocate a random shard from node_0 to node_1
final var randomShard = randomFrom(StreamSupport.stream(allocation.routingNodes().node("node_0").spliterator(), false).toList());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log randomShard? For debug purposes, then we can match it with the ClusterInfo info I suggest logging in createRoutingAllocation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again given that this is a unit test with no concurrency any failure should be reliably reproducible. Going to not log here assuming someone investigating a failure can log the things they're interested in.

final var moveShardTuple = allocation.routingNodes().relocateShard(randomShard, "node_1", randomNonNegativeLong(), "testing", NOOP);
writeLoadPerShardSimulator.simulateShardStarted(moveShardTuple.v2());

final var calculatedNodeUsageStates = writeLoadPerShardSimulator.nodeUsageStatsForThreadPools();
assertThat(calculatedNodeUsageStates, Matchers.aMapWithSize(2));

// Some node_0 utilization should have been moved to node_1
assertThat(
getAverageWritePoolUtilization(writeLoadPerShardSimulator, "node_0"),
lessThan(originalNode0WriteLoadStats.averageThreadPoolUtilization())
);
assertThat(
getAverageWritePoolUtilization(writeLoadPerShardSimulator, "node_1"),
greaterThan(originalNode1WriteLoadStats.averageThreadPoolUtilization())
);
}

public void testMovementFollowedByMovementBackWillNotChangeAnything() {
final var originalNode0WriteLoadStats = randomUsageStats();
final var originalNode1WriteLoadStats = randomUsageStats();
final var allocation = createRoutingAllocation(originalNode0WriteLoadStats, originalNode1WriteLoadStats, Set.of());
final var writeLoadPerShardSimulator = new WriteLoadPerShardSimulator(allocation);

// Relocate a random shard from node_0 to node_1
final long expectedShardSize = randomNonNegativeLong();
final var randomShard = randomFrom(StreamSupport.stream(allocation.routingNodes().node("node_0").spliterator(), false).toList());
final var moveShardTuple = allocation.routingNodes().relocateShard(randomShard, "node_1", expectedShardSize, "testing", NOOP);
writeLoadPerShardSimulator.simulateShardStarted(moveShardTuple.v2());
final ShardRouting movedAndStartedShard = allocation.routingNodes().startShard(moveShardTuple.v2(), NOOP, expectedShardSize);

// Some node_0 utilization should have been moved to node_1
assertThat(
getAverageWritePoolUtilization(writeLoadPerShardSimulator, "node_0"),
lessThan(originalNode0WriteLoadStats.averageThreadPoolUtilization())
);
assertThat(
getAverageWritePoolUtilization(writeLoadPerShardSimulator, "node_1"),
greaterThan(originalNode1WriteLoadStats.averageThreadPoolUtilization())
);

// Then move it back
final var moveBackTuple = allocation.routingNodes()
.relocateShard(movedAndStartedShard, "node_0", expectedShardSize, "testing", NOOP);
writeLoadPerShardSimulator.simulateShardStarted(moveBackTuple.v2());

// The utilization numbers should be back to their original values
assertThat(
getAverageWritePoolUtilization(writeLoadPerShardSimulator, "node_0"),
equalTo(originalNode0WriteLoadStats.averageThreadPoolUtilization())
);
assertThat(
getAverageWritePoolUtilization(writeLoadPerShardSimulator, "node_1"),
equalTo(originalNode1WriteLoadStats.averageThreadPoolUtilization())
);
}

public void testMovementBetweenNodesWithNoThreadPoolStats() {
final var originalNode0WriteLoadStats = randomBoolean() ? randomUsageStats() : null;
final var originalNode1WriteLoadStats = randomBoolean() ? randomUsageStats() : null;
final var allocation = createRoutingAllocation(
originalNode0WriteLoadStats,
originalNode1WriteLoadStats,
new HashSet<>(randomSubsetOf(Arrays.asList(INDICES)))
);
final var writeLoadPerShardSimulator = new WriteLoadPerShardSimulator(allocation);

// Relocate a random shard from node_0 to node_1
final long expectedShardSize = randomNonNegativeLong();
final var randomShard = randomFrom(StreamSupport.stream(allocation.routingNodes().node("node_0").spliterator(), false).toList());
final var moveShardTuple = allocation.routingNodes().relocateShard(randomShard, "node_1", expectedShardSize, "testing", NOOP);
writeLoadPerShardSimulator.simulateShardStarted(moveShardTuple.v2());
allocation.routingNodes().startShard(moveShardTuple.v2(), NOOP, expectedShardSize);

final var generated = writeLoadPerShardSimulator.nodeUsageStatsForThreadPools();
assertThat(generated.containsKey("node_0"), equalTo(originalNode0WriteLoadStats != null));
assertThat(generated.containsKey("node_1"), equalTo(originalNode1WriteLoadStats != null));
}

private float getAverageWritePoolUtilization(WriteLoadPerShardSimulator writeLoadPerShardSimulator, String nodeId) {
final var generatedNodeUsageStates = writeLoadPerShardSimulator.nodeUsageStatsForThreadPools();
final var node0WritePoolStats = generatedNodeUsageStates.get(nodeId).threadPoolUsageStatsMap().get("write");
return node0WritePoolStats.averageThreadPoolUtilization();
}

private NodeUsageStatsForThreadPools.ThreadPoolUsageStats randomUsageStats() {
return new NodeUsageStatsForThreadPools.ThreadPoolUsageStats(
randomIntBetween(4, 16),
randomFloatBetween(0.1f, 1.0f, true),
randomLongBetween(0, 60_000)
);
}

private RoutingAllocation createRoutingAllocation(
NodeUsageStatsForThreadPools.ThreadPoolUsageStats node0WriteLoadStats,
NodeUsageStatsForThreadPools.ThreadPoolUsageStats node1WriteLoadStats,
Set<String> indicesWithNoWriteLoad
) {
final Map<String, NodeUsageStatsForThreadPools> nodeUsageStats = new HashMap<>();
if (node0WriteLoadStats != null) {
nodeUsageStats.put("node_0", new NodeUsageStatsForThreadPools("node_0", Map.of("write", node0WriteLoadStats)));
}
if (node1WriteLoadStats != null) {
nodeUsageStats.put("node_1", new NodeUsageStatsForThreadPools("node_1", Map.of("write", node1WriteLoadStats)));
}

final ClusterState clusterState = createClusterState();
final ClusterInfo clusterInfo = ClusterInfo.builder()
.nodeUsageStatsForThreadPools(nodeUsageStats)
.shardWriteLoads(
clusterState.metadata()
.getProject(ProjectId.DEFAULT)
.stream()
.filter(index -> indicesWithNoWriteLoad.contains(index.getIndex().getName()) == false)
.flatMap(index -> IntStream.range(0, 3).mapToObj(shardNum -> new ShardId(index.getIndex(), shardNum)))
.collect(Collectors.toUnmodifiableMap(shardId -> shardId, shardId -> randomDoubleBetween(0.1, 5.0, true)))
)
.build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you log the ClusterInfo to a string? There isn't any debug information to look at if any of the tests fail (I think?), and some logging of the values might help.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that will add value here, there's a lot of numbers that go into the calculations and all the values are randomised, and it's a unit test with no concurrency so failures should be reliably reproducible with the seed. I would like to leave the logging to whoever's troubleshooting the failure.


return new RoutingAllocation(
new AllocationDeciders(List.of()),
clusterState,
clusterInfo,
SnapshotShardSizeInfo.EMPTY,
System.nanoTime()
).mutableCloneForSimulation();
}

private ClusterState createClusterState() {
return ClusterStateCreationUtils.stateWithAssignedPrimariesAndReplicas(INDICES, 3, 0);
}
}