Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
eec2039
Limit concurrent node requests
idegtiarenko Feb 18, 2025
d4850fc
upd
idegtiarenko Feb 18, 2025
ae96763
upd
idegtiarenko Feb 18, 2025
15d897f
Merge branch 'main' into limit_concurrent_node_requests
idegtiarenko Mar 4, 2025
666f588
do not send requests if source is completed
idegtiarenko Mar 4, 2025
34badf7
Merge branch 'main' into limit_concurrent_node_requests
idegtiarenko Mar 5, 2025
8c55e8a
upd
idegtiarenko Mar 5, 2025
b2a66a2
do not erase prior shard failures on skipping node
idegtiarenko Mar 5, 2025
11bd0f0
Merge branch 'main' into limit_concurrent_node_requests
idegtiarenko Mar 5, 2025
53f7d60
upd
idegtiarenko Mar 5, 2025
3fe1766
Initial calculator with tests
ivancea Mar 11, 2025
d4fcb2e
Merge branch 'main' into esql-calculate-concurrent-requests-limit
ivancea Mar 14, 2025
a8faf76
Fix NPE
ivancea Mar 14, 2025
97cf29e
[CI] Auto commit changes from spotless
Mar 14, 2025
a5af34c
Fix test by optimizing plans
ivancea Mar 17, 2025
6e41a98
Merge branch 'main' into esql-calculate-concurrent-requests-limit
ivancea Mar 19, 2025
4fb514e
Added whitelisted nodes, removed multiple Limits assertion, and avoid…
ivancea Mar 19, 2025
5acabda
[CI] Auto commit changes from spotless
Mar 19, 2025
bc4be1d
Removed node whitelist, as limit is pushed down, and update concurren…
ivancea Mar 20, 2025
36c05ee
Remove limit from Pragma
ivancea Mar 20, 2025
7fa6f18
Merge branch 'main' into esql-calculate-concurrent-requests-limit
ivancea Mar 20, 2025
6bf6007
Add Nullable annotations, and cleanup
ivancea Mar 20, 2025
a37ca1f
Avoid limiting on high limits
ivancea Mar 20, 2025
6ea6df5
Update docs/changelog/124901.yaml
ivancea Mar 20, 2025
cdb007e
Merge branch 'main' into esql-calculate-concurrent-requests-limit
ivancea Mar 21, 2025
44e6399
Fix tests after max limit logic
ivancea Mar 21, 2025
6cc0f42
fix comments
idegtiarenko Mar 25, 2025
9f930b6
Merge branch 'main' into esql-calculate-concurrent-requests-limit
idegtiarenko Mar 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.planner;

import org.elasticsearch.xpack.esql.core.expression.FoldContext;
import org.elasticsearch.xpack.esql.core.util.Holder;
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
import org.elasticsearch.xpack.esql.plan.logical.Filter;
import org.elasticsearch.xpack.esql.plan.logical.Limit;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plan.physical.FragmentExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.session.Configuration;

public class PlanConcurrencyCalculator {
public static final PlanConcurrencyCalculator INSTANCE = new PlanConcurrencyCalculator();

private PlanConcurrencyCalculator() {}

/**
* Calculates the maximum number of nodes that should be executed concurrently for the given data node plan.
* <p>
* Used to avoid overloading the cluster with concurrent requests that may not be needed.
* </p>
*
* @return Null if there should be no limit, otherwise, the maximum number of nodes that should be executed concurrently.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be -1 for no limit, to work like the pragma

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd probably move this to the class javadoc.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or put a little bit there.

*/
public Integer calculateNodesConcurrency(PhysicalPlan dataNodePlan, Configuration configuration) {
// TODO: Request FoldContext or a context containing it
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is needed for the limit.limit().fold(...). But we can probably assert that it's a Literal, and avoid folding


// If available, pragma overrides any calculation
if (configuration.pragmas().maxConcurrentNodesPerCluster() > 0) {
return configuration.pragmas().maxConcurrentNodesPerCluster();
}

// TODO: Should this class take into account the node roles/tiers? What about the index modes like LOOKUP?
// TODO: Interactions with functions like SAMPLE()?

// TODO: ---
// # Positive cases
// - FROM | LIMIT | _anything_: Fragment[EsRelation, Limit]
// -

// # Negative cases
// - FROM | STATS: Fragment[EsRelation, Aggregate]
// - SORT: Fragment[EsRelation, TopN]
// - WHERE: Fragment[EsRelation, Filter]
Copy link
Contributor Author

@ivancea ivancea Mar 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When getting the LIMIT value:

  • The WHERE is already taken into account explicitly.
  • The STATS can't have a LIMIT in the datanode side, so it's fine.
  • The SORT shouldn't happen, as we look for a Limit after the EsRelation, and the Limit would be a TopN otherwise.

Those are mostly assumptions; there's still a lot of testing to do with different commands that could break them


Integer dataNodeLimit = getDataNodeLimit(dataNodePlan);

if (dataNodeLimit != null) {
return limitToConcurrency(dataNodeLimit);
}

return null;
}

private int limitToConcurrency(int limit) {
// TODO: Do some conversion here
return limit;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic we choose here may be quite arbitrary without some real statistics of the nodes/shard

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would probably limit to 2 for everything up to 10. Or may be something like Math.max(2, log(limit))

}

private Integer getDataNodeLimit(PhysicalPlan dataNodePlan) {
LogicalPlan logicalPlan = getFragmentPlan(dataNodePlan);

// State machine to find:
// A relation
Holder<Boolean> relationFound = new Holder<>(false);
// ...followed by NO filters
Holder<Boolean> filterFound = new Holder<>(false);
// ...and finally, a limit
Holder<Integer> limitValue = new Holder<>(null);

logicalPlan.forEachUp(node -> {
if (node instanceof EsRelation) {
relationFound.set(true);
} else if (node instanceof Filter) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this blacklisting is safe in the long term.
I'd prefer to have a whitelist approach, ie. a set of plan types that can be present after EsRelation and that we know are safe to ignore before a LIMIT.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initially changed it to a whitelist, but after adding test for every command, Limit is effectively pushed down always. So now it's just an "If not a relation or limit -> 💀"

filterFound.set(true);
} else if (relationFound.get() && filterFound.get() == false) {
// We only care about the limit if there's a relation before it, and no filter in between
if (node instanceof Limit limit) {
assert limitValue.get() == null : "Multiple limits found in the same data node plan";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could still happen, eg. with MV_EXPAND | LIMIT, that becomes LIMIT | MV_EXPAND | LIMIT

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed that assertion, to just use the first limit it finds, which is what makes sense in any case

limitValue.set((Integer) limit.limit().fold(FoldContext.small()));
}
}
});

return limitValue.get();
}

