diff --git a/docs/changelog/124901.yaml b/docs/changelog/124901.yaml new file mode 100644 index 0000000000000..d67bfb4defdba --- /dev/null +++ b/docs/changelog/124901.yaml @@ -0,0 +1,5 @@ +pr: 124901 +summary: Calculate concurrent node limit +area: ES|QL +type: feature +issues: [] diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlanConcurrencyCalculator.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlanConcurrencyCalculator.java new file mode 100644 index 0000000000000..5e40761a4a9d1 --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlanConcurrencyCalculator.java @@ -0,0 +1,106 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.planner; + +import org.elasticsearch.core.Nullable; +import org.elasticsearch.xpack.esql.core.expression.Literal; +import org.elasticsearch.xpack.esql.core.util.Holder; +import org.elasticsearch.xpack.esql.plan.logical.EsRelation; +import org.elasticsearch.xpack.esql.plan.logical.Limit; +import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; +import org.elasticsearch.xpack.esql.plan.physical.FragmentExec; +import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; +import org.elasticsearch.xpack.esql.session.Configuration; + +/** + * Calculates the maximum number of nodes that should be queried concurrently for the given data node plan. + *

+ * Used to avoid overloading the cluster with concurrent requests that may not be needed. + *

+ */ +public class PlanConcurrencyCalculator { + public static final PlanConcurrencyCalculator INSTANCE = new PlanConcurrencyCalculator(); + + private PlanConcurrencyCalculator() {} + + /** + * @return {@code null} if there should be no limit, otherwise, the maximum number of nodes that should be queried concurrently. + */ + @Nullable + public Integer calculateNodesConcurrency(PhysicalPlan dataNodePlan, Configuration configuration) { + // If available, pragma overrides any calculation + if (configuration.pragmas().maxConcurrentNodesPerCluster() > 0) { + return configuration.pragmas().maxConcurrentNodesPerCluster(); + } + if (dataNodePlan == null) { + return null; + } + + Integer dataNodeLimit = getDataNodeLimit(dataNodePlan); + + if (dataNodeLimit != null) { + return limitToConcurrency(dataNodeLimit); + } + + return null; + } + + private Integer limitToConcurrency(int limit) { + // For high limits, don't limit the concurrency + if (limit > 1000) { + return null; + } + + // At least 2 nodes, otherwise log2(limit). E.g. + // Limit | Concurrency + // 1 | 2 + // 10 | 3 + // 1000 | 9 + return Math.max(2, (int) (Math.log(limit) / Math.log(2))); + } + + @Nullable + private Integer getDataNodeLimit(PhysicalPlan dataNodePlan) { + LogicalPlan logicalPlan = getFragmentPlan(dataNodePlan); + + // State machine to find: + // A relation + Holder relationFound = new Holder<>(false); + // ...followed by no other node that could break the calculation + Holder forbiddenNodeFound = new Holder<>(false); + // ...and finally, a limit + Holder limitValue = new Holder<>(null); + + logicalPlan.forEachUp(node -> { + // If a limit or a forbidden command was already found, ignore the rest + if (limitValue.get() == null && forbiddenNodeFound.get() == false) { + if (node instanceof EsRelation) { + relationFound.set(true); + } else if (relationFound.get()) { + if (node instanceof Limit limit && limit.limit() instanceof Literal literalLimit) { + limitValue.set((Integer) literalLimit.value()); + } else { + forbiddenNodeFound.set(true); + } + } + } + }); + + return limitValue.get(); + } + + private LogicalPlan getFragmentPlan(PhysicalPlan plan) { + Holder foundPlan = new Holder<>(); + plan.forEachDown(node -> { + if (node instanceof FragmentExec fragment) { + foundPlan.set(fragment.fragment()); + } + }); + return foundPlan.get(); + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java index c4329d0b2002b..bea9c7b7a5db9 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java @@ -42,6 +42,7 @@ import org.elasticsearch.xpack.esql.core.expression.FoldContext; import org.elasticsearch.xpack.esql.plan.physical.ExchangeSinkExec; import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; +import org.elasticsearch.xpack.esql.planner.PlanConcurrencyCalculator; import org.elasticsearch.xpack.esql.planner.PlannerUtils; import org.elasticsearch.xpack.esql.session.Configuration; @@ -98,13 +99,15 @@ void startComputeOnDataNodes( Runnable runOnTaskFailure, ActionListener outListener ) { + Integer maxConcurrentNodesPerCluster = PlanConcurrencyCalculator.INSTANCE.calculateNodesConcurrency(dataNodePlan, configuration); + new DataNodeRequestSender( transportService, esqlExecutor, clusterAlias, parentTask, configuration.allowPartialResults(), - configuration.pragmas().maxConcurrentNodesPerCluster() + maxConcurrentNodesPerCluster == null ? -1 : maxConcurrentNodesPerCluster ) { @Override protected void sendRequest( diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java index 3db10264d8aff..bc1ba595c5481 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java @@ -57,7 +57,7 @@ public final class QueryPragmas implements Writeable { public static final Setting STATUS_INTERVAL = Setting.timeSetting("status_interval", Driver.DEFAULT_STATUS_INTERVAL); public static final Setting MAX_CONCURRENT_NODES_PER_CLUSTER = // - Setting.intSetting("max_concurrent_nodes_per_cluster", -1, -1, 100); + Setting.intSetting("max_concurrent_nodes_per_cluster", -1, -1); public static final Setting MAX_CONCURRENT_SHARDS_PER_NODE = // Setting.intSetting("max_concurrent_shards_per_node", 10, 1, 100); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/PlanConcurrencyCalculatorTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/PlanConcurrencyCalculatorTests.java new file mode 100644 index 0000000000000..da36b42d1241b --- /dev/null +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/PlanConcurrencyCalculatorTests.java @@ -0,0 +1,258 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.planner; + +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.esql.analysis.Analyzer; +import org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils; +import org.elasticsearch.xpack.esql.core.expression.FoldContext; +import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext; +import org.elasticsearch.xpack.esql.optimizer.LogicalPlanOptimizer; +import org.elasticsearch.xpack.esql.optimizer.PhysicalOptimizerContext; +import org.elasticsearch.xpack.esql.optimizer.PhysicalPlanOptimizer; +import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; +import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; +import org.elasticsearch.xpack.esql.planner.mapper.Mapper; +import org.elasticsearch.xpack.esql.plugin.QueryPragmas; +import org.elasticsearch.xpack.esql.session.Configuration; + +import java.util.List; + +import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_VERIFIER; +import static org.elasticsearch.xpack.esql.EsqlTestUtils.configuration; +import static org.elasticsearch.xpack.esql.EsqlTestUtils.withDefaultLimitWarning; +import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.analyzer; +import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.analyzerDefaultMapping; +import static org.hamcrest.Matchers.equalTo; + +public class PlanConcurrencyCalculatorTests extends ESTestCase { + public void testSimpleLimit() { + assertConcurrency(""" + FROM x + | LIMIT 512 + """, 9); + } + + public void testLimitZero() { + assertConcurrency("FROM x | LIMIT 0", null); + } + + public void testBiggestPragmaOverride() { + assertConcurrency(""" + FROM x + | LIMIT 512 + """, Integer.MAX_VALUE, Integer.MAX_VALUE); + } + + public void testSmallestPragmaOverride() { + assertConcurrency(""" + FROM x + | LIMIT 512 + """, 1, 1); + } + + public void testPragmaOverrideWithUnsupportedCommands() { + assertConcurrency(""" + FROM x + | WHERE salary * 2 > 5 + | LIMIT 512 + """, 1, 1); + } + + public void testImplicitLimit() { + assertConcurrency(""" + FROM x + """, 9); + } + + public void testStats() { + assertConcurrency(""" + FROM x + | STATS COUNT(salary) + """, null); + } + + public void testStatsWithLimit() { + assertConcurrency(""" + FROM x + | LIMIT 512 + | STATS COUNT(salary) + """, 9); + } + + public void testSortBeforeLimit() { + assertConcurrency(""" + FROM x + | SORT salary + """, null); + } + + public void testSortAfterLimit() { + assertConcurrency(""" + FROM x + | LIMIT 512 + | SORT salary + """, 9); + } + + public void testStatsWithSortBeforeLimit() { + assertConcurrency(""" + FROM x + | SORT salary + | LIMIT 512 + | STATS COUNT(salary) + """, null); + } + + public void testStatsWithSortAfterLimit() { + assertConcurrency(""" + FROM x + | SORT salary + | LIMIT 512 + | STATS COUNT(salary) + """, null); + } + + public void testWhereBeforeLimit() { + assertConcurrency(""" + FROM x + | WHERE salary * 2 > 5 + | LIMIT 512 + """, null); + } + + public void testWhereAfterLimit() { + assertConcurrency(""" + FROM x + | LIMIT 512 + | WHERE salary * 2 > 5 + """, 9); + } + + public void testWherePushedToLuceneQueryBeforeLimit() { + assertConcurrency(""" + FROM x + | WHERE first_name LIKE "A%" + | LIMIT 512 + """, null); + } + + public void testWherePushedToLuceneQueryAfterLimit() { + assertConcurrency(""" + FROM x + | LIMIT 512 + | WHERE first_name LIKE "A%" + """, 9); + } + + public void testExpand() { + assertConcurrency(""" + FROM x + | LIMIT 2048 + | MV_EXPAND salary + | LIMIT 512 + """, 9); + } + + public void testEval() { + assertConcurrency(""" + FROM x + | EVAL x=salary*2 + | LIMIT 512 + """, 9); + } + + public void testRename() { + assertConcurrency(""" + FROM x + | RENAME salary as x + | LIMIT 512 + """, 9); + } + + public void testKeep() { + assertConcurrency(""" + FROM x + | KEEP salary + | LIMIT 512 + """, 9); + } + + public void testDrop() { + assertConcurrency(""" + FROM x + | DROP salary + | LIMIT 512 + """, 9); + } + + public void testDissect() { + assertConcurrency(""" + FROM x + | DISSECT first_name "%{a} %{b}" + | LIMIT 512 + """, 9); + } + + public void testGrok() { + assertConcurrency(""" + FROM x + | GROK first_name "%{EMAILADDRESS:email}" + | LIMIT 512 + """, 9); + } + + public void testEnrich() { + assertConcurrency(""" + FROM x + | ENRICH languages ON first_name + | LIMIT 512 + """, 9); + } + + public void testLookup() { + assertConcurrency(""" + FROM x + | RENAME salary as language_code + | LOOKUP JOIN languages_lookup on language_code + | LIMIT 512 + """, 9); + } + + private void assertConcurrency(String query, Integer expectedConcurrency) { + assertConcurrency(query, null, expectedConcurrency); + } + + private void assertConcurrency(String query, Integer concurrencyPragmaValue, Integer expectedConcurrency) { + Configuration configuration = concurrencyPragmaValue == null + ? configuration(query) + : configuration( + new QueryPragmas(Settings.builder().put("max_concurrent_nodes_per_cluster", concurrencyPragmaValue).build()), + query + ); + + Analyzer analyzer = analyzer(analyzerDefaultMapping(), TEST_VERIFIER, configuration); + LogicalPlan logicalPlan = AnalyzerTestUtils.analyze(query, analyzer); + logicalPlan = new LogicalPlanOptimizer(new LogicalOptimizerContext(configuration, FoldContext.small())).optimize(logicalPlan); + + PhysicalPlan physicalPlan = new Mapper().map(logicalPlan); + physicalPlan = new PhysicalPlanOptimizer(new PhysicalOptimizerContext(configuration)).optimize(physicalPlan); + + PhysicalPlan dataNodePlan = PlannerUtils.breakPlanBetweenCoordinatorAndDataNode(physicalPlan, configuration).v2(); + + Integer actualConcurrency = PlanConcurrencyCalculator.INSTANCE.calculateNodesConcurrency(dataNodePlan, configuration); + + assertThat(actualConcurrency, equalTo(expectedConcurrency)); + } + + @Override + protected List filteredWarnings() { + return withDefaultLimitWarning(super.filteredWarnings()); + } +}