Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
affdae5
Log when no progress is made towards the desired balance for some time
nicktindall Oct 23, 2025
5249be9
Track immovable shards individually
nicktindall Oct 24, 2025
14772ab
Naming, initial delay
nicktindall Oct 24, 2025
d8186d9
Remove dead code
nicktindall Oct 24, 2025
ad5c028
Tidy
nicktindall Oct 24, 2025
7a98137
Javadoc
nicktindall Oct 24, 2025
e84efa6
Clear immovable shard in moveShards
nicktindall Oct 24, 2025
c901665
Merge branch 'main' into log_on_no_balancing_progress
nicktindall Oct 24, 2025
4194739
Try tracking undesired state in shard routing
nicktindall Oct 27, 2025
f050316
Fix setting names
nicktindall Oct 27, 2025
c7b14fa
Naming
nicktindall Oct 27, 2025
5cf44fc
Naming
nicktindall Oct 27, 2025
1472ec2
ShardRouting#equals/hashCode
nicktindall Oct 27, 2025
7652369
Javadoc
nicktindall Oct 27, 2025
7996801
Fix logic to handle Long.MAX_VALUE
nicktindall Oct 27, 2025
2056c4e
javadoc
nicktindall Oct 27, 2025
6e37d04
javadoc
nicktindall Oct 27, 2025
9590ce3
Merge remote-tracking branch 'origin/main' into log_on_no_balancing_p…
nicktindall Oct 27, 2025
904d597
Naming, RoutingNodes updates
nicktindall Oct 27, 2025
be13adb
Clear becameUndesiredTime when a shard goes unassigned
nicktindall Oct 27, 2025
5013031
Naming
nicktindall Oct 27, 2025
f285936
Merge remote-tracking branch 'origin/main' into log_on_no_balancing_p…
nicktindall Oct 29, 2025
047b411
Mock time provision correctly
nicktindall Oct 29, 2025
9bd1e24
Serialise became_undesired_time
nicktindall Oct 29, 2025
a8fba52
Add unit test
nicktindall Oct 29, 2025
fc9c7d0
Track undesired allocations locally
nicktindall Nov 5, 2025
713a0e1
Merge remote-tracking branch 'origin/main' into log_on_no_balancing_p…
nicktindall Nov 5, 2025
5f57c0b
Minimise change
nicktindall Nov 5, 2025
5d1d4c0
Minimise change
nicktindall Nov 5, 2025
54533f7
Tidy
nicktindall Nov 5, 2025
2d3f10f
Tidy
nicktindall Nov 5, 2025
ba678a9
Reduce default limit
nicktindall Nov 5, 2025
6330ba9
clear -> removeTracking
nicktindall Nov 5, 2025
f797014
Merge branch 'main' into log_on_no_balancing_progress
nicktindall Nov 5, 2025
207dd0d
Update server/src/main/java/org/elasticsearch/cluster/routing/allocat…
nicktindall Nov 5, 2025
6d5d2fa
Add separate setting for logging interval, reduce default to 5 minutes.
nicktindall Nov 5, 2025
0af8050
Check for capacity before we check for existing record
nicktindall Nov 5, 2025
6db1bc3
Discard excess tracking if the limit is reduced
nicktindall Nov 6, 2025
5fd0770
Fix test after log message changed
nicktindall Nov 6, 2025
7dcb690
Default max to track to zero
nicktindall Nov 6, 2025
ef8f45b
Add more unit tests
nicktindall Nov 6, 2025
19b7475
Make resilient to metadata changes
nicktindall Nov 6, 2025
ed7caa4
Explicitly configure max tracking in test
nicktindall Nov 6, 2025
53f95c0
de-dupe default time value
nicktindall Nov 6, 2025
be8a38b
Remove conditional and leave assertion
nicktindall Nov 6, 2025
061e2ad
Reduce maximum max-to-track
nicktindall Nov 6, 2025
17fd729
Merge branch 'main' into log_on_no_balancing_progress
nicktindall Nov 6, 2025
566f5d5
Give duration threshold minimum of one minute
nicktindall Nov 6, 2025
655e352
Use ordered map to quickly determine earliest entry
nicktindall Nov 6, 2025
feaca10
Add test cases where there is no desired balance
nicktindall Nov 6, 2025
d532523
Merge branch 'main' into log_on_no_balancing_progress
nicktindall Nov 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,18 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.time.TimeProvider;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.gateway.PriorityComparator;
import org.elasticsearch.index.IndexVersions;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Comparator;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand All @@ -54,6 +56,7 @@
public class DesiredBalanceReconciler {

private static final Logger logger = LogManager.getLogger(DesiredBalanceReconciler.class);
private static final int IMMOVABLE_SHARDS_SAMPLE_SIZE = 3;

/**
* The minimum interval that log messages will be written if the number of undesired shard allocations reaches the percentage of total
Expand All @@ -79,21 +82,45 @@ public class DesiredBalanceReconciler {
Setting.Property.NodeScope
);

/**
* Warning logs will be periodically written if we haven't seen any progress towards balance in the time specified
*/
public static final Setting<TimeValue> NO_PROGRESS_TOWARDS_BALANCE_LOG_THRESHOLD_SETTING = Setting.timeSetting(
"cluster.routing.allocation.desired_balance.no_progress_towards_balance_logging.threshold",
TimeValue.timeValueMinutes(5),
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

private final TimeProvider timeProvider;
private final AtomicLong lastProgressTowardsBalanceTimestampMillis = new AtomicLong(0L);
private final FrequencyCappedAction undesiredAllocationLogInterval;
private final FrequencyCappedAction noProgressTowardsBalanceLogInterval;
private double undesiredAllocationsLogThreshold;
private final NodeAllocationOrdering allocationOrdering = new NodeAllocationOrdering();
private final NodeAllocationOrdering moveOrdering = new NodeAllocationOrdering();
private volatile TimeValue noProgressTowardsBalanceThreshold;

public DesiredBalanceReconciler(ClusterSettings clusterSettings, ThreadPool threadPool) {
this.undesiredAllocationLogInterval = new FrequencyCappedAction(
threadPool.relativeTimeInMillisSupplier(),
public DesiredBalanceReconciler(ClusterSettings clusterSettings, TimeProvider timeProvider) {
this.timeProvider = timeProvider;
this.undesiredAllocationLogInterval = new FrequencyCappedAction(timeProvider::relativeTimeInMillis, TimeValue.timeValueMinutes(5));
this.noProgressTowardsBalanceLogInterval = new FrequencyCappedAction(
timeProvider::relativeTimeInMillis,
TimeValue.timeValueMinutes(5)
);
clusterSettings.initializeAndWatch(UNDESIRED_ALLOCATIONS_LOG_INTERVAL_SETTING, this.undesiredAllocationLogInterval::setMinInterval);
this.lastProgressTowardsBalanceTimestampMillis.set(timeProvider.relativeTimeInMillis());
clusterSettings.initializeAndWatch(UNDESIRED_ALLOCATIONS_LOG_INTERVAL_SETTING, value -> {
this.undesiredAllocationLogInterval.setMinInterval(value);
this.noProgressTowardsBalanceLogInterval.setMinInterval(value);
});
clusterSettings.initializeAndWatch(
UNDESIRED_ALLOCATIONS_LOG_THRESHOLD_SETTING,
value -> this.undesiredAllocationsLogThreshold = value
);
clusterSettings.initializeAndWatch(
NO_PROGRESS_TOWARDS_BALANCE_LOG_THRESHOLD_SETTING,
value -> this.noProgressTowardsBalanceThreshold = value
);
}

/**
Expand Down Expand Up @@ -532,6 +559,7 @@ private DesiredBalanceMetrics.AllocationStats balance() {
int undesiredAllocationsExcludingShuttingDownNodes = 0;
final ObjectLongMap<ShardRouting.Role> totalAllocationsByRole = new ObjectLongHashMap<>();
final ObjectLongMap<ShardRouting.Role> undesiredAllocationsExcludingShuttingDownNodesByRole = new ObjectLongHashMap<>();
final Set<ShardRouting> undesiredShardSample = Sets.newHashSetWithExpectedSize(IMMOVABLE_SHARDS_SAMPLE_SIZE);

// Iterate over all started shards and try to move any which are on undesired nodes. In the presence of throttling shard
// movements, the goal of this iteration order is to achieve a fairer movement of shards from the nodes that are offloading the
Expand Down Expand Up @@ -587,6 +615,7 @@ private DesiredBalanceMetrics.AllocationStats balance() {
shardRouting.currentNodeId(),
rebalanceTarget.getId()
);
lastProgressTowardsBalanceTimestampMillis.set(timeProvider.relativeTimeInMillis());

routingNodes.relocateShard(
shardRouting,
Expand All @@ -597,9 +626,14 @@ private DesiredBalanceMetrics.AllocationStats balance() {
);
iterator.dePrioritizeNode(shardRouting.currentNodeId());
moveOrdering.recordAllocation(shardRouting.currentNodeId());
} else {
if (undesiredShardSample.size() < IMMOVABLE_SHARDS_SAMPLE_SIZE) {
undesiredShardSample.add(shardRouting);
}
}
}

maybeLogProgressTowardsDesiredWarning(undesiredAllocationsExcludingShuttingDownNodes, undesiredShardSample);
maybeLogUndesiredAllocationsWarning(totalAllocations, undesiredAllocationsExcludingShuttingDownNodes, routingNodes.size());
return new DesiredBalanceMetrics.AllocationStats(
unassignedShards,
Expand All @@ -616,6 +650,49 @@ private DesiredBalanceMetrics.AllocationStats balance() {
);
}

/**
* If there are undesired allocations, and we have made no recent progress towards the desired balance, log a warning
*
* @param undesiredAllocationsExcludingShuttingDownNodes The number of undesired allocations (excluding shutting down nodes)
* @param immovableShardsSample A sample of the shards we found to be immovable
*/
private void maybeLogProgressTowardsDesiredWarning(
int undesiredAllocationsExcludingShuttingDownNodes,
Set<ShardRouting> immovableShardsSample
) {
// There are no undesired allocations, reset the last-progress timestamp
if (undesiredAllocationsExcludingShuttingDownNodes == 0) {
lastProgressTowardsBalanceTimestampMillis.set(timeProvider.relativeTimeInMillis());
return;
}

final long millisecondsSinceLastProgress = timeProvider.relativeTimeInMillis() - lastProgressTowardsBalanceTimestampMillis
.get();
if (millisecondsSinceLastProgress > noProgressTowardsBalanceThreshold.millis()) {
noProgressTowardsBalanceLogInterval.maybeExecute(() -> {
TimeValue timeSinceProgress = TimeValue.timeValueMillis(millisecondsSinceLastProgress);
logger.warn(
"No progress has been made towards desired balance for [{}], this exceeds the warn threshold of [{}]",
timeSinceProgress,
noProgressTowardsBalanceThreshold
);
final RoutingAllocation.DebugMode originalDebugMode = allocation.getDebugMode();
allocation.setDebugMode(RoutingAllocation.DebugMode.EXCLUDE_YES_DECISIONS);
try {
for (final var shardRouting : immovableShardsSample) {
final var assignment = desiredBalance.getAssignment(shardRouting.shardId());
for (final var nodeId : assignment.nodeIds()) {
final var decision = allocation.deciders().canAllocate(shardRouting, routingNodes.node(nodeId), allocation);
logger.warn("Shard [{}] cannot be allocated on node [{}]: {}", shardRouting.shardId(), nodeId, decision);
}
}
} finally {
allocation.setDebugMode(originalDebugMode);
}
});
}
}

private void maybeLogUndesiredAllocationsWarning(int totalAllocations, int undesiredAllocations, int nodeCount) {
// more shards than cluster can relocate with one reroute
final boolean nonEmptyRelocationBacklog = undesiredAllocations > 2L * nodeCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ public void apply(Settings value, Settings current, Settings previous) {
DesiredBalanceComputer.MAX_BALANCE_COMPUTATION_TIME_DURING_INDEX_CREATION_SETTING,
DesiredBalanceReconciler.UNDESIRED_ALLOCATIONS_LOG_INTERVAL_SETTING,
DesiredBalanceReconciler.UNDESIRED_ALLOCATIONS_LOG_THRESHOLD_SETTING,
DesiredBalanceReconciler.NO_PROGRESS_TOWARDS_BALANCE_LOG_THRESHOLD_SETTING,
BreakerSettings.CIRCUIT_BREAKER_LIMIT_SETTING,
BreakerSettings.CIRCUIT_BREAKER_OVERHEAD_SETTING,
BreakerSettings.CIRCUIT_BREAKER_TYPE,
Expand Down