private LogicalPlan getFragmentPlan(PhysicalPlan plan) {
Holder<LogicalPlan> foundPlan = new Holder<>();
plan.forEachDown(node -> {
if (node instanceof FragmentExec fragment) {
foundPlan.set(fragment.fragment());
}
});
return foundPlan.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.elasticsearch.xpack.esql.core.expression.FoldContext;
import org.elasticsearch.xpack.esql.plan.physical.ExchangeSinkExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.planner.PlanConcurrencyCalculator;
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
import org.elasticsearch.xpack.esql.session.Configuration;

Expand Down Expand Up @@ -98,12 +99,17 @@ void startComputeOnDataNodes(
Runnable runOnTaskFailure,
ActionListener<ComputeResponse> outListener
) {
// TODO: CCS?
// TODO: Where to calculate/store the stats?

var maxConcurrentNodesPerCluster = PlanConcurrencyCalculator.INSTANCE.calculateNodesConcurrency(dataNodePlan, configuration);

new DataNodeRequestSender(
transportService,
esqlExecutor,
parentTask,
configuration.allowPartialResults(),
configuration.pragmas().maxConcurrentNodesPerCluster()
maxConcurrentNodesPerCluster == null ? -1 : maxConcurrentNodesPerCluster
) {
@Override
protected void sendRequest(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.planner;

import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.esql.analysis.Analyzer;
import org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.planner.mapper.Mapper;
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
import org.elasticsearch.xpack.esql.session.Configuration;

import java.util.List;

import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_VERIFIER;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.configuration;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.withDefaultLimitWarning;
import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.analyzer;
import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.loadMapping;
import static org.hamcrest.Matchers.equalTo;

public class PlanConcurrencyCalculatorTests extends ESTestCase {

public void testSimpleLimit() {
assertConcurrency("""
FROM x
| LIMIT 123
""",
123
);
}

public void testImplicitLimit() {
assertConcurrency("""
FROM x
""",
1000
);
}

public void testStats() {
assertConcurrency("""
FROM x
| STATS COUNT(salary)
""",
null
);
}

public void testStatsWithLimit() {
assertConcurrency("""
FROM x
| LIMIT 123
| STATS COUNT(salary)
""",
123
);
}

public void testStatsWithSortBeforeLimit() {
assertConcurrency("""
FROM x
| SORT salary
| LIMIT 123
| STATS COUNT(salary)
""",
null
);
}

public void testStatsWithSortAfterLimit() {
assertConcurrency("""
FROM x
| SORT salary
| LIMIT 123
| STATS COUNT(salary)
""",
null
);
}

public void testSort() {
assertConcurrency("""
FROM x
| SORT salary
""",
null
);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was looking for that when doing it, but we only have class-level parameterized tests...
I was also checking assertAll(), but, of course, junit5 too 💀

Luckily there aren't that many cases now, we can refactor them if we add more and they're similar


private void assertConcurrency(
String query,
Integer expectedConcurrency
) {
assertConcurrency(query, null, expectedConcurrency);
}

private void assertConcurrency(
String query,
Integer concurrencyPragmaValue,
Integer expectedConcurrency
) {
Configuration configuration = concurrencyPragmaValue == null
? configuration(query)
: configuration(
new QueryPragmas(
Settings.builder()
.put("max_concurrent_nodes_per_cluster", concurrencyPragmaValue)
.build()
),
query
);
Analyzer analyzer = analyzer(
loadMapping("mapping-basic.json", "test"), TEST_VERIFIER, configuration
);
LogicalPlan logicalPlan = AnalyzerTestUtils.analyze(query, analyzer);
PhysicalPlan physicalPlan = new Mapper().map(logicalPlan);

PhysicalPlan dataNodePlan = PlannerUtils.breakPlanBetweenCoordinatorAndDataNode(
physicalPlan,
configuration
).v2();

Integer actualConcurrency = PlanConcurrencyCalculator.INSTANCE.calculateNodesConcurrency(
dataNodePlan,
configuration
);

assertThat(actualConcurrency, equalTo(expectedConcurrency));
}

@Override
protected List<String> filteredWarnings() {
return withDefaultLimitWarning(super.filteredWarnings());
}
}