Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/120957.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 120957
summary: Introduce `AllocationBalancingRoundSummaryService`
area: Allocation
type: enhancement
issues: []
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.cluster.routing.allocation.allocator;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.ArrayList;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicReference;

/**
* Manages the lifecycle of a series of {@link BalancingRoundSummary} results from allocation balancing rounds and creates reports thereof.
* Reporting balancer round summary results will provide information with which to do cost-benefit analyses of the work that shard
* allocation rebalancing executes.
*
* Any successfully added summary via {@link #addBalancerRoundSummary(BalancingRoundSummary)} will eventually be collected/drained and
* reported. This should still be done in the event of the node stepping down from master, on the assumption that all summaries are only
* added while master and should be drained for reporting. There is no need to start/stop this service with master election/stepdown because
* balancer rounds will no longer be supplied when not master. It will simply drain the last summaries and then have nothing more to do.
* This does have the tradeoff that non-master nodes will run a task to check for summaries to report every
* {@link #BALANCER_ROUND_SUMMARIES_LOG_INTERVAL_SETTING} seconds.
*/
public class AllocationBalancingRoundSummaryService {

/** Turns on or off balancing round summary reporting. */
public static final Setting<Boolean> ENABLE_BALANCER_ROUND_SUMMARIES_SETTING = Setting.boolSetting(
"cluster.routing.allocation.desired_balance.enable_balancer_round_summaries",
false,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

/** Controls how frequently in time balancer round summaries are logged. If less than zero, effectively disables reporting. */
public static final Setting<TimeValue> BALANCER_ROUND_SUMMARIES_LOG_INTERVAL_SETTING = Setting.timeSetting(
"cluster.routing.allocation.desired_balance.balanace_round_summaries_interval",
TimeValue.timeValueSeconds(10),
TimeValue.ZERO,
Copy link
Contributor

@nicktindall nicktindall Jan 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It says above

If less than zero, effectively disables reporting.

but the minimum is zero (I think?)

Either comment or code is out of date I think

Setting.Property.NodeScope,
Setting.Property.Dynamic
);

private static final Logger logger = LogManager.getLogger(AllocationBalancingRoundSummaryService.class);
private final ThreadPool threadPool;
private volatile boolean enableBalancerRoundSummaries;
private volatile TimeValue summaryReportInterval;

/**
* A concurrency-safe list of balancing round summaries. Balancer rounds are run and added here serially, so the queue will naturally
* progress from newer to older results.
*/
private ConcurrentLinkedQueue<BalancingRoundSummary> summaries = new ConcurrentLinkedQueue<>();

/** This reference is set when reporting is scheduled. If it is null, then reporting is inactive. */
private AtomicReference<Scheduler.Cancellable> scheduledReportFuture = new AtomicReference<>();

public AllocationBalancingRoundSummaryService(ThreadPool threadPool, ClusterSettings clusterSettings) {
this.threadPool = threadPool;
// Initialize the local setting values to avoid a null access when ClusterSettings#initializeAndWatch is called on each setting:
// updating enableBalancerRoundSummaries accesses summaryReportInterval.
this.enableBalancerRoundSummaries = clusterSettings.get(ENABLE_BALANCER_ROUND_SUMMARIES_SETTING);
this.summaryReportInterval = clusterSettings.get(BALANCER_ROUND_SUMMARIES_LOG_INTERVAL_SETTING);

clusterSettings.initializeAndWatch(ENABLE_BALANCER_ROUND_SUMMARIES_SETTING, value -> {
this.enableBalancerRoundSummaries = value;
updateBalancingRoundSummaryReporting();
});
clusterSettings.initializeAndWatch(BALANCER_ROUND_SUMMARIES_LOG_INTERVAL_SETTING, value -> {
// The new value will get picked up the next time that the summary report task reschedules itself on the thread pool.
this.summaryReportInterval = value;
});
}

/**
* Adds the summary of a balancing round. If summaries are enabled, this will eventually be reported (logging, etc.). If balancer round
* summaries are not enabled in the cluster, then the summary is immediately discarded (so as not to fill up a data structure that will
* never be drained).
*/
public void addBalancerRoundSummary(BalancingRoundSummary summary) {
if (enableBalancerRoundSummaries == false) {
return;
}

summaries.add(summary);
}

/**
* Reports on all the balancer round summaries added since the last call to this method, if there are any. Then reschedules itself per
* the {@link #ENABLE_BALANCER_ROUND_SUMMARIES_SETTING} and {@link #BALANCER_ROUND_SUMMARIES_LOG_INTERVAL_SETTING} settings.
*/
private void reportSummariesAndThenReschedule() {
drainAndReportSummaries();
rescheduleReporting();
}

/**
* Drains all the waiting balancer round summaries (if there are any) and reports them.
*/
private void drainAndReportSummaries() {
var combinedSummaries = drainSummaries();
if (combinedSummaries == CombinedBalancingRoundSummary.EMPTY_RESULTS) {
return;
}

logger.info("Balancing round summaries: " + combinedSummaries);
}

/**
* Returns a combined summary of all unreported allocation round summaries: may summarize a single balancer round, multiple, or none.
*
* @return {@link CombinedBalancingRoundSummary#EMPTY_RESULTS} if there are no balancing round summaries waiting to be reported.
*/
private CombinedBalancingRoundSummary drainSummaries() {
ArrayList<BalancingRoundSummary> batchOfSummaries = new ArrayList<>();
while (summaries.isEmpty() == false) {
batchOfSummaries.add(summaries.poll());
}
return CombinedBalancingRoundSummary.combine(batchOfSummaries);
}

/**
* Returns whether reporting is turned on with the given point-in-time summary setting values.
*/
private boolean shouldReschedule(boolean enableValue) {
return enableValue;
}

/**
* Schedules a periodic task to drain and report the latest balancer round summaries, or cancels the already running task, if the latest
* setting values dictate a change to enable or disable reporting. A change to {@link #BALANCER_ROUND_SUMMARIES_LOG_INTERVAL_SETTING}
* will only take effect when the periodic task completes and reschedules itself.
*/
private void updateBalancingRoundSummaryReporting() {
if (this.enableBalancerRoundSummaries) {
startReporting(this.summaryReportInterval);
} else {
cancelReporting();
// Clear the data structure so that we don't retain unnecessary memory.
drainSummaries();
}
}

/**
* Schedules a reporting task, if one is not already scheduled. The reporting task will reschedule itself going forward.
*/
private void startReporting(TimeValue intervalValue) {
if (scheduledReportFuture.get() == null) {
scheduleReporting(intervalValue);
}
}

/**
* Cancels the future reporting task and resets {@link #scheduledReportFuture} to null.
*
* Note that this is best-effort: cancellation can race with {@link #rescheduleReporting}. But that is okay because the subsequent
* {@link #rescheduleReporting} will use the latest settings and choose to cancel reporting if appropriate.
*/
private void cancelReporting() {
var future = scheduledReportFuture.getAndSet(null);
if (future != null) {
future.cancel();
}
}

private void scheduleReporting(TimeValue intervalValue) {
scheduledReportFuture.set(
threadPool.schedule(this::reportSummariesAndThenReschedule, intervalValue, threadPool.executor(ThreadPool.Names.GENERIC))
);
}

/**
* Looks at the given setting values and decides whether to schedule another reporting task or cancel reporting now.
*/
private void rescheduleReporting() {
if (this.enableBalancerRoundSummaries) {
// It's possible that this races with a concurrent call to cancel reporting, but that's okay. The next rescheduleReporting call
// will check the latest settings and cancel.
scheduleReporting(this.summaryReportInterval);
} else {
cancelReporting();
}
}

// @VisibleForTesting
protected void verifyNumberOfSummaries(int numberOfSummaries) {
assert numberOfSummaries == summaries.size();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.cluster.routing.allocation.allocator;

/**
* Summarizes the impact to the cluster as a result of a rebalancing round.
*
* @param numberOfShardsToMove The number of shard moves required to move from the previous desired balance to the new one.
*/
public record BalancingRoundSummary(long numberOfShardsToMove) {

@Override
public String toString() {
return "BalancingRoundSummary{" + "numberOfShardsToMove=" + numberOfShardsToMove + '}';
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.cluster.routing.allocation.allocator;

import java.util.List;

/**
* Holds combined {@link BalancingRoundSummary} results. Essentially holds a list of the balancing events and the summed up changes
* across all those events: what allocation work was done across some period of time.
* TODO: WIP ES-10341
*
* Note that each balancing round summary is the difference between, at the time, latest desired balance and the previous desired balance.
* Each summary represents a step towards the next desired balance, which is based on presuming the previous desired balance is reached. So
* combining them is roughly the difference between the first summary's previous desired balance and the last summary's latest desired
* balance.
*
* @param numberOfBalancingRounds How many balancing round summaries are combined in this report.
* @param numberOfShardMoves The sum of shard moves for each balancing round being combined into a single summary.
*/
public record CombinedBalancingRoundSummary(int numberOfBalancingRounds, long numberOfShardMoves) {

public static final CombinedBalancingRoundSummary EMPTY_RESULTS = new CombinedBalancingRoundSummary(0, 0);

public static CombinedBalancingRoundSummary combine(List<BalancingRoundSummary> summaries) {
if (summaries.isEmpty()) {
return EMPTY_RESULTS;
}

int numSummaries = 0;
long numberOfShardMoves = 0;
for (BalancingRoundSummary summary : summaries) {
++numSummaries;
numberOfShardMoves += summary.numberOfShardsToMove();
}
return new CombinedBalancingRoundSummary(numSummaries, numberOfShardMoves);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@
* strictly increasing sequence number. A new master term restarts the index values from zero. The balancer,
* which runs async to reroute, uses the latest request's data to compute the desired balance.
* @param assignments a set of the (persistent) node IDs to which each {@link ShardId} should be allocated
* @param weightsPerNode The node weights calculated based on
* {@link org.elasticsearch.cluster.routing.allocation.allocator.WeightFunction#calculateNodeWeight}
* @param weightsPerNode The node weights calculated based on {@link WeightFunction#calculateNodeWeight}
*/
public record DesiredBalance(
long lastConvergedIndex,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ public class DesiredBalanceShardsAllocator implements ShardsAllocator {
private volatile boolean resetCurrentDesiredBalance = false;
private final Set<String> processedNodeShutdowns = new HashSet<>();
private final DesiredBalanceMetrics desiredBalanceMetrics;
/**
* Manages balancer round results in order to report on the balancer activity in a configurable manner.
*/
private final AllocationBalancingRoundSummaryService balancerRoundSummaryService;

// stats
protected final CounterMetric computationsSubmitted = new CounterMetric();
Expand Down Expand Up @@ -132,6 +136,7 @@ public DesiredBalanceShardsAllocator(
NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator
) {
this.desiredBalanceMetrics = new DesiredBalanceMetrics(telemetryProvider.getMeterRegistry());
this.balancerRoundSummaryService = new AllocationBalancingRoundSummaryService(threadPool, clusterService.getClusterSettings());
this.delegateAllocator = delegateAllocator;
this.threadPool = threadPool;
this.reconciler = reconciler;
Expand Down Expand Up @@ -320,6 +325,7 @@ private void setCurrentDesiredBalance(DesiredBalance newDesiredBalance) {
}

if (currentDesiredBalanceRef.compareAndSet(oldDesiredBalance, newDesiredBalance)) {
balancerRoundSummaryService.addBalancerRoundSummary(calculateBalancingRoundSummary(oldDesiredBalance, newDesiredBalance));
if (logger.isTraceEnabled()) {
var diff = DesiredBalance.hasChanges(oldDesiredBalance, newDesiredBalance)
? "Diff: " + DesiredBalance.humanReadableDiff(oldDesiredBalance, newDesiredBalance)
Expand All @@ -334,6 +340,13 @@ private void setCurrentDesiredBalance(DesiredBalance newDesiredBalance) {
}
}

/**
* Summarizes the work required to move from an old to new desired balance shard allocation.
*/
private BalancingRoundSummary calculateBalancingRoundSummary(DesiredBalance oldDesiredBalance, DesiredBalance newDesiredBalance) {
return new BalancingRoundSummary(DesiredBalance.shardMovements(oldDesiredBalance, newDesiredBalance));
}

protected void submitReconcileTask(DesiredBalance desiredBalance) {
masterServiceTaskQueue.submitTask("reconcile-desired-balance", new ReconcileDesiredBalanceTask(desiredBalance), null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.elasticsearch.cluster.routing.OperationRouting;
import org.elasticsearch.cluster.routing.allocation.DataTier;
import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings;
import org.elasticsearch.cluster.routing.allocation.allocator.AllocationBalancingRoundSummaryService;
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceComputer;
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceReconciler;
Expand Down Expand Up @@ -212,6 +213,8 @@ public void apply(Settings value, Settings current, Settings previous) {
}

public static final Set<Setting<?>> BUILT_IN_CLUSTER_SETTINGS = Stream.of(
AllocationBalancingRoundSummaryService.ENABLE_BALANCER_ROUND_SUMMARIES_SETTING,
AllocationBalancingRoundSummaryService.BALANCER_ROUND_SUMMARIES_LOG_INTERVAL_SETTING,
AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING,
AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING,
BalancedShardsAllocator.INDEX_BALANCE_FACTOR_SETTING,
Expand Down
Loading