- 
                Notifications
    
You must be signed in to change notification settings  - Fork 25.6k
 
ESQL: Calculate concurrent node limit #124901
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 15 commits
eec2039
              d4850fc
              ae96763
              15d897f
              666f588
              34badf7
              8c55e8a
              b2a66a2
              11bd0f0
              53f7d60
              3fe1766
              d4fcb2e
              a8faf76
              97cf29e
              a5af34c
              6e41a98
              4fb514e
              5acabda
              bc4be1d
              36c05ee
              7fa6f18
              6bf6007
              a37ca1f
              6ea6df5
              cdb007e
              44e6399
              6cc0f42
              9f930b6
              File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,105 @@ | ||
| /* | ||
| * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
| * or more contributor license agreements. Licensed under the Elastic License | ||
| * 2.0; you may not use this file except in compliance with the Elastic License | ||
| * 2.0. | ||
| */ | ||
| 
     | 
||
| package org.elasticsearch.xpack.esql.planner; | ||
| 
     | 
||
| import org.elasticsearch.xpack.esql.core.expression.FoldContext; | ||
| import org.elasticsearch.xpack.esql.core.util.Holder; | ||
| import org.elasticsearch.xpack.esql.plan.logical.EsRelation; | ||
| import org.elasticsearch.xpack.esql.plan.logical.Filter; | ||
| import org.elasticsearch.xpack.esql.plan.logical.Limit; | ||
| import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; | ||
| import org.elasticsearch.xpack.esql.plan.physical.FragmentExec; | ||
| import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; | ||
| import org.elasticsearch.xpack.esql.session.Configuration; | ||
| 
     | 
