Skip to content

Commit cd80187

Browse files
committed
Rewrite limit in data node execution
1 parent 333e252 commit cd80187

File tree

7 files changed

+220
-13
lines changed

7 files changed

+220
-13
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeBuffer.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import java.util.Queue;
1717
import java.util.concurrent.ConcurrentLinkedQueue;
1818
import java.util.concurrent.atomic.AtomicInteger;
19+
import java.util.concurrent.atomic.AtomicLong;
1920

2021
final class ExchangeBuffer {
2122

@@ -31,6 +32,7 @@ final class ExchangeBuffer {
3132
private SubscribableListener<Void> notFullFuture = null;
3233

3334
private final SubscribableListener<Void> completionFuture = new SubscribableListener<>();
35+
private final AtomicLong addedRows = new AtomicLong();
3436

3537
private volatile boolean noMoreInputs = false;
3638

@@ -58,6 +60,8 @@ void addPage(Page page) {
5860
completionFuture.onResponse(null);
5961
}
6062
}
63+
} else {
64+
addedRows.addAndGet(page.getPositionCount());
6165
}
6266
}
6367

@@ -155,6 +159,13 @@ int size() {
155159
return queueSize.get();
156160
}
157161

162+
/**
163+
* Returns number of rows has been added to this exchange buffer
164+
*/
165+
long addedRows() {
166+
return addedRows.get();
167+
}
168+
158169
/**
159170
* Adds a listener that will be notified when this exchange buffer is finished.
160171
*/

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSinkHandler.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,4 +205,11 @@ long lastUpdatedTimeInMillis() {
205205
public int bufferSize() {
206206
return buffer.size();
207207
}
208+
209+
/**
210+
* Returns the number of rows (i.e. positions) has been added to this exchange sink
211+
*/
212+
public long addedRows() {
213+
return buffer.addedRows();
214+
}
208215
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.action;
9+
10+
import org.elasticsearch.action.index.IndexRequestBuilder;
11+
import org.elasticsearch.common.settings.Settings;
12+
import org.elasticsearch.compute.lucene.DataPartitioning;
13+
import org.elasticsearch.compute.lucene.LuceneSourceOperator;
14+
import org.elasticsearch.compute.operator.DriverProfile;
15+
import org.elasticsearch.xpack.esql.EsqlTestUtils;
16+
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
17+
18+
import java.util.ArrayList;
19+
import java.util.List;
20+
21+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
22+
import static org.hamcrest.Matchers.equalTo;
23+
import static org.hamcrest.Matchers.hasSize;
24+
25+
public class EarlyTerminationIT extends AbstractEsqlIntegTestCase {
26+
27+
public void testAdjustLimit() {
28+
assumeTrue("require pragmas", canUseQueryPragmas());
29+
String dataNode = internalCluster().startDataOnlyNode();
30+
int numIndices = 10;
31+
int value = 0;
32+
for (int i = 0; i < numIndices; i++) {
33+
String index = "test-" + i;
34+
assertAcked(
35+
admin().indices()
36+
.prepareCreate(index)
37+
.setSettings(indexSettings(1, 0).put("index.routing.allocation.require._name", dataNode))
38+
);
39+
List<IndexRequestBuilder> indexRequests = new ArrayList<>();
40+
for (int d = 0; d < 20; d++) {
41+
indexRequests.add(new IndexRequestBuilder(client()).setIndex(index).setSource("v", Integer.toString(value++)));
42+
}
43+
indexRandom(true, indexRequests);
44+
}
45+
EsqlQueryRequest request = new EsqlQueryRequest();
46+
request.query("FROM test-* | LIMIT 95");
47+
QueryPragmas pragmas = new QueryPragmas(
48+
Settings.builder()
49+
.put(QueryPragmas.MAX_CONCURRENT_SHARDS_PER_NODE.getKey(), 2)
50+
.put(QueryPragmas.DATA_PARTITIONING.getKey(), DataPartitioning.SHARD)
51+
.put(QueryPragmas.TASK_CONCURRENCY.getKey(), between(3, 5))
52+
.build()
53+
);
54+
request.pragmas(pragmas);
55+
request.profile(true);
56+
try (EsqlQueryResponse resp = run(request)) {
57+
List<List<Object>> values = EsqlTestUtils.getValuesList(resp);
58+
assertThat(values, hasSize(95));
59+
List<DriverProfile> drivers = resp.profile().drivers();
60+
List<DriverProfile> queryProfiles = drivers.stream().filter(d -> d.taskDescription().equals("data")).toList();
61+
assertThat(queryProfiles, hasSize(6));
62+
var luceneOperators = queryProfiles.stream().map(d -> (LuceneSourceOperator.Status) d.operators().getFirst().status()).toList();
63+
assertThat(luceneOperators, hasSize(6));
64+
assertThat(luceneOperators.get(0).rowsEmitted() + luceneOperators.get(1).rowsEmitted(), equalTo(40L));
65+
assertThat(luceneOperators.get(2).rowsEmitted() + luceneOperators.get(3).rowsEmitted(), equalTo(40L));
66+
assertThat(luceneOperators.get(4).rowsEmitted() + luceneOperators.get(5).rowsEmitted(), equalTo(15L));
67+
}
68+
}
69+
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.optimizer;
9+
10+
import org.elasticsearch.core.Nullable;
11+
import org.elasticsearch.xpack.esql.core.expression.FoldContext;
12+
import org.elasticsearch.xpack.esql.core.expression.Literal;
13+
import org.elasticsearch.xpack.esql.plan.logical.Limit;
14+
import org.elasticsearch.xpack.esql.plan.physical.FragmentExec;
15+
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
16+
import org.elasticsearch.xpack.esql.planner.mapper.Mapper;
17+
18+
import java.util.function.IntSupplier;
19+
20+
/**
21+
* Attempts to rewrite the fragment enclosed in a data-node plan based on the progress of the query. For example:
22+
* `FROM x | LIMIT 100` can be safely rewritten to `FROM x | LIMIT 40` if 60 rows have already been collected.
23+
* TODO: Rewrite TopN to filters
24+
*/
25+
public final class QueryProgressFragmentOptimizer {
26+
private final PhysicalVerifier verifier = PhysicalVerifier.INSTANCE;
27+
private final IntSupplier alreadyCollectedLimit;
28+
29+
/**
30+
* @param alreadyCollectedLimit the supplier to get the number of rows already collected
31+
*/
32+
public QueryProgressFragmentOptimizer(@Nullable IntSupplier alreadyCollectedLimit) {
33+
this.alreadyCollectedLimit = alreadyCollectedLimit;
34+
}
35+
36+
/**
37+
* Attempts to optimize the fragment enclosed in a data-node plan based on the progress of the query.
38+
* @param plan the input plan
39+
* @return the optimized plan. If this returns null, the query can be early terminated.
40+
*/
41+
public PhysicalPlan optimizeFragment(PhysicalPlan plan) {
42+
if (alreadyCollectedLimit == null) {
43+
return plan;
44+
}
45+
final var fragments = plan.collectFirstChildren(p -> p instanceof FragmentExec);
46+
if (fragments.size() != 1) {
47+
return plan;
48+
}
49+
final FragmentExec fragment = (FragmentExec) fragments.getFirst();
50+
final var pipelineBreakers = fragment.fragment().collectFirstChildren(Mapper::isPipelineBreaker);
51+
if (pipelineBreakers.isEmpty()) {
52+
return plan;
53+
}
54+
// Rewrite LIMIT
55+
if (pipelineBreakers.getFirst() instanceof Limit firstLimit) {
56+
final int collected = alreadyCollectedLimit.getAsInt();
57+
if (collected == 0) {
58+
return plan;
59+
}
60+
final int originalLimit = (int) firstLimit.limit().fold(FoldContext.small());
61+
if (originalLimit <= collected) {
62+
return null;
63+
}
64+
final var newFragment = fragment.fragment().transformUp(Limit.class, l -> {
65+
if (l == firstLimit) {
66+
return l.withLimit(Literal.of(firstLimit.limit(), originalLimit - collected));
67+
} else {
68+
return l;
69+
}
70+
});
71+
var newPlan = plan.transformUp(FragmentExec.class, f -> {
72+
if (f == fragment) {
73+
return new FragmentExec(newFragment);
74+
} else {
75+
return f;
76+
}
77+
});
78+
verifier.verify(newPlan);
79+
return newPlan;
80+
}
81+
82+
return plan;
83+
}
84+
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.elasticsearch.transport.TransportRequestOptions;
4141
import org.elasticsearch.transport.TransportService;
4242
import org.elasticsearch.xpack.esql.core.expression.FoldContext;
43+
import org.elasticsearch.xpack.esql.optimizer.QueryProgressFragmentOptimizer;
4344
import org.elasticsearch.xpack.esql.plan.physical.ExchangeSinkExec;
4445
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
4546
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
@@ -153,6 +154,7 @@ protected void sendRequest(
153154
computeListener.acquireAvoid()
154155
);
155156
final boolean sameNode = transportService.getLocalNode().getId().equals(connection.getNode().getId());
157+
// TODO: Rewrite the plan with the optimizer
156158
var dataNodeRequest = new DataNodeRequest(
157159
childSessionId,
158160
configuration,
@@ -199,6 +201,7 @@ private class DataNodeRequestExecutor {
199201
private final ExchangeSink blockingSink; // block until we have completed on all shards or the coordinator has enough data
200202
private final boolean failFastOnShardFailure;
201203
private final Map<ShardId, Exception> shardLevelFailures;
204+
private final QueryProgressFragmentOptimizer fragmentOptimizer;
202205

203206
DataNodeRequestExecutor(
204207
DataNodeRequest request,
@@ -217,16 +220,17 @@ private class DataNodeRequestExecutor {
217220
this.failFastOnShardFailure = failFastOnShardFailure;
218221
this.shardLevelFailures = shardLevelFailures;
219222
this.blockingSink = exchangeSink.createExchangeSink(() -> {});
223+
this.fragmentOptimizer = new QueryProgressFragmentOptimizer(() -> (int) Math.min(Integer.MAX_VALUE, exchangeSink.addedRows()));
220224
}
221225

222226
void start() {
223227
parentTask.addListener(
224228
() -> exchangeService.finishSinkHandler(request.sessionId(), new TaskCancelledException(parentTask.getReasonCancelled()))
225229
);
226-
runBatch(0);
230+
runBatch(request.plan(), 0);
227231
}
228232

229-
private void runBatch(int startBatchIndex) {
233+
private void runBatch(PhysicalPlan plan, int startBatchIndex) {
230234
final Configuration configuration = request.configuration();
231235
final String clusterAlias = request.clusterAlias();
232236
final var sessionId = request.sessionId();
@@ -278,7 +282,7 @@ public void onFailure(Exception e) {
278282
null,
279283
() -> exchangeSink.createExchangeSink(pagesProduced::incrementAndGet)
280284
);
281-
computeService.runCompute(parentTask, computeContext, request.plan(), batchListener);
285+
computeService.runCompute(parentTask, computeContext, plan, batchListener);
282286
}, batchListener::onFailure));
283287
}
284288

@@ -351,16 +355,20 @@ private void acquireSearchContexts(
351355
}
352356

353357
private void onBatchCompleted(int lastBatchIndex) {
354-
if (lastBatchIndex < request.shardIds().size() && exchangeSink.isFinished() == false) {
355-
runBatch(lastBatchIndex);
356-
} else {
357-
// don't return until all pages are fetched
358-
var completionListener = computeListener.acquireAvoid();
359-
exchangeSink.addCompletionListener(
360-
ActionListener.runAfter(completionListener, () -> exchangeService.finishSinkHandler(request.sessionId(), null))
361-
);
362-
blockingSink.finish();
358+
// check if we need to re-plan again
359+
if (lastBatchIndex < request.shardIds().size()) {
360+
final PhysicalPlan plan = fragmentOptimizer.optimizeFragment(request.plan());
361+
if (plan != null) {
362+
runBatch(plan, lastBatchIndex);
363+
return;
364+
}
363365
}
366+
// don't return until all pages are fetched
367+
var completionListener = computeListener.acquireAvoid();
368+
exchangeSink.addCompletionListener(
369+
ActionListener.runAfter(completionListener, () -> exchangeService.finishSinkHandler(request.sessionId(), null))
370+
);
371+
blockingSink.finish();
364372
}
365373

366374
private boolean addShardLevelFailure(ShardId shardId, Exception e) {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public final class QueryPragmas implements Writeable {
3232
public static final Setting<Integer> EXCHANGE_CONCURRENT_CLIENTS = Setting.intSetting("exchange_concurrent_clients", 3);
3333
public static final Setting<Integer> ENRICH_MAX_WORKERS = Setting.intSetting("enrich_max_workers", 1);
3434

35-
private static final Setting<Integer> TASK_CONCURRENCY = Setting.intSetting(
35+
public static final Setting<Integer> TASK_CONCURRENCY = Setting.intSetting(
3636
"task_concurrency",
3737
ThreadPool.searchOrGetThreadPoolSize(EsExecutors.allocatedProcessors(Settings.EMPTY))
3838
);

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
import org.elasticsearch.xpack.esql.core.tree.Source;
6262
import org.elasticsearch.xpack.esql.core.type.DataType;
6363
import org.elasticsearch.xpack.esql.core.type.EsField;
64+
import org.elasticsearch.xpack.esql.core.util.Holder;
6465
import org.elasticsearch.xpack.esql.enrich.ResolvedEnrichPolicy;
6566
import org.elasticsearch.xpack.esql.expression.Order;
6667
import org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry;
@@ -7713,6 +7714,33 @@ public void testReductionPlanForAggs() {
77137714
assertThat(reductionAggs.estimatedRowSize(), equalTo(58)); // double and keyword
77147715
}
77157716

7717+
public void testAdjustLimit() {
7718+
var plan = physicalPlan("""
7719+
FROM test
7720+
| LIMIT 100
7721+
""");
7722+
Holder<Integer> collectedRows = new Holder<>(0);
7723+
Tuple<PhysicalPlan, PhysicalPlan> plans = PlannerUtils.breakPlanBetweenCoordinatorAndDataNode(plan, config);
7724+
PhysicalPlan dataNode = plans.v2();
7725+
var fragmentOptimizer = new QueryProgressFragmentOptimizer(collectedRows::get);
7726+
assertSame(dataNode, fragmentOptimizer.optimizeFragment(dataNode));
7727+
collectedRows.set(30);
7728+
PhysicalPlan p30 = fragmentOptimizer.optimizeFragment(dataNode);
7729+
assertNotNull(p30);
7730+
LimitExec limit70 = as(PlannerUtils.reductionPlan(p30), LimitExec.class);
7731+
assertThat(limit70.limit().fold(FoldContext.small()), equalTo(70));
7732+
7733+
collectedRows.set(60);
7734+
PhysicalPlan p60 = fragmentOptimizer.optimizeFragment(dataNode);
7735+
assertNotNull(p60);
7736+
LimitExec limit40 = as(PlannerUtils.reductionPlan(p60), LimitExec.class);
7737+
assertThat(limit40.limit().fold(FoldContext.small()), equalTo(40));
7738+
7739+
collectedRows.set(between(100, 120));
7740+
PhysicalPlan p100 = fragmentOptimizer.optimizeFragment(dataNode);
7741+
assertNull(p100);
7742+
}
7743+
77167744
@SuppressWarnings("SameParameterValue")
77177745
private static void assertFilterCondition(
77187746
Filter filter,

0 commit comments

Comments
 (0)