Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,7 @@ public ClusterState applyStartedShards(ClusterState clusterState, List<ShardRout
}

private static ClusterState buildResultAndLogHealthChange(ClusterState oldState, RoutingAllocation allocation, String reason) {
final RoutingTable oldRoutingTable = oldState.routingTable();
final RoutingNodes newRoutingNodes = allocation.routingNodes();
final RoutingTable newRoutingTable = RoutingTable.of(newRoutingNodes);
final RoutingTable newRoutingTable = RoutingTable.of(allocation.routingNodes());
final Metadata newMetadata = allocation.updateMetadataWithRoutingChanges(newRoutingTable);
assert newRoutingTable.validate(newMetadata); // validates the routing table is coherent with the cluster state metadata

Expand Down Expand Up @@ -271,8 +269,7 @@ public ClusterState applyFailedShards(
}

/**
* unassigned an shards that are associated with nodes that are no longer part of the cluster, potentially promoting replicas
* if needed.
* Unassign any shards that are associated with nodes that are no longer part of the cluster, potentially promoting replicas if needed.
*/
public ClusterState disassociateDeadNodes(ClusterState clusterState, boolean reroute, String reason) {
RoutingAllocation allocation = createRoutingAllocation(clusterState, currentNanoTime());
Expand All @@ -284,7 +281,7 @@ public ClusterState disassociateDeadNodes(ClusterState clusterState, boolean rer
clusterState = buildResultAndLogHealthChange(clusterState, allocation, reason);
}
if (reroute) {
return reroute(clusterState, reason, rerouteCompletionIsNotRequired());// this is not triggered by a user request
return reroute(clusterState, reason, rerouteCompletionIsNotRequired() /* this is not triggered by a user request */);
} else {
return clusterState;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.cluster.routing.allocation.allocator;

import java.util.HashMap;
import java.util.LinkedList;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
* Manages the lifecycle of {@link BalancingSummary} data structures tracking allocation balancing round results. There are many balancing
* rounds and this class manages their reporting.
*
* Summarizing balancer rounds and reporting the results will provide information with which to do a cost-benefit analysis of the work that
* the allocation rebalancing performs.
*
* TODO (Dianna): how to handle master step down. Probably refuse to take further add*() calls, but return any previous results that have
* not yet been drained for reporting
*/
public class AllocationBalancingRoundSummaryService {

public static final AllocationBalancingRoundSummaryService NOOP = new AllocationBalancingRoundSummaryService();

/** Value to return if no balancing rounds have occurred in the requested time period. */
public static final BalancingSummary.CombinedClusterBalancingRoundSummary EMPTY_RESULTS =
new BalancingSummary.CombinedClusterBalancingRoundSummary(
new LinkedList<>(),
new BalancingSummary.ClusterShardAssignments(0L, 0L, 0L, 0L, 0L, 0L),
new HashMap<>()
);

/**
* A concurrency-safe list of balancing round summaries. Balancer rounds are run and added here serially, so the queue will naturally
* progress from newer to older results.
*/
private ConcurrentLinkedQueue<BalancingSummary.BalancingRoundSummary> summaries = new ConcurrentLinkedQueue<>();

/**
* Returns a combined summary of all unreported allocation round summaries: may summarize a single balancer round, multiple, or none.
*
* @return returns {@link #EMPTY_RESULTS} if there are no unreported balancing rounds.
*/
public BalancingSummary.CombinedClusterBalancingRoundSummary combineSummaries() {
// TODO: implement
return EMPTY_RESULTS;
}

public void addBalancerRoundSummary(BalancingSummary.BalancingRoundSummary summary) {
summaries.add(summary);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.cluster.routing.allocation.allocator;

/**
* Data structure to pass allocation statistics between the desired balance classes.
*/
public record BalancingRoundStats(
long startTime,
long endTime,
long newlyAssignedShards,
long unassignedShards,
long totalAllocations,
long undesiredAllocationsExcludingShuttingDownNodes,
boolean executedReconciliation
) {

public static final BalancingRoundStats EMPTY_BALANCING_ROUND_STATS = new BalancingRoundStats(-1, -1, -1, -1, -1, -1, false);

public static class Builder {
private long startTime = 0;
private long endTime = 0;
private long newlyAssignedShards = 0;
private long unassignedShards = 0;
long totalAllocations = 0;
long undesiredAllocationsExcludingShuttingDownNodes = 0;
boolean executedReconciliation = false;

public BalancingRoundStats build() {
return new BalancingRoundStats(
startTime,
endTime,
newlyAssignedShards,
unassignedShards,
totalAllocations,
undesiredAllocationsExcludingShuttingDownNodes,

executedReconciliation
);
}

public void setStartTime(long startTime) {
this.startTime = startTime;
}

public void setEndTime(long endTime) {
this.endTime = endTime;
}

public void incNewlyAssignedShards() {
++this.newlyAssignedShards;
}

public void setUnassignedShards(long numUnassignedShards) {
this.unassignedShards = numUnassignedShards;
}

public void incTotalAllocations() {
++this.totalAllocations;
}

public void incUndesiredAllocationsExcludingShuttingDownNodes() {
++this.undesiredAllocationsExcludingShuttingDownNodes;
}

public long getStartTime() {
return this.startTime;
}

public long getEndTime() {
return this.endTime;
}

public long getTotalAllocations() {
return this.totalAllocations;
}

public long getUndesiredAllocationsExcludingShuttingDownNodes() {
return this.undesiredAllocationsExcludingShuttingDownNodes;
}

public void setExecutedReconciliation() {
this.executedReconciliation = executedReconciliation;
}

};

};
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.cluster.routing.allocation.allocator;

import org.elasticsearch.core.Tuple;

import java.util.List;
import java.util.Map;

/**
* Data structures defining the results of allocation balancing rounds.
*/
public class BalancingSummary {

/**
* Holds combined {@link BalancingRoundSummary} results. Essentially holds a list of the balancing events and the summed up changes
* across all those events: what allocation work was done across some period of time.
*
* @param events A list of all the cluster events that started the balancing rounds and time duration for computation + reconciliation
* of each event.
* @param shardAssignments The sum of all shard movements across all combined balancing rounds.
* @param nodeChanges The total change stats per node in the cluster from the earliest balancing round to the latest one.
*/
record CombinedClusterBalancingRoundSummary(
List<Tuple<Long, ClusterRebalancingEvent>> events,
ClusterShardAssignments shardAssignments,
Map<String, IndividualNodeRebalancingChangeStats> nodeChanges
) {};

/**
* Summarizes the impact to the cluster as a result of a rebalancing round.
*
* @param eventStartTime Time at which the desired balance calculation began due to a cluster event.
* @param eventEndTime Time at which the new desired balance calculation was finished.
* @param event Reports what provoked the rebalancing round. The rebalancer only runs when requested, not on a periodic basis.
* @param computationFinishReason Whether the balancing round converged to a final allocation, or exiting early for some reason.
* @param shardMovements Lists the total number of shard moves, and breaks down the total into number shards moved by category,
* like node shutdown
* @param nodeChanges A Map of node name to {@link IndividualNodeRebalancingChangeStats} to describe what each node gained and how much
* work each node performed for the balancing round.
*/
record BalancingRoundSummary(
long eventStartTime,
long eventEndTime,
ClusterRebalancingEvent event,
DesiredBalance.ComputationFinishReason computationFinishReason,
ClusterShardAssignments shardMovements,
Map<String, IndividualNodeRebalancingChangeStats> nodeChanges
) {
@Override
public String toString() {
return "BalancingRoundSummary{"
+ "ClusterRebalancingEvent="
+ event
+ ", ClusterShardMovements="
+ shardMovements
+ ", NodeChangeStats={"
+ nodeChanges
+ "}"
+ '}';
}
};

/**
* General cost-benefit information on the node-level. Describes how each node was improved by a balancing round, and how much work that
* node did to achieve the shard rebalancing.
*
* @param nodeWeightBeforeRebalancing
* @param nodeWeightAfterRebalancing
* @param dataMovedToNodeInMB
* @param dataMovedAwayFromNodeInMB
*/
record IndividualNodeRebalancingChangeStats(
float nodeWeightBeforeRebalancing,
float nodeWeightAfterRebalancing,
long dataMovedToNodeInMB,
long dataMovedAwayFromNodeInMB
) {
@Override
public String toString() {
return "IndividualNodeRebalancingChangeStats{"
+ "nodeWeightBeforeRebalancing="
+ nodeWeightBeforeRebalancing
+ ", nodeWeightAfterRebalancing="
+ nodeWeightAfterRebalancing
+ ", dataMovedToNodeInMB="
+ dataMovedToNodeInMB
+ ", dataMovedAwayFromNodeInMB="
+ dataMovedAwayFromNodeInMB
+ '}';
}
};

/**
* Tracks and summarizes the more granular reasons why shards are moved between nodes.
*
* @param numShardsMoved total number of shard moves between nodes
* @param numAllocationDeciderForcedShardMoves total number of shards that must be moved because they violate an AllocationDecider rule
* @param numRebalancingShardMoves total number of shards moved to improve cluster balance and are not otherwise required to move
* @param numShutdownForcedShardMoves total number of shards that must move off of a node because it is shutting down
* @param numNewlyAssignedShardsNotMoved
* @param numStuckShards total number of shards violating an AllocationDecider on their current node and on every other cluster node
*/
public record ClusterShardAssignments(
long numShardsMoved,
long numAllocationDeciderForcedShardMoves,
long numRebalancingShardMoves,
long numShutdownForcedShardMoves,
long numNewlyAssignedShardsNotMoved,
long numStuckShards
) {
@Override
public String toString() {
return "ClusterShardMovements{"
+ "numShardsMoved="
+ numShardsMoved
+ ", numAllocationDeciderForcedShardMoves="
+ numAllocationDeciderForcedShardMoves
+ ", numRebalancingShardMoves="
+ numRebalancingShardMoves
+ ", numShutdownForcedShardMoves="
+ numShutdownForcedShardMoves
+ ", numStuckShards="
+ numStuckShards
+ '}';
}
};

/**
* The cluster event that initiated a rebalancing round. This will tell us what initiated the balancer doing some amount of rebalancing
* work.
*/
enum ClusterRebalancingEvent {
// TODO (Dianna): go through the reroute methods and identify the causes -- many reroute methods accept a 'reason' string -- and
// replace them with this enum to be saved later in a balancing summary.
RerouteCommand,
IndexCreation,
IndexDeletion,
NodeShutdownAndRemoval,
NewNodeAdded
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
/**
* The desired balance of the cluster, indicating which nodes should hold a copy of each shard.
*
* @param lastConvergedIndex Identifies what input data the balancer computation round used to produce this {@link DesiredBalance}. See
* {@link DesiredBalanceInput#index()} for details. Each reroute request gets assigned a strictly increasing
* sequence number, and the balancer, which runs async to reroute, uses the latest request's data to compute the
* desired balance.
* @param assignments a set of the (persistent) node IDs to which each {@link ShardId} should be allocated
* @param weightsPerNode The node weights calculated based on
* {@link org.elasticsearch.cluster.routing.allocation.allocator.WeightFunction#calculateNodeWeight}
Expand All @@ -31,8 +35,11 @@ public record DesiredBalance(
) {

enum ComputationFinishReason {
/** Computation ran to completion */
CONVERGED,
/** Computation exited and published early because a new cluster event occurred that affects computation */
YIELD_TO_NEW_INPUT,
/** Computation stopped and published early to avoid delaying new shard assignment */
STOP_EARLY
}

Expand All @@ -44,6 +51,7 @@ public DesiredBalance(long lastConvergedIndex, Map<ShardId, ShardAssignment> ass
* The placeholder value for {@link DesiredBalance} when the node stands down as master.
*/
public static final DesiredBalance NOT_MASTER = new DesiredBalance(-2, Map.of());

/**
* The starting value for {@link DesiredBalance} when the node becomes the master.
*/
Expand All @@ -57,6 +65,10 @@ public static boolean hasChanges(DesiredBalance a, DesiredBalance b) {
return Objects.equals(a.assignments, b.assignments) == false;
}

/**
* Returns the sum of shard movements needed to reach the new desired balance. Doesn't count new shard copies as a move, nor removal or
* unassignment of a shard copy.
*/
public static int shardMovements(DesiredBalance old, DesiredBalance updated) {
var intersection = Sets.intersection(old.assignments().keySet(), updated.assignments().keySet());
int movements = 0;
Expand All @@ -70,8 +82,15 @@ public static int shardMovements(DesiredBalance old, DesiredBalance updated) {
return movements;
}

/**
* Returns the number of shard movements needed to reach the new shard assignment. Doesn't count new shard copies as a move, nor removal
* or unassignment of a shard copy.
*/
private static int shardMovements(ShardAssignment old, ShardAssignment updated) {
var movements = Math.min(0, old.assigned() - updated.assigned());// compensate newly started shards
// A shard move should retain the same number of assigned nodes, just swap out one node for another. We will compensate for newly
// started shards -- adding a shard copy is not a move -- by initializing the count with a negative value so that incrementing later
// for a new node zeros out.
var movements = Math.min(0, old.assigned() - updated.assigned());
for (String nodeId : updated.nodeIds()) {
if (old.nodeIds().contains(nodeId) == false) {
movements++;
Expand Down
Loading