Skip to content
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
eec2039
Limit concurrent node requests
idegtiarenko Feb 18, 2025
d4850fc
upd
idegtiarenko Feb 18, 2025
ae96763
upd
idegtiarenko Feb 18, 2025
15d897f
Merge branch 'main' into limit_concurrent_node_requests
idegtiarenko Mar 4, 2025
666f588
do not send requests if source is completed
idegtiarenko Mar 4, 2025
34badf7
Merge branch 'main' into limit_concurrent_node_requests
idegtiarenko Mar 5, 2025
8c55e8a
upd
idegtiarenko Mar 5, 2025
b2a66a2
do not erase prior shard failures on skipping node
idegtiarenko Mar 5, 2025
11bd0f0
Merge branch 'main' into limit_concurrent_node_requests
idegtiarenko Mar 5, 2025
53f7d60
upd
idegtiarenko Mar 5, 2025
3fe1766
Initial calculator with tests
ivancea Mar 11, 2025
d4fcb2e
Merge branch 'main' into esql-calculate-concurrent-requests-limit
ivancea Mar 14, 2025
a8faf76
Fix NPE
ivancea Mar 14, 2025
97cf29e
[CI] Auto commit changes from spotless
Mar 14, 2025
a5af34c
Fix test by optimizing plans
ivancea Mar 17, 2025
6e41a98
Merge branch 'main' into esql-calculate-concurrent-requests-limit
ivancea Mar 19, 2025
4fb514e
Added whitelisted nodes, removed multiple Limits assertion, and avoid…
ivancea Mar 19, 2025
5acabda
[CI] Auto commit changes from spotless
Mar 19, 2025
bc4be1d
Removed node whitelist, as limit is pushed down, and update concurren…
ivancea Mar 20, 2025
36c05ee
Remove limit from Pragma
ivancea Mar 20, 2025
7fa6f18
Merge branch 'main' into esql-calculate-concurrent-requests-limit
ivancea Mar 20, 2025
6bf6007
Add Nullable annotations, and cleanup
ivancea Mar 20, 2025
a37ca1f
Avoid limiting on high limits
ivancea Mar 20, 2025
6ea6df5
Update docs/changelog/124901.yaml
ivancea Mar 20, 2025
cdb007e
Merge branch 'main' into esql-calculate-concurrent-requests-limit
ivancea Mar 21, 2025
44e6399
Fix tests after max limit logic
ivancea Mar 21, 2025
6cc0f42
fix comments
idegtiarenko Mar 25, 2025
9f930b6
Merge branch 'main' into esql-calculate-concurrent-requests-limit
idegtiarenko Mar 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/124901.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 124901
summary: Calculate concurrent node limit
area: ES|QL
type: feature
issues: []
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.planner;

import org.elasticsearch.core.Nullable;
import org.elasticsearch.xpack.esql.core.expression.Literal;
import org.elasticsearch.xpack.esql.core.util.Holder;
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
import org.elasticsearch.xpack.esql.plan.logical.Limit;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plan.physical.FragmentExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.session.Configuration;

