Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions server/src/main/java/org/elasticsearch/cluster/ClusterModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import org.elasticsearch.cluster.routing.allocation.AllocationStatsService;
import org.elasticsearch.cluster.routing.allocation.ExistingShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.NodeAllocationStatsAndWeightsCalculator;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.ShardAllocationDecision;
import org.elasticsearch.cluster.routing.allocation.WriteLoadForecaster;
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.allocator.BalancerSettings;
Expand Down Expand Up @@ -107,6 +109,8 @@
import java.util.Objects;
import java.util.function.Supplier;

import static org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator.ShardAllocationExplainer;

/**
* Configures classes and services that affect the entire cluster.
*/
Expand Down Expand Up @@ -171,7 +175,8 @@ public ClusterModule(
this::reconcile,
writeLoadForecaster,
telemetryProvider,
nodeAllocationStatsAndWeightsCalculator
nodeAllocationStatsAndWeightsCalculator,
this::explainShardAllocation
);
this.clusterService = clusterService;
this.indexNameExpressionResolver = new IndexNameExpressionResolver(threadPool.getThreadContext(), systemIndices, projectResolver);
Expand Down Expand Up @@ -237,6 +242,10 @@ private ClusterState reconcile(ClusterState clusterState, RerouteStrategy rerout
return allocationService.executeWithRoutingAllocation(clusterState, "reconcile-desired-balance", rerouteStrategy);
}

private ShardAllocationDecision explainShardAllocation(ShardRouting shardRouting, RoutingAllocation allocation) {
return allocationService.explainShardAllocation(shardRouting, allocation);
}

