-
Notifications
You must be signed in to change notification settings - Fork 25.7k
Implement WriteLoadConstraintDecider#canAllocate #132041
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 23 commits
e508206
72471c9
4fdc73f
121d4de
3dcb94d
9102349
6d85655
64d6344
606cadc
68be3df
6244ef8
da441c3
7ce1e59
a6783a2
74b38e1
8b955e6
fc934dc
a0e61e0
3014d88
6197bcb
b896291
0824b59
2e6824f
85f4448
1e53258
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,124 @@ | ||
| /* | ||
| * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
| * or more contributor license agreements. Licensed under the "Elastic License | ||
| * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side | ||
| * Public License v 1"; you may not use this file except in compliance with, at | ||
| * your election, the "Elastic License 2.0", the "GNU Affero General Public | ||
| * License v3.0 only", or the "Server Side Public License, v 1". | ||
| */ | ||
|
|
||
| package org.elasticsearch.cluster.routing.allocation.decider; | ||
|
|
||
| import org.apache.logging.log4j.LogManager; | ||
| import org.apache.logging.log4j.Logger; | ||
| import org.elasticsearch.cluster.NodeUsageStatsForThreadPools.ThreadPoolUsageStats; | ||
| import org.elasticsearch.cluster.metadata.IndexMetadata; | ||
| import org.elasticsearch.cluster.routing.RoutingNode; | ||
| import org.elasticsearch.cluster.routing.ShardMovementWriteLoadSimulator; | ||
| import org.elasticsearch.cluster.routing.ShardRouting; | ||
| import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; | ||
| import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings; | ||
| import org.elasticsearch.common.settings.ClusterSettings; | ||
| import org.elasticsearch.core.Strings; | ||
| import org.elasticsearch.threadpool.ThreadPool; | ||
|
|
||
| /** | ||
| * Decides whether shards can be allocated to cluster nodes, or can remain on cluster nodes, based on the target node's current write thread | ||
| * pool usage stats and any candidate shard's write load estimate. | ||
| */ | ||
| public class WriteLoadConstraintDecider extends AllocationDecider { | ||
| private static final Logger logger = LogManager.getLogger(WriteLoadConstraintDecider.class); | ||
|
|
||
| public static final String NAME = "write_load"; | ||
|
|
||
| private final WriteLoadConstraintSettings writeLoadConstraintSettings; | ||
|
|
||
| public WriteLoadConstraintDecider(ClusterSettings clusterSettings) { | ||
| this.writeLoadConstraintSettings = new WriteLoadConstraintSettings(clusterSettings); | ||
| } | ||
|
|
||
| @Override | ||
| public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { | ||
| if (writeLoadConstraintSettings.getWriteLoadConstraintEnabled().disabled()) { | ||
| return Decision.single(Decision.Type.YES, NAME, "Decider is disabled"); | ||
| } | ||
DiannaHohensee marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| // Check whether the shard being relocated has any write load estimate. If it does not, then this decider has no opinion. | ||
| var allShardWriteLoads = allocation.clusterInfo().getShardWriteLoads(); | ||
| var shardWriteLoad = allShardWriteLoads.get(shardRouting.shardId()); | ||
nicktindall marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if (shardWriteLoad == null || shardWriteLoad == 0) { | ||
mhl-b marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| return Decision.single(Decision.Type.YES, NAME, "Shard has no estimated write load. Decider takes no action."); | ||
| } | ||
|
|
||
| var allNodeUsageStats = allocation.clusterInfo().getNodeUsageStatsForThreadPools(); | ||
| var nodeUsageStatsForThreadPools = allNodeUsageStats.get(node.nodeId()); | ||
| if (nodeUsageStatsForThreadPools == null) { | ||
mhl-b marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| // No node-level thread pool usage stats were reported for this node. Let's assume this is OK and that the simulator will handle | ||
| // setting a node-level write load for this node after this shard is assigned. | ||
| return Decision.single(Decision.Type.YES, NAME, "The node has no write load estimate. Decider takes no action."); | ||
| } | ||
|
|
||
| assert nodeUsageStatsForThreadPools.threadPoolUsageStatsMap().isEmpty() == false; | ||
| assert nodeUsageStatsForThreadPools.threadPoolUsageStatsMap().get(ThreadPool.Names.WRITE) != null; | ||
| var nodeWriteThreadPoolStats = nodeUsageStatsForThreadPools.threadPoolUsageStatsMap().get(ThreadPool.Names.WRITE); | ||
| var nodeWriteThreadPoolLoadThreshold = writeLoadConstraintSettings.getWriteThreadPoolHighUtilizationThresholdSetting(); | ||
| if (nodeWriteThreadPoolStats.averageThreadPoolUtilization() >= nodeWriteThreadPoolLoadThreshold) { | ||
mhl-b marked this conversation as resolved.
Show resolved
Hide resolved
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, the next check would also catch this. It's a matter of explanation message for the NO decision, really, at this point. They could be combined; separately the messages can be clearer for the user to understand, I think.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure what you're suggesting. Are you proposing to add logic to I like the clarity of the separate explain messages. Do you feel strongly about merging the two if statements?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't feel strongly |
||
| // The node's write thread pool usage stats already show high utilization above the threshold for accepting new shards. | ||
| String explain = Strings.format( | ||
| "Node [%s] with write thread pool utilization [%.2f] already exceeds the high utilization threshold of [%f]. Cannot " | ||
| + "allocate shard [%s] to node without risking increased write latencies.", | ||
| node.nodeId(), | ||
| nodeWriteThreadPoolStats.averageThreadPoolUtilization(), | ||
| nodeWriteThreadPoolLoadThreshold, | ||
| shardRouting.shardId() | ||
| ); | ||
| logger.debug(explain); | ||
| return Decision.single(Decision.Type.NO, NAME, explain); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this should respond "not-preferred" instead?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was disagreement on implementing |
||
| } | ||
|
|
||
| if (calculateShardMovementChange(nodeWriteThreadPoolStats, shardWriteLoad) >= nodeWriteThreadPoolLoadThreshold) { | ||
| // The node's write thread pool usage would be raised above the high utilization threshold with assignment of the new shard. | ||
| // This could lead to a hot spot on this node and is undesirable. | ||
| String explain = Strings.format( | ||
| "The high utilization threshold of [%f] would be exceeded on node [%s] with utilization [%.2f] if shard [%s] with " | ||
| + "estimated additional utilisation [%.5f] (write load [%.5f] / threads [%d]) were assigned to it. Cannot allocate " | ||
| + "shard to node without risking increased write latencies.", | ||
| nodeWriteThreadPoolLoadThreshold, | ||
| node.nodeId(), | ||
| nodeWriteThreadPoolStats.averageThreadPoolUtilization(), | ||
nicktindall marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| shardRouting.shardId(), | ||
| shardWriteLoad / nodeWriteThreadPoolStats.totalThreadPoolThreads(), | ||
| shardWriteLoad, | ||
| nodeWriteThreadPoolStats.totalThreadPoolThreads() | ||
| ); | ||
| logger.debug(explain); | ||
| return Decision.single(Decision.Type.NO, NAME, explain); | ||
| } | ||
|
|
||
| return Decision.YES; | ||
| } | ||
|
|
||
| @Override | ||
| public Decision canRemain(IndexMetadata indexMetadata, ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { | ||
| if (writeLoadConstraintSettings.getWriteLoadConstraintEnabled().notFullyEnabled()) { | ||
| return Decision.YES; | ||
| } | ||
|
|
||
| // TODO: implement | ||
|
|
||
| return Decision.YES; | ||
| } | ||
|
|
||
| /** | ||
| * Calculates the change to the node's write thread pool utilization percentage if the shard is added to the node. | ||
| * Returns the percent thread pool utilization change. | ||
| */ | ||
| private float calculateShardMovementChange(ThreadPoolUsageStats nodeWriteThreadPoolStats, double shardWriteLoad) { | ||
| assert shardWriteLoad > 0; | ||
| return ShardMovementWriteLoadSimulator.updateNodeUtilizationWithShardMovements( | ||
| nodeWriteThreadPoolStats.averageThreadPoolUtilization(), | ||
| (float) shardWriteLoad, | ||
| nodeWriteThreadPoolStats.totalThreadPoolThreads() | ||
| ); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: alternatively we could have a method
WriteLoadDeciderStatus#requiresShardLevelWriteLoads()(andrequiresNodeLevelWriteLoads()) which would returntrueforLOW_THRESHOLD_ONLYandENABLEDbutfalseforDISABLED.It would read nicer if the
writeLoadConstraintEnabledfield was calledwriteLoadDeciderStatusif we went that way.Don't feel strongly about this naming thing though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
requiresShardLevelWriteLoadsandrequiresNodeLevelWriteLoadsdoesn't seem like the right split, as I understand it. I was imagining LOW as the best-effort hot-spot prevention (canAllocate) without hot-spot correction (canRemain), and fully enabled as including hot-spot correction. Both node and shard level stats are needed for prevention, to compare the shard move write load change with the node's overall write load.I'll leave this as is until some follow up.