-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Introduce AllocationBalancingRoundSummaryService #120957
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
DiannaHohensee
merged 5 commits into
elastic:main
from
DiannaHohensee:2025/01/17/ES-10341-computation-metrics-pr
Feb 3, 2025
Merged
Changes from all commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
42103ca
Introduce AllocationBalancingRoundSummaryService
DiannaHohensee 6f8471c
Update docs/changelog/120957.yaml
DiannaHohensee 0020cfe
review improvements
DiannaHohensee d770386
Merge branch 'main' into 2025/01/17/ES-10341-computation-metrics-pr
DiannaHohensee 206e7e1
remove unused method
DiannaHohensee File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| pr: 120957 | ||
| summary: Introduce `AllocationBalancingRoundSummaryService` | ||
| area: Allocation | ||
| type: enhancement | ||
| issues: [] |
194 changes: 194 additions & 0 deletions
194
...icsearch/cluster/routing/allocation/allocator/AllocationBalancingRoundSummaryService.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,194 @@ | ||
| /* | ||
| * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
| * or more contributor license agreements. Licensed under the "Elastic License | ||
| * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side | ||
| * Public License v 1"; you may not use this file except in compliance with, at | ||
| * your election, the "Elastic License 2.0", the "GNU Affero General Public | ||
| * License v3.0 only", or the "Server Side Public License, v 1". | ||
| */ | ||
|
|
||
| package org.elasticsearch.cluster.routing.allocation.allocator; | ||
|
|
||
| import org.apache.logging.log4j.LogManager; | ||
| import org.apache.logging.log4j.Logger; | ||
| import org.elasticsearch.common.settings.ClusterSettings; | ||
| import org.elasticsearch.common.settings.Setting; | ||
| import org.elasticsearch.core.TimeValue; | ||
| import org.elasticsearch.threadpool.Scheduler; | ||
| import org.elasticsearch.threadpool.ThreadPool; | ||
|
|
||
| import java.util.ArrayList; | ||
| import java.util.concurrent.ConcurrentLinkedQueue; | ||
| import java.util.concurrent.atomic.AtomicReference; | ||
|
|
||
| /** | ||
| * Manages the lifecycle of a series of {@link BalancingRoundSummary} results from allocation balancing rounds and creates reports thereof. | ||
| * Reporting balancer round summary results will provide information with which to do cost-benefit analyses of the work that shard | ||
| * allocation rebalancing executes. | ||
| * | ||
| * Any successfully added summary via {@link #addBalancerRoundSummary(BalancingRoundSummary)} will eventually be collected/drained and | ||
| * reported. This should still be done in the event of the node stepping down from master, on the assumption that all summaries are only | ||
| * added while master and should be drained for reporting. There is no need to start/stop this service with master election/stepdown because | ||
| * balancer rounds will no longer be supplied when not master. It will simply drain the last summaries and then have nothing more to do. | ||
| * This does have the tradeoff that non-master nodes will run a task to check for summaries to report every | ||
| * {@link #BALANCER_ROUND_SUMMARIES_LOG_INTERVAL_SETTING} seconds. | ||
| */ | ||
| public class AllocationBalancingRoundSummaryService { | ||
|
|
||
| /** Turns on or off balancing round summary reporting. */ | ||
| public static final Setting<Boolean> ENABLE_BALANCER_ROUND_SUMMARIES_SETTING = Setting.boolSetting( | ||
| "cluster.routing.allocation.desired_balance.enable_balancer_round_summaries", | ||
| false, | ||
| Setting.Property.NodeScope, | ||
| Setting.Property.Dynamic | ||
| ); | ||
|
|
||
| /** Controls how frequently in time balancer round summaries are logged. */ | ||
| public static final Setting<TimeValue> BALANCER_ROUND_SUMMARIES_LOG_INTERVAL_SETTING = Setting.timeSetting( | ||
| "cluster.routing.allocation.desired_balance.balanace_round_summaries_interval", | ||
| TimeValue.timeValueSeconds(10), | ||
| TimeValue.ZERO, | ||
| Setting.Property.NodeScope, | ||
| Setting.Property.Dynamic | ||
| ); | ||
|
|
||
| private static final Logger logger = LogManager.getLogger(AllocationBalancingRoundSummaryService.class); | ||
| private final ThreadPool threadPool; | ||
| private volatile boolean enableBalancerRoundSummaries; | ||
| private volatile TimeValue summaryReportInterval; | ||
|
|
||
| /** | ||
| * A concurrency-safe list of balancing round summaries. Balancer rounds are run and added here serially, so the queue will naturally | ||
| * progress from newer to older results. | ||
| */ | ||
| private final ConcurrentLinkedQueue<BalancingRoundSummary> summaries = new ConcurrentLinkedQueue<>(); | ||
|
|
||
| /** This reference is set when reporting is scheduled. If it is null, then reporting is inactive. */ | ||
| private final AtomicReference<Scheduler.Cancellable> scheduledReportFuture = new AtomicReference<>(); | ||
|
|
||
| public AllocationBalancingRoundSummaryService(ThreadPool threadPool, ClusterSettings clusterSettings) { | ||
| this.threadPool = threadPool; | ||
| // Initialize the local setting values to avoid a null access when ClusterSettings#initializeAndWatch is called on each setting: | ||
| // updating enableBalancerRoundSummaries accesses summaryReportInterval. | ||
| this.enableBalancerRoundSummaries = clusterSettings.get(ENABLE_BALANCER_ROUND_SUMMARIES_SETTING); | ||
| this.summaryReportInterval = clusterSettings.get(BALANCER_ROUND_SUMMARIES_LOG_INTERVAL_SETTING); | ||
|
|
||
| clusterSettings.initializeAndWatch(ENABLE_BALANCER_ROUND_SUMMARIES_SETTING, value -> { | ||
| this.enableBalancerRoundSummaries = value; | ||
| updateBalancingRoundSummaryReporting(); | ||
| }); | ||
| clusterSettings.initializeAndWatch(BALANCER_ROUND_SUMMARIES_LOG_INTERVAL_SETTING, value -> { | ||
| // The new value will get picked up the next time that the summary report task reschedules itself on the thread pool. | ||
| this.summaryReportInterval = value; | ||
| }); | ||
| } | ||
|
|
||
| /** | ||
| * Adds the summary of a balancing round. If summaries are enabled, this will eventually be reported (logging, etc.). If balancer round | ||
| * summaries are not enabled in the cluster, then the summary is immediately discarded (so as not to fill up a data structure that will | ||
| * never be drained). | ||
| */ | ||
| public void addBalancerRoundSummary(BalancingRoundSummary summary) { | ||
| if (enableBalancerRoundSummaries == false) { | ||
| return; | ||
| } | ||
|
|
||
| summaries.add(summary); | ||
| } | ||
|
|
||
| /** | ||
| * Reports on all the balancer round summaries added since the last call to this method, if there are any. Then reschedules itself per | ||
| * the {@link #ENABLE_BALANCER_ROUND_SUMMARIES_SETTING} and {@link #BALANCER_ROUND_SUMMARIES_LOG_INTERVAL_SETTING} settings. | ||
| */ | ||
| private void reportSummariesAndThenReschedule() { | ||
| drainAndReportSummaries(); | ||
| rescheduleReporting(); | ||
| } | ||
|
|
||
| /** | ||
| * Drains all the waiting balancer round summaries (if there are any) and reports them. | ||
| */ | ||
| private void drainAndReportSummaries() { | ||
| var combinedSummaries = drainSummaries(); | ||
| if (combinedSummaries == CombinedBalancingRoundSummary.EMPTY_RESULTS) { | ||
| return; | ||
| } | ||
|
|
||
| logger.info("Balancing round summaries: " + combinedSummaries); | ||
| } | ||
|
|
||
| /** | ||
| * Returns a combined summary of all unreported allocation round summaries: may summarize a single balancer round, multiple, or none. | ||
| * | ||
| * @return {@link CombinedBalancingRoundSummary#EMPTY_RESULTS} if there are no balancing round summaries waiting to be reported. | ||
| */ | ||
| private CombinedBalancingRoundSummary drainSummaries() { | ||
| ArrayList<BalancingRoundSummary> batchOfSummaries = new ArrayList<>(); | ||
| while (summaries.isEmpty() == false) { | ||
| batchOfSummaries.add(summaries.poll()); | ||
| } | ||
| return CombinedBalancingRoundSummary.combine(batchOfSummaries); | ||
| } | ||
|
|
||
| /** | ||
| * Schedules a periodic task to drain and report the latest balancer round summaries, or cancels the already running task, if the latest | ||
| * setting values dictate a change to enable or disable reporting. A change to {@link #BALANCER_ROUND_SUMMARIES_LOG_INTERVAL_SETTING} | ||
| * will only take effect when the periodic task completes and reschedules itself. | ||
| */ | ||
| private void updateBalancingRoundSummaryReporting() { | ||
| if (this.enableBalancerRoundSummaries) { | ||
| startReporting(this.summaryReportInterval); | ||
| } else { | ||
| cancelReporting(); | ||
| // Clear the data structure so that we don't retain unnecessary memory. | ||
| drainSummaries(); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Schedules a reporting task, if one is not already scheduled. The reporting task will reschedule itself going forward. | ||
| */ | ||
| private void startReporting(TimeValue intervalValue) { | ||
| if (scheduledReportFuture.get() == null) { | ||
| scheduleReporting(intervalValue); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Cancels the future reporting task and resets {@link #scheduledReportFuture} to null. | ||
| * | ||
| * Note that this is best-effort: cancellation can race with {@link #rescheduleReporting}. But that is okay because the subsequent | ||
| * {@link #rescheduleReporting} will use the latest settings and choose to cancel reporting if appropriate. | ||
| */ | ||
| private void cancelReporting() { | ||
| var future = scheduledReportFuture.getAndSet(null); | ||
| if (future != null) { | ||
| future.cancel(); | ||
| } | ||
| } | ||
|
|
||
| private void scheduleReporting(TimeValue intervalValue) { | ||
| scheduledReportFuture.set( | ||
| threadPool.schedule(this::reportSummariesAndThenReschedule, intervalValue, threadPool.executor(ThreadPool.Names.GENERIC)) | ||
| ); | ||
| } | ||
|
|
||
| /** | ||
| * Looks at the given setting values and decides whether to schedule another reporting task or cancel reporting now. | ||
| */ | ||
| private void rescheduleReporting() { | ||
| if (this.enableBalancerRoundSummaries) { | ||
| // It's possible that this races with a concurrent call to cancel reporting, but that's okay. The next rescheduleReporting call | ||
| // will check the latest settings and cancel. | ||
| scheduleReporting(this.summaryReportInterval); | ||
| } else { | ||
| cancelReporting(); | ||
| } | ||
| } | ||
|
|
||
| // @VisibleForTesting | ||
| protected void verifyNumberOfSummaries(int numberOfSummaries) { | ||
| assert numberOfSummaries == summaries.size(); | ||
| } | ||
|
|
||
| } | ||
24 changes: 24 additions & 0 deletions
24
...in/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancingRoundSummary.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,24 @@ | ||
| /* | ||
| * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
| * or more contributor license agreements. Licensed under the "Elastic License | ||
| * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side | ||
| * Public License v 1"; you may not use this file except in compliance with, at | ||
| * your election, the "Elastic License 2.0", the "GNU Affero General Public | ||
| * License v3.0 only", or the "Server Side Public License, v 1". | ||
| */ | ||
|
|
||
| package org.elasticsearch.cluster.routing.allocation.allocator; | ||
|
|
||
| /** | ||
| * Summarizes the impact to the cluster as a result of a rebalancing round. | ||
| * | ||
| * @param numberOfShardsToMove The number of shard moves required to move from the previous desired balance to the new one. | ||
| */ | ||
| public record BalancingRoundSummary(long numberOfShardsToMove) { | ||
|
|
||
| @Override | ||
| public String toString() { | ||
| return "BalancingRoundSummary{" + "numberOfShardsToMove=" + numberOfShardsToMove + '}'; | ||
| } | ||
|
|
||
| } |
45 changes: 45 additions & 0 deletions
45
...org/elasticsearch/cluster/routing/allocation/allocator/CombinedBalancingRoundSummary.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,45 @@ | ||
| /* | ||
| * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
| * or more contributor license agreements. Licensed under the "Elastic License | ||
| * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side | ||
| * Public License v 1"; you may not use this file except in compliance with, at | ||
| * your election, the "Elastic License 2.0", the "GNU Affero General Public | ||
| * License v3.0 only", or the "Server Side Public License, v 1". | ||
| */ | ||
|
|
||
| package org.elasticsearch.cluster.routing.allocation.allocator; | ||
|
|
||
| import java.util.List; | ||
|
|
||
| /** | ||
| * Holds combined {@link BalancingRoundSummary} results. Essentially holds a list of the balancing events and the summed up changes | ||
| * across all those events: what allocation work was done across some period of time. | ||
| * TODO: WIP ES-10341 | ||
| * | ||
| * Note that each balancing round summary is the difference between, at the time, latest desired balance and the previous desired balance. | ||
| * Each summary represents a step towards the next desired balance, which is based on presuming the previous desired balance is reached. So | ||
| * combining them is roughly the difference between the first summary's previous desired balance and the last summary's latest desired | ||
| * balance. | ||
| * | ||
| * @param numberOfBalancingRounds How many balancing round summaries are combined in this report. | ||
| * @param numberOfShardMoves The sum of shard moves for each balancing round being combined into a single summary. | ||
| */ | ||
| public record CombinedBalancingRoundSummary(int numberOfBalancingRounds, long numberOfShardMoves) { | ||
|
|
||
| public static final CombinedBalancingRoundSummary EMPTY_RESULTS = new CombinedBalancingRoundSummary(0, 0); | ||
|
|
||
| public static CombinedBalancingRoundSummary combine(List<BalancingRoundSummary> summaries) { | ||
| if (summaries.isEmpty()) { | ||
| return EMPTY_RESULTS; | ||
| } | ||
|
|
||
| int numSummaries = 0; | ||
| long numberOfShardMoves = 0; | ||
| for (BalancingRoundSummary summary : summaries) { | ||
| ++numSummaries; | ||
| numberOfShardMoves += summary.numberOfShardsToMove(); | ||
| } | ||
| return new CombinedBalancingRoundSummary(numSummaries, numberOfShardMoves); | ||
| } | ||
|
|
||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It says above
but the minimum is zero (I think?)
Either comment or code is out of date I think