Skip to content

Commit c666679

Browse files
authored
ESQL: Add asynchronous pre-optimization step for logical plan (#131440)
1 parent 9db4361 commit c666679

File tree

7 files changed

+251
-23
lines changed

7 files changed

+251
-23
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/execution/PlanExecutor.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry;
2222
import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext;
2323
import org.elasticsearch.xpack.esql.optimizer.LogicalPlanOptimizer;
24+
import org.elasticsearch.xpack.esql.optimizer.LogicalPlanPreOptimizer;
25+
import org.elasticsearch.xpack.esql.optimizer.LogicalPreOptimizerContext;
2426
import org.elasticsearch.xpack.esql.planner.mapper.Mapper;
2527
import org.elasticsearch.xpack.esql.plugin.TransportActionServices;
2628
import org.elasticsearch.xpack.esql.querylog.EsqlQueryLog;
@@ -85,6 +87,7 @@ public void esql(
8587
indexResolver,
8688
enrichPolicyResolver,
8789
preAnalyzer,
90+
new LogicalPlanPreOptimizer(new LogicalPreOptimizerContext(foldContext)),
8891
functionRegistry,
8992
new LogicalPlanOptimizer(new LogicalOptimizerContext(cfg, foldContext)),
9093
mapper,
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.optimizer;
9+
10+
import org.elasticsearch.action.ActionListener;
11+
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
12+
13+
/**
14+
* The class is responsible for invoking any steps that need to be applied to the logical plan,
15+
* before this is being optimized.
16+
* <p>
17+
* This is useful, especially if you need to execute some async tasks before the plan is optimized.
18+
* </p>
19+
*/
20+
public class LogicalPlanPreOptimizer {
21+
22+
private final LogicalPreOptimizerContext preOptimizerContext;
23+
24+
public LogicalPlanPreOptimizer(LogicalPreOptimizerContext preOptimizerContext) {
25+
this.preOptimizerContext = preOptimizerContext;
26+
}
27+
28+
/**
29+
* Pre-optimize a logical plan.
30+
*
31+
* @param plan the analyzed logical plan to pre-optimize
32+
* @param listener the listener returning the pre-optimized plan when pre-optimization is complete
33+
*/
34+
public void preOptimize(LogicalPlan plan, ActionListener<LogicalPlan> listener) {
35+
if (plan.analyzed() == false) {
36+
listener.onFailure(new IllegalStateException("Expected analyzed plan"));
37+
return;
38+
}
39+
40+
doPreOptimize(plan, listener.delegateFailureAndWrap((l, preOptimized) -> {
41+
preOptimized.setPreOptimized();
42+
listener.onResponse(preOptimized);
43+
}));
44+
}
45+
46+
private void doPreOptimize(LogicalPlan plan, ActionListener<LogicalPlan> listener) {
47+
// this is where we will be executing async tasks
48+
listener.onResponse(plan);
49+
}
50+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.optimizer;
9+
10+
import org.elasticsearch.xpack.esql.core.expression.FoldContext;
11+
12+
import java.util.Objects;
13+
14+
public class LogicalPreOptimizerContext {
15+
16+
private final FoldContext foldCtx;
17+
18+
public LogicalPreOptimizerContext(FoldContext foldCtx) {
19+
this.foldCtx = foldCtx;
20+
}
21+
22+
public FoldContext foldCtx() {
23+
return foldCtx;
24+
}
25+
26+
@Override
27+
public boolean equals(Object obj) {
28+
if (obj == this) return true;
29+
if (obj == null || obj.getClass() != this.getClass()) return false;
30+
var that = (LogicalPreOptimizerContext) obj;
31+
return this.foldCtx.equals(that.foldCtx);
32+
}
33+
34+
@Override
35+
public int hashCode() {
36+
return Objects.hash(foldCtx);
37+
}
38+
39+
@Override
40+
public String toString() {
41+
return "LogicalPreOptimizerContext[foldCtx=" + foldCtx + ']';
42+
}
43+
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/LogicalPlan.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ public enum Stage {
2525
PARSED,
2626
PRE_ANALYZED,
2727
ANALYZED,
28+
PRE_OPTIMIZED,
2829
OPTIMIZED;
2930
}
3031

@@ -52,6 +53,14 @@ public void setAnalyzed() {
5253
stage = Stage.ANALYZED;
5354
}
5455

56+
public boolean preOptimized() {
57+
return stage.ordinal() >= Stage.PRE_OPTIMIZED.ordinal();
58+
}
59+
60+
public void setPreOptimized() {
61+
stage = Stage.PRE_OPTIMIZED;
62+
}
63+
5564
public boolean optimized() {
5665
return stage.ordinal() >= Stage.OPTIMIZED.ordinal();
5766
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@
7171
import org.elasticsearch.xpack.esql.inference.InferenceResolution;
7272
import org.elasticsearch.xpack.esql.inference.InferenceRunner;
7373
import org.elasticsearch.xpack.esql.optimizer.LogicalPlanOptimizer;
74+
import org.elasticsearch.xpack.esql.optimizer.LogicalPlanPreOptimizer;
7475
import org.elasticsearch.xpack.esql.optimizer.PhysicalOptimizerContext;
7576
import org.elasticsearch.xpack.esql.optimizer.PhysicalPlanOptimizer;
7677
import org.elasticsearch.xpack.esql.parser.EsqlParser;
@@ -147,6 +148,7 @@ public interface PlanRunner {
147148
private final PreAnalyzer preAnalyzer;
148149
private final Verifier verifier;
149150
private final EsqlFunctionRegistry functionRegistry;
151+
private final LogicalPlanPreOptimizer logicalPlanPreOptimizer;
150152
private final LogicalPlanOptimizer logicalPlanOptimizer;
151153
private final PreMapper preMapper;
152154

@@ -168,6 +170,7 @@ public EsqlSession(
168170
IndexResolver indexResolver,
169171
EnrichPolicyResolver enrichPolicyResolver,
170172
PreAnalyzer preAnalyzer,
173+
LogicalPlanPreOptimizer logicalPlanPreOptimizer,
171174
EsqlFunctionRegistry functionRegistry,
172175
LogicalPlanOptimizer logicalPlanOptimizer,
173176
Mapper mapper,
@@ -181,6 +184,7 @@ public EsqlSession(
181184
this.indexResolver = indexResolver;
182185
this.enrichPolicyResolver = enrichPolicyResolver;
183186
this.preAnalyzer = preAnalyzer;
187+
this.logicalPlanPreOptimizer = logicalPlanPreOptimizer;
184188
this.verifier = verifier;
185189
this.functionRegistry = functionRegistry;
186190
this.mapper = mapper;
@@ -212,11 +216,10 @@ public void execute(EsqlQueryRequest request, EsqlExecutionInfo executionInfo, P
212216
analyzedPlan(parsed, executionInfo, request.filter(), new EsqlCCSUtils.CssPartialErrorsActionListener(executionInfo, listener) {
213217
@Override
214218
public void onResponse(LogicalPlan analyzedPlan) {
215-
LogicalPlan optimizedPlan = optimizedPlan(analyzedPlan);
216-
preMapper.preMapper(
217-
optimizedPlan,
218-
listener.delegateFailureAndWrap((l, p) -> executeOptimizedPlan(request, executionInfo, planRunner, p, l))
219-
);
219+
SubscribableListener.<LogicalPlan>newForked(l -> preOptimizedPlan(analyzedPlan, l))
220+
.<LogicalPlan>andThen((l, p) -> preMapper.preMapper(optimizedPlan(p), l))
221+
.<Result>andThen((l, p) -> executeOptimizedPlan(request, executionInfo, planRunner, p, l))
222+
.addListener(listener);
220223
}
221224
});
222225
}
@@ -1043,14 +1046,18 @@ private PhysicalPlan logicalPlanToPhysicalPlan(LogicalPlan optimizedPlan, EsqlQu
10431046
}
10441047

10451048
public LogicalPlan optimizedPlan(LogicalPlan logicalPlan) {
1046-
if (logicalPlan.analyzed() == false) {
1047-
throw new IllegalStateException("Expected analyzed plan");
1049+
if (logicalPlan.preOptimized() == false) {
1050+
throw new IllegalStateException("Expected pre-optimized plan");
10481051
}
10491052
var plan = logicalPlanOptimizer.optimize(logicalPlan);
10501053
LOGGER.debug("Optimized logicalPlan plan:\n{}", plan);
10511054
return plan;
10521055
}
10531056

1057+
public void preOptimizedPlan(LogicalPlan logicalPlan, ActionListener<LogicalPlan> listener) {
1058+
logicalPlanPreOptimizer.preOptimize(logicalPlan, listener);
1059+
}
1060+
10541061
public PhysicalPlan physicalPlan(LogicalPlan optimizedPlan) {
10551062
if (optimizedPlan.optimized() == false) {
10561063
throw new IllegalStateException("Expected optimized plan");

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@
7373
import org.elasticsearch.xpack.esql.optimizer.LocalPhysicalOptimizerContext;
7474
import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext;
7575
import org.elasticsearch.xpack.esql.optimizer.LogicalPlanOptimizer;
76+
import org.elasticsearch.xpack.esql.optimizer.LogicalPlanPreOptimizer;
77+
import org.elasticsearch.xpack.esql.optimizer.LogicalPreOptimizerContext;
7678
import org.elasticsearch.xpack.esql.optimizer.TestLocalPhysicalPlanOptimizer;
7779
import org.elasticsearch.xpack.esql.parser.EsqlParser;
7880
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
@@ -582,6 +584,7 @@ private ActualResults executePlan(BigArrays bigArrays) throws Exception {
582584
null,
583585
null,
584586
null,
587+
new LogicalPlanPreOptimizer(new LogicalPreOptimizerContext(foldCtx)),
585588
functionRegistry,
586589
new LogicalPlanOptimizer(new LogicalOptimizerContext(configuration, foldCtx)),
587590
mapper,
@@ -594,24 +597,27 @@ private ActualResults executePlan(BigArrays bigArrays) throws Exception {
594597

595598
PlainActionFuture<ActualResults> listener = new PlainActionFuture<>();
596599

597-
session.executeOptimizedPlan(
598-
new EsqlQueryRequest(),
599-
new EsqlExecutionInfo(randomBoolean()),
600-
planRunner(bigArrays, foldCtx, physicalOperationProviders),
601-
session.optimizedPlan(analyzed),
602-
listener.delegateFailureAndWrap(
603-
// Wrap so we can capture the warnings in the calling thread
604-
(next, result) -> next.onResponse(
605-
new ActualResults(
606-
result.schema().stream().map(Attribute::name).toList(),
607-
result.schema().stream().map(a -> Type.asType(a.dataType().nameUpper())).toList(),
608-
result.schema().stream().map(Attribute::dataType).toList(),
609-
result.pages(),
610-
threadPool.getThreadContext().getResponseHeaders()
600+
session.preOptimizedPlan(analyzed, listener.delegateFailureAndWrap((l, preOptimized) -> {
601+
session.executeOptimizedPlan(
602+
new EsqlQueryRequest(),
603+
new EsqlExecutionInfo(randomBoolean()),
604+
planRunner(bigArrays, foldCtx, physicalOperationProviders),
605+
session.optimizedPlan(preOptimized),
606+
listener.delegateFailureAndWrap(
607+
// Wrap so we can capture the warnings in the calling thread
608+
(next, result) -> next.onResponse(
609+
new ActualResults(
610+
result.schema().stream().map(Attribute::name).toList(),
611+
result.schema().stream().map(a -> Type.asType(a.dataType().nameUpper())).toList(),
612+
result.schema().stream().map(Attribute::dataType).toList(),
613+
result.pages(),
614+
threadPool.getThreadContext().getResponseHeaders()
615+
)
611616
)
612617
)
613-
)
614-
);
618+
);
619+
}));
620+
615621
return listener.get();
616622
}
617623

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.optimizer;
9+
10+
import org.apache.lucene.util.SetOnce;
11+
import org.elasticsearch.action.ActionListener;
12+
import org.elasticsearch.test.ESTestCase;
13+
import org.elasticsearch.xpack.esql.EsqlTestUtils;
14+
import org.elasticsearch.xpack.esql.core.expression.Alias;
15+
import org.elasticsearch.xpack.esql.core.expression.Expression;
16+
import org.elasticsearch.xpack.esql.core.expression.FoldContext;
17+
import org.elasticsearch.xpack.esql.core.tree.Source;
18+
import org.elasticsearch.xpack.esql.expression.function.scalar.string.Concat;
19+
import org.elasticsearch.xpack.esql.expression.predicate.operator.arithmetic.Add;
20+
import org.elasticsearch.xpack.esql.plan.logical.Eval;
21+
import org.elasticsearch.xpack.esql.plan.logical.Filter;
22+
import org.elasticsearch.xpack.esql.plan.logical.Limit;
23+
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
24+
import org.elasticsearch.xpack.esql.plan.logical.Project;
25+
26+
import java.util.List;
27+
28+
import static org.elasticsearch.xpack.esql.EsqlTestUtils.as;
29+
import static org.elasticsearch.xpack.esql.EsqlTestUtils.fieldAttribute;
30+
import static org.elasticsearch.xpack.esql.EsqlTestUtils.of;
31+
import static org.hamcrest.Matchers.equalTo;
32+
import static org.hamcrest.Matchers.notNullValue;
33+
34+
public class LogicalPlanPreOptimizerTests extends ESTestCase {
35+
36+
public void testPlanIsMarkedAsPreOptimized() throws Exception {
37+
for (int round = 0; round < 100; round++) {
38+
// We want to make sure that the pre-optimizer woks for a wide range of plans
39+
preOptimizedPlan(randomPlan());
40+
}
41+
}
42+
43+
public void testPreOptimizeFailsIfPlanIsNotAnalyzed() throws Exception {
44+
LogicalPlan plan = EsqlTestUtils.relation();
45+
SetOnce<Exception> exceptionHolder = new SetOnce<>();
46+
47+
preOptimizer().preOptimize(plan, ActionListener.wrap(r -> fail("Should have failed"), exceptionHolder::set));
48+
assertBusy(() -> {
49+
assertThat(exceptionHolder.get(), notNullValue());
50+
IllegalStateException e = as(exceptionHolder.get(), IllegalStateException.class);
51+
assertThat(e.getMessage(), equalTo("Expected analyzed plan"));
52+
});
53+
}
54+
55+
public LogicalPlan preOptimizedPlan(LogicalPlan plan) throws Exception {
56+
// set plan as analyzed
57+
plan.setPreOptimized();
58+
59+
SetOnce<LogicalPlan> resultHolder = new SetOnce<>();
60+
SetOnce<Exception> exceptionHolder = new SetOnce<>();
61+
62+
preOptimizer().preOptimize(plan, ActionListener.wrap(resultHolder::set, exceptionHolder::set));
63+
64+
if (exceptionHolder.get() != null) {
65+
throw exceptionHolder.get();
66+
}
67+
68+
assertThat(resultHolder.get(), notNullValue());
69+
assertThat(resultHolder.get().preOptimized(), equalTo(true));
70+
71+
return resultHolder.get();
72+
}
73+
74+
private LogicalPlanPreOptimizer preOptimizer() {
75+
LogicalPreOptimizerContext preOptimizerContext = new LogicalPreOptimizerContext(FoldContext.small());
76+
return new LogicalPlanPreOptimizer(preOptimizerContext);
77+
}
78+
79+
private LogicalPlan randomPlan() {
80+
LogicalPlan plan = EsqlTestUtils.relation();
81+
int numCommands = between(0, 100);
82+
83+
for (int i = 0; i < numCommands; i++) {
84+
plan = switch (randomInt(3)) {
85+
case 0 -> new Eval(Source.EMPTY, plan, List.of(new Alias(Source.EMPTY, randomIdentifier(), randomExpression())));
86+
case 1 -> new Limit(Source.EMPTY, of(randomInt()), plan);
87+
case 2 -> new Filter(Source.EMPTY, plan, randomCondition());
88+
default -> new Project(Source.EMPTY, plan, List.of(new Alias(Source.EMPTY, randomIdentifier(), fieldAttribute())));
89+
};
90+
}
91+
return plan;
92+
}
93+
94+
private Expression randomExpression() {
95+
return switch (randomInt(3)) {
96+
case 0 -> of(randomInt());
97+
case 1 -> of(randomIdentifier());
98+
case 2 -> new Add(Source.EMPTY, of(randomInt()), of(randomDouble()));
99+
default -> new Concat(Source.EMPTY, of(randomIdentifier()), randomList(1, 10, () -> of(randomIdentifier())));
100+
};
101+
}
102+
103+
private Expression randomCondition() {
104+
if (randomBoolean()) {
105+
return EsqlTestUtils.equalsOf(randomExpression(), randomExpression());
106+
}
107+
108+
return EsqlTestUtils.greaterThanOf(randomExpression(), randomExpression());
109+
}
110+
}

0 commit comments

Comments
 (0)