Skip to content

Conversation

@javanna
Copy link
Member

@javanna javanna commented Oct 25, 2024

Since we removed the search workers thread pool with #111099, we execute many more tasks in the search thread pool, given that each shard search request parallelizes across slices or even segments (knn query rewrite. There are also rare situations where segment level tasks may parallelize further (e.g. createWeight), that cause the creation of many many tasks for a single top-level request. These are rather small tasks that previously queued up in the unbounded search workers queue. With recent improvements in Lucene, these tasks queue up in the search queue, yet they get executed by the caller thread while they are still in the queue, and remain in the queue as no-op until they are pulled out of the queue. We have protection against rejections based on turning off search concurrency when we have more than maxPoolSize items in the queue, yet that is not enough if enough parallel requests see an empty queue and manage to submit enough tasks to fill the queue at once. That will cause rejections for top-level searches that should not be rejected.

This commit introduces wrapping for the executor to limit the number of tasks that a single search instance can submit to the executor, to prevent the situation where a single search submits way more tasks than threads available.

Since we removed the search workers thread pool with elastic#111099, we execute many
more tasks in the search thread pool, given that each shard search request
parallelizes across slices or even segments (knn query rewrite. There are also
rare situations where segment level tasks may parallelize further
(e.g. createWeight), that cause the creation of many many tasks for a single
top-level request. These are rather small tasks that previously queued up in
the unbounded search workers queue. With recent improvements in Lucene,
these tasks queue up in the search queue, yet they get executed by the caller
thread while they are still in the queue, and remain in the queue as no-op
until they are pulled out of the queue. We have protection against rejections
based on turning off search concurrency when we have more than maxPoolSize
items in the queue, yet that is not enough if enough parallel requests see
an empty queue and manage to submit enough tasks to fill the queue at once.
That will cause rejections for top-level searches that should not be rejected.

This commit introduces wrapping for the executor to limit the number of tasks
that a single search instance can submit to the executor, to prevent the situation
where a single search submits way more tasks than threads available.
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-search-foundations (Team:Search Foundations)

@elasticsearchmachine elasticsearchmachine added the Team:Search Foundations Meta label for the Search Foundations team in Elasticsearch label Oct 25, 2024
@elasticsearchmachine
Copy link
Collaborator

Hi @javanna, I've created a changelog YAML for you.

command.run();
return;
}
if (segmentLevelTasks.incrementAndGet() > tpe.getMaximumPoolSize()) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am open to opinions on the threshold. It is quite conservative. For instance, for operations like knn query rewrite that parallelize on number of segments, we end up creating much less tasks on small nodes. yet it is probably a good idea to not create more tasks than the available threads, and there could be multiple shard level requests happening at the same time in the same node, deriving from the same search or others, so the total number of tasks is still potentially higher than max pool size anyways.

We should probably improve this in Lucene as a follow-up, but this is some protection mostly for 8.x which is based on Lucene 9.12.

Copy link
Contributor

@jpountz jpountz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the simplicity of the workaround, left some small comments.

command.run();
} else {
executor.execute(command);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we decrement the counter when the task is done executing in order to allow search parallelism again later on?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I've been going back and forth on it.

The current impl removes the need for CAS once we used the budget. It is also much simpler to test. These are maybe minor points though. The current solution may look too conservative around the number of tasks that get created, and if they are executed fast enough we could indeed create more tasks than the number of threads in total, although not all at the same time. I wonder how likely that is a real scenario, given that TaskExecutor submits all tasks at once, and not gradually. That is why I think that this simple solution provides the most value, assuming that all tasks are submitted at once. I guess that this impl may perhaps hurt latency a little over throughput. Also, we already apply the same limit to the number of slices, statically. We would just apply the same limit effectively to knn query rewrite and degenerate cases where we end up parallelizing from a segment level task, which seems wrong anyway and we should protect from.

Additional thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess that my main concern is more that if a query parallelizes at rewrite time (or createWeight time) and then at collection time, and if the query rewrite uses all the parallelism budget, then you won't get any parallelism at collection time.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, my comment above does not take into account that invokeAll may be called multiple times in different stages of a search, for instance rewrite and later collection. It is also true that when we do parallelize at rewrite, we care less about parallelizing during collection, but that's an assumption that may only hold in the short term. I do think that we should eventually adjust things in Lucene and remove this conditional in ES on the long run.

I will look further into how we can make this a little better without over-complicating things.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying the same thing at one point and it's somewhat tricky to avoid a serious penalty for tiny tasks if you start increasing the amount of ref-counting that is done overall like this.
The beauty of this solution is that it's just a single thread doing all the counting pretty much, the CAS cost is far from trivial here. I couldn't make any scheme that added another CAS (on top of the existing CAS we do when enqueuing work in Lucene) work without a measurable regression.
My vote would be to see if we can find a more helpful API on the Lucene end to deal with the various contention/scheduling tradeoffs/issues we have today rather than add complexity here => I'd go with Luca's solution as is I think :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed a new commit that does the decrement. Testing has become much less predictable like I expected. I do also worry about the CAS for each task. We should probably benchmark both solutions and see what the difference is. What are we losing by e.g. only parallelizing knn query rewrite and not collection when they both happen, compared to what we are losing by performing the CAS at all times?

Copy link
Contributor

@jpountz jpountz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The approach makes sense to me, this would help bound the contribution of each shard to the queue size. However, it looks like we are missing a decrement on the code path where we run the command in the current thread?

AtomicInteger segmentLevelTasks = new AtomicInteger(0);
return command -> {
if (segmentLevelTasks.incrementAndGet() > tpe.getMaximumPoolSize()) {
command.run();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't you need to decrement the counter on this code path as well?

@javanna javanna removed the v9.1.0 label Mar 17, 2025
@javanna javanna closed this Mar 17, 2025
@javanna javanna deleted the fix/limit_num_tasks_searcher branch March 17, 2025 14:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

>bug :Search Foundations/Search Catch all for Search Foundations Team:Search Foundations Meta label for the Search Foundations team in Elasticsearch

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants