Skip to content

Commit 623a6af

Browse files
Introduce AllocationBalancingRoundSummaryService (#120957)
This service is added to the desired balance allocator to track and report on balancer round activity. It is a WIP and currently only tracks the number of shard moves caused by a balancing round. Reporting balancer round summary results will provide information with which to do cost-benefit analyses of the work that shard allocation rebalancing executes. It is disabled by default. Relates ES-10341
1 parent aa28d84 commit 623a6af

File tree

8 files changed

+541
-2
lines changed

8 files changed

+541
-2
lines changed

docs/changelog/120957.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 120957
2+
summary: Introduce `AllocationBalancingRoundSummaryService`
3+
area: Allocation
4+
type: enhancement
5+
issues: []
Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.cluster.routing.allocation.allocator;
11+
12+
import org.apache.logging.log4j.LogManager;
13+
import org.apache.logging.log4j.Logger;
14+
import org.elasticsearch.common.settings.ClusterSettings;
15+
import org.elasticsearch.common.settings.Setting;
16+
import org.elasticsearch.core.TimeValue;
17+
import org.elasticsearch.threadpool.Scheduler;
18+
import org.elasticsearch.threadpool.ThreadPool;
19+
20+
import java.util.ArrayList;
21+
import java.util.concurrent.ConcurrentLinkedQueue;
22+
import java.util.concurrent.atomic.AtomicReference;
23+
24+
/**
25+
* Manages the lifecycle of a series of {@link BalancingRoundSummary} results from allocation balancing rounds and creates reports thereof.
26+
* Reporting balancer round summary results will provide information with which to do cost-benefit analyses of the work that shard
27+
* allocation rebalancing executes.
28+
*
29+
* Any successfully added summary via {@link #addBalancerRoundSummary(BalancingRoundSummary)} will eventually be collected/drained and
30+
* reported. This should still be done in the event of the node stepping down from master, on the assumption that all summaries are only
31+
* added while master and should be drained for reporting. There is no need to start/stop this service with master election/stepdown because
32+
* balancer rounds will no longer be supplied when not master. It will simply drain the last summaries and then have nothing more to do.
33+
* This does have the tradeoff that non-master nodes will run a task to check for summaries to report every
34+
* {@link #BALANCER_ROUND_SUMMARIES_LOG_INTERVAL_SETTING} seconds.
35+
*/
36+
public class AllocationBalancingRoundSummaryService {
37+
38+
/** Turns on or off balancing round summary reporting. */
39+
public static final Setting<Boolean> ENABLE_BALANCER_ROUND_SUMMARIES_SETTING = Setting.boolSetting(
40+
"cluster.routing.allocation.desired_balance.enable_balancer_round_summaries",
41+
false,
42+
Setting.Property.NodeScope,
43+
Setting.Property.Dynamic
44+
);
45+
46+
/** Controls how frequently in time balancer round summaries are logged. */
47+
public static final Setting<TimeValue> BALANCER_ROUND_SUMMARIES_LOG_INTERVAL_SETTING = Setting.timeSetting(
48+
"cluster.routing.allocation.desired_balance.balanace_round_summaries_interval",
49+
TimeValue.timeValueSeconds(10),
50+
TimeValue.ZERO,
51+
Setting.Property.NodeScope,
52+
Setting.Property.Dynamic
53+
);
54+
55+
private static final Logger logger = LogManager.getLogger(AllocationBalancingRoundSummaryService.class);
56+
private final ThreadPool threadPool;
57+
private volatile boolean enableBalancerRoundSummaries;
58+
private volatile TimeValue summaryReportInterval;
59+
60+
/**
61+
* A concurrency-safe list of balancing round summaries. Balancer rounds are run and added here serially, so the queue will naturally
62+
* progress from newer to older results.
63+
*/
64+
private final ConcurrentLinkedQueue<BalancingRoundSummary> summaries = new ConcurrentLinkedQueue<>();
65+
66+
/** This reference is set when reporting is scheduled. If it is null, then reporting is inactive. */
67+
private final AtomicReference<Scheduler.Cancellable> scheduledReportFuture = new AtomicReference<>();
68+
69+
public AllocationBalancingRoundSummaryService(ThreadPool threadPool, ClusterSettings clusterSettings) {
70+
this.threadPool = threadPool;
71+
// Initialize the local setting values to avoid a null access when ClusterSettings#initializeAndWatch is called on each setting:
72+
// updating enableBalancerRoundSummaries accesses summaryReportInterval.
73+
this.enableBalancerRoundSummaries = clusterSettings.get(ENABLE_BALANCER_ROUND_SUMMARIES_SETTING);
74+
this.summaryReportInterval = clusterSettings.get(BALANCER_ROUND_SUMMARIES_LOG_INTERVAL_SETTING);
75+
76+
clusterSettings.initializeAndWatch(ENABLE_BALANCER_ROUND_SUMMARIES_SETTING, value -> {
77+
this.enableBalancerRoundSummaries = value;
78+
updateBalancingRoundSummaryReporting();
79+
});
80+
clusterSettings.initializeAndWatch(BALANCER_ROUND_SUMMARIES_LOG_INTERVAL_SETTING, value -> {
81+
// The new value will get picked up the next time that the summary report task reschedules itself on the thread pool.
82+
this.summaryReportInterval = value;
83+
});
84+
}
85+
86+
/**
87+
* Adds the summary of a balancing round. If summaries are enabled, this will eventually be reported (logging, etc.). If balancer round
88+
* summaries are not enabled in the cluster, then the summary is immediately discarded (so as not to fill up a data structure that will
89+
* never be drained).
90+
*/
91+
public void addBalancerRoundSummary(BalancingRoundSummary summary) {
92+
if (enableBalancerRoundSummaries == false) {
93+
return;
94+
}
95+
96+
summaries.add(summary);
97+
}
98+
99+
/**
100+
* Reports on all the balancer round summaries added since the last call to this method, if there are any. Then reschedules itself per
101+
* the {@link #ENABLE_BALANCER_ROUND_SUMMARIES_SETTING} and {@link #BALANCER_ROUND_SUMMARIES_LOG_INTERVAL_SETTING} settings.
102+
*/
103+
private void reportSummariesAndThenReschedule() {
104+
drainAndReportSummaries();
105+
rescheduleReporting();
106+
}
107+
108+
/**
109+
* Drains all the waiting balancer round summaries (if there are any) and reports them.
110+
*/
111+
private void drainAndReportSummaries() {
112+
var combinedSummaries = drainSummaries();
113+
if (combinedSummaries == CombinedBalancingRoundSummary.EMPTY_RESULTS) {
114+
return;
115+
}
116+
117+
logger.info("Balancing round summaries: " + combinedSummaries);
118+
}
119+
120+
/**
121+
* Returns a combined summary of all unreported allocation round summaries: may summarize a single balancer round, multiple, or none.
122+
*
123+
* @return {@link CombinedBalancingRoundSummary#EMPTY_RESULTS} if there are no balancing round summaries waiting to be reported.
124+
*/
125+
private CombinedBalancingRoundSummary drainSummaries() {
126+
ArrayList<BalancingRoundSummary> batchOfSummaries = new ArrayList<>();
127+
while (summaries.isEmpty() == false) {
128+
batchOfSummaries.add(summaries.poll());
129+
}
130+
return CombinedBalancingRoundSummary.combine(batchOfSummaries);
131+
}
132+
133+
/**
134+
* Schedules a periodic task to drain and report the latest balancer round summaries, or cancels the already running task, if the latest
135+
* setting values dictate a change to enable or disable reporting. A change to {@link #BALANCER_ROUND_SUMMARIES_LOG_INTERVAL_SETTING}
136+
* will only take effect when the periodic task completes and reschedules itself.
137+
*/
138+
private void updateBalancingRoundSummaryReporting() {
139+
if (this.enableBalancerRoundSummaries) {
140+
startReporting(this.summaryReportInterval);
141+
} else {
142+
cancelReporting();
143+
// Clear the data structure so that we don't retain unnecessary memory.
144+
drainSummaries();
145+
}
146+
}
147+
148+
/**
149+
* Schedules a reporting task, if one is not already scheduled. The reporting task will reschedule itself going forward.
150+
*/
151+
private void startReporting(TimeValue intervalValue) {
152+
if (scheduledReportFuture.get() == null) {
153+
scheduleReporting(intervalValue);
154+
}
155+
}
156+
157+
/**
158+
* Cancels the future reporting task and resets {@link #scheduledReportFuture} to null.
159+
*
160+
* Note that this is best-effort: cancellation can race with {@link #rescheduleReporting}. But that is okay because the subsequent
161+
* {@link #rescheduleReporting} will use the latest settings and choose to cancel reporting if appropriate.
162+
*/
163+
private void cancelReporting() {
164+
var future = scheduledReportFuture.getAndSet(null);
165+
if (future != null) {
166+
future.cancel();
167+
}
168+
}
169+
170+
private void scheduleReporting(TimeValue intervalValue) {
171+
scheduledReportFuture.set(
172+
threadPool.schedule(this::reportSummariesAndThenReschedule, intervalValue, threadPool.executor(ThreadPool.Names.GENERIC))
173+
);
174+
}
175+
176+
/**
177+
* Looks at the given setting values and decides whether to schedule another reporting task or cancel reporting now.
178+
*/
179+
private void rescheduleReporting() {
180+
if (this.enableBalancerRoundSummaries) {
181+
// It's possible that this races with a concurrent call to cancel reporting, but that's okay. The next rescheduleReporting call
182+
// will check the latest settings and cancel.
183+
scheduleReporting(this.summaryReportInterval);
184+
} else {
185+
cancelReporting();
186+
}
187+
}
188+
189+
// @VisibleForTesting
190+
protected void verifyNumberOfSummaries(int numberOfSummaries) {
191+
assert numberOfSummaries == summaries.size();
192+
}
193+
194+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.cluster.routing.allocation.allocator;
11+
12+
/**
13+
* Summarizes the impact to the cluster as a result of a rebalancing round.
14+
*
15+
* @param numberOfShardsToMove The number of shard moves required to move from the previous desired balance to the new one.
16+
*/
17+
public record BalancingRoundSummary(long numberOfShardsToMove) {
18+
19+
@Override
20+
public String toString() {
21+
return "BalancingRoundSummary{" + "numberOfShardsToMove=" + numberOfShardsToMove + '}';
22+
}
23+
24+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.cluster.routing.allocation.allocator;
11+
12+
import java.util.List;
13+
14+
/**
15+
* Holds combined {@link BalancingRoundSummary} results. Essentially holds a list of the balancing events and the summed up changes
16+
* across all those events: what allocation work was done across some period of time.
17+
* TODO: WIP ES-10341
18+
*
19+
* Note that each balancing round summary is the difference between, at the time, latest desired balance and the previous desired balance.
20+
* Each summary represents a step towards the next desired balance, which is based on presuming the previous desired balance is reached. So
21+
* combining them is roughly the difference between the first summary's previous desired balance and the last summary's latest desired
22+
* balance.
23+
*
24+
* @param numberOfBalancingRounds How many balancing round summaries are combined in this report.
25+
* @param numberOfShardMoves The sum of shard moves for each balancing round being combined into a single summary.
26+
*/
27+
public record CombinedBalancingRoundSummary(int numberOfBalancingRounds, long numberOfShardMoves) {
28+
29+
public static final CombinedBalancingRoundSummary EMPTY_RESULTS = new CombinedBalancingRoundSummary(0, 0);
30+
31+
public static CombinedBalancingRoundSummary combine(List<BalancingRoundSummary> summaries) {
32+
if (summaries.isEmpty()) {
33+
return EMPTY_RESULTS;
34+
}
35+
36+
int numSummaries = 0;
37+
long numberOfShardMoves = 0;
38+
for (BalancingRoundSummary summary : summaries) {
39+
++numSummaries;
40+
numberOfShardMoves += summary.numberOfShardsToMove();
41+
}
42+
return new CombinedBalancingRoundSummary(numSummaries, numberOfShardMoves);
43+
}
44+
45+
}

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalance.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,7 @@
2424
* strictly increasing sequence number. A new master term restarts the index values from zero. The balancer,
2525
* which runs async to reroute, uses the latest request's data to compute the desired balance.
2626
* @param assignments a set of the (persistent) node IDs to which each {@link ShardId} should be allocated
27-
* @param weightsPerNode The node weights calculated based on
28-
* {@link org.elasticsearch.cluster.routing.allocation.allocator.WeightFunction#calculateNodeWeight}
27+
* @param weightsPerNode The node weights calculated based on {@link WeightFunction#calculateNodeWeight}
2928
*/
3029
public record DesiredBalance(
3130
long lastConvergedIndex,

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,10 @@ public class DesiredBalanceShardsAllocator implements ShardsAllocator {
8888
private volatile boolean resetCurrentDesiredBalance = false;
8989
private final Set<String> processedNodeShutdowns = new HashSet<>();
9090
private final DesiredBalanceMetrics desiredBalanceMetrics;
91+
/**
92+
* Manages balancer round results in order to report on the balancer activity in a configurable manner.
93+
*/
94+
private final AllocationBalancingRoundSummaryService balancerRoundSummaryService;
9195

9296
// stats
9397
protected final CounterMetric computationsSubmitted = new CounterMetric();
@@ -132,6 +136,7 @@ public DesiredBalanceShardsAllocator(
132136
NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator
133137
) {
134138
this.desiredBalanceMetrics = new DesiredBalanceMetrics(telemetryProvider.getMeterRegistry());
139+
this.balancerRoundSummaryService = new AllocationBalancingRoundSummaryService(threadPool, clusterService.getClusterSettings());
135140
this.delegateAllocator = delegateAllocator;
136141
this.threadPool = threadPool;
137142
this.reconciler = reconciler;
@@ -320,6 +325,7 @@ private void setCurrentDesiredBalance(DesiredBalance newDesiredBalance) {
320325
}
321326

322327
if (currentDesiredBalanceRef.compareAndSet(oldDesiredBalance, newDesiredBalance)) {
328+
balancerRoundSummaryService.addBalancerRoundSummary(calculateBalancingRoundSummary(oldDesiredBalance, newDesiredBalance));
323329
if (logger.isTraceEnabled()) {
324330
var diff = DesiredBalance.hasChanges(oldDesiredBalance, newDesiredBalance)
325331
? "Diff: " + DesiredBalance.humanReadableDiff(oldDesiredBalance, newDesiredBalance)
@@ -334,6 +340,13 @@ private void setCurrentDesiredBalance(DesiredBalance newDesiredBalance) {
334340
}
335341
}
336342

343+
/**
344+
* Summarizes the work required to move from an old to new desired balance shard allocation.
345+
*/
346+
private BalancingRoundSummary calculateBalancingRoundSummary(DesiredBalance oldDesiredBalance, DesiredBalance newDesiredBalance) {
347+
return new BalancingRoundSummary(DesiredBalance.shardMovements(oldDesiredBalance, newDesiredBalance));
348+
}
349+
337350
protected void submitReconcileTask(DesiredBalance desiredBalance) {
338351
masterServiceTaskQueue.submitTask("reconcile-desired-balance", new ReconcileDesiredBalanceTask(desiredBalance), null);
339352
}

server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.elasticsearch.cluster.routing.OperationRouting;
4747
import org.elasticsearch.cluster.routing.allocation.DataTier;
4848
import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings;
49+
import org.elasticsearch.cluster.routing.allocation.allocator.AllocationBalancingRoundSummaryService;
4950
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
5051
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceComputer;
5152
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceReconciler;
@@ -212,6 +213,8 @@ public void apply(Settings value, Settings current, Settings previous) {
212213
}
213214

214215
public static final Set<Setting<?>> BUILT_IN_CLUSTER_SETTINGS = Stream.of(
216+
AllocationBalancingRoundSummaryService.ENABLE_BALANCER_ROUND_SUMMARIES_SETTING,
217+
AllocationBalancingRoundSummaryService.BALANCER_ROUND_SUMMARIES_LOG_INTERVAL_SETTING,
215218
AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING,
216219
AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING,
217220
BalancedShardsAllocator.INDEX_BALANCE_FACTOR_SETTING,

0 commit comments

Comments
 (0)