Skip to content

Commit 3fe1766

Browse files
committed
Initial calculator with tests
1 parent 53f7d60 commit 3fe1766

File tree

3 files changed

+255
-1
lines changed

3 files changed

+255
-1
lines changed
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.planner;
9+
10+
import org.elasticsearch.xpack.esql.core.expression.FoldContext;
11+
import org.elasticsearch.xpack.esql.core.util.Holder;
12+
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
13+
import org.elasticsearch.xpack.esql.plan.logical.Filter;
14+
import org.elasticsearch.xpack.esql.plan.logical.Limit;
15+
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
16+
import org.elasticsearch.xpack.esql.plan.physical.FragmentExec;
17+
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
18+
import org.elasticsearch.xpack.esql.session.Configuration;
19+
20+
public class PlanConcurrencyCalculator {
21+
public static final PlanConcurrencyCalculator INSTANCE = new PlanConcurrencyCalculator();
22+
23+
private PlanConcurrencyCalculator() {}
24+
25+
/**
26+
* Calculates the maximum number of nodes that should be executed concurrently for the given data node plan.
27+
* <p>
28+
* Used to avoid overloading the cluster with concurrent requests that may not be needed.
29+
* </p>
30+
*
31+
* @return Null if there should be no limit, otherwise, the maximum number of nodes that should be executed concurrently.
32+
*/
33+
public Integer calculateNodesConcurrency(PhysicalPlan dataNodePlan, Configuration configuration) {
34+
// TODO: Request FoldContext or a context containing it
35+
36+
// If available, pragma overrides any calculation
37+
if (configuration.pragmas().maxConcurrentNodePerCluster() > 0) {
38+
return configuration.pragmas().maxConcurrentNodePerCluster();
39+
}
40+
41+
// TODO: Should this class take into account the node roles/tiers? What about the index modes like LOOKUP?
42+
// TODO: Interactions with functions like SAMPLE()?
43+
44+
// TODO: ---
45+
// # Positive cases
46+
// - FROM | LIMIT | _anything_: Fragment[EsRelation, Limit]
47+
// -
48+
49+
// # Negative cases
50+
// - FROM | STATS: Fragment[EsRelation, Aggregate]
51+
// - SORT: Fragment[EsRelation, TopN]
52+
// - WHERE: Fragment[EsRelation, Filter]
53+
54+
Integer dataNodeLimit = getDataNodeLimit(dataNodePlan);
55+
56+
if (dataNodeLimit != null) {
57+
return limitToConcurrency(dataNodeLimit);
58+
}
59+
60+
return null;
61+
}
62+
63+
private int limitToConcurrency(int limit) {
64+
// TODO: Do some conversion here
65+
return limit;
66+
}
67+
68+
private Integer getDataNodeLimit(PhysicalPlan dataNodePlan) {
69+
LogicalPlan logicalPlan = getFragmentPlan(dataNodePlan);
70+
71+
// State machine to find:
72+
// A relation
73+
Holder<Boolean> relationFound = new Holder<>(false);
74+
// ...followed by NO filters
75+
Holder<Boolean> filterFound = new Holder<>(false);
76+
// ...and finally, a limit
77+
Holder<Integer> limitValue = new Holder<>(null);
78+
79+
logicalPlan.forEachUp(node -> {
80+
if (node instanceof EsRelation) {
81+
relationFound.set(true);
82+
} else if (node instanceof Filter) {
83+
filterFound.set(true);
84+
} else if (relationFound.get() && filterFound.get() == false) {
85+
// We only care about the limit if there's a relation before it, and no filter in between
86+
if (node instanceof Limit limit) {
87+
assert limitValue.get() == null : "Multiple limits found in the same data node plan";
88+
limitValue.set((Integer) limit.limit().fold(FoldContext.small()));
89+
}
90+
}
91+
});
92+
93+
return limitValue.get();
94+
}
95+
96+
private LogicalPlan getFragmentPlan(PhysicalPlan plan) {
97+
Holder<LogicalPlan> foundPlan = new Holder<>();
98+
plan.forEachDown(node -> {
99+
if (node instanceof FragmentExec fragment) {
100+
foundPlan.set(fragment.fragment());
101+
}
102+
});
103+
return foundPlan.get();
104+
}
105+
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.elasticsearch.xpack.esql.core.expression.FoldContext;
4343
import org.elasticsearch.xpack.esql.plan.physical.ExchangeSinkExec;
4444
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
45+
import org.elasticsearch.xpack.esql.planner.PlanConcurrencyCalculator;
4546
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
4647
import org.elasticsearch.xpack.esql.session.Configuration;
4748

@@ -98,12 +99,17 @@ void startComputeOnDataNodes(
9899
Runnable runOnTaskFailure,
99100
ActionListener<ComputeResponse> outListener
100101
) {
102+
// TODO: CCS?
103+
// TODO: Where to calculate/store the stats?
104+
105+
var maxConcurrentNodesPerCluster = PlanConcurrencyCalculator.INSTANCE.calculateNodesConcurrency(dataNodePlan, configuration);
106+
101107
new DataNodeRequestSender(
102108
transportService,
103109
esqlExecutor,
104110
parentTask,
105111
configuration.allowPartialResults(),
106-
configuration.pragmas().maxConcurrentNodePerCluster()
112+
maxConcurrentNodesPerCluster
107113
) {
108114
@Override
109115
protected void sendRequest(
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.planner;
9+
10+
import org.elasticsearch.common.settings.Settings;
11+
import org.elasticsearch.test.ESTestCase;
12+
import org.elasticsearch.xpack.esql.analysis.Analyzer;
13+
import org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils;
14+
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
15+
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
16+
import org.elasticsearch.xpack.esql.planner.mapper.Mapper;
17+
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
18+
import org.elasticsearch.xpack.esql.session.Configuration;
19+
20+
import java.util.List;
21+
22+
import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_VERIFIER;
23+
import static org.elasticsearch.xpack.esql.EsqlTestUtils.configuration;
24+
import static org.elasticsearch.xpack.esql.EsqlTestUtils.withDefaultLimitWarning;
25+
import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.analyzer;
26+
import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.loadMapping;
27+
import static org.hamcrest.Matchers.equalTo;
28+
29+
public class PlanConcurrencyCalculatorTests extends ESTestCase {
30+
31+
public void testSimpleLimit() {
32+
assertConcurrency("""
33+
FROM x
34+
| LIMIT 123
35+
""",
36+
123
37+
);
38+
}
39+
40+
public void testImplicitLimit() {
41+
assertConcurrency("""
42+
FROM x
43+
""",
44+
1000
45+
);
46+
}
47+
48+
public void testStats() {
49+
assertConcurrency("""
50+
FROM x
51+
| STATS COUNT(salary)
52+
""",
53+
null
54+
);
55+
}
56+
57+
public void testStatsWithLimit() {
58+
assertConcurrency("""
59+
FROM x
60+
| LIMIT 123
61+
| STATS COUNT(salary)
62+
""",
63+
123
64+
);
65+
}
66+
67+
public void testStatsWithSortBeforeLimit() {
68+
assertConcurrency("""
69+
FROM x
70+
| SORT salary
71+
| LIMIT 123
72+
| STATS COUNT(salary)
73+
""",
74+
null
75+
);
76+
}
77+
78+
public void testStatsWithSortAfterLimit() {
79+
assertConcurrency("""
80+
FROM x
81+
| SORT salary
82+
| LIMIT 123
83+
| STATS COUNT(salary)
84+
""",
85+
null
86+
);
87+
}
88+
89+
public void testSort() {
90+
assertConcurrency("""
91+
FROM x
92+
| SORT salary
93+
""",
94+
null
95+
);
96+
}
97+
98+
private void assertConcurrency(
99+
String query,
100+
Integer expectedConcurrency
101+
) {
102+
assertConcurrency(query, null, expectedConcurrency);
103+
}
104+
105+
private void assertConcurrency(
106+
String query,
107+
Integer concurrencyPragmaValue,
108+
Integer expectedConcurrency
109+
) {
110+
Configuration configuration = concurrencyPragmaValue == null
111+
? configuration(query)
112+
: configuration(
113+
new QueryPragmas(
114+
Settings.builder()
115+
.put("max_concurrent_nodes_per_cluster", concurrencyPragmaValue)
116+
.build()
117+
),
118+
query
119+
);
120+
Analyzer analyzer = analyzer(
121+
loadMapping("mapping-basic.json", "test"), TEST_VERIFIER, configuration
122+
);
123+
LogicalPlan logicalPlan = AnalyzerTestUtils.analyze(query, analyzer);
124+
PhysicalPlan physicalPlan = new Mapper().map(logicalPlan);
125+
126+
PhysicalPlan dataNodePlan = PlannerUtils.breakPlanBetweenCoordinatorAndDataNode(
127+
physicalPlan,
128+
configuration
129+
).v2();
130+
131+
Integer actualConcurrency = PlanConcurrencyCalculator.INSTANCE.calculateNodesConcurrency(
132+
dataNodePlan,
133+
configuration
134+
);
135+
136+
assertThat(actualConcurrency, equalTo(expectedConcurrency));
137+
}
138+
139+
@Override
140+
protected List<String> filteredWarnings() {
141+
return withDefaultLimitWarning(super.filteredWarnings());
142+
}
143+
}

0 commit comments

Comments
 (0)