||
| public class PlanConcurrencyCalculator { | ||
| public static final PlanConcurrencyCalculator INSTANCE = new PlanConcurrencyCalculator(); | ||
| 
     | 
||
| private PlanConcurrencyCalculator() {} | ||
| 
     | 
||
| /** | ||
| * Calculates the maximum number of nodes that should be executed concurrently for the given data node plan. | ||
| * <p> | ||
| * Used to avoid overloading the cluster with concurrent requests that may not be needed. | ||
| * </p> | ||
| * | ||
| * @return Null if there should be no limit, otherwise, the maximum number of nodes that should be executed concurrently. | ||
| */ | ||
| public Integer calculateNodesConcurrency(PhysicalPlan dataNodePlan, Configuration configuration) { | ||
| // TODO: Request FoldContext or a context containing it | ||
                
       | 
||
| 
     | 
||
| // If available, pragma overrides any calculation | ||
| if (configuration.pragmas().maxConcurrentNodesPerCluster() > 0) { | ||
| return configuration.pragmas().maxConcurrentNodesPerCluster(); | ||
| } | ||
| 
     | 
||
| // TODO: Should this class take into account the node roles/tiers? What about the index modes like LOOKUP? | ||
| // TODO: Interactions with functions like SAMPLE()? | ||
| 
     | 
||
| // TODO: --- | ||
| // # Positive cases | ||
| // - FROM | LIMIT | _anything_: Fragment[EsRelation, Limit] | ||
| // - | ||
| 
     | 
||
| // # Negative cases | ||
| // - FROM | STATS: Fragment[EsRelation, Aggregate] | ||
| // - SORT: Fragment[EsRelation, TopN] | ||
| // - WHERE: Fragment[EsRelation, Filter] | ||
                
       | 
||
| 
     | 
||
| Integer dataNodeLimit = getDataNodeLimit(dataNodePlan); | ||
| 
     | 
||
| if (dataNodeLimit != null) { | ||
| return limitToConcurrency(dataNodeLimit); | ||
| } | ||
| 
     | 
||
| return null; | ||
| } | ||
| 
     | 
||
| private int limitToConcurrency(int limit) { | ||
| // TODO: Do some conversion here | ||
| return limit; | ||
                
       | 
||
| } | ||
| 
     | 
||
| private Integer getDataNodeLimit(PhysicalPlan dataNodePlan) { | ||
| LogicalPlan logicalPlan = getFragmentPlan(dataNodePlan); | ||
| 
     | 
||
| // State machine to find: | ||
| // A relation | ||
| Holder<Boolean> relationFound = new Holder<>(false); | ||
| // ...followed by NO filters | ||
| Holder<Boolean> filterFound = new Holder<>(false); | ||
| // ...and finally, a limit | ||
| Holder<Integer> limitValue = new Holder<>(null); | ||
| 
     | 
||
| logicalPlan.forEachUp(node -> { | ||
| if (node instanceof EsRelation) { | ||
| relationFound.set(true); | ||
| } else if (node instanceof Filter) { | ||
                
       | 
||
| filterFound.set(true); | ||
| } else if (relationFound.get() && filterFound.get() == false) { | ||
| // We only care about the limit if there's a relation before it, and no filter in between | ||
| if (node instanceof Limit limit) { | ||
| assert limitValue.get() == null : "Multiple limits found in the same data node plan"; | ||
                
       | 
||
| limitValue.set((Integer) limit.limit().fold(FoldContext.small())); | ||
| } | ||
| } | ||
| }); | ||
| 
     | 
||
| return limitValue.get(); | ||
| } | ||
| 
     | 
||
| private LogicalPlan getFragmentPlan(PhysicalPlan plan) { | ||
| Holder<LogicalPlan> foundPlan = new Holder<>(); | ||
| plan.forEachDown(node -> { | ||
| if (node instanceof FragmentExec fragment) { | ||
| foundPlan.set(fragment.fragment()); | ||
| } | ||
| }); | ||
| return foundPlan.get(); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,119 @@ | ||
| /* | ||
| * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
| * or more contributor license agreements. Licensed under the Elastic License | ||
| * 2.0; you may not use this file except in compliance with the Elastic License | ||
| * 2.0. | ||
| */ | ||
| 
     | 
||
| package org.elasticsearch.xpack.esql.planner; | ||
| 
     | 
||
| import org.elasticsearch.common.settings.Settings; | ||
| import org.elasticsearch.test.ESTestCase; | ||
| import org.elasticsearch.xpack.esql.analysis.Analyzer; | ||
| import org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils; | ||
| import org.elasticsearch.xpack.esql.core.expression.FoldContext; | ||
| import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext; | ||
| import org.elasticsearch.xpack.esql.optimizer.LogicalPlanOptimizer; | ||
| import org.elasticsearch.xpack.esql.optimizer.PhysicalOptimizerContext; | ||
| import org.elasticsearch.xpack.esql.optimizer.PhysicalPlanOptimizer; | ||
| import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; | ||
| import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; | ||
| import org.elasticsearch.xpack.esql.planner.mapper.Mapper; | ||
| import org.elasticsearch.xpack.esql.plugin.QueryPragmas; | ||
| import org.elasticsearch.xpack.esql.session.Configuration; | ||
| 
     | 
||
| import java.util.List; | ||
| 
     | 
||
| import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_VERIFIER; | ||
| import static org.elasticsearch.xpack.esql.EsqlTestUtils.configuration; | ||
| import static org.elasticsearch.xpack.esql.EsqlTestUtils.withDefaultLimitWarning; | ||
| import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.analyzer; | ||
| import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.loadMapping; | ||
| import static org.hamcrest.Matchers.equalTo; | ||
| 
     | 
||
| public class PlanConcurrencyCalculatorTests extends ESTestCase { | ||
| 
     | 
||
| public void testSimpleLimit() { | ||
| assertConcurrency(""" | ||
| FROM x | ||
| | LIMIT 123 | ||
| """, 123); | ||
| } | ||
| 
     | 
||
| public void testImplicitLimit() { | ||
| assertConcurrency(""" | ||
| FROM x | ||
| """, 1000); | ||
| } | ||
| 
     | 
||
| public void testStats() { | ||
| assertConcurrency(""" | ||
| FROM x | ||
| | STATS COUNT(salary) | ||
| """, null); | ||
| } | ||
| 
     | 
||
| public void testStatsWithLimit() { | ||
| assertConcurrency(""" | ||
| FROM x | ||
| | LIMIT 123 | ||
| | STATS COUNT(salary) | ||
| """, 123); | ||
| } | ||
| 
     | 
||
| public void testStatsWithSortBeforeLimit() { | ||
| assertConcurrency(""" | ||
| FROM x | ||
| | SORT salary | ||
| | LIMIT 123 | ||
| | STATS COUNT(salary) | ||
| """, null); | ||
| } | ||
| 
     | 
||
| public void testStatsWithSortAfterLimit() { | ||
| assertConcurrency(""" | ||
| FROM x | ||
| | SORT salary | ||
| | LIMIT 123 | ||
| | STATS COUNT(salary) | ||
| """, null); | ||
| } | ||
| 
     | 
||
| public void testSort() { | ||
| assertConcurrency(""" | ||
| FROM x | ||
| | SORT salary | ||
| """, null); | ||
| } | ||
| 
     | 
||
| private void assertConcurrency(String query, Integer expectedConcurrency) { | ||
| assertConcurrency(query, null, expectedConcurrency); | ||
| } | ||
| 
     | 
||
| private void assertConcurrency(String query, Integer concurrencyPragmaValue, Integer expectedConcurrency) { | ||
| Configuration configuration = concurrencyPragmaValue == null | ||
| ? configuration(query) | ||
| : configuration( | ||
| new QueryPragmas(Settings.builder().put("max_concurrent_nodes_per_cluster", concurrencyPragmaValue).build()), | ||
| query | ||
| ); | ||
| 
     | 
||
| Analyzer analyzer = analyzer(loadMapping("mapping-basic.json", "test"), TEST_VERIFIER, configuration); | ||
| LogicalPlan logicalPlan = AnalyzerTestUtils.analyze(query, analyzer); | ||
| logicalPlan = new LogicalPlanOptimizer(new LogicalOptimizerContext(configuration, FoldContext.small())).optimize(logicalPlan); | ||
| 
     | 
||
| PhysicalPlan physicalPlan = new Mapper().map(logicalPlan); | ||
| physicalPlan = new PhysicalPlanOptimizer(new PhysicalOptimizerContext(configuration)).optimize(physicalPlan); | ||
| 
     | 
||
| PhysicalPlan dataNodePlan = PlannerUtils.breakPlanBetweenCoordinatorAndDataNode(physicalPlan, configuration).v2(); | ||
| 
     | 
||
| Integer actualConcurrency = PlanConcurrencyCalculator.INSTANCE.calculateNodesConcurrency(dataNodePlan, configuration); | ||
| 
     | 
||
| assertThat(actualConcurrency, equalTo(expectedConcurrency)); | ||
| } | ||
| 
     | 
||
| @Override | ||
| protected List<String> filteredWarnings() { | ||
| return withDefaultLimitWarning(super.filteredWarnings()); | ||
| } | ||
| } | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be -1 for no limit, to work like the pragma
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd probably move this to the class javadoc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or put a little bit there.