Skip to content

Commit ffb0055

Browse files
committed
Rewrite limit in data node execution
1 parent 2e338d8 commit ffb0055

File tree

8 files changed

+172
-13
lines changed

8 files changed

+172
-13
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeBuffer.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import java.util.Queue;
1717
import java.util.concurrent.ConcurrentLinkedQueue;
1818
import java.util.concurrent.atomic.AtomicInteger;
19+
import java.util.concurrent.atomic.AtomicLong;
1920

2021
final class ExchangeBuffer {
2122

@@ -31,6 +32,7 @@ final class ExchangeBuffer {
3132
private SubscribableListener<Void> notFullFuture = null;
3233

3334
private final SubscribableListener<Void> completionFuture = new SubscribableListener<>();
35+
private final AtomicLong addedRows = new AtomicLong();
3436

3537
private volatile boolean noMoreInputs = false;
3638

@@ -58,6 +60,8 @@ void addPage(Page page) {
5860
completionFuture.onResponse(null);
5961
}
6062
}
63+
} else {
64+
addedRows.addAndGet(page.getPositionCount());
6165
}
6266
}
6367

@@ -155,6 +159,13 @@ int size() {
155159
return queueSize.get();
156160
}
157161

162+
/**
163+
* Returns number of rows has been added to this exchange buffer
164+
*/
165+
long addedRows() {
166+
return addedRows.get();
167+
}
168+
158169
/**
159170
* Adds a listener that will be notified when this exchange buffer is finished.
160171
*/

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSinkHandler.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,4 +205,11 @@ long lastUpdatedTimeInMillis() {
205205
public int bufferSize() {
206206
return buffer.size();
207207
}
208+
209+
/**
210+
* Returns the number of rows (i.e. positions) has been added to this exchange sink
211+
*/
212+
public long addedRows() {
213+
return buffer.addedRows();
214+
}
208215
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.action;
9+
10+
import org.elasticsearch.action.index.IndexRequestBuilder;
11+
import org.elasticsearch.common.settings.Settings;
12+
import org.elasticsearch.compute.lucene.DataPartitioning;
13+
import org.elasticsearch.compute.lucene.LuceneSourceOperator;
14+
import org.elasticsearch.compute.operator.DriverProfile;
15+
import org.elasticsearch.xpack.esql.EsqlTestUtils;
16+
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
17+
18+
import java.util.ArrayList;
19+
import java.util.List;
20+
21+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
22+
import static org.hamcrest.Matchers.equalTo;
23+
import static org.hamcrest.Matchers.hasSize;
24+
25+
public class EarlyTerminationIT extends AbstractEsqlIntegTestCase {
26+
27+
public void testAdjustLimit() {
28+
assumeTrue("require pragmas", canUseQueryPragmas());
29+
String dataNode = internalCluster().startDataOnlyNode();
30+
int numIndices = 10;
31+
int value = 0;
32+
for (int i = 0; i < numIndices; i++) {
33+
String index = "test-" + i;
34+
assertAcked(
35+
admin().indices()
36+
.prepareCreate(index)
37+
.setSettings(indexSettings(1, 0).put("index.routing.allocation.require._name", dataNode))
38+
);
39+
List<IndexRequestBuilder> indexRequests = new ArrayList<>();
40+
for (int d = 0; d < 20; d++) {
41+
indexRequests.add(new IndexRequestBuilder(client()).setIndex(index).setSource("v", Integer.toString(value++)));
42+
}
43+
indexRandom(true, indexRequests);
44+
}
45+
EsqlQueryRequest request = new EsqlQueryRequest();
46+
request.query("FROM test-* | LIMIT 95");
47+
QueryPragmas pragmas = new QueryPragmas(
48+
Settings.builder()
49+
.put(QueryPragmas.MAX_CONCURRENT_SHARDS_PER_NODE.getKey(), 2)
50+
.put(QueryPragmas.DATA_PARTITIONING.getKey(), DataPartitioning.SHARD)
51+
.put(QueryPragmas.TASK_CONCURRENCY.getKey(), between(3, 5))
52+
.build()
53+
);
54+
request.pragmas(pragmas);
55+
request.profile(true);
56+
try (EsqlQueryResponse resp = run(request)) {
57+
List<List<Object>> values = EsqlTestUtils.getValuesList(resp);
58+
assertThat(values, hasSize(95));
59+
List<DriverProfile> drivers = resp.profile().drivers();
60+
List<DriverProfile> queryProfiles = drivers.stream().filter(d -> d.taskDescription().equals("data")).toList();
61+
assertThat(queryProfiles, hasSize(6));
62+
var luceneOperators = queryProfiles.stream().map(d -> (LuceneSourceOperator.Status) d.operators().getFirst().status()).toList();
63+
assertThat(luceneOperators, hasSize(6));
64+
assertThat(luceneOperators.get(0).rowsEmitted() + luceneOperators.get(1).rowsEmitted(), equalTo(40L));
65+
assertThat(luceneOperators.get(2).rowsEmitted() + luceneOperators.get(3).rowsEmitted(), equalTo(40L));
66+
assertThat(luceneOperators.get(4).rowsEmitted() + luceneOperators.get(5).rowsEmitted(), equalTo(15L));
67+
}
68+
}
69+
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Limit.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.elasticsearch.common.io.stream.StreamOutput;
1212
import org.elasticsearch.xpack.esql.capabilities.TelemetryAware;
1313
import org.elasticsearch.xpack.esql.core.expression.Expression;
14+
import org.elasticsearch.xpack.esql.core.expression.Literal;
1415
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
1516
import org.elasticsearch.xpack.esql.core.tree.Source;
1617
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
@@ -93,6 +94,10 @@ public Limit withLimit(Expression limit) {
9394
return new Limit(source(), limit, child(), duplicated);
9495
}
9596

97+
public Limit withLimit(int newLimit) {
98+
return withLimit(new Literal(limit.source(), newLimit, limit.dataType()));
99+
}
100+
96101
public boolean duplicated() {
97102
return duplicated;
98103
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
import org.elasticsearch.xpack.esql.optimizer.LocalPhysicalPlanOptimizer;
3434
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
3535
import org.elasticsearch.xpack.esql.plan.logical.Filter;
36+
import org.elasticsearch.xpack.esql.plan.logical.Limit;
37+
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
3638
import org.elasticsearch.xpack.esql.plan.logical.join.Join;
3739
import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
3840
import org.elasticsearch.xpack.esql.plan.physical.EsSourceExec;
@@ -56,6 +58,7 @@
5658
import java.util.Set;
5759
import java.util.function.Consumer;
5860
import java.util.function.Function;
61+
import java.util.function.LongSupplier;
5962

6063
import static java.util.Arrays.asList;
6164
import static org.elasticsearch.index.mapper.MappedFieldType.FieldExtractPreference.DOC_VALUES;
@@ -102,6 +105,45 @@ public static PhysicalPlan reductionPlan(PhysicalPlan plan) {
102105
return EstimatesRowSize.estimateRowSize(fragment.estimatedRowSize(), reducePlan);
103106
}
104107

108+
/**
109+
* Tries to adjust the limit of the data-node plan based on the number of already-collected rows.
110+
* For example, `FROM x | LIMIT 100` can be safely rewritten to `FROM x | LIMIT 40` if 60 rows have already been collected.
111+
*
112+
* @param plan the data-node plan
113+
* @param collectedRows the supplier to get the number of rows already collected
114+
* @return null if the limit has reached, otherwise a new plan with the adjusted limit
115+
*/
116+
public static PhysicalPlan maybeRewriteDataPlanWithNewLimit(PhysicalPlan plan, LongSupplier collectedRows) {
117+
assert plan instanceof ExchangeSinkExec : "not a data node plan " + plan;
118+
final var fragments = plan.collectFirstChildren(p -> p instanceof FragmentExec);
119+
if (fragments.isEmpty()) {
120+
return plan;
121+
}
122+
final FragmentExec fragment = (FragmentExec) fragments.getFirst();
123+
final var pipelineBreakers = fragment.fragment().collectFirstChildren(Mapper::isPipelineBreaker);
124+
if (pipelineBreakers.isEmpty()) {
125+
return plan;
126+
}
127+
if (pipelineBreakers.getFirst() instanceof Limit firstLimit) {
128+
final int originalLimit = (int) firstLimit.limit().fold(FoldContext.small());
129+
final long collected = collectedRows.getAsLong();
130+
if (originalLimit <= collected) {
131+
return null;
132+
}
133+
return plan.transformUp(FragmentExec.class, f -> {
134+
final LogicalPlan newFragment = f.fragment().transformUp(Limit.class, l -> {
135+
if (l == firstLimit) {
136+
return l.withLimit(Math.toIntExact(originalLimit - collected));
137+
} else {
138+
return l;
139+
}
140+
});
141+
return f.withFragment(newFragment);
142+
});
143+
}
144+
return plan;
145+
}
146+
105147
/**
106148
* Returns a set of concrete indices after resolving the original indices specified in the FROM command.
107149
*/

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -223,10 +223,10 @@ void start() {
223223
parentTask.addListener(
224224
() -> exchangeService.finishSinkHandler(request.sessionId(), new TaskCancelledException(parentTask.getReasonCancelled()))
225225
);
226-
runBatch(0);
226+
runBatch(request.plan(), 0);
227227
}
228228

229-
private void runBatch(int startBatchIndex) {
229+
private void runBatch(PhysicalPlan plan, int startBatchIndex) {
230230
final Configuration configuration = request.configuration();
231231
final String clusterAlias = request.clusterAlias();
232232
final var sessionId = request.sessionId();
@@ -278,7 +278,7 @@ public void onFailure(Exception e) {
278278
null,
279279
() -> exchangeSink.createExchangeSink(pagesProduced::incrementAndGet)
280280
);
281-
computeService.runCompute(parentTask, computeContext, request.plan(), batchListener);
281+
computeService.runCompute(parentTask, computeContext, plan, batchListener);
282282
}, batchListener::onFailure));
283283
}
284284

@@ -351,16 +351,20 @@ private void acquireSearchContexts(
351351
}
352352

353353
private void onBatchCompleted(int lastBatchIndex) {
354-
if (lastBatchIndex < request.shardIds().size() && exchangeSink.isFinished() == false) {
355-
runBatch(lastBatchIndex);
356-
} else {
357-
// don't return until all pages are fetched
358-
var completionListener = computeListener.acquireAvoid();
359-
exchangeSink.addCompletionListener(
360-
ActionListener.runAfter(completionListener, () -> exchangeService.finishSinkHandler(request.sessionId(), null))
361-
);
362-
blockingSink.finish();
354+
// check if we need to re-plan again
355+
if (lastBatchIndex < request.shardIds().size()) {
356+
final PhysicalPlan plan = PlannerUtils.maybeRewriteDataPlanWithNewLimit(request.plan(), exchangeSink::addedRows);
357+
if (plan != null) {
358+
runBatch(plan, lastBatchIndex);
359+
return;
360+
}
363361
}
362+
// don't return until all pages are fetched
363+
var completionListener = computeListener.acquireAvoid();
364+
exchangeSink.addCompletionListener(
365+
ActionListener.runAfter(completionListener, () -> exchangeService.finishSinkHandler(request.sessionId(), null))
366+
);
367+
blockingSink.finish();
364368
}
365369

366370
private boolean addShardLevelFailure(ShardId shardId, Exception e) {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public final class QueryPragmas implements Writeable {
3232
public static final Setting<Integer> EXCHANGE_CONCURRENT_CLIENTS = Setting.intSetting("exchange_concurrent_clients", 3);
3333
public static final Setting<Integer> ENRICH_MAX_WORKERS = Setting.intSetting("enrich_max_workers", 1);
3434

35-
private static final Setting<Integer> TASK_CONCURRENCY = Setting.intSetting(
35+
public static final Setting<Integer> TASK_CONCURRENCY = Setting.intSetting(
3636
"task_concurrency",
3737
ThreadPool.searchOrGetThreadPoolSize(EsExecutors.allocatedProcessors(Settings.EMPTY))
3838
);

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7713,6 +7713,27 @@ public void testReductionPlanForAggs() {
77137713
assertThat(reductionAggs.estimatedRowSize(), equalTo(58)); // double and keyword
77147714
}
77157715

7716+
public void testAdjustLimit() {
7717+
var plan = physicalPlan("""
7718+
FROM test
7719+
| LIMIT 100
7720+
""");
7721+
Tuple<PhysicalPlan, PhysicalPlan> plans = PlannerUtils.breakPlanBetweenCoordinatorAndDataNode(plan, config);
7722+
PhysicalPlan dataNode = plans.v2();
7723+
PhysicalPlan p30 = PlannerUtils.maybeRewriteDataPlanWithNewLimit(dataNode, () -> 30);
7724+
assertNotNull(p30);
7725+
LimitExec limit70 = as(PlannerUtils.reductionPlan(p30), LimitExec.class);
7726+
assertThat(limit70.limit().fold(FoldContext.small()), equalTo(70));
7727+
7728+
PhysicalPlan p60 = PlannerUtils.maybeRewriteDataPlanWithNewLimit(dataNode, () -> 60);
7729+
assertNotNull(p60);
7730+
LimitExec limit40 = as(PlannerUtils.reductionPlan(p60), LimitExec.class);
7731+
assertThat(limit40.limit().fold(FoldContext.small()), equalTo(40));
7732+
7733+
PhysicalPlan p100 = PlannerUtils.maybeRewriteDataPlanWithNewLimit(dataNode, () -> between(100, 120));
7734+
assertNull(p100);
7735+
}
7736+
77167737
@SuppressWarnings("SameParameterValue")
77177738
private static void assertFilterCondition(
77187739
Filter filter,

0 commit comments

Comments
 (0)