- 
                Notifications
    You must be signed in to change notification settings 
- Fork 25.6k
Model movements to nodes with no existing node stats #133901
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
|  | @@ -11,7 +11,10 @@ | |
|  | ||
| import com.carrotsearch.hppc.ObjectDoubleHashMap; | ||
| import com.carrotsearch.hppc.ObjectDoubleMap; | ||
| import com.carrotsearch.hppc.procedures.ObjectDoubleProcedure; | ||
|  | ||
| import org.apache.logging.log4j.LogManager; | ||
| import org.apache.logging.log4j.Logger; | ||
| import org.elasticsearch.cluster.NodeUsageStatsForThreadPools; | ||
| import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; | ||
| import org.elasticsearch.common.util.Maps; | ||
|  | @@ -21,6 +24,7 @@ | |
| import java.util.Collections; | ||
| import java.util.HashSet; | ||
| import java.util.Map; | ||
| import java.util.OptionalInt; | ||
| import java.util.Set; | ||
|  | ||
| /** | ||
|  | @@ -29,6 +33,7 @@ | |
| */ | ||
| public class ShardMovementWriteLoadSimulator { | ||
|  | ||
| private static final Logger logger = LogManager.getLogger(ShardMovementWriteLoadSimulator.class); | ||
| private final Map<String, NodeUsageStatsForThreadPools> originalNodeUsageStatsForThreadPools; | ||
| private final ObjectDoubleMap<String> simulatedNodeWriteLoadDeltas; | ||
| private final Map<ShardId, Double> writeLoadsPerShard; | ||
|  | @@ -87,9 +92,46 @@ public Map<String, NodeUsageStatsForThreadPools> simulatedNodeUsageStatsForThrea | |
| adjustedNodeUsageStatsForThreadPools.put(entry.getKey(), entry.getValue()); | ||
| } | ||
| } | ||
|  | ||
| // Add `NodeUsageStatsForThreadPools` for any nodes not present in the original `NodeUsageStatsForThreadPools` map. | ||
| addUsageStatsForAnyNodesNotPresentInOriginalNodeUsageStatsForThreadPools(adjustedNodeUsageStatsForThreadPools); | ||
|  | ||
| return Collections.unmodifiableMap(adjustedNodeUsageStatsForThreadPools); | ||
| } | ||
|  | ||
| private void addUsageStatsForAnyNodesNotPresentInOriginalNodeUsageStatsForThreadPools( | ||
| Map<String, NodeUsageStatsForThreadPools> nodeUsageStatsForThreadPools | ||
| ) { | ||
| // Assume the new node has the same size thread pool as the largest existing node | ||
| final OptionalInt largestWriteThreadPool = originalNodeUsageStatsForThreadPools.values() | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right now we have to build the new node basis every time we calculated an updated ClusterInfo. Could we instead immediately add a new 0 utilization node to  | ||
| .stream() | ||
| .map(NodeUsageStatsForThreadPools::threadPoolUsageStatsMap) | ||
| .map(m -> m.get(ThreadPool.Names.WRITE)) | ||
| .mapToInt(NodeUsageStatsForThreadPools.ThreadPoolUsageStats::totalThreadPoolThreads) | ||
| .max(); | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Assuming the max thread pool size is probably "optimistic", we could also be pessimistic and assume the minimum pool size There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since we don't really know what would be best, can we pick the first node in the map? Then we avoid any performance slow downs with streams or iteration, since this path is going to be hit a lot. | ||
|  | ||
| if (largestWriteThreadPool.isPresent()) { | ||
| simulatedNodeWriteLoadDeltas.forEach((ObjectDoubleProcedure<? super String>) (nodeId, writeLoadDelta) -> { | ||
| nodeUsageStatsForThreadPools.computeIfAbsent( | ||
| nodeId, | ||
| missingNodeId -> new NodeUsageStatsForThreadPools( | ||
| missingNodeId, | ||
| Map.of( | ||
| ThreadPool.Names.WRITE, | ||
| new NodeUsageStatsForThreadPools.ThreadPoolUsageStats( | ||
| largestWriteThreadPool.getAsInt(), | ||
| updateNodeUtilizationWithShardMovements(0.0f, (float) writeLoadDelta, largestWriteThreadPool.getAsInt()), | ||
| 0 | ||
| ) | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will fudge a  There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can also probably do better in the event a node with no stats has a shard moved off of it. In that case it doesn't make sense to assume the node was empty before the move, because clearly it was not, but then it seems very unlikely we'd find ourselves in that situation so I don't know how much effort we want to put into improving that estimate. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 
 Just FYI, the Decider does take some action based on the ClusterInfo it receives. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 
 I agree, not worth the effort. Not sure the scenario can even happen, actually. A new node joins, we'd have to assign shards to the node, first, which means simulation begins. The simulation might be a little off because of the thread pool thread count guess, but otherwise fine. | ||
| ) | ||
| ) | ||
| ); | ||
| }); | ||
| } else { | ||
| logger.debug("No nodes found to estimate write thread pool size, skipping"); | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we get here, there were no other nodes in the  | ||
| } | ||
| } | ||
|  | ||
| private static NodeUsageStatsForThreadPools.ThreadPoolUsageStats replaceWritePoolStats( | ||
| NodeUsageStatsForThreadPools value, | ||
| double writeLoadDelta, | ||
|  | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of adding a second non-trivial for-loop in
addUsageStatsForAnyNodesNotPresentInOriginalNodeUsageStatsForThreadPools, could we replace the above original for-loop with iteration of thesimulatedNodeWriteLoadDeltasdata structure to begin with?Then use something like
originalNodeUsageStatsForThreadPools.forEach(adjustedNodeUsageStatsForThreadPools::putIfAbsent)to straight copy the remainder.This would be in combination with my other suggestion to add the new node to
originalNodeUsageStatsForThreadPoolsbeforehand for simplicity.