Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
71 commits
Select commit Hold shift + click to select a range
053edfa
New new approach
nicktindall Sep 19, 2025
e38191a
Add logging when moveDecision is stale
nicktindall Sep 19, 2025
909fd6d
assert we're not abusing the comparator
nicktindall Sep 19, 2025
2696fd9
Change comparator ranking, test
nicktindall Sep 22, 2025
10c042d
[CI] Update transport version definitions
Sep 22, 2025
f03e4b8
Use constant for missing write load
nicktindall Sep 22, 2025
be6253c
Merge remote-tracking branch 'origin/main' into ES-12739_select_hot_s…
nicktindall Sep 22, 2025
9112b1d
Distinguish between move and move-not-preferred movements
nicktindall Sep 22, 2025
49d11e3
Update javadoc
nicktindall Sep 22, 2025
7e23a15
Tweak test params
nicktindall Sep 22, 2025
fcde6c6
Simplify logic
nicktindall Sep 22, 2025
ceff1e3
Simplify logic
nicktindall Sep 22, 2025
6dcb6f0
Simplify logic
nicktindall Sep 22, 2025
c03bcde
Simplify logic
nicktindall Sep 22, 2025
fdca404
Tidy
nicktindall Sep 22, 2025
36cff36
Simplify logic
nicktindall Sep 22, 2025
98b5b80
Tidy
nicktindall Sep 22, 2025
df79e6f
Pedantry
nicktindall Sep 22, 2025
b2d67d7
Add test for NOT_PREFERRED movement
nicktindall Sep 22, 2025
3c5f592
Pedantry
nicktindall Sep 22, 2025
ae504cd
Merge branch 'main' into ES-12739_select_hot_shard_to_move_off_data_n…
nicktindall Sep 22, 2025
f204577
Expand test to cover prioritisation
nicktindall Sep 22, 2025
3c65dc8
Pedantry
nicktindall Sep 22, 2025
3fd27b5
Pedantry
nicktindall Sep 22, 2025
063995c
Reduce logging
nicktindall Sep 22, 2025
3815293
Naming
nicktindall Sep 22, 2025
34a88df
Constant
nicktindall Sep 22, 2025
cb8ac9a
Fix javadoc
nicktindall Sep 22, 2025
fa2b936
Merge branch 'main' into ES-12739_select_hot_shard_to_move_off_data_n…
nicktindall Sep 22, 2025
1c4dfaf
Used cached MoveDecision if no other moves have been made
nicktindall Sep 23, 2025
738875d
Abstract best move tracking and comparison out of decideMove
nicktindall Sep 29, 2025
f1061e2
Fix comment
nicktindall Sep 29, 2025
7b42376
Move MostDesirableMovementsTracker and ShardMovementPriorityComparato…
nicktindall Sep 29, 2025
baf802d
Put explicit check in for rhsMissing and lhsMissing
nicktindall Sep 29, 2025
104be72
Move shardMoved update below early exit
nicktindall Sep 29, 2025
1302b46
[CI] Update transport version definitions
Sep 29, 2025
0055dc7
Clarify natural ordering in comparator
nicktindall Sep 29, 2025
a031158
Escape > and <
nicktindall Sep 29, 2025
b3d85fc
Merge branch 'main' into ES-12739_select_hot_shard_to_move_off_data_n…
nicktindall Sep 29, 2025
ede9e44
Move MoveNotPreferredDecision local to where it's used
nicktindall Sep 29, 2025
2c87b39
Minimise change after merge
nicktindall Sep 29, 2025
7ef836d
Fix HTML, reference relevant fields
nicktindall Sep 30, 2025
4c6ebdb
Document THRESHOLD_RATIO
nicktindall Sep 30, 2025
a53787c
More specific naming on comparator, fix javadoc paragraph break
nicktindall Sep 30, 2025
7d7d23c
Make comparator put most-preferred first
nicktindall Oct 1, 2025
2199491
Use LinkedHashMap for predictable iteration order
nicktindall Oct 1, 2025
ee95443
Move tracker initialization above loop comment
nicktindall Oct 1, 2025
3ee55e7
Merge branch 'main' into ES-12739_select_hot_shard_to_move_off_data_n…
nicktindall Oct 2, 2025
8673351
Make it clearer what the predicate is when looking for the best non-p…
nicktindall Oct 2, 2025
1b87daa
Update server/src/main/java/org/elasticsearch/cluster/routing/allocat…
nicktindall Oct 2, 2025
57979dc
Improve naming
nicktindall Oct 2, 2025
3fef076
Improve naming (again)
nicktindall Oct 2, 2025
2ea2ea3
Remove decideCanAllocatePreferredOnly
nicktindall Oct 2, 2025
105d7bf
Remove redundant check
nicktindall Oct 2, 2025
50d266f
Merge remote-tracking branch 'origin/main' into ES-12739_select_hot_s…
nicktindall Oct 2, 2025
7837211
Improve javadoc
nicktindall Oct 2, 2025
cc147c9
Improve javadoc
nicktindall Oct 5, 2025
35dd5d4
Improve javadoc (again)
nicktindall Oct 5, 2025
c32d6e9
Improve javadoc
nicktindall Oct 6, 2025
291416a
Use constant for threshold
nicktindall Oct 6, 2025
0bfcf6b
Document test utility
nicktindall Oct 6, 2025
7668101
Explain single shard indices
nicktindall Oct 6, 2025
f3588e2
Improve naming
nicktindall Oct 6, 2025
7643dd0
Improve naming
nicktindall Oct 6, 2025
e514692
Reference comparator in javadoc
nicktindall Oct 6, 2025
ddc7739
Document test decider
nicktindall Oct 6, 2025
7753a44
Merge branch 'main' into ES-12739_select_hot_shard_to_move_off_data_n…
nicktindall Oct 6, 2025
4170af3
Test movement prioritization in IT
nicktindall Oct 6, 2025
43a2dd3
Update server/src/internalClusterTest/java/org/elasticsearch/cluster/…
nicktindall Oct 6, 2025
29a79e1
Fix Javadoc
nicktindall Oct 6, 2025
e64343f
Merge branch 'main' into ES-12739_select_hot_shard_to_move_off_data_n…
nicktindall Oct 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings;
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceMetrics;
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator;
import org.elasticsearch.cluster.service.ClusterService;
Expand All @@ -54,14 +55,18 @@
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.stream.StreamSupport;

import static java.util.stream.IntStream.range;
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.everyItem;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
Expand Down Expand Up @@ -130,7 +135,7 @@ public void testHighNodeWriteLoadPreventsNewShardAllocation() {
setUpMockTransportIndicesStatsResponse(
harness.firstDiscoveryNode,
indexMetadata.getNumberOfShards(),
createShardStatsResponseForIndex(indexMetadata, harness.randomShardWriteLoad, harness.firstDataNodeId)
createShardStatsResponseForIndex(indexMetadata, harness.maxShardWriteLoad, harness.firstDataNodeId)
);
setUpMockTransportIndicesStatsResponse(harness.secondDiscoveryNode, 0, List.of());
setUpMockTransportIndicesStatsResponse(harness.thirdDiscoveryNode, 0, List.of());
Expand Down Expand Up @@ -235,7 +240,7 @@ public void testShardsAreAssignedToNotPreferredWhenAlternativeIsNo() {
setUpMockTransportIndicesStatsResponse(
harness.firstDiscoveryNode,
indexMetadata.getNumberOfShards(),
createShardStatsResponseForIndex(indexMetadata, harness.randomShardWriteLoad, harness.firstDataNodeId)
createShardStatsResponseForIndex(indexMetadata, harness.maxShardWriteLoad, harness.firstDataNodeId)
);
setUpMockTransportIndicesStatsResponse(harness.secondDiscoveryNode, 0, List.of());
setUpMockTransportIndicesStatsResponse(harness.thirdDiscoveryNode, 0, List.of());
Expand Down Expand Up @@ -333,7 +338,7 @@ public void testCanRemainNotPreferredIsIgnoredWhenAllOtherNodesReturnNotPreferre
setUpMockTransportIndicesStatsResponse(
harness.firstDiscoveryNode,
indexMetadata.getNumberOfShards(),
createShardStatsResponseForIndex(indexMetadata, harness.randomShardWriteLoad, harness.firstDataNodeId)
createShardStatsResponseForIndex(indexMetadata, harness.maxShardWriteLoad, harness.firstDataNodeId)
);
setUpMockTransportIndicesStatsResponse(harness.secondDiscoveryNode, 0, List.of());
setUpMockTransportIndicesStatsResponse(harness.thirdDiscoveryNode, 0, List.of());
Expand Down Expand Up @@ -429,15 +434,12 @@ public void testCanRemainRelocatesOneShardWhenAHotSpotOccurs() {
* will show that all shards have non-empty write load stats (so that the WriteLoadDecider will evaluate assigning them to a node).
*/

IndexMetadata indexMetadata = internalCluster().getCurrentMasterNodeInstance(ClusterService.class)
.state()
.getMetadata()
.getProject()
.index(harness.indexName);
final ClusterState originalClusterState = internalCluster().getCurrentMasterNodeInstance(ClusterService.class).state();
final IndexMetadata indexMetadata = originalClusterState.getMetadata().getProject().index(harness.indexName);
setUpMockTransportIndicesStatsResponse(
harness.firstDiscoveryNode,
indexMetadata.getNumberOfShards(),
createShardStatsResponseForIndex(indexMetadata, harness.randomShardWriteLoad, harness.firstDataNodeId)
createShardStatsResponseForIndex(indexMetadata, harness.maxShardWriteLoad, harness.firstDataNodeId)
);
setUpMockTransportIndicesStatsResponse(harness.secondDiscoveryNode, 0, List.of());
setUpMockTransportIndicesStatsResponse(harness.thirdDiscoveryNode, 0, List.of());
Expand Down Expand Up @@ -483,6 +485,7 @@ public void testCanRemainRelocatesOneShardWhenAHotSpotOccurs() {
harness.randomNumberOfShards,
countShardsStillAssignedToFirstNode + 1
);
assertThatTheBestShardWasMoved(harness, originalClusterState, desiredBalanceResponse);
} catch (AssertionError error) {
ClusterState state = client().admin()
.cluster()
Expand All @@ -498,6 +501,36 @@ public void testCanRemainRelocatesOneShardWhenAHotSpotOccurs() {
}
}

/**
* Determine which shard was moved and check that it's the "best" according to
* {@link org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.Balancer.PrioritiseByShardWriteLoadComparator}
*/
private void assertThatTheBestShardWasMoved(
TestHarness harness,
ClusterState originalClusterState,
DesiredBalanceResponse desiredBalanceResponse
) {
int movedShardId = desiredBalanceResponse.getRoutingTable().get(harness.indexName).entrySet().stream().filter(e -> {
Set<String> desiredNodeIds = e.getValue().desired().nodeIds();
return desiredNodeIds.contains(harness.secondDiscoveryNode.getId())
|| desiredNodeIds.contains(harness.thirdDiscoveryNode.getId());
}).findFirst().map(Map.Entry::getKey).orElseThrow(() -> new AssertionError("No shard was moved to a non-hot-spotting node"));

final BalancedShardsAllocator.Balancer.PrioritiseByShardWriteLoadComparator comparator =
new BalancedShardsAllocator.Balancer.PrioritiseByShardWriteLoadComparator(
desiredBalanceResponse.getClusterInfo(),
originalClusterState.getRoutingNodes().node(harness.firstDataNodeId)
);

final List<ShardRouting> bestShardsToMove = StreamSupport.stream(
originalClusterState.getRoutingNodes().node(harness.firstDataNodeId).spliterator(),
false
).sorted(comparator).toList();

// The moved shard should be at the head of the sorted list
assertThat(movedShardId, equalTo(bestShardsToMove.get(0).shardId().id()));
}

public void testMaxQueueLatencyMetricIsPublished() {
final Settings settings = Settings.builder()
.put(
Expand Down Expand Up @@ -659,16 +692,35 @@ private NodeUsageStatsForThreadPools createNodeUsageStatsForThreadPools(
}

/**
* Helper to create a list of dummy {@link ShardStats} for the given index, each shard reporting a {@code peakShardWriteLoad} stat.
* Helper to create a list of dummy {@link ShardStats} for the given index, each shard being randomly allocated a peak write load
* between 0 and {@code maximumShardWriteLoad}. There will always be at least one shard reporting the specified
* {@code maximumShardWriteLoad}.
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Method comment needs update

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 29a79e1

private List<ShardStats> createShardStatsResponseForIndex(
IndexMetadata indexMetadata,
float peakShardWriteLoad,
float maximumShardWriteLoad,
String assignedShardNodeId
) {
List<ShardStats> shardStats = new ArrayList<>(indexMetadata.getNumberOfShards());
// Randomly distribute shards' peak write-loads so that we can check later that shard movements are prioritized correctly
final double writeLoadThreshold = maximumShardWriteLoad
* BalancedShardsAllocator.Balancer.PrioritiseByShardWriteLoadComparator.THRESHOLD_RATIO;
final List<Double> shardPeakWriteLoads = new ArrayList<>();
// Need at least one with the maximum write-load
shardPeakWriteLoads.add((double) maximumShardWriteLoad);
final int remainingShards = indexMetadata.getNumberOfShards() - 1;
// Some over-threshold, some under
for (int i = 0; i < remainingShards; ++i) {
if (randomBoolean()) {
shardPeakWriteLoads.add(randomDoubleBetween(writeLoadThreshold, maximumShardWriteLoad, true));
} else {
shardPeakWriteLoads.add(randomDoubleBetween(0.0, writeLoadThreshold, true));
}
}
assertThat(shardPeakWriteLoads, hasSize(indexMetadata.getNumberOfShards()));
Collections.shuffle(shardPeakWriteLoads, random());
final List<ShardStats> shardStats = new ArrayList<>(indexMetadata.getNumberOfShards());
for (int i = 0; i < indexMetadata.getNumberOfShards(); i++) {
shardStats.add(createShardStats(indexMetadata, i, peakShardWriteLoad, assignedShardNodeId));
shardStats.add(createShardStats(indexMetadata, i, shardPeakWriteLoads.get(i), assignedShardNodeId));
}
return shardStats;
}
Expand Down Expand Up @@ -719,7 +771,7 @@ private TestHarness setUpThreeTestNodesAndAllIndexShardsOnFirstNode() {
int randomUtilizationThresholdPercent = randomIntBetween(50, 100);
int randomNumberOfWritePoolThreads = randomIntBetween(2, 20);
long randomQueueLatencyThresholdMillis = randomLongBetween(1, 20_000);
float randomShardWriteLoad = randomFloatBetween(0.0f, 0.01f, false);
float maximumShardWriteLoad = randomFloatBetween(0.0f, 0.01f, false);
Settings settings = enabledWriteLoadDeciderSettings(randomUtilizationThresholdPercent, randomQueueLatencyThresholdMillis);

internalCluster().startMasterOnlyNode(settings);
Expand Down Expand Up @@ -756,8 +808,8 @@ private TestHarness setUpThreeTestNodesAndAllIndexShardsOnFirstNode() {
+ randomUtilizationThresholdPercent
+ ", write threads: "
+ randomNumberOfWritePoolThreads
+ ", individual shard write loads: "
+ randomShardWriteLoad
+ ", maximum shard write load: "
+ maximumShardWriteLoad
);

/**
Expand All @@ -775,7 +827,7 @@ private TestHarness setUpThreeTestNodesAndAllIndexShardsOnFirstNode() {

// Calculate the maximum utilization a node can report while still being able to accept all relocating shards
int shardWriteLoadOverhead = shardLoadUtilizationOverhead(
randomShardWriteLoad * randomNumberOfShards,
maximumShardWriteLoad * randomNumberOfShards,
randomNumberOfWritePoolThreads
);
int maxUtilBelowThresholdThatAllowsAllShardsToRelocate = randomUtilizationThresholdPercent - shardWriteLoadOverhead - 1;
Expand Down Expand Up @@ -819,7 +871,7 @@ private TestHarness setUpThreeTestNodesAndAllIndexShardsOnFirstNode() {
randomUtilizationThresholdPercent,
randomNumberOfWritePoolThreads,
randomQueueLatencyThresholdMillis,
randomShardWriteLoad,
maximumShardWriteLoad,
indexName,
randomNumberOfShards,
maxUtilBelowThresholdThatAllowsAllShardsToRelocate
Expand All @@ -842,7 +894,7 @@ record TestHarness(
int randomUtilizationThresholdPercent,
int randomNumberOfWritePoolThreads,
long randomQueueLatencyThresholdMillis,
float randomShardWriteLoad,
float maxShardWriteLoad,
String indexName,
int randomNumberOfShards,
int maxUtilBelowThresholdThatAllowsAllShardsToRelocate
Expand Down
Loading