Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ public record AllocationStats(long unassignedShards, long totalAllocations, long

public record NodeWeightStats(long shardCount, double diskUsageInBytes, double writeLoad, double nodeWeight) {}

public static final DesiredBalanceMetrics NOOP = new DesiredBalanceMetrics(MeterRegistry.NOOP);

public static final String UNASSIGNED_SHARDS_METRIC_NAME = "es.allocator.desired_balance.shards.unassigned.current";
public static final String TOTAL_SHARDS_METRIC_NAME = "es.allocator.desired_balance.shards.current";
public static final String UNDESIRED_ALLOCATION_COUNT_METRIC_NAME = "es.allocator.desired_balance.allocations.undesired.current";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.UnassignedInfo.AllocationStatus;
import org.elasticsearch.cluster.routing.allocation.NodeAllocationStatsAndWeightsCalculator;
import org.elasticsearch.cluster.routing.allocation.NodeAllocationStatsAndWeightsCalculator.NodeAllocationStatsAndWeight;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceMetrics.AllocationStats;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.ClusterSettings;
Expand All @@ -36,9 +33,7 @@
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -82,16 +77,8 @@ public class DesiredBalanceReconciler {
private double undesiredAllocationsLogThreshold;
private final NodeAllocationOrdering allocationOrdering = new NodeAllocationOrdering();
private final NodeAllocationOrdering moveOrdering = new NodeAllocationOrdering();
private final DesiredBalanceMetrics desiredBalanceMetrics;
private final NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator;

public DesiredBalanceReconciler(
ClusterSettings clusterSettings,
ThreadPool threadPool,
DesiredBalanceMetrics desiredBalanceMetrics,
NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator
) {
this.desiredBalanceMetrics = desiredBalanceMetrics;

public DesiredBalanceReconciler(ClusterSettings clusterSettings, ThreadPool threadPool) {
this.undesiredAllocationLogInterval = new FrequencyCappedAction(
threadPool.relativeTimeInMillisSupplier(),
TimeValue.timeValueMinutes(5)
Expand All @@ -101,7 +88,6 @@ public DesiredBalanceReconciler(
UNDESIRED_ALLOCATIONS_LOG_THRESHOLD_SETTING,
value -> this.undesiredAllocationsLogThreshold = value
);
this.nodeAllocationStatsAndWeightsCalculator = nodeAllocationStatsAndWeightsCalculator;
}

/**
Expand All @@ -110,12 +96,13 @@ public DesiredBalanceReconciler(
* @param desiredBalance The new desired cluster shard allocation
* @param allocation Cluster state information with which to make decisions, contains routing table metadata that will be modified to
* reach the given desired balance.
* @return {@link DesiredBalanceMetrics.AllocationStats} for this round of reconciliation changes.
*/
public void reconcile(DesiredBalance desiredBalance, RoutingAllocation allocation) {
public DesiredBalanceMetrics.AllocationStats reconcile(DesiredBalance desiredBalance, RoutingAllocation allocation) {
var nodeIds = allocation.routingNodes().getAllNodeIds();
allocationOrdering.retainNodes(nodeIds);
moveOrdering.retainNodes(nodeIds);
new Reconciliation(desiredBalance, allocation).run();
return new Reconciliation(desiredBalance, allocation).run();
}

public void clear() {
Expand All @@ -135,7 +122,7 @@ private class Reconciliation {
this.routingNodes = allocation.routingNodes();
}

void run() {
DesiredBalanceMetrics.AllocationStats run() {
try (var ignored = allocation.withReconcilingFlag()) {

logger.debug("Reconciling desired balance for [{}]", desiredBalance.lastConvergedIndex());
Expand All @@ -144,13 +131,13 @@ void run() {
// no data nodes, so fail allocation to report red health
failAllocationOfNewPrimaries(allocation);
logger.trace("no nodes available, nothing to reconcile");
return;
return DesiredBalanceMetrics.EMPTY_ALLOCATION_STATS;
}

if (desiredBalance.assignments().isEmpty()) {
// no desired state yet but it is on its way and we'll reroute again when it is ready
logger.trace("desired balance is empty, nothing to reconcile");
return;
return DesiredBalanceMetrics.EMPTY_ALLOCATION_STATS;
}

// compute next moves towards current desired balance:
Expand All @@ -165,31 +152,11 @@ void run() {
moveShards();
// 3. move any other shards that are desired elsewhere
logger.trace("Reconciler#balance");
var allocationStats = balance();
DesiredBalanceMetrics.AllocationStats allocationStats = balance();

logger.debug("Reconciliation is complete");

updateDesireBalanceMetrics(allocationStats);
}
}

private void updateDesireBalanceMetrics(AllocationStats allocationStats) {
var nodesStatsAndWeights = nodeAllocationStatsAndWeightsCalculator.nodesAllocationStatsAndWeights(
allocation.metadata(),
allocation.routingNodes(),
allocation.clusterInfo(),
desiredBalance
);
Map<DiscoveryNode, NodeAllocationStatsAndWeight> filteredNodeAllocationStatsAndWeights = new HashMap<>(
nodesStatsAndWeights.size()
);
for (var nodeStatsAndWeight : nodesStatsAndWeights.entrySet()) {
var node = allocation.nodes().get(nodeStatsAndWeight.getKey());
if (node != null) {
filteredNodeAllocationStatsAndWeights.put(node, nodeStatsAndWeight.getValue());
}
return allocationStats;
}
desiredBalanceMetrics.updateMetrics(allocationStats, desiredBalance.weightsPerNode(), filteredNodeAllocationStatsAndWeights);
}

private boolean allocateUnassignedInvariant() {
Expand Down Expand Up @@ -512,7 +479,7 @@ private void moveShards() {
}
}

private AllocationStats balance() {
private DesiredBalanceMetrics.AllocationStats balance() {
if (allocation.deciders().canRebalance(allocation).type() != Decision.Type.YES) {
return DesiredBalanceMetrics.EMPTY_ALLOCATION_STATS;
}
Expand Down Expand Up @@ -581,8 +548,11 @@ private AllocationStats balance() {
}

maybeLogUndesiredAllocationsWarning(totalAllocations, undesiredAllocationsExcludingShuttingDownNodes, routingNodes.size());

return new AllocationStats(unassignedShards, totalAllocations, undesiredAllocationsExcludingShuttingDownNodes);
return new DesiredBalanceMetrics.AllocationStats(
unassignedShards,
totalAllocations,
undesiredAllocationsExcludingShuttingDownNodes
);
}

private void maybeLogUndesiredAllocationsWarning(int totalAllocations, int undesiredAllocations, int nodeCount) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.AllocationService.RerouteStrategy;
Expand All @@ -39,6 +40,7 @@

import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -87,6 +89,7 @@ public class DesiredBalanceShardsAllocator implements ShardsAllocator {
private final AtomicReference<DesiredBalance> currentDesiredBalanceRef = new AtomicReference<>(DesiredBalance.NOT_MASTER);
private volatile boolean resetCurrentDesiredBalance = false;
private final Set<String> processedNodeShutdowns = new HashSet<>();
private final NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator;
private final DesiredBalanceMetrics desiredBalanceMetrics;
/**
* Manages balancer round results in order to report on the balancer activity in a configurable manner.
Expand Down Expand Up @@ -136,17 +139,13 @@ public DesiredBalanceShardsAllocator(
NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator
) {
this.desiredBalanceMetrics = new DesiredBalanceMetrics(telemetryProvider.getMeterRegistry());
this.nodeAllocationStatsAndWeightsCalculator = nodeAllocationStatsAndWeightsCalculator;
this.balancerRoundSummaryService = new AllocationBalancingRoundSummaryService(threadPool, clusterService.getClusterSettings());
this.delegateAllocator = delegateAllocator;
this.threadPool = threadPool;
this.reconciler = reconciler;
this.desiredBalanceComputer = desiredBalanceComputer;
this.desiredBalanceReconciler = new DesiredBalanceReconciler(
clusterService.getClusterSettings(),
threadPool,
desiredBalanceMetrics,
nodeAllocationStatsAndWeightsCalculator
);
this.desiredBalanceReconciler = new DesiredBalanceReconciler(clusterService.getClusterSettings(), threadPool);
this.desiredBalanceComputation = new ContinuousComputation<>(threadPool.generic()) {

@Override
Expand Down Expand Up @@ -347,6 +346,10 @@ private BalancingRoundSummary calculateBalancingRoundSummary(DesiredBalance oldD
return new BalancingRoundSummary(DesiredBalance.shardMovements(oldDesiredBalance, newDesiredBalance));
}

/**
* Submits the desired balance to be reconciled (applies the desired changes to the routing table) and creates and publishes a new
* cluster state. The data nodes will receive and apply the new cluster state to start/move/remove shards.
*/
protected void submitReconcileTask(DesiredBalance desiredBalance) {
masterServiceTaskQueue.submitTask("reconcile-desired-balance", new ReconcileDesiredBalanceTask(desiredBalance), null);
}
Expand All @@ -357,7 +360,11 @@ protected void reconcile(DesiredBalance desiredBalance, RoutingAllocation alloca
} else {
logger.debug("Reconciling desired balance for [{}]", desiredBalance.lastConvergedIndex());
}
recordTime(cumulativeReconciliationTime, () -> desiredBalanceReconciler.reconcile(desiredBalance, allocation));
recordTime(cumulativeReconciliationTime, () -> {
DesiredBalanceMetrics.AllocationStats allocationStats = desiredBalanceReconciler.reconcile(desiredBalance, allocation);
updateDesireBalanceMetrics(desiredBalance, allocation, allocationStats);
});

if (logger.isTraceEnabled()) {
logger.trace("Reconciled desired balance: {}", desiredBalance);
} else {
Expand Down Expand Up @@ -391,6 +398,28 @@ public void resetDesiredBalance() {
resetCurrentDesiredBalance = true;
}

private void updateDesireBalanceMetrics(
DesiredBalance desiredBalance,
RoutingAllocation routingAllocation,
DesiredBalanceMetrics.AllocationStats allocationStats
) {
var nodesStatsAndWeights = nodeAllocationStatsAndWeightsCalculator.nodesAllocationStatsAndWeights(
routingAllocation.metadata(),
routingAllocation.routingNodes(),
routingAllocation.clusterInfo(),
desiredBalance
);
Map<DiscoveryNode, NodeAllocationStatsAndWeightsCalculator.NodeAllocationStatsAndWeight> filteredNodeAllocationStatsAndWeights =
new HashMap<>(nodesStatsAndWeights.size());
for (var nodeStatsAndWeight : nodesStatsAndWeights.entrySet()) {
var node = routingAllocation.nodes().get(nodeStatsAndWeight.getKey());
if (node != null) {
filteredNodeAllocationStatsAndWeights.put(node, nodeStatsAndWeight.getValue());
}
}
desiredBalanceMetrics.updateMetrics(allocationStats, desiredBalance.weightsPerNode(), filteredNodeAllocationStatsAndWeights);
}

public DesiredBalanceStats getStats() {
return new DesiredBalanceStats(
Math.max(currentDesiredBalanceRef.get().lastConvergedIndex(), 0L),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1212,12 +1212,7 @@ public void testRebalanceDoesNotCauseHotSpots() {
new ConcurrentRebalanceAllocationDecider(clusterSettings),
new ThrottlingAllocationDecider(clusterSettings) };

var reconciler = new DesiredBalanceReconciler(
clusterSettings,
new DeterministicTaskQueue().getThreadPool(),
DesiredBalanceMetrics.NOOP,
EMPTY_NODE_ALLOCATION_STATS
);
var reconciler = new DesiredBalanceReconciler(clusterSettings, new DeterministicTaskQueue().getThreadPool());

var totalOutgoingMoves = new HashMap<String, AtomicInteger>();
for (int i = 0; i < numberOfNodes; i++) {
Expand Down Expand Up @@ -1299,12 +1294,7 @@ public void testShouldLogOnTooManyUndesiredAllocations() {
final var timeInMillisSupplier = new AtomicLong();
when(threadPool.relativeTimeInMillisSupplier()).thenReturn(timeInMillisSupplier::incrementAndGet);

var reconciler = new DesiredBalanceReconciler(
createBuiltInClusterSettings(),
threadPool,
DesiredBalanceMetrics.NOOP,
EMPTY_NODE_ALLOCATION_STATS
);
var reconciler = new DesiredBalanceReconciler(createBuiltInClusterSettings(), threadPool);
final long initialDelayInMillis = TimeValue.timeValueMinutes(5).getMillis();
timeInMillisSupplier.addAndGet(randomLongBetween(initialDelayInMillis, 2 * initialDelayInMillis));

Expand Down Expand Up @@ -1356,8 +1346,7 @@ public void testShouldLogOnTooManyUndesiredAllocations() {
private static void reconcile(RoutingAllocation routingAllocation, DesiredBalance desiredBalance) {
final var threadPool = mock(ThreadPool.class);
when(threadPool.relativeTimeInMillisSupplier()).thenReturn(new AtomicLong()::incrementAndGet);
new DesiredBalanceReconciler(createBuiltInClusterSettings(), threadPool, DesiredBalanceMetrics.NOOP, EMPTY_NODE_ALLOCATION_STATS)
.reconcile(desiredBalance, routingAllocation);
new DesiredBalanceReconciler(createBuiltInClusterSettings(), threadPool).reconcile(desiredBalance, routingAllocation);
}

private static boolean isReconciled(RoutingNode node, DesiredBalance balance) {
Expand Down