public class PlanConcurrencyCalculator {
public static final PlanConcurrencyCalculator INSTANCE = new PlanConcurrencyCalculator();

private PlanConcurrencyCalculator() {}

/**
* Calculates the maximum number of nodes that should be executed concurrently for the given data node plan.
* <p>
* Used to avoid overloading the cluster with concurrent requests that may not be needed.
* </p>
*
* @return Null if there should be no limit, otherwise, the maximum number of nodes that should be executed concurrently.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be -1 for no limit, to work like the pragma

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd probably move this to the class javadoc.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or put a little bit there.

*/
@Nullable
public Integer calculateNodesConcurrency(PhysicalPlan dataNodePlan, Configuration configuration) {
// If available, pragma overrides any calculation
if (configuration.pragmas().maxConcurrentNodesPerCluster() > 0) {
return configuration.pragmas().maxConcurrentNodesPerCluster();
}

Integer dataNodeLimit = getDataNodeLimit(dataNodePlan);

if (dataNodeLimit != null) {
return limitToConcurrency(dataNodeLimit);
}

return null;
}

private Integer limitToConcurrency(int limit) {
// For high limits, don't limit the concurrency
if (limit > 1000) {
return null;
}

// At least 2 nodes, otherwise log2(limit). E.g.
// Limit | Concurrency
// 1 | 2
// 10 | 3
// 1000 | 9
// 100000 | 16
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This example would violate the above.

return Math.max(2, (int) (Math.log(limit) / Math.log(2)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets not limit queries with limits higher than 1000 for now.
It might become slower when querying a lot of shards with small number of shards.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

        // Limit | Concurrency
        // 1 | 2
        // 10 | 3
        // 1000 | 9

Above makes sense, but I would like to confirm with @costin about it

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with this heuristic. You can always override it.

Do we get here with | LIMIT 0? Could you make sure we have tests for that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Super-duper driveby: maybe it's simpler to take 31-Integer.numberOfLeadingZeros(limit) to compute the log2.

}

@Nullable
private Integer getDataNodeLimit(PhysicalPlan dataNodePlan) {
LogicalPlan logicalPlan = getFragmentPlan(dataNodePlan);

// State machine to find:
// A relation
Holder<Boolean> relationFound = new Holder<>(false);
// ...followed by no other node that could break the calculation
Holder<Boolean> forbiddenNodeFound = new Holder<>(false);
// ...and finally, a limit
Holder<Integer> limitValue = new Holder<>(null);

logicalPlan.forEachUp(node -> {
// If a limit or a forbidden command was already found, ignore the rest
if (limitValue.get() == null && forbiddenNodeFound.get() == false) {
if (node instanceof EsRelation) {
relationFound.set(true);
} else if (relationFound.get()) {
if (node instanceof Limit limit && limit.limit() instanceof Literal literalLimit) {
limitValue.set((Integer) literalLimit.value());
} else {
forbiddenNodeFound.set(true);
}
}
}
});

return limitValue.get();
}

private LogicalPlan getFragmentPlan(PhysicalPlan plan) {
Holder<LogicalPlan> foundPlan = new Holder<>();
plan.forEachDown(node -> {
if (node instanceof FragmentExec fragment) {
foundPlan.set(fragment.fragment());
}
});
return foundPlan.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.elasticsearch.xpack.esql.core.expression.FoldContext;
import org.elasticsearch.xpack.esql.plan.physical.ExchangeSinkExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.planner.PlanConcurrencyCalculator;
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
import org.elasticsearch.xpack.esql.session.Configuration;

Expand Down Expand Up @@ -98,13 +99,15 @@ void startComputeOnDataNodes(
Runnable runOnTaskFailure,
ActionListener<ComputeResponse> outListener
) {
Integer maxConcurrentNodesPerCluster = PlanConcurrencyCalculator.INSTANCE.calculateNodesConcurrency(dataNodePlan, configuration);

new DataNodeRequestSender(
transportService,
esqlExecutor,
clusterAlias,
parentTask,
configuration.allowPartialResults(),
configuration.pragmas().maxConcurrentNodesPerCluster()
maxConcurrentNodesPerCluster == null ? -1 : maxConcurrentNodesPerCluster
) {
@Override
protected void sendRequest(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public final class QueryPragmas implements Writeable {
public static final Setting<TimeValue> STATUS_INTERVAL = Setting.timeSetting("status_interval", Driver.DEFAULT_STATUS_INTERVAL);

public static final Setting<Integer> MAX_CONCURRENT_NODES_PER_CLUSTER = //
Setting.intSetting("max_concurrent_nodes_per_cluster", -1, -1, 100);
Setting.intSetting("max_concurrent_nodes_per_cluster", -1, -1);
public static final Setting<Integer> MAX_CONCURRENT_SHARDS_PER_NODE = //
Setting.intSetting("max_concurrent_shards_per_node", 10, 1, 100);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.planner;

import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.esql.analysis.Analyzer;
import org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils;
import org.elasticsearch.xpack.esql.core.expression.FoldContext;
import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext;
import org.elasticsearch.xpack.esql.optimizer.LogicalPlanOptimizer;
import org.elasticsearch.xpack.esql.optimizer.PhysicalOptimizerContext;
import org.elasticsearch.xpack.esql.optimizer.PhysicalPlanOptimizer;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.planner.mapper.Mapper;
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
import org.elasticsearch.xpack.esql.session.Configuration;

import java.util.List;

import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_VERIFIER;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.configuration;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.withDefaultLimitWarning;
import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.analyzer;
import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.analyzerDefaultMapping;
import static org.hamcrest.Matchers.equalTo;

public class PlanConcurrencyCalculatorTests extends ESTestCase {
public void testSimpleLimit() {
assertConcurrency("""
FROM x
| LIMIT 1024
""", 10);
}

public void testBiggestPragmaOverride() {
assertConcurrency("""
FROM x
| LIMIT 1024
""", Integer.MAX_VALUE, Integer.MAX_VALUE);
}

public void testSmallestPragmaOverride() {
assertConcurrency("""
FROM x
| LIMIT 1024
""", 1, 1);
}

public void testPragmaOverrideWithUnsupportedCommands() {
assertConcurrency("""
FROM x
| WHERE salary * 2 > 5
| LIMIT 1024
""", 1, 1);
}

public void testImplicitLimit() {
assertConcurrency("""
FROM x
""", 9);
}

public void testStats() {
assertConcurrency("""
FROM x
| STATS COUNT(salary)
""", null);
}

public void testStatsWithLimit() {
assertConcurrency("""
FROM x
| LIMIT 1024
| STATS COUNT(salary)
""", 10);
}

public void testSortBeforeLimit() {
assertConcurrency("""
FROM x
| SORT salary
""", null);
}

public void testSortAfterLimit() {
assertConcurrency("""
FROM x
| LIMIT 1024
| SORT salary
""", 10);
}

public void testStatsWithSortBeforeLimit() {
assertConcurrency("""
FROM x
| SORT salary
| LIMIT 1024
| STATS COUNT(salary)
""", null);
}

public void testStatsWithSortAfterLimit() {
assertConcurrency("""
FROM x
| SORT salary
| LIMIT 1024
| STATS COUNT(salary)
""", null);
}

public void testWhereBeforeLimit() {
assertConcurrency("""
FROM x
| WHERE salary * 2 > 5
| LIMIT 1024
""", null);
}

public void testWhereAfterLimit() {
assertConcurrency("""
FROM x
| LIMIT 1024
| WHERE salary * 2 > 5
""", 10);
}

public void testWherePushedToLuceneQueryBeforeLimit() {
assertConcurrency("""
FROM x
| WHERE first_name LIKE "A%"
| LIMIT 1024
""", null);
}

public void testWherePushedToLuceneQueryAfterLimit() {
assertConcurrency("""
FROM x
| LIMIT 1024
| WHERE first_name LIKE "A%"
""", 10);
}

public void testExpand() {
assertConcurrency("""
FROM x
| LIMIT 2048
| MV_EXPAND salary
| LIMIT 1024
""", 10);
}

public void testEval() {
assertConcurrency("""
FROM x
| EVAL x=salary*2
| LIMIT 1024
""", 10);
}

public void testRename() {
assertConcurrency("""
FROM x
| RENAME salary as x
| LIMIT 1024
""", 10);
}

public void testKeep() {
assertConcurrency("""
FROM x
| KEEP salary
| LIMIT 1024
""", 10);
}

public void testDrop() {
assertConcurrency("""
FROM x
| DROP salary
| LIMIT 1024
""", 10);
}

public void testDissect() {
assertConcurrency("""
FROM x
| DISSECT first_name "%{a} %{b}"
| LIMIT 1024
""", 10);
}

public void testGrok() {
assertConcurrency("""
FROM x
| GROK first_name "%{EMAILADDRESS:email}"
| LIMIT 1024
""", 10);
}

public void testEnrich() {
assertConcurrency("""
FROM x
| ENRICH languages ON first_name
| LIMIT 1024
""", 10);
}

public void testLookup() {
assertConcurrency("""
FROM x
| RENAME salary as language_code
| LOOKUP JOIN languages_lookup on language_code
| LIMIT 1024
""", 10);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was looking for that when doing it, but we only have class-level parameterized tests...
I was also checking assertAll(), but, of course, junit5 too 💀

Luckily there aren't that many cases now, we can refactor them if we add more and they're similar


private void assertConcurrency(String query, Integer expectedConcurrency) {
assertConcurrency(query, null, expectedConcurrency);
}

private void assertConcurrency(String query, Integer concurrencyPragmaValue, Integer expectedConcurrency) {
Configuration configuration = concurrencyPragmaValue == null
? configuration(query)
: configuration(
new QueryPragmas(Settings.builder().put("max_concurrent_nodes_per_cluster", concurrencyPragmaValue).build()),
query
);

Analyzer analyzer = analyzer(analyzerDefaultMapping(), TEST_VERIFIER, configuration);
LogicalPlan logicalPlan = AnalyzerTestUtils.analyze(query, analyzer);
logicalPlan = new LogicalPlanOptimizer(new LogicalOptimizerContext(configuration, FoldContext.small())).optimize(logicalPlan);

PhysicalPlan physicalPlan = new Mapper().map(logicalPlan);
physicalPlan = new PhysicalPlanOptimizer(new PhysicalOptimizerContext(configuration)).optimize(physicalPlan);

PhysicalPlan dataNodePlan = PlannerUtils.breakPlanBetweenCoordinatorAndDataNode(physicalPlan, configuration).v2();

Integer actualConcurrency = PlanConcurrencyCalculator.INSTANCE.calculateNodesConcurrency(dataNodePlan, configuration);

assertThat(actualConcurrency, equalTo(expectedConcurrency));
}

@Override
protected List<String> filteredWarnings() {
return withDefaultLimitWarning(super.filteredWarnings());
}
}
Loading