Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
affdae5
Log when no progress is made towards the desired balance for some time
nicktindall Oct 23, 2025
5249be9
Track immovable shards individually
nicktindall Oct 24, 2025
14772ab
Naming, initial delay
nicktindall Oct 24, 2025
d8186d9
Remove dead code
nicktindall Oct 24, 2025
ad5c028
Tidy
nicktindall Oct 24, 2025
7a98137
Javadoc
nicktindall Oct 24, 2025
e84efa6
Clear immovable shard in moveShards
nicktindall Oct 24, 2025
c901665
Merge branch 'main' into log_on_no_balancing_progress
nicktindall Oct 24, 2025
4194739
Try tracking undesired state in shard routing
nicktindall Oct 27, 2025
f050316
Fix setting names
nicktindall Oct 27, 2025
c7b14fa
Naming
nicktindall Oct 27, 2025
5cf44fc
Naming
nicktindall Oct 27, 2025
1472ec2
ShardRouting#equals/hashCode
nicktindall Oct 27, 2025
7652369
Javadoc
nicktindall Oct 27, 2025
7996801
Fix logic to handle Long.MAX_VALUE
nicktindall Oct 27, 2025
2056c4e
javadoc
nicktindall Oct 27, 2025
6e37d04
javadoc
nicktindall Oct 27, 2025
9590ce3
Merge remote-tracking branch 'origin/main' into log_on_no_balancing_p…
nicktindall Oct 27, 2025
904d597
Naming, RoutingNodes updates
nicktindall Oct 27, 2025
be13adb
Clear becameUndesiredTime when a shard goes unassigned
nicktindall Oct 27, 2025
5013031
Naming
nicktindall Oct 27, 2025
f285936
Merge remote-tracking branch 'origin/main' into log_on_no_balancing_p…
nicktindall Oct 29, 2025
047b411
Mock time provision correctly
nicktindall Oct 29, 2025
9bd1e24
Serialise became_undesired_time
nicktindall Oct 29, 2025
a8fba52
Add unit test
nicktindall Oct 29, 2025
fc9c7d0
Track undesired allocations locally
nicktindall Nov 5, 2025
713a0e1
Merge remote-tracking branch 'origin/main' into log_on_no_balancing_p…
nicktindall Nov 5, 2025
5f57c0b
Minimise change
nicktindall Nov 5, 2025
5d1d4c0
Minimise change
nicktindall Nov 5, 2025
54533f7
Tidy
nicktindall Nov 5, 2025
2d3f10f
Tidy
nicktindall Nov 5, 2025
ba678a9
Reduce default limit
nicktindall Nov 5, 2025
6330ba9
clear -> removeTracking
nicktindall Nov 5, 2025
f797014
Merge branch 'main' into log_on_no_balancing_progress
nicktindall Nov 5, 2025
207dd0d
Update server/src/main/java/org/elasticsearch/cluster/routing/allocat…
nicktindall Nov 5, 2025
6d5d2fa
Add separate setting for logging interval, reduce default to 5 minutes.
nicktindall Nov 5, 2025
0af8050
Check for capacity before we check for existing record
nicktindall Nov 5, 2025
6db1bc3
Discard excess tracking if the limit is reduced
nicktindall Nov 6, 2025
5fd0770
Fix test after log message changed
nicktindall Nov 6, 2025
7dcb690
Default max to track to zero
nicktindall Nov 6, 2025
ef8f45b
Add more unit tests
nicktindall Nov 6, 2025
19b7475
Make resilient to metadata changes
nicktindall Nov 6, 2025
ed7caa4
Explicitly configure max tracking in test
nicktindall Nov 6, 2025
53f95c0
de-dupe default time value
nicktindall Nov 6, 2025
be8a38b
Remove conditional and leave assertion
nicktindall Nov 6, 2025
061e2ad
Reduce maximum max-to-track
nicktindall Nov 6, 2025
17fd729
Merge branch 'main' into log_on_no_balancing_progress
nicktindall Nov 6, 2025
566f5d5
Give duration threshold minimum of one minute
nicktindall Nov 6, 2025
655e352
Use ordered map to quickly determine earliest entry
nicktindall Nov 6, 2025
feaca10
Add test cases where there is no desired balance
nicktindall Nov 6, 2025
d532523
Merge branch 'main' into log_on_no_balancing_progress
nicktindall Nov 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.time.TimeProvider;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.gateway.PriorityComparator;
import org.elasticsearch.index.IndexVersions;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Comparator;
import java.util.Iterator;
Expand Down Expand Up @@ -83,17 +83,16 @@ public class DesiredBalanceReconciler {
private double undesiredAllocationsLogThreshold;
private final NodeAllocationOrdering allocationOrdering = new NodeAllocationOrdering();
private final NodeAllocationOrdering moveOrdering = new NodeAllocationOrdering();
private final UndesiredAllocationsTracker undesiredAllocationsTracker;

public DesiredBalanceReconciler(ClusterSettings clusterSettings, ThreadPool threadPool) {
this.undesiredAllocationLogInterval = new FrequencyCappedAction(
threadPool.relativeTimeInMillisSupplier(),
TimeValue.timeValueMinutes(5)
);
public DesiredBalanceReconciler(ClusterSettings clusterSettings, TimeProvider timeProvider) {
this.undesiredAllocationLogInterval = new FrequencyCappedAction(timeProvider::relativeTimeInMillis, TimeValue.timeValueMinutes(5));
clusterSettings.initializeAndWatch(UNDESIRED_ALLOCATIONS_LOG_INTERVAL_SETTING, this.undesiredAllocationLogInterval::setMinInterval);
clusterSettings.initializeAndWatch(
UNDESIRED_ALLOCATIONS_LOG_THRESHOLD_SETTING,
value -> this.undesiredAllocationsLogThreshold = value
);
this.undesiredAllocationsTracker = new UndesiredAllocationsTracker(clusterSettings, timeProvider);
}

/**
Expand All @@ -114,6 +113,7 @@ public DesiredBalanceMetrics.AllocationStats reconcile(DesiredBalance desiredBal
public void clear() {
allocationOrdering.clear();
moveOrdering.clear();
undesiredAllocationsTracker.clear();
}

/**
Expand All @@ -134,6 +134,7 @@ private class Reconciliation {
}

DesiredBalanceMetrics.AllocationStats run() {
undesiredAllocationsTracker.cleanup(routingNodes);
try (var ignored = allocation.withReconcilingFlag()) {

logger.debug("Reconciling desired balance for [{}]", desiredBalance.lastConvergedIndex());
Expand Down Expand Up @@ -493,35 +494,51 @@ private void moveShards() {

if (assignment.nodeIds().contains(shardRouting.currentNodeId())) {
// shard is already on a desired node
undesiredAllocationsTracker.removeTracking(shardRouting);
continue;
}

if (allocation.deciders().canAllocate(shardRouting, allocation).type() != Decision.Type.YES) {
// cannot allocate anywhere, no point in looking for a target node
continue;
}
boolean movedUndesiredShard = false;
try {
if (allocation.deciders().canAllocate(shardRouting, allocation).type() != Decision.Type.YES) {
// cannot allocate anywhere, no point in looking for a target node
continue;
}

final var routingNode = routingNodes.node(shardRouting.currentNodeId());
final var canRemainDecision = allocation.deciders().canRemain(shardRouting, routingNode, allocation);
if (canRemainDecision.type() != Decision.Type.NO && canRemainDecision.type() != Decision.Type.NOT_PREFERRED) {
// If movement is throttled, a future reconciliation round will see a resolution. For now, leave it alone.
// Reconciliation treats canRemain NOT_PREFERRED answers as YES because the DesiredBalance computation already decided
// how to handle the situation.
continue;
}
final var routingNode = routingNodes.node(shardRouting.currentNodeId());
final var canRemainDecision = allocation.deciders().canRemain(shardRouting, routingNode, allocation);
if (canRemainDecision.type() != Decision.Type.NO && canRemainDecision.type() != Decision.Type.NOT_PREFERRED) {
// If movement is throttled, a future reconciliation round will see a resolution. For now, leave it alone.
// Reconciliation treats canRemain NOT_PREFERRED answers as YES because the DesiredBalance computation already
// decided how to handle the situation.
continue;
}

final var moveTarget = findRelocationTarget(shardRouting, assignment.nodeIds());
if (moveTarget != null) {
logger.debug("Moving shard {} from {} to {}", shardRouting.shardId(), shardRouting.currentNodeId(), moveTarget.getId());
routingNodes.relocateShard(
shardRouting,
moveTarget.getId(),
allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE),
"move",
allocation.changes()
);
iterator.dePrioritizeNode(shardRouting.currentNodeId());
moveOrdering.recordAllocation(shardRouting.currentNodeId());
final var moveTarget = findRelocationTarget(shardRouting, assignment.nodeIds());
if (moveTarget != null) {
logger.debug(
"Moving shard {} from {} to {}",
shardRouting.shardId(),
shardRouting.currentNodeId(),
moveTarget.getId()
);
routingNodes.relocateShard(
shardRouting,
moveTarget.getId(),
allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE),
"move",
allocation.changes()
);
iterator.dePrioritizeNode(shardRouting.currentNodeId());
moveOrdering.recordAllocation(shardRouting.currentNodeId());
movedUndesiredShard = true;
}
} finally {
if (movedUndesiredShard) {
undesiredAllocationsTracker.removeTracking(shardRouting);
} else {
undesiredAllocationsTracker.trackUndesiredAllocation(shardRouting);
}
}
}
}
Expand Down Expand Up @@ -555,51 +572,63 @@ private DesiredBalanceMetrics.AllocationStats balance() {

if (assignment.nodeIds().contains(shardRouting.currentNodeId())) {
// shard is already on a desired node
undesiredAllocationsTracker.removeTracking(shardRouting);
continue;
}

if (allocation.metadata().nodeShutdowns().contains(shardRouting.currentNodeId()) == false) {
// shard is not on a shutting down node, nor is it on a desired node per the previous check.
undesiredAllocationsExcludingShuttingDownNodes++;
undesiredAllocationsExcludingShuttingDownNodesByRole.addTo(shardRouting.role(), 1);
}

if (allocation.deciders().canRebalance(allocation).type() != Decision.Type.YES) {
// Rebalancing is disabled, we're just here to collect the AllocationStats to return.
continue;
}
boolean movedUndesiredShard = false;
try {
if (allocation.metadata().nodeShutdowns().contains(shardRouting.currentNodeId()) == false) {
// shard is not on a shutting down node, nor is it on a desired node per the previous check.
undesiredAllocationsExcludingShuttingDownNodes++;
undesiredAllocationsExcludingShuttingDownNodesByRole.addTo(shardRouting.role(), 1);
}

if (allocation.deciders().canRebalance(shardRouting, allocation).type() != Decision.Type.YES) {
// rebalancing disabled for this shard
continue;
}
if (allocation.deciders().canRebalance(allocation).type() != Decision.Type.YES) {
// Rebalancing is disabled, we're just here to collect the AllocationStats to return.
continue;
}

if (allocation.deciders().canAllocate(shardRouting, allocation).type() != Decision.Type.YES) {
// cannot allocate anywhere, no point in looking for a target node
continue;
}
if (allocation.deciders().canRebalance(shardRouting, allocation).type() != Decision.Type.YES) {
// rebalancing disabled for this shard
continue;
}

final var rebalanceTarget = findRelocationTarget(shardRouting, assignment.nodeIds(), this::decideCanAllocate);
if (rebalanceTarget != null) {
logger.debug(
"Rebalancing shard {} from {} to {}",
shardRouting.shardId(),
shardRouting.currentNodeId(),
rebalanceTarget.getId()
);
if (allocation.deciders().canAllocate(shardRouting, allocation).type() != Decision.Type.YES) {
// cannot allocate anywhere, no point in looking for a target node
continue;
}

routingNodes.relocateShard(
shardRouting,
rebalanceTarget.getId(),
allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE),
"rebalance",
allocation.changes()
);
iterator.dePrioritizeNode(shardRouting.currentNodeId());
moveOrdering.recordAllocation(shardRouting.currentNodeId());
final var rebalanceTarget = findRelocationTarget(shardRouting, assignment.nodeIds(), this::decideCanAllocate);
if (rebalanceTarget != null) {
logger.debug(
"Rebalancing shard {} from {} to {}",
shardRouting.shardId(),
shardRouting.currentNodeId(),
rebalanceTarget.getId()
);

routingNodes.relocateShard(
shardRouting,
rebalanceTarget.getId(),
allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE),
"rebalance",
allocation.changes()
);
iterator.dePrioritizeNode(shardRouting.currentNodeId());
moveOrdering.recordAllocation(shardRouting.currentNodeId());
movedUndesiredShard = true;
}
} finally {
if (movedUndesiredShard) {
undesiredAllocationsTracker.removeTracking(shardRouting);
} else {
undesiredAllocationsTracker.trackUndesiredAllocation(shardRouting);
}
}
}

undesiredAllocationsTracker.maybeLogUndesiredShardsWarning(routingNodes, allocation, desiredBalance);
maybeLogUndesiredAllocationsWarning(totalAllocations, undesiredAllocationsExcludingShuttingDownNodes, routingNodes.size());
return new DesiredBalanceMetrics.AllocationStats(
unassignedShards,
Expand Down
Loading