-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Sending slice requests to different nodes in BulkByScrollParallelizationHelper to improve performance #125238
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…ticsearch into round-robin-slice-requests
|
Hi @masseyke, I've created a changelog YAML for you. |
|
Pinging @elastic/es-distributed-indexing (Team:Distributed Indexing) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am slightly unsure about the need for this, hope you can provide more data on it.
I would like to also have an IT demonstrating that the slices are indeed handled on different nodes.
And we may need to do a more exhaustive search for local node expectations.
| DiscoveryNode ingestNode = ingestNodes[Math.floorMod(ingestNodeOffsetGenerator.incrementAndGet(), ingestNodes.length)]; | ||
| logger.debug("Sending request for slice to {}", ingestNode.getName()); | ||
| transportService.sendRequest( | ||
| ingestNode, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there are expectations of this running on the local node, for instance here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't realize that we had a rebalance API, and that it worked on the assumption that all subtasks were local. That would be a much bigger change to handle (I assume we'd have to put information about where each child task is running into the LeaderBulkByScrollTaskState?). So I think I'll close this for now, and maybe revisit it if we see evidence of this causing performance problems in the wild.
| client.execute(action, requestForSlice, sliceListener); | ||
| } else { | ||
| /* | ||
| * Indexing will potentially run a pipeline for each document. If we run all slices on the same node (locally), that |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pipelines that hog the CPU that much during reindex sounds problematic, I wonder if that is worth looking into instead? Perhaps you have more detail to share (privately is good too).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't even have to be a really CPU-heavy pipeline. Just the existence of a trivial set processor, for example, means that the data has to be deserialized and reserialized. Just that serialization slows things down a good bit (since without pipelines the data is never deserialized before being sent to the correct node for indexing). There might be something smarter we can do on the pipeline side, but this seemed like an easy workaround to spread out that work (although taking the comment below about rebalancing into account, this is no longer an easy workaround).
| * The following is incremented in order to keep track of the current round-robin position for ingest nodes that we send sliced requests | ||
| * to. We randomize where it starts so that all nodes don't begin by sending data to the same node. | ||
| */ | ||
| private static final AtomicInteger ingestNodeOffsetGenerator = new AtomicInteger(Randomness.get().nextInt(2048)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be good to get rid of the static here, perhaps this can be kept on the action instead and passed in?
This came out of some testing that @parkertimmins and I did. We noticed that if we ran reindex with a very simple pipeline (set a single field), and had an index that was big enough to get split into many slices, and we ran on a 10-node cluster, one node would be pegged at 100% running the pipeline, and the rest would be at a much lower percent (~10-15% maybe) doing indexing. |
|
Closing because this solution is not compatible with the rethrottle action. |
We noticed that when running reindex on a large index on a large cluster, a single node (the one running TransportReindexAction) would have 100% CPU usage, while the other nodes would all be at ~15%. It turns out that we execute all slices for the reindex on the same node. So if there is any pipeline, all processors for all documents are executed on the single node, before the shard-specific indexing requests are sent out to the nodes where the shards live.
This change makes it so that BulkByScrollParallelizationHelper round-robins its work through all of the ingest nodes to more evenly spread out the work. In practice, we have seen significant performance improvements when we have large indices with pipelines running on large (10-node) clusters.
Relates #125171