public static List<Entry> getNamedWriteables() {
List<Entry> entries = new ArrayList<>();
// Cluster State
Expand Down Expand Up @@ -489,7 +498,8 @@ private static ShardsAllocator createShardsAllocator(
DesiredBalanceReconcilerAction reconciler,
WriteLoadForecaster writeLoadForecaster,
TelemetryProvider telemetryProvider,
NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator
NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator,
ShardAllocationExplainer shardAllocationExplainer
) {
Map<String, Supplier<ShardsAllocator>> allocators = new HashMap<>();
allocators.put(
Expand All @@ -505,7 +515,8 @@ private static ShardsAllocator createShardsAllocator(
clusterService,
reconciler,
telemetryProvider,
nodeAllocationStatsAndWeightsCalculator
nodeAllocationStatsAndWeightsCalculator,
shardAllocationExplainer
)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.ShardAllocationDecision;
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator.ShardAllocationExplainer;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.FrequencyCappedAction;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.time.TimeProvider;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.common.xcontent.ChunkedToXContentHelper;
import org.elasticsearch.core.Strings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.shard.ShardId;
Expand All @@ -39,6 +43,7 @@
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.function.Predicate;
import java.util.stream.Stream;

import static java.util.stream.Collectors.toUnmodifiableSet;

Expand All @@ -48,9 +53,13 @@
public class DesiredBalanceComputer {

private static final Logger logger = LogManager.getLogger(DesiredBalanceComputer.class);
private static final Logger allocationExplainLogger = LogManager.getLogger(
DesiredBalanceComputer.class.getCanonicalName() + ".allocation_explain"
);

private final ShardsAllocator delegateAllocator;
private final TimeProvider timeProvider;
private final ShardAllocationExplainer shardAllocationExplainer;

// stats
protected final MeanMetric iterations = new MeanMetric();
Expand All @@ -77,15 +86,27 @@ public class DesiredBalanceComputer {
private long lastConvergedTimeMillis;
private long lastNotConvergedLogMessageTimeMillis;
private Level convergenceLogMsgLevel;
private final FrequencyCappedAction logAllocationExplainForUnassigned;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than a simple frequency cap, WDYT about keeping track of the shard we logged and not logging anything while that same shard remains unassigned? I.e. we reset the state only when the unassigned shard becomes assigned or is deleted. Otherwise if there are multiple shards unassigned we may get reports about different shards each time which will be very hard to interpret.

Might also be nice to record the reset in the logs, that way we can see how long a shard might be blocked from assignment.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds like a good idea. I pushed 9a4e07f for it.


public DesiredBalanceComputer(ClusterSettings clusterSettings, TimeProvider timeProvider, ShardsAllocator delegateAllocator) {
public DesiredBalanceComputer(
ClusterSettings clusterSettings,
TimeProvider timeProvider,
ShardsAllocator delegateAllocator,
ShardAllocationExplainer shardAllocationExplainer
) {
this.delegateAllocator = delegateAllocator;
this.timeProvider = timeProvider;
this.shardAllocationExplainer = shardAllocationExplainer;
this.numComputeCallsSinceLastConverged = 0;
this.numIterationsSinceLastConverged = 0;
this.logAllocationExplainForUnassigned = new FrequencyCappedAction(timeProvider::relativeTimeInMillis, TimeValue.ZERO);
this.lastConvergedTimeMillis = timeProvider.relativeTimeInMillis();
this.lastNotConvergedLogMessageTimeMillis = lastConvergedTimeMillis;
this.convergenceLogMsgLevel = Level.DEBUG;
clusterSettings.initializeAndWatch(
DesiredBalanceShardsAllocator.ALLOCATION_EXPLAIN_LOGGING_INTERVAL,
logAllocationExplainForUnassigned::setMinInterval
);
clusterSettings.initializeAndWatch(PROGRESS_LOG_INTERVAL_SETTING, value -> this.progressLogInterval = value);
clusterSettings.initializeAndWatch(
MAX_BALANCE_COMPUTATION_TIME_DURING_INDEX_CREATION_SETTING,
Expand Down Expand Up @@ -462,10 +483,45 @@ public DesiredBalance compute(
);
}

maybeLogAllocationExplainForUnassigned(finishReason, routingNodes, routingAllocation);

long lastConvergedIndex = hasChanges ? previousDesiredBalance.lastConvergedIndex() : desiredBalanceInput.index();
return new DesiredBalance(lastConvergedIndex, assignments, routingNodes.getBalanceWeightStatsPerNode(), finishReason);
}

private void maybeLogAllocationExplainForUnassigned(
DesiredBalance.ComputationFinishReason finishReason,
RoutingNodes routingNodes,
RoutingAllocation routingAllocation
) {
if (allocationExplainLogger.isDebugEnabled()
&& finishReason == DesiredBalance.ComputationFinishReason.CONVERGED
&& routingNodes.hasUnassignedShards()) {
logAllocationExplainForUnassigned.maybeExecute(() -> {
final var unassigned = routingNodes.unassigned();
final var shardRouting = Stream.concat(unassigned.stream(), unassigned.ignored().stream())
.filter(ShardRouting::primary)
.findFirst()
.orElse(Stream.concat(unassigned.stream(), unassigned.ignored().stream()).iterator().next());
final var originalDebugMode = routingAllocation.getDebugMode();
routingAllocation.setDebugMode(RoutingAllocation.DebugMode.EXCLUDE_YES_DECISIONS);
final ShardAllocationDecision shardAllocationDecision;
try {
shardAllocationDecision = shardAllocationExplainer.explain(shardRouting, routingAllocation);
} finally {
routingAllocation.setDebugMode(originalDebugMode);
}
allocationExplainLogger.debug(
"unassigned shard [{}] with allocation decision {}",
shardRouting,
org.elasticsearch.common.Strings.toString(
p -> ChunkedToXContentHelper.object("node_allocation_decision", shardAllocationDecision.toXContentChunked(p))
)
);
});
}
}

// visible for testing
boolean hasEnoughIterations(int currentIteration) {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.telemetry.TelemetryProvider;
import org.elasticsearch.threadpool.ThreadPool;

Expand All @@ -57,6 +59,13 @@ public class DesiredBalanceShardsAllocator implements ShardsAllocator {

private static final Logger logger = LogManager.getLogger(DesiredBalanceShardsAllocator.class);

public static final Setting<TimeValue> ALLOCATION_EXPLAIN_LOGGING_INTERVAL = Setting.timeSetting(
"cluster.routing.allocation.desired_balance.allocation_explain_log_interval",
TimeValue.timeValueMinutes(1),
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

private final ShardsAllocator delegateAllocator;
private final ThreadPool threadPool;
/**
Expand Down Expand Up @@ -109,20 +118,26 @@ public interface DesiredBalanceReconcilerAction {
ClusterState apply(ClusterState clusterState, RerouteStrategy rerouteStrategy);
}

@FunctionalInterface
public interface ShardAllocationExplainer {
ShardAllocationDecision explain(ShardRouting shard, RoutingAllocation allocation);
}

public DesiredBalanceShardsAllocator(
ClusterSettings clusterSettings,
ShardsAllocator delegateAllocator,
ThreadPool threadPool,
ClusterService clusterService,
DesiredBalanceReconcilerAction reconciler,
TelemetryProvider telemetryProvider,
NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator
NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator,
ShardAllocationExplainer shardAllocationExplainer
) {
this(
delegateAllocator,
threadPool,
clusterService,
new DesiredBalanceComputer(clusterSettings, threadPool, delegateAllocator),
new DesiredBalanceComputer(clusterSettings, threadPool, delegateAllocator, shardAllocationExplainer),
reconciler,
telemetryProvider,
nodeAllocationStatsAndWeightsCalculator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceComputer;
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceReconciler;
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.decider.AwarenessAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.ConcurrentRebalanceAllocationDecider;
Expand Down Expand Up @@ -229,6 +230,7 @@ public void apply(Settings value, Settings current, Settings previous) {
DataStreamAutoShardingService.CLUSTER_AUTO_SHARDING_MIN_WRITE_THREADS,
DataStreamAutoShardingService.DATA_STREAMS_AUTO_SHARDING_INCREASE_SHARDS_LOAD_METRIC,
DataStreamAutoShardingService.DATA_STREAMS_AUTO_SHARDING_DECREASE_SHARDS_LOAD_METRIC,
DesiredBalanceShardsAllocator.ALLOCATION_EXPLAIN_LOGGING_INTERVAL,
DesiredBalanceComputer.PROGRESS_LOG_INTERVAL_SETTING,
DesiredBalanceComputer.MAX_BALANCE_COMPUTATION_TIME_DURING_INDEX_CREATION_SETTING,
DesiredBalanceReconciler.UNDESIRED_ALLOCATIONS_LOG_INTERVAL_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public void testDeleteDesiredBalance() throws Exception {
var clusterSettings = ClusterSettings.createBuiltInClusterSettings(settings);

var delegate = new BalancedShardsAllocator();
var computer = new DesiredBalanceComputer(clusterSettings, threadPool, delegate) {
var computer = new DesiredBalanceComputer(clusterSettings, threadPool, delegate, DUMMY_EXPLAINER) {

final AtomicReference<DesiredBalance> lastComputationInput = new AtomicReference<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,8 @@ public void testUndesiredShardCount() {
clusterService,
(innerState, strategy) -> innerState,
TelemetryProvider.NOOP,
EMPTY_NODE_ALLOCATION_STATS
EMPTY_NODE_ALLOCATION_STATS,
DUMMY_EXPLAINER
) {
@Override
public DesiredBalance getDesiredBalance() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,8 @@ private Map.Entry<MockAllocationService, ShardsAllocator> createNewAllocationSer
(clusterState, routingAllocationAction) -> strategyRef.get()
.executeWithRoutingAllocation(clusterState, "reconcile-desired-balance", routingAllocationAction),
TelemetryProvider.NOOP,
EMPTY_NODE_ALLOCATION_STATS
EMPTY_NODE_ALLOCATION_STATS,
DUMMY_EXPLAINER
) {
@Override
public void allocate(RoutingAllocation allocation, ActionListener<Void> listener) {
Expand Down
Loading