diff --git a/docs/changelog/124901.yaml b/docs/changelog/124901.yaml
new file mode 100644
index 0000000000000..d67bfb4defdba
--- /dev/null
+++ b/docs/changelog/124901.yaml
@@ -0,0 +1,5 @@
+pr: 124901
+summary: Calculate concurrent node limit
+area: ES|QL
+type: feature
+issues: []
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlanConcurrencyCalculator.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlanConcurrencyCalculator.java
new file mode 100644
index 0000000000000..5e40761a4a9d1
--- /dev/null
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlanConcurrencyCalculator.java
@@ -0,0 +1,106 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.planner;
+
+import org.elasticsearch.core.Nullable;
+import org.elasticsearch.xpack.esql.core.expression.Literal;
+import org.elasticsearch.xpack.esql.core.util.Holder;
+import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
+import org.elasticsearch.xpack.esql.plan.logical.Limit;
+import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
+import org.elasticsearch.xpack.esql.plan.physical.FragmentExec;
+import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
+import org.elasticsearch.xpack.esql.session.Configuration;
+
+/**
+ * Calculates the maximum number of nodes that should be queried concurrently for the given data node plan.
+ *
+ * Used to avoid overloading the cluster with concurrent requests that may not be needed.
+ *
+ */
+public class PlanConcurrencyCalculator {
+ public static final PlanConcurrencyCalculator INSTANCE = new PlanConcurrencyCalculator();
+
+ private PlanConcurrencyCalculator() {}
+
+ /**
+ * @return {@code null} if there should be no limit, otherwise, the maximum number of nodes that should be queried concurrently.
+ */
+ @Nullable
+ public Integer calculateNodesConcurrency(PhysicalPlan dataNodePlan, Configuration configuration) {
+ // If available, pragma overrides any calculation
+ if (configuration.pragmas().maxConcurrentNodesPerCluster() > 0) {
+ return configuration.pragmas().maxConcurrentNodesPerCluster();
+ }
+ if (dataNodePlan == null) {
+ return null;
+ }
+
+ Integer dataNodeLimit = getDataNodeLimit(dataNodePlan);
+
+ if (dataNodeLimit != null) {
+ return limitToConcurrency(dataNodeLimit);
+ }
+
+ return null;
+ }
+
+ private Integer limitToConcurrency(int limit) {
+ // For high limits, don't limit the concurrency
+ if (limit > 1000) {
+ return null;
+ }
+
+ // At least 2 nodes, otherwise log2(limit). E.g.
+ // Limit | Concurrency
+ // 1 | 2
+ // 10 | 3
+ // 1000 | 9
+ return Math.max(2, (int) (Math.log(limit) / Math.log(2)));
+ }
+
+ @Nullable
+ private Integer getDataNodeLimit(PhysicalPlan dataNodePlan) {
+ LogicalPlan logicalPlan = getFragmentPlan(dataNodePlan);
+
+ // State machine to find:
+ // A relation
+ Holder relationFound = new Holder<>(false);
+ // ...followed by no other node that could break the calculation
+ Holder forbiddenNodeFound = new Holder<>(false);
+ // ...and finally, a limit
+ Holder limitValue = new Holder<>(null);
+
+ logicalPlan.forEachUp(node -> {
+ // If a limit or a forbidden command was already found, ignore the rest
+ if (limitValue.get() == null && forbiddenNodeFound.get() == false) {
+ if (node instanceof EsRelation) {
+ relationFound.set(true);
+ } else if (relationFound.get()) {
+ if (node instanceof Limit limit && limit.limit() instanceof Literal literalLimit) {
+ limitValue.set((Integer) literalLimit.value());
+ } else {
+ forbiddenNodeFound.set(true);
+ }
+ }
+ }
+ });
+
+ return limitValue.get();
+ }
+
+ private LogicalPlan getFragmentPlan(PhysicalPlan plan) {
+ Holder foundPlan = new Holder<>();
+ plan.forEachDown(node -> {
+ if (node instanceof FragmentExec fragment) {
+ foundPlan.set(fragment.fragment());
+ }
+ });
+ return foundPlan.get();
+ }
+}
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java
index 65aa47cd3db40..78bca3e81ef9b 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java
@@ -42,6 +42,7 @@
import org.elasticsearch.xpack.esql.core.expression.FoldContext;
import org.elasticsearch.xpack.esql.plan.physical.ExchangeSinkExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
+import org.elasticsearch.xpack.esql.planner.PlanConcurrencyCalculator;
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
import org.elasticsearch.xpack.esql.session.Configuration;
@@ -98,13 +99,15 @@ void startComputeOnDataNodes(
Runnable runOnTaskFailure,
ActionListener outListener
) {
+ Integer maxConcurrentNodesPerCluster = PlanConcurrencyCalculator.INSTANCE.calculateNodesConcurrency(dataNodePlan, configuration);
+
new DataNodeRequestSender(
transportService,
esqlExecutor,
clusterAlias,
parentTask,
configuration.allowPartialResults(),
- configuration.pragmas().maxConcurrentNodesPerCluster()
+ maxConcurrentNodesPerCluster == null ? -1 : maxConcurrentNodesPerCluster
) {
@Override
protected void sendRequest(
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java
index 579e467853776..414f8bdb38527 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java
@@ -56,7 +56,7 @@ public final class QueryPragmas implements Writeable {
public static final Setting STATUS_INTERVAL = Setting.timeSetting("status_interval", Driver.DEFAULT_STATUS_INTERVAL);
public static final Setting MAX_CONCURRENT_NODES_PER_CLUSTER = //
- Setting.intSetting("max_concurrent_nodes_per_cluster", -1, -1, 100);
+ Setting.intSetting("max_concurrent_nodes_per_cluster", -1, -1);
public static final Setting MAX_CONCURRENT_SHARDS_PER_NODE = //
Setting.intSetting("max_concurrent_shards_per_node", 10, 1, 100);
diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/PlanConcurrencyCalculatorTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/PlanConcurrencyCalculatorTests.java
new file mode 100644
index 0000000000000..da36b42d1241b
--- /dev/null
+++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/PlanConcurrencyCalculatorTests.java
@@ -0,0 +1,258 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.planner;
+
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.xpack.esql.analysis.Analyzer;
+import org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils;
+import org.elasticsearch.xpack.esql.core.expression.FoldContext;
+import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext;
+import org.elasticsearch.xpack.esql.optimizer.LogicalPlanOptimizer;
+import org.elasticsearch.xpack.esql.optimizer.PhysicalOptimizerContext;
+import org.elasticsearch.xpack.esql.optimizer.PhysicalPlanOptimizer;
+import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
+import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
+import org.elasticsearch.xpack.esql.planner.mapper.Mapper;
+import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
+import org.elasticsearch.xpack.esql.session.Configuration;
+
+import java.util.List;
+
+import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_VERIFIER;
+import static org.elasticsearch.xpack.esql.EsqlTestUtils.configuration;
+import static org.elasticsearch.xpack.esql.EsqlTestUtils.withDefaultLimitWarning;
+import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.analyzer;
+import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.analyzerDefaultMapping;
+import static org.hamcrest.Matchers.equalTo;
+
+public class PlanConcurrencyCalculatorTests extends ESTestCase {
+ public void testSimpleLimit() {
+ assertConcurrency("""
+ FROM x
+ | LIMIT 512
+ """, 9);
+ }
+
+ public void testLimitZero() {
+ assertConcurrency("FROM x | LIMIT 0", null);
+ }
+
+ public void testBiggestPragmaOverride() {
+ assertConcurrency("""
+ FROM x
+ | LIMIT 512
+ """, Integer.MAX_VALUE, Integer.MAX_VALUE);
+ }
+
+ public void testSmallestPragmaOverride() {
+ assertConcurrency("""
+ FROM x
+ | LIMIT 512
+ """, 1, 1);
+ }
+
+ public void testPragmaOverrideWithUnsupportedCommands() {
+ assertConcurrency("""
+ FROM x
+ | WHERE salary * 2 > 5
+ | LIMIT 512
+ """, 1, 1);
+ }
+
+ public void testImplicitLimit() {
+ assertConcurrency("""
+ FROM x
+ """, 9);
+ }
+
+ public void testStats() {
+ assertConcurrency("""
+ FROM x
+ | STATS COUNT(salary)
+ """, null);
+ }
+
+ public void testStatsWithLimit() {
+ assertConcurrency("""
+ FROM x
+ | LIMIT 512
+ | STATS COUNT(salary)
+ """, 9);
+ }
+
+ public void testSortBeforeLimit() {
+ assertConcurrency("""
+ FROM x
+ | SORT salary
+ """, null);
+ }
+
+ public void testSortAfterLimit() {
+ assertConcurrency("""
+ FROM x
+ | LIMIT 512
+ | SORT salary
+ """, 9);
+ }
+
+ public void testStatsWithSortBeforeLimit() {
+ assertConcurrency("""
+ FROM x
+ | SORT salary
+ | LIMIT 512
+ | STATS COUNT(salary)
+ """, null);
+ }
+
+ public void testStatsWithSortAfterLimit() {
+ assertConcurrency("""
+ FROM x
+ | SORT salary
+ | LIMIT 512
+ | STATS COUNT(salary)
+ """, null);
+ }
+
+ public void testWhereBeforeLimit() {
+ assertConcurrency("""
+ FROM x
+ | WHERE salary * 2 > 5
+ | LIMIT 512
+ """, null);
+ }
+
+ public void testWhereAfterLimit() {
+ assertConcurrency("""
+ FROM x
+ | LIMIT 512
+ | WHERE salary * 2 > 5
+ """, 9);
+ }
+
+ public void testWherePushedToLuceneQueryBeforeLimit() {
+ assertConcurrency("""
+ FROM x
+ | WHERE first_name LIKE "A%"
+ | LIMIT 512
+ """, null);
+ }
+
+ public void testWherePushedToLuceneQueryAfterLimit() {
+ assertConcurrency("""
+ FROM x
+ | LIMIT 512
+ | WHERE first_name LIKE "A%"
+ """, 9);
+ }
+
+ public void testExpand() {
+ assertConcurrency("""
+ FROM x
+ | LIMIT 2048
+ | MV_EXPAND salary
+ | LIMIT 512
+ """, 9);
+ }
+
+ public void testEval() {
+ assertConcurrency("""
+ FROM x
+ | EVAL x=salary*2
+ | LIMIT 512
+ """, 9);
+ }
+
+ public void testRename() {
+ assertConcurrency("""
+ FROM x
+ | RENAME salary as x
+ | LIMIT 512
+ """, 9);
+ }
+
+ public void testKeep() {
+ assertConcurrency("""
+ FROM x
+ | KEEP salary
+ | LIMIT 512
+ """, 9);
+ }
+
+ public void testDrop() {
+ assertConcurrency("""
+ FROM x
+ | DROP salary
+ | LIMIT 512
+ """, 9);
+ }
+
+ public void testDissect() {
+ assertConcurrency("""
+ FROM x
+ | DISSECT first_name "%{a} %{b}"
+ | LIMIT 512
+ """, 9);
+ }
+
+ public void testGrok() {
+ assertConcurrency("""
+ FROM x
+ | GROK first_name "%{EMAILADDRESS:email}"
+ | LIMIT 512
+ """, 9);
+ }
+
+ public void testEnrich() {
+ assertConcurrency("""
+ FROM x
+ | ENRICH languages ON first_name
+ | LIMIT 512
+ """, 9);
+ }
+
+ public void testLookup() {
+ assertConcurrency("""
+ FROM x
+ | RENAME salary as language_code
+ | LOOKUP JOIN languages_lookup on language_code
+ | LIMIT 512
+ """, 9);
+ }
+
+ private void assertConcurrency(String query, Integer expectedConcurrency) {
+ assertConcurrency(query, null, expectedConcurrency);
+ }
+
+ private void assertConcurrency(String query, Integer concurrencyPragmaValue, Integer expectedConcurrency) {
+ Configuration configuration = concurrencyPragmaValue == null
+ ? configuration(query)
+ : configuration(
+ new QueryPragmas(Settings.builder().put("max_concurrent_nodes_per_cluster", concurrencyPragmaValue).build()),
+ query
+ );
+
+ Analyzer analyzer = analyzer(analyzerDefaultMapping(), TEST_VERIFIER, configuration);
+ LogicalPlan logicalPlan = AnalyzerTestUtils.analyze(query, analyzer);
+ logicalPlan = new LogicalPlanOptimizer(new LogicalOptimizerContext(configuration, FoldContext.small())).optimize(logicalPlan);
+
+ PhysicalPlan physicalPlan = new Mapper().map(logicalPlan);
+ physicalPlan = new PhysicalPlanOptimizer(new PhysicalOptimizerContext(configuration)).optimize(physicalPlan);
+
+ PhysicalPlan dataNodePlan = PlannerUtils.breakPlanBetweenCoordinatorAndDataNode(physicalPlan, configuration).v2();
+
+ Integer actualConcurrency = PlanConcurrencyCalculator.INSTANCE.calculateNodesConcurrency(dataNodePlan, configuration);
+
+ assertThat(actualConcurrency, equalTo(expectedConcurrency));
+ }
+
+ @Override
+ protected List filteredWarnings() {
+ return withDefaultLimitWarning(super.filteredWarnings());
+ }
+}