Skip to content

Commit df24e4f

Browse files
authored
ESQL: Plumb a way to run phased plans (#110445)
INLINESTATS is going to run two ESQL commands - one to get the STATS and one to join the stats results to the output. This plumbs a way for `EsqlSession#execute` to run multiple dips into the compute engine using a `BiConsumer<PhysicalPlan, ActionListener<Result>> runPhase`. For now, we just plug that right into the output to keep things working as they are now. But soon, so soon, we'll plug in a second phase.
1 parent 5c8c76e commit df24e4f

File tree

6 files changed

+73
-36
lines changed

6 files changed

+73
-36
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/execution/PlanExecutor.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,12 @@
2020
import org.elasticsearch.xpack.esql.session.EsqlConfiguration;
2121
import org.elasticsearch.xpack.esql.session.EsqlSession;
2222
import org.elasticsearch.xpack.esql.session.IndexResolver;
23+
import org.elasticsearch.xpack.esql.session.Result;
2324
import org.elasticsearch.xpack.esql.stats.Metrics;
2425
import org.elasticsearch.xpack.esql.stats.QueryMetric;
2526

27+
import java.util.function.BiConsumer;
28+
2629
import static org.elasticsearch.action.ActionListener.wrap;
2730

2831
public class PlanExecutor {
@@ -48,7 +51,8 @@ public void esql(
4851
String sessionId,
4952
EsqlConfiguration cfg,
5053
EnrichPolicyResolver enrichPolicyResolver,
51-
ActionListener<PhysicalPlan> listener
54+
BiConsumer<PhysicalPlan, ActionListener<Result>> runPhase,
55+
ActionListener<Result> listener
5256
) {
5357
final var session = new EsqlSession(
5458
sessionId,
@@ -63,7 +67,7 @@ public void esql(
6367
);
6468
QueryMetric clientId = QueryMetric.fromString("rest");
6569
metrics.total(clientId);
66-
session.execute(request, wrap(listener::onResponse, ex -> {
70+
session.execute(request, runPhase, wrap(listener::onResponse, ex -> {
6771
// TODO when we decide if we will differentiate Kibana from REST, this String value will likely come from the request
6872
metrics.failed(clientId);
6973
listener.onFailure(ex);

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner;
7373
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
7474
import org.elasticsearch.xpack.esql.session.EsqlConfiguration;
75+
import org.elasticsearch.xpack.esql.session.Result;
7576

7677
import java.util.ArrayList;
7778
import java.util.Collections;
@@ -89,8 +90,6 @@
8990
* Computes the result of a {@link PhysicalPlan}.
9091
*/
9192
public class ComputeService {
92-
public record Result(List<Page> pages, List<DriverProfile> profiles) {}
93-
9493
private static final Logger LOGGER = LogManager.getLogger(ComputeService.class);
9594
private final SearchService searchService;
9695
private final BigArrays bigArrays;
@@ -176,7 +175,7 @@ public void execute(
176175
rootTask,
177176
computeContext,
178177
coordinatorPlan,
179-
listener.map(driverProfiles -> new Result(collectedPages, driverProfiles))
178+
listener.map(driverProfiles -> new Result(physicalPlan.output(), collectedPages, driverProfiles))
180179
);
181180
return;
182181
} else {
@@ -201,7 +200,9 @@ public void execute(
201200
);
202201
try (
203202
Releasable ignored = exchangeSource.addEmptySink();
204-
RefCountingListener refs = new RefCountingListener(listener.map(unused -> new Result(collectedPages, collectedProfiles)))
203+
RefCountingListener refs = new RefCountingListener(
204+
listener.map(unused -> new Result(physicalPlan.output(), collectedPages, collectedProfiles))
205+
)
205206
) {
206207
// run compute on the coordinator
207208
exchangeSource.addCompletionListener(refs.acquire());

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java

Lines changed: 26 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -37,14 +37,17 @@
3737
import org.elasticsearch.xpack.esql.enrich.EnrichLookupService;
3838
import org.elasticsearch.xpack.esql.enrich.EnrichPolicyResolver;
3939
import org.elasticsearch.xpack.esql.execution.PlanExecutor;
40+
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
4041
import org.elasticsearch.xpack.esql.session.EsqlConfiguration;
42+
import org.elasticsearch.xpack.esql.session.Result;
4143

4244
import java.io.IOException;
4345
import java.time.ZoneOffset;
4446
import java.util.List;
4547
import java.util.Locale;
4648
import java.util.Map;
4749
import java.util.concurrent.Executor;
50+
import java.util.function.BiConsumer;
4851

4952
import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN;
5053

@@ -157,37 +160,37 @@ private void innerExecute(Task task, EsqlQueryRequest request, ActionListener<Es
157160
request.tables()
158161
);
159162
String sessionId = sessionID(task);
163+
BiConsumer<PhysicalPlan, ActionListener<Result>> runPhase = (physicalPlan, resultListener) -> computeService.execute(
164+
sessionId,
165+
(CancellableTask) task,
166+
physicalPlan,
167+
configuration,
168+
resultListener
169+
);
170+
160171
planExecutor.esql(
161172
request,
162173
sessionId,
163174
configuration,
164175
enrichPolicyResolver,
165-
listener.delegateFailureAndWrap(
166-
(delegate, physicalPlan) -> computeService.execute(
167-
sessionId,
168-
(CancellableTask) task,
169-
physicalPlan,
170-
configuration,
171-
delegate.map(result -> {
172-
List<ColumnInfoImpl> columns = physicalPlan.output()
173-
.stream()
174-
.map(c -> new ColumnInfoImpl(c.qualifiedName(), c.dataType().outputType()))
175-
.toList();
176-
EsqlQueryResponse.Profile profile = configuration.profile()
177-
? new EsqlQueryResponse.Profile(result.profiles())
178-
: null;
179-
if (task instanceof EsqlQueryTask asyncTask && request.keepOnCompletion()) {
180-
String id = asyncTask.getExecutionId().getEncoded();
181-
return new EsqlQueryResponse(columns, result.pages(), profile, request.columnar(), id, false, request.async());
182-
} else {
183-
return new EsqlQueryResponse(columns, result.pages(), profile, request.columnar(), request.async());
184-
}
185-
})
186-
)
187-
)
176+
runPhase,
177+
listener.map(result -> toResponse(task, request, configuration, result))
188178
);
189179
}
190180

181+
private EsqlQueryResponse toResponse(Task task, EsqlQueryRequest request, EsqlConfiguration configuration, Result result) {
182+
List<ColumnInfoImpl> columns = result.schema()
183+
.stream()
184+
.map(c -> new ColumnInfoImpl(c.qualifiedName(), c.dataType().outputType()))
185+
.toList();
186+
EsqlQueryResponse.Profile profile = configuration.profile() ? new EsqlQueryResponse.Profile(result.profiles()) : null;
187+
if (task instanceof EsqlQueryTask asyncTask && request.keepOnCompletion()) {
188+
String id = asyncTask.getExecutionId().getEncoded();
189+
return new EsqlQueryResponse(columns, result.pages(), profile, request.columnar(), id, false, request.async());
190+
}
191+
return new EsqlQueryResponse(columns, result.pages(), profile, request.columnar(), request.async());
192+
}
193+
191194
/**
192195
* Returns the ID for this compute session. The ID is unique within the cluster, and is used
193196
* to identify the compute-session across nodes. The ID is just the TaskID of the task that

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import java.util.List;
5959
import java.util.Map;
6060
import java.util.Set;
61+
import java.util.function.BiConsumer;
6162
import java.util.function.BiFunction;
6263
import java.util.function.Predicate;
6364
import java.util.stream.Collectors;
@@ -110,10 +111,19 @@ public String sessionId() {
110111
return sessionId;
111112
}
112113

113-
public void execute(EsqlQueryRequest request, ActionListener<PhysicalPlan> listener) {
114+
public void execute(
115+
EsqlQueryRequest request,
116+
BiConsumer<PhysicalPlan, ActionListener<Result>> runPhase,
117+
ActionListener<Result> listener
118+
) {
114119
LOGGER.debug("ESQL query:\n{}", request.query());
120+
LogicalPlan logicalPlan = parse(request.query(), request.params());
121+
logicalPlanToPhysicalPlan(logicalPlan, request, listener.delegateFailureAndWrap((l, r) -> runPhase.accept(r, l)));
122+
}
123+
124+
private void logicalPlanToPhysicalPlan(LogicalPlan logicalPlan, EsqlQueryRequest request, ActionListener<PhysicalPlan> listener) {
115125
optimizedPhysicalPlan(
116-
parse(request.query(), request.params()),
126+
logicalPlan,
117127
listener.map(plan -> EstimatesRowSize.estimateRowSize(0, plan.transformUp(FragmentExec.class, f -> {
118128
QueryBuilder filter = request.filter();
119129
if (filter != null) {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/Result.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,23 @@
77

88
package org.elasticsearch.xpack.esql.session;
99

10+
import org.elasticsearch.compute.data.Block;
11+
import org.elasticsearch.compute.data.Page;
12+
import org.elasticsearch.compute.operator.DriverProfile;
1013
import org.elasticsearch.xpack.esql.core.expression.Attribute;
14+
import org.elasticsearch.xpack.esql.core.plan.logical.LogicalPlan;
1115

1216
import java.util.List;
1317

14-
public record Result(List<Attribute> columns, List<List<Object>> values) {}
18+
/**
19+
* Results from running a chunk of ESQL.
20+
* @param schema "Schema" of the {@link Attribute}s that are produced by the {@link LogicalPlan}
21+
* that was run. Each {@link Page} contains a {@link Block} of values for each
22+
* attribute in this list.
23+
* @param pages Actual values produced by running the ESQL.
24+
* @param profiles {@link DriverProfile}s from all drivers that ran to produce the output. These
25+
* are quite cheap to build, so we build them for all ESQL runs, regardless of if
26+
* users have asked for them. But we only include them in the results if users ask
27+
* for them.
28+
*/
29+
public record Result(List<Attribute> schema, List<Page> pages, List<DriverProfile> profiles) {}

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/stats/PlanExecutorMetricsTests.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.elasticsearch.xpack.esql.execution.PlanExecutor;
2525
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
2626
import org.elasticsearch.xpack.esql.session.IndexResolver;
27+
import org.elasticsearch.xpack.esql.session.Result;
2728
import org.elasticsearch.xpack.esql.type.EsqlDataTypeRegistry;
2829
import org.junit.After;
2930
import org.junit.Before;
@@ -33,6 +34,7 @@
3334
import java.util.HashMap;
3435
import java.util.List;
3536
import java.util.Map;
37+
import java.util.function.BiConsumer;
3638

3739
import static org.elasticsearch.xpack.esql.EsqlTestUtils.withDefaultLimitWarning;
3840
import static org.hamcrest.Matchers.instanceOf;
@@ -100,9 +102,10 @@ public void testFailedMetric() {
100102
var request = new EsqlQueryRequest();
101103
// test a failed query: xyz field doesn't exist
102104
request.query("from test | stats m = max(xyz)");
103-
planExecutor.esql(request, randomAlphaOfLength(10), EsqlTestUtils.TEST_CFG, enrichResolver, new ActionListener<>() {
105+
BiConsumer<PhysicalPlan, ActionListener<Result>> runPhase = (p, r) -> fail("this shouldn't happen");
106+
planExecutor.esql(request, randomAlphaOfLength(10), EsqlTestUtils.TEST_CFG, enrichResolver, runPhase, new ActionListener<>() {
104107
@Override
105-
public void onResponse(PhysicalPlan physicalPlan) {
108+
public void onResponse(Result result) {
106109
fail("this shouldn't happen");
107110
}
108111

@@ -119,9 +122,10 @@ public void onFailure(Exception e) {
119122

120123
// fix the failing query: foo field does exist
121124
request.query("from test | stats m = max(foo)");
122-
planExecutor.esql(request, randomAlphaOfLength(10), EsqlTestUtils.TEST_CFG, enrichResolver, new ActionListener<>() {
125+
runPhase = (p, r) -> r.onResponse(null);
126+
planExecutor.esql(request, randomAlphaOfLength(10), EsqlTestUtils.TEST_CFG, enrichResolver, runPhase, new ActionListener<>() {
123127
@Override
124-
public void onResponse(PhysicalPlan physicalPlan) {}
128+
public void onResponse(Result result) {}
125129

126130
@Override
127131
public void onFailure(Exception e) {

0 commit comments

Comments
 (0)