Skip to content

Conversation

@ivancea
Copy link
Contributor

@ivancea ivancea commented Mar 14, 2025

Calculate the maximum concurrent nodes for a query, based on whether the datanode plan has a limit or not (And no other conditions/nodes before).

The concurrency limit is calculated as the log2(limit).

Also, changed the query pragma to not have an upper limit, allowing users to effectively override any calculation with a bigger limit.

idegtiarenko and others added 13 commits February 18, 2025 14:18
# Conflicts:
#	x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java
# Conflicts:
#	x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/ManyShardsIT.java
#	x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java
#	x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java
#	x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java
#	x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderTests.java
* Used to avoid overloading the cluster with concurrent requests that may not be needed.
* </p>
*
* @return Null if there should be no limit, otherwise, the maximum number of nodes that should be executed concurrently.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be -1 for no limit, to work like the pragma

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd probably move this to the class javadoc.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or put a little bit there.

* @return Null if there should be no limit, otherwise, the maximum number of nodes that should be executed concurrently.
*/
public Integer calculateNodesConcurrency(PhysicalPlan dataNodePlan, Configuration configuration) {
// TODO: Request FoldContext or a context containing it
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is needed for the limit.limit().fold(...). But we can probably assert that it's a Literal, and avoid folding

Comment on lines 64 to 65
// TODO: Do some conversion here
return limit;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic we choose here may be quite arbitrary without some real statistics of the nodes/shard

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would probably limit to 2 for everything up to 10. Or may be something like Math.max(2, log(limit))

Comment on lines 49 to 52
// # Negative cases
// - FROM | STATS: Fragment[EsRelation, Aggregate]
// - SORT: Fragment[EsRelation, TopN]
// - WHERE: Fragment[EsRelation, Filter]
Copy link
Contributor Author

@ivancea ivancea Mar 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When getting the LIMIT value:

  • The WHERE is already taken into account explicitly.
  • The STATS can't have a LIMIT in the datanode side, so it's fine.
  • The SORT shouldn't happen, as we look for a Limit after the EsRelation, and the Limit would be a TopN otherwise.

Those are mostly assumptions; there's still a lot of testing to do with different commands that could break them

Copy link
Contributor

@luigidellaquila luigidellaquila left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a first quick look and left a couple of comments.

In general, for now I think it's acceptable to have this component at this level as it's simple enough, but on the other hand this could benefit from some additional information (eg. LocalPhysicalOptimizerContext and SearchStats) that is available at physical planning time.

More in abstract, this should be part of a cost based execution planning process, but it's way too complicated as a topic for now.

} else if (relationFound.get() && filterFound.get() == false) {
// We only care about the limit if there's a relation before it, and no filter in between
if (node instanceof Limit limit) {
assert limitValue.get() == null : "Multiple limits found in the same data node plan";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could still happen, eg. with MV_EXPAND | LIMIT, that becomes LIMIT | MV_EXPAND | LIMIT

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed that assertion, to just use the first limit it finds, which is what makes sense in any case

logicalPlan.forEachUp(node -> {
if (node instanceof EsRelation) {
relationFound.set(true);
} else if (node instanceof Filter) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this blacklisting is safe in the long term.
I'd prefer to have a whitelist approach, ie. a set of plan types that can be present after EsRelation and that we know are safe to ignore before a LIMIT.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initially changed it to a whitelist, but after adding test for every command, Limit is effectively pushed down always. So now it's just an "If not a relation or limit -> 💀"

@ivancea ivancea marked this pull request as ready for review March 20, 2025 13:17
@elasticsearchmachine elasticsearchmachine added the needs:triage Requires assignment of a team area label label Mar 20, 2025
// 10 | 3
// 1000 | 9
// 100000 | 16
return Math.max(2, (int) (Math.log(limit) / Math.log(2)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets not limit queries with limits higher than 1000 for now.
It might become slower when querying a lot of shards with small number of shards.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

        // Limit | Concurrency
        // 1 | 2
        // 10 | 3
        // 1000 | 9

Above makes sense, but I would like to confirm with @costin about it

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with this heuristic. You can always override it.

Do we get here with | LIMIT 0? Could you make sure we have tests for that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

| LOOKUP JOIN languages_lookup on language_code
| LIMIT 1024
""", 10);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was looking for that when doing it, but we only have class-level parameterized tests...
I was also checking assertAll(), but, of course, junit5 too 💀

Luckily there aren't that many cases now, we can refactor them if we add more and they're similar

Copy link
Contributor

@idegtiarenko idegtiarenko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@ivancea ivancea added >feature Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) :Analytics/ES|QL AKA ESQL and removed needs:triage Requires assignment of a team area label labels Mar 20, 2025
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-analytical-engine (Team:Analytics)

@elasticsearchmachine
Copy link
Collaborator

Hi @ivancea, I've created a changelog YAML for you.

Copy link
Contributor

@luigidellaquila luigidellaquila left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

// 10 | 3
// 1000 | 9
// 100000 | 16
return Math.max(2, (int) (Math.log(limit) / Math.log(2)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Super-duper driveby: maybe it's simpler to take 31-Integer.numberOfLeadingZeros(limit) to compute the log2.

Copy link
Member

@nik9000 nik9000 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM so long as we have tests for |LIMIT 0 and we're sure this doesn't break them.

It'd be cool to see this on tests for bigger clusters. I bet it'll be compelling. I'm really curious to see about follow up that let us apply this for things like FROM | SORT | LIMIT - that's trickier but it'll be lovely one day!

* Used to avoid overloading the cluster with concurrent requests that may not be needed.
* </p>
*
* @return Null if there should be no limit, otherwise, the maximum number of nodes that should be executed concurrently.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd probably move this to the class javadoc.

* Used to avoid overloading the cluster with concurrent requests that may not be needed.
* </p>
*
* @return Null if there should be no limit, otherwise, the maximum number of nodes that should be executed concurrently.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or put a little bit there.

// 1 | 2
// 10 | 3
// 1000 | 9
// 100000 | 16
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This example would violate the above.

// 10 | 3
// 1000 | 9
// 100000 | 16
return Math.max(2, (int) (Math.log(limit) / Math.log(2)));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with this heuristic. You can always override it.

Do we get here with | LIMIT 0? Could you make sure we have tests for that?

@idegtiarenko idegtiarenko merged commit bd04d1f into elastic:main Mar 25, 2025
17 checks passed
omricohenn pushed a commit to omricohenn/elasticsearch that referenced this pull request Mar 28, 2025
@ivancea ivancea deleted the esql-calculate-concurrent-requests-limit branch April 7, 2025 09:16
ivancea added a commit to ivancea/elasticsearch that referenced this pull request May 6, 2025
ivancea added a commit that referenced this pull request May 6, 2025
@nik9000 nik9000 added the v8.19.0 label May 6, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

:Analytics/ES|QL AKA ESQL >feature Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) v8.19.0 v9.1.0

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants