Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
affdae5
Log when no progress is made towards the desired balance for some time
nicktindall Oct 23, 2025
5249be9
Track immovable shards individually
nicktindall Oct 24, 2025
14772ab
Naming, initial delay
nicktindall Oct 24, 2025
d8186d9
Remove dead code
nicktindall Oct 24, 2025
ad5c028
Tidy
nicktindall Oct 24, 2025
7a98137
Javadoc
nicktindall Oct 24, 2025
e84efa6
Clear immovable shard in moveShards
nicktindall Oct 24, 2025
c901665
Merge branch 'main' into log_on_no_balancing_progress
nicktindall Oct 24, 2025
4194739
Try tracking undesired state in shard routing
nicktindall Oct 27, 2025
f050316
Fix setting names
nicktindall Oct 27, 2025
c7b14fa
Naming
nicktindall Oct 27, 2025
5cf44fc
Naming
nicktindall Oct 27, 2025
1472ec2
ShardRouting#equals/hashCode
nicktindall Oct 27, 2025
7652369
Javadoc
nicktindall Oct 27, 2025
7996801
Fix logic to handle Long.MAX_VALUE
nicktindall Oct 27, 2025
2056c4e
javadoc
nicktindall Oct 27, 2025
6e37d04
javadoc
nicktindall Oct 27, 2025
9590ce3
Merge remote-tracking branch 'origin/main' into log_on_no_balancing_p…
nicktindall Oct 27, 2025
904d597
Naming, RoutingNodes updates
nicktindall Oct 27, 2025
be13adb
Clear becameUndesiredTime when a shard goes unassigned
nicktindall Oct 27, 2025
5013031
Naming
nicktindall Oct 27, 2025
f285936
Merge remote-tracking branch 'origin/main' into log_on_no_balancing_p…
nicktindall Oct 29, 2025
047b411
Mock time provision correctly
nicktindall Oct 29, 2025
9bd1e24
Serialise became_undesired_time
nicktindall Oct 29, 2025
a8fba52
Add unit test
nicktindall Oct 29, 2025
fc9c7d0
Track undesired allocations locally
nicktindall Nov 5, 2025
713a0e1
Merge remote-tracking branch 'origin/main' into log_on_no_balancing_p…
nicktindall Nov 5, 2025
5f57c0b
Minimise change
nicktindall Nov 5, 2025
5d1d4c0
Minimise change
nicktindall Nov 5, 2025
54533f7
Tidy
nicktindall Nov 5, 2025
2d3f10f
Tidy
nicktindall Nov 5, 2025
ba678a9
Reduce default limit
nicktindall Nov 5, 2025
6330ba9
clear -> removeTracking
nicktindall Nov 5, 2025
f797014
Merge branch 'main' into log_on_no_balancing_progress
nicktindall Nov 5, 2025
207dd0d
Update server/src/main/java/org/elasticsearch/cluster/routing/allocat…
nicktindall Nov 5, 2025
6d5d2fa
Add separate setting for logging interval, reduce default to 5 minutes.
nicktindall Nov 5, 2025
0af8050
Check for capacity before we check for existing record
nicktindall Nov 5, 2025
6db1bc3
Discard excess tracking if the limit is reduced
nicktindall Nov 6, 2025
5fd0770
Fix test after log message changed
nicktindall Nov 6, 2025
7dcb690
Default max to track to zero
nicktindall Nov 6, 2025
ef8f45b
Add more unit tests
nicktindall Nov 6, 2025
19b7475
Make resilient to metadata changes
nicktindall Nov 6, 2025
ed7caa4
Explicitly configure max tracking in test
nicktindall Nov 6, 2025
53f95c0
de-dupe default time value
nicktindall Nov 6, 2025
be8a38b
Remove conditional and leave assertion
nicktindall Nov 6, 2025
061e2ad
Reduce maximum max-to-track
nicktindall Nov 6, 2025
17fd729
Merge branch 'main' into log_on_no_balancing_progress
nicktindall Nov 6, 2025
566f5d5
Give duration threshold minimum of one minute
nicktindall Nov 6, 2025
655e352
Use ordered map to quickly determine earliest entry
nicktindall Nov 6, 2025
feaca10
Add test cases where there is no desired balance
nicktindall Nov 6, 2025
d532523
Merge branch 'main' into log_on_no_balancing_progress
nicktindall Nov 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

import com.carrotsearch.hppc.ObjectLongHashMap;
import com.carrotsearch.hppc.ObjectLongMap;
import com.carrotsearch.hppc.procedures.LongProcedure;
import com.carrotsearch.hppc.procedures.ObjectLongProcedure;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -30,12 +32,13 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.time.TimeProvider;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.gateway.PriorityComparator;
import org.elasticsearch.index.IndexVersions;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Comparator;
import java.util.Iterator;
Expand Down Expand Up @@ -79,21 +82,38 @@ public class DesiredBalanceReconciler {
Setting.Property.NodeScope
);

/**
* Warning logs will be periodically written if we see a shard that's been unable to be allocated for this long
*/
public static final Setting<TimeValue> IMMOVABLE_SHARD_LOG_THRESHOLD_SETTING = Setting.timeSetting(
"cluster.routing.allocation.desired_balance.immovable_shard_logging.threshold",
TimeValue.timeValueMinutes(5),
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

private final TimeProvider timeProvider;
private final FrequencyCappedAction undesiredAllocationLogInterval;
private final FrequencyCappedAction immovableShardsLogInterval;
private double undesiredAllocationsLogThreshold;
private final NodeAllocationOrdering allocationOrdering = new NodeAllocationOrdering();
private final NodeAllocationOrdering moveOrdering = new NodeAllocationOrdering();

public DesiredBalanceReconciler(ClusterSettings clusterSettings, ThreadPool threadPool) {
this.undesiredAllocationLogInterval = new FrequencyCappedAction(
threadPool.relativeTimeInMillisSupplier(),
TimeValue.timeValueMinutes(5)
);
clusterSettings.initializeAndWatch(UNDESIRED_ALLOCATIONS_LOG_INTERVAL_SETTING, this.undesiredAllocationLogInterval::setMinInterval);
private volatile TimeValue immovableShardThreshold;
private final ObjectLongHashMap<ShardRouting> immovableShards = new ObjectLongHashMap<>();

public DesiredBalanceReconciler(ClusterSettings clusterSettings, TimeProvider timeProvider) {
this.timeProvider = timeProvider;
this.undesiredAllocationLogInterval = new FrequencyCappedAction(timeProvider::relativeTimeInMillis, TimeValue.timeValueMinutes(5));
this.immovableShardsLogInterval = new FrequencyCappedAction(timeProvider::relativeTimeInMillis, TimeValue.ZERO);
clusterSettings.initializeAndWatch(UNDESIRED_ALLOCATIONS_LOG_INTERVAL_SETTING, value -> {
this.undesiredAllocationLogInterval.setMinInterval(value);
this.immovableShardsLogInterval.setMinInterval(value);
});
clusterSettings.initializeAndWatch(
UNDESIRED_ALLOCATIONS_LOG_THRESHOLD_SETTING,
value -> this.undesiredAllocationsLogThreshold = value
);
clusterSettings.initializeAndWatch(IMMOVABLE_SHARD_LOG_THRESHOLD_SETTING, value -> this.immovableShardThreshold = value);
}

/**
Expand All @@ -114,6 +134,7 @@ public DesiredBalanceMetrics.AllocationStats reconcile(DesiredBalance desiredBal
public void clear() {
allocationOrdering.clear();
moveOrdering.clear();
immovableShards.clear();
}

/**
Expand Down Expand Up @@ -513,6 +534,7 @@ private void moveShards() {
final var moveTarget = findRelocationTarget(shardRouting, assignment.nodeIds());
if (moveTarget != null) {
logger.debug("Moving shard {} from {} to {}", shardRouting.shardId(), shardRouting.currentNodeId(), moveTarget.getId());
immovableShards.remove(shardRouting);
routingNodes.relocateShard(
shardRouting,
moveTarget.getId(),
Expand Down Expand Up @@ -579,8 +601,10 @@ private DesiredBalanceMetrics.AllocationStats balance() {
continue;
}

final var rebalanceTarget = findRelocationTarget(shardRouting, assignment.nodeIds(), this::decideCanAllocate);
final var rebalanceDecision = findRelocationTarget(shardRouting, assignment.nodeIds(), this::decideCanAllocate);
final var rebalanceTarget = rebalanceDecision.chosenNode();
if (rebalanceTarget != null) {
immovableShards.remove(shardRouting);
logger.debug(
"Rebalancing shard {} from {} to {}",
shardRouting.shardId(),
Expand All @@ -597,9 +621,17 @@ private DesiredBalanceMetrics.AllocationStats balance() {
);
iterator.dePrioritizeNode(shardRouting.currentNodeId());
moveOrdering.recordAllocation(shardRouting.currentNodeId());
} else {
// Start tracking this shard as immovable if we are not already, we're not interested in shards that are THROTTLED
if (rebalanceDecision.bestDecision() == null || rebalanceDecision.bestDecision() == Decision.NO) {
if (immovableShards.containsKey(shardRouting) == false) {
immovableShards.put(shardRouting, timeProvider.relativeTimeInMillis());
}
}
}
}

maybeLogImmovableShardsWarning();
maybeLogUndesiredAllocationsWarning(totalAllocations, undesiredAllocationsExcludingShuttingDownNodes, routingNodes.size());
return new DesiredBalanceMetrics.AllocationStats(
unassignedShards,
Expand All @@ -616,6 +648,51 @@ private DesiredBalanceMetrics.AllocationStats balance() {
);
}

/**
* If there are shards that have been immovable for a long time, log a warning
*/
private void maybeLogImmovableShardsWarning() {
final long currentTimeMillis = timeProvider.relativeTimeInMillis();
if (currentTimeMillis - oldestImmovableShardTimestamp() > immovableShardThreshold.millis()) {
immovableShardsLogInterval.maybeExecute(this::logDecisionsForImmovableShardsOverThreshold);
}
}

private void logDecisionsForImmovableShardsOverThreshold() {
final long currentTimeMillis = timeProvider.relativeTimeInMillis();
immovableShards.forEach((ObjectLongProcedure<? super ShardRouting>) (shardRouting, immovableSinceMillis) -> {
final long immovableDurationMs = currentTimeMillis - immovableSinceMillis;
if (immovableDurationMs > immovableShardThreshold.millis()) {
logImmovableShardDetails(shardRouting, TimeValue.timeValueMillis(immovableDurationMs));
}
});
}

private void logImmovableShardDetails(ShardRouting shardRouting, TimeValue immovableDuration) {
final RoutingAllocation.DebugMode originalDebugMode = allocation.getDebugMode();
allocation.setDebugMode(RoutingAllocation.DebugMode.EXCLUDE_YES_DECISIONS);
try {
final var assignment = desiredBalance.getAssignment(shardRouting.shardId());
logger.warn("Shard {} has been immovable for {}", shardRouting.shardId(), immovableDuration);
for (final var nodeId : assignment.nodeIds()) {
final var decision = allocation.deciders().canAllocate(shardRouting, routingNodes.node(nodeId), allocation);
logger.warn("Shard [{}] cannot be allocated on node [{}]: {}", shardRouting.shardId(), nodeId, decision);
}
} finally {
allocation.setDebugMode(originalDebugMode);
}
}

private long oldestImmovableShardTimestamp() {
long[] oldestTimestamp = { Long.MAX_VALUE };
immovableShards.values().forEach((LongProcedure) lc -> {
if (lc < oldestTimestamp[0]) {
oldestTimestamp[0] = lc;
}
});
return oldestTimestamp[0];
}

private void maybeLogUndesiredAllocationsWarning(int totalAllocations, int undesiredAllocations, int nodeCount) {
// more shards than cluster can relocate with one reroute
final boolean nonEmptyRelocationBacklog = undesiredAllocations > 2L * nodeCount;
Expand All @@ -634,24 +711,25 @@ private void maybeLogUndesiredAllocationsWarning(int totalAllocations, int undes
}

private DiscoveryNode findRelocationTarget(final ShardRouting shardRouting, Set<String> desiredNodeIds) {
final var moveDecision = findRelocationTarget(shardRouting, desiredNodeIds, this::decideCanAllocate);
final var moveDecision = findRelocationTarget(shardRouting, desiredNodeIds, this::decideCanAllocate).chosenNode();
if (moveDecision != null) {
return moveDecision;
}

final var shardsOnReplacedNode = allocation.metadata().nodeShutdowns().contains(shardRouting.currentNodeId(), REPLACE);
if (shardsOnReplacedNode) {
return findRelocationTarget(shardRouting, desiredNodeIds, this::decideCanForceAllocateForVacate);
return findRelocationTarget(shardRouting, desiredNodeIds, this::decideCanForceAllocateForVacate).chosenNode();
}
return null;
}

private DiscoveryNode findRelocationTarget(
private DecisionAndResult findRelocationTarget(
ShardRouting shardRouting,
Set<String> desiredNodeIds,
BiFunction<ShardRouting, RoutingNode, Decision> canAllocateDecider
) {
DiscoveryNode chosenNode = null;
Decision bestDecision = null;
for (final var nodeId : desiredNodeIds) {
// TODO consider ignored nodes here too?
if (nodeId.equals(shardRouting.currentNodeId())) {
Expand All @@ -669,6 +747,7 @@ private DiscoveryNode findRelocationTarget(
// better to offload shards first.
if (decision.type() == Decision.Type.YES) {
chosenNode = node.node();
bestDecision = decision;
// As soon as we get any YES, we return it.
break;
} else if (decision.type() == Decision.Type.NOT_PREFERRED && chosenNode == null) {
Expand All @@ -677,12 +756,21 @@ private DiscoveryNode findRelocationTarget(
// choose and the shard cannot remain where it is, we accept not-preferred. NOT_PREFERRED is essentially a YES for
// reconciliation.
chosenNode = node.node();
bestDecision = decision;
}
}

return chosenNode;
return new DecisionAndResult(bestDecision, chosenNode);
}

/**
* An allocation decision result
*
* @param bestDecision The best decision we saw from the nodes attempted (can be null if no nodes were attempted)
* @param chosenNode The node to allocate the shard to (can be null if no suitable nodes were found)
*/
private record DecisionAndResult(@Nullable Decision bestDecision, @Nullable DiscoveryNode chosenNode) {}

private Decision decideCanAllocate(ShardRouting shardRouting, RoutingNode target) {
assert target != null : "Target node is not found";
return allocation.deciders().canAllocate(shardRouting, target, allocation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ public void apply(Settings value, Settings current, Settings previous) {
DesiredBalanceComputer.MAX_BALANCE_COMPUTATION_TIME_DURING_INDEX_CREATION_SETTING,
DesiredBalanceReconciler.UNDESIRED_ALLOCATIONS_LOG_INTERVAL_SETTING,
DesiredBalanceReconciler.UNDESIRED_ALLOCATIONS_LOG_THRESHOLD_SETTING,
DesiredBalanceReconciler.IMMOVABLE_SHARD_LOG_THRESHOLD_SETTING,
BreakerSettings.CIRCUIT_BREAKER_LIMIT_SETTING,
BreakerSettings.CIRCUIT_BREAKER_OVERHEAD_SETTING,
BreakerSettings.CIRCUIT_BREAKER_TYPE,
Expand Down