Skip to content

Commit 5851dc3

Browse files
committed
Make LogicalPlanOptimizer::optimize async.
1 parent a7794b7 commit 5851dc3

File tree

14 files changed

+150
-101
lines changed

14 files changed

+150
-101
lines changed

benchmarks/src/main/java/org/elasticsearch/benchmark/_nightly/esql/QueryPlanningBenchmark.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,8 @@ private void plan(
128128
) {
129129
var parsed = parser.createStatement(query, new QueryParams(), telemetry);
130130
SubscribableListener.<LogicalPlan>newForked(analyzedPlanListener -> analyzer.analyze(parsed, analyzedPlanListener))
131-
.addListener(listener.map(optimizer::optimize));
131+
.<LogicalPlan>andThen((optimizedPlanListener, analyzedPlan) -> optimizer.optimize(analyzedPlan, optimizedPlanListener))
132+
.addListener(listener);
132133
}
133134

134135
@Benchmark

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
package org.elasticsearch.xpack.esql.optimizer;
99

10+
import org.elasticsearch.action.ActionListener;
1011
import org.elasticsearch.xpack.esql.VerificationException;
1112
import org.elasticsearch.xpack.esql.common.Failures;
1213
import org.elasticsearch.xpack.esql.core.type.DataType;
@@ -109,15 +110,16 @@ public LogicalPlanOptimizer(LogicalOptimizerContext optimizerContext) {
109110
super(optimizerContext);
110111
}
111112

112-
public LogicalPlan optimize(LogicalPlan verified) {
113-
var optimized = execute(verified);
114-
115-
Failures failures = verifier.verify(optimized);
116-
if (failures.hasFailures()) {
117-
throw new VerificationException(failures);
118-
}
119-
optimized.setOptimized();
120-
return optimized;
113+
public void optimize(LogicalPlan verified, ActionListener<LogicalPlan> listener) {
114+
execute(verified, listener.delegateFailureAndWrap((l, optimized) -> {
115+
Failures failures = verifier.verify(optimized);
116+
if (failures.hasFailures()) {
117+
l.onFailure(new VerificationException(failures));
118+
return;
119+
}
120+
optimized.setOptimized();
121+
l.onResponse(optimized);
122+
}));
121123
}
122124

123125
@Override

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -198,10 +198,10 @@ public void execute(EsqlQueryRequest request, EsqlExecutionInfo executionInfo, P
198198
analyzedPlan(parsed, executionInfo, request.filter(), new EsqlCCSUtils.CssPartialErrorsActionListener(executionInfo, listener) {
199199
@Override
200200
public void onResponse(LogicalPlan analyzedPlan) {
201-
preMapper.preMapper(
202-
analyzedPlan,
203-
listener.delegateFailureAndWrap((l, p) -> executeOptimizedPlan(request, executionInfo, planRunner, optimizedPlan(p), l))
204-
);
201+
SubscribableListener.<LogicalPlan>newForked(l -> preMapper.preMapper(analyzedPlan, l))
202+
.<LogicalPlan>andThen((l, p) -> optimizedPlan(p, l))
203+
.<Result>andThen((l, p) -> executeOptimizedPlan(request, executionInfo, planRunner, p, l))
204+
.addListener(listener);
205205
}
206206
});
207207
}
@@ -827,13 +827,15 @@ private PhysicalPlan logicalPlanToPhysicalPlan(LogicalPlan optimizedPlan, EsqlQu
827827
return EstimatesRowSize.estimateRowSize(0, physicalPlan);
828828
}
829829

830-
public LogicalPlan optimizedPlan(LogicalPlan logicalPlan) {
830+
public void optimizedPlan(LogicalPlan logicalPlan, ActionListener<LogicalPlan> listener) {
831831
if (logicalPlan.analyzed() == false) {
832-
throw new IllegalStateException("Expected analyzed plan");
832+
listener.onFailure(new IllegalStateException("Expected analyzed plan"));
833+
return;
833834
}
834-
var plan = logicalPlanOptimizer.optimize(logicalPlan);
835-
LOGGER.debug("Optimized logicalPlan plan:\n{}", plan);
836-
return plan;
835+
logicalPlanOptimizer.optimize(logicalPlan, listener.map(optimizedPlan -> {
836+
LOGGER.debug("Optimized logicalPlan plan:\n{}", optimizedPlan);
837+
return optimizedPlan;
838+
}));
837839
}
838840

839841
public PhysicalPlan physicalPlan(LogicalPlan optimizedPlan) {

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java

Lines changed: 28 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -502,19 +502,18 @@ private static EnrichPolicy loadEnrichPolicyMapping(String policyFileName) {
502502
}
503503
}
504504

505-
private LogicalPlan analyzedPlan(LogicalPlan parsed, CsvTestsDataLoader.MultiIndexTestDataset datasets) {
505+
private void analyzedPlan(LogicalPlan parsed, CsvTestsDataLoader.MultiIndexTestDataset datasets, ActionListener<LogicalPlan> listener) {
506506
var indexResolution = loadIndexResolution(datasets);
507507
var enrichPolicies = loadEnrichPolicies();
508508
var analyzer = new Analyzer(
509509
new AnalyzerContext(configuration, functionRegistry, indexResolution, enrichPolicies, emptyInferenceResolution()),
510510
TEST_VERIFIER
511511
);
512-
PlainActionFuture<LogicalPlan> analyzedPlanFuture = new PlainActionFuture<>();
513-
analyzer.analyze(parsed, analyzedPlanFuture);
514-
LogicalPlan plan = analyzedPlanFuture.actionGet();
515-
plan.setAnalyzed();
516-
LOGGER.debug("Analyzed plan:\n{}", plan);
517-
return plan;
512+
analyzer.analyze(parsed, listener.map(analyzedPlan -> {
513+
analyzedPlan.setAnalyzed();
514+
LOGGER.debug("Analyzed plan:\n{}", analyzedPlan);
515+
return analyzedPlan;
516+
}));
518517
}
519518

520519
private static CsvTestsDataLoader.MultiIndexTestDataset testDatasets(LogicalPlan parsed) {
@@ -570,7 +569,6 @@ private static TestPhysicalOperationProviders testOperationProviders(
570569
private ActualResults executePlan(BigArrays bigArrays) throws Exception {
571570
LogicalPlan parsed = parser.createStatement(testCase.query);
572571
var testDatasets = testDatasets(parsed);
573-
LogicalPlan analyzed = analyzedPlan(parsed, testDatasets);
574572

575573
FoldContext foldCtx = FoldContext.small();
576574
EsqlSession session = new EsqlSession(
@@ -591,24 +589,29 @@ private ActualResults executePlan(BigArrays bigArrays) throws Exception {
591589

592590
PlainActionFuture<ActualResults> listener = new PlainActionFuture<>();
593591

594-
session.executeOptimizedPlan(
595-
new EsqlQueryRequest(),
596-
new EsqlExecutionInfo(randomBoolean()),
597-
planRunner(bigArrays, foldCtx, physicalOperationProviders),
598-
session.optimizedPlan(analyzed),
599-
listener.delegateFailureAndWrap(
600-
// Wrap so we can capture the warnings in the calling thread
601-
(next, result) -> next.onResponse(
602-
new ActualResults(
603-
result.schema().stream().map(Attribute::name).toList(),
604-
result.schema().stream().map(a -> Type.asType(a.dataType().nameUpper())).toList(),
605-
result.schema().stream().map(Attribute::dataType).toList(),
606-
result.pages(),
607-
threadPool.getThreadContext().getResponseHeaders()
592+
analyzedPlan(parsed, testDatasets, listener.delegateFailureAndWrap((analyzedPlanListener, analyzedPlan) -> {
593+
session.optimizedPlan(analyzedPlan, listener.delegateFailureAndWrap((optimizedPlanListener, optimizedPlan) -> {
594+
session.executeOptimizedPlan(
595+
new EsqlQueryRequest(),
596+
new EsqlExecutionInfo(randomBoolean()),
597+
planRunner(bigArrays, foldCtx, physicalOperationProviders),
598+
optimizedPlan,
599+
listener.delegateFailureAndWrap(
600+
// Wrap so we can capture the warnings in the calling thread
601+
(next, result) -> next.onResponse(
602+
new ActualResults(
603+
result.schema().stream().map(Attribute::name).toList(),
604+
result.schema().stream().map(a -> Type.asType(a.dataType().nameUpper())).toList(),
605+
result.schema().stream().map(Attribute::dataType).toList(),
606+
result.pages(),
607+
threadPool.getThreadContext().getResponseHeaders()
608+
)
609+
)
608610
)
609-
)
610-
)
611-
);
611+
);
612+
}));
613+
}));
614+
612615
return listener.get();
613616
}
614617

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/AbstractLogicalPlanOptimizerTests.java

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
package org.elasticsearch.xpack.esql.optimizer;
99

10+
import org.elasticsearch.action.support.PlainActionFuture;
1011
import org.elasticsearch.index.IndexMode;
1112
import org.elasticsearch.test.ESTestCase;
1213
import org.elasticsearch.xpack.esql.EsqlTestUtils;
@@ -181,35 +182,33 @@ protected LogicalPlan plan(String query) {
181182
}
182183

183184
protected LogicalPlan plan(String query, LogicalPlanOptimizer optimizer) {
184-
var analyzed = analyze(analyzer, parser.createStatement(query));
185-
// System.out.println(analyzed);
186-
var optimized = optimizer.optimize(analyzed);
187-
// System.out.println(optimized);
188-
return optimized;
185+
return optimizedPlan(analyze(analyzer, parser.createStatement(query)), optimizer);
189186
}
190187

191188
protected LogicalPlan planAirports(String query) {
192-
var analyzed = analyze(analyzerAirports, parser.createStatement(query));
193-
// System.out.println(analyzed);
194-
var optimized = logicalOptimizer.optimize(analyzed);
195-
// System.out.println(optimized);
196-
return optimized;
189+
return optimizedPlan(analyze(analyzerAirports, parser.createStatement(query)));
197190
}
198191

199192
protected LogicalPlan planExtra(String query) {
200-
var analyzed = analyze(analyzerExtra, parser.createStatement(query));
201-
// System.out.println(analyzed);
202-
var optimized = logicalOptimizer.optimize(analyzed);
203-
// System.out.println(optimized);
204-
return optimized;
193+
return optimizedPlan(analyze(analyzerExtra, parser.createStatement(query)));
205194
}
206195

207196
protected LogicalPlan planTypes(String query) {
208-
return logicalOptimizer.optimize(analyze(analyzerTypes, parser.createStatement(query)));
197+
return optimizedPlan(analyze(analyzerTypes, parser.createStatement(query)));
209198
}
210199

211200
protected LogicalPlan planMultiIndex(String query) {
212-
return logicalOptimizer.optimize(analyze(multiIndexAnalyzer, parser.createStatement(query)));
201+
return optimizedPlan(analyze(multiIndexAnalyzer, parser.createStatement(query)));
202+
}
203+
204+
protected LogicalPlan optimizedPlan(LogicalPlan logicalPlan) {
205+
return optimizedPlan(logicalPlan, logicalOptimizer);
206+
}
207+
208+
protected LogicalPlan optimizedPlan(LogicalPlan logicalPlan, LogicalPlanOptimizer optimizer) {
209+
PlainActionFuture<LogicalPlan> optimizedPlanFuture = new PlainActionFuture<>();
210+
optimizer.optimize(logicalPlan, optimizedPlanFuture);
211+
return optimizedPlanFuture.actionGet();
213212
}
214213

215214
@Override

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizerTests.java

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
package org.elasticsearch.xpack.esql.optimizer;
99

1010
import org.apache.lucene.util.BytesRef;
11+
import org.elasticsearch.action.support.PlainActionFuture;
1112
import org.elasticsearch.common.io.stream.StreamOutput;
1213
import org.elasticsearch.common.util.Maps;
1314
import org.elasticsearch.index.IndexMode;
@@ -251,7 +252,7 @@ public void testMissingFieldInSort() {
251252
* EsqlProject[[first_name{f}#7, last_name{r}#17]]
252253
* \_Limit[1000[INTEGER],true]
253254
* \_MvExpand[last_name{f}#10,last_name{r}#17]
254-
* \_Project[[_meta_field{f}#12, emp_no{f}#6, first_name{f}#7, gender{f}#8, hire_date{f}#13, job{f}#14, job.raw{f}#15, lang
255+
* \_Project[[_meta_field{f}#12, emp_no{f}#6, first_name{f}#7, gender{f}#8, hire_date{f}#13, job{f}#14, job.raw{f}#15, langu
255256
* uages{f}#9, last_name{r}#10, long_noidx{f}#16, salary{f}#11]]
256257
* \_Eval[[null[KEYWORD] AS last_name]]
257258
* \_Limit[1000[INTEGER],false]
@@ -510,7 +511,7 @@ public void testSparseDocument() throws Exception {
510511
);
511512

512513
var analyzed = analyze(analyzer, parser.createStatement(query));
513-
var optimized = logicalOptimizer.optimize(analyzed);
514+
var optimized = optimizedPlan(analyzed);
514515
var localContext = new LocalLogicalOptimizerContext(EsqlTestUtils.TEST_CFG, FoldContext.small(), searchStats);
515516
var plan = new LocalLogicalPlanOptimizer(localContext).localOptimize(optimized);
516517

@@ -786,17 +787,24 @@ private LocalRelation asEmptyRelation(Object o) {
786787
}
787788

788789
private LogicalPlan plan(String query, Analyzer analyzer) {
789-
var analyzed = analyze(analyzer, parser.createStatement(query));
790-
// System.out.println(analyzed);
791-
var optimized = logicalOptimizer.optimize(analyzed);
792-
// System.out.println(optimized);
793-
return optimized;
790+
PlainActionFuture<LogicalPlan> optimizedPlanFuture = new PlainActionFuture<>();
791+
analyzer.analyze(
792+
parser.createStatement(query),
793+
optimizedPlanFuture.delegateFailureAndWrap((l, analyzedPlan) -> logicalOptimizer.optimize(analyzedPlan, l))
794+
);
795+
return optimizedPlanFuture.actionGet();
794796
}
795797

796798
private LogicalPlan plan(String query) {
797799
return plan(query, analyzer);
798800
}
799801

802+
private LogicalPlan optimizedPlan(LogicalPlan logicalPlan) {
803+
PlainActionFuture<LogicalPlan> optimizedPlanFuture = new PlainActionFuture<>();
804+
logicalOptimizer.optimize(logicalPlan, optimizedPlanFuture);
805+
return optimizedPlanFuture.actionGet();
806+
}
807+
800808
private LogicalPlan localPlan(LogicalPlan plan, SearchStats searchStats) {
801809
var localContext = new LocalLogicalOptimizerContext(EsqlTestUtils.TEST_CFG, FoldContext.small(), searchStats);
802810
// System.out.println(plan);

0 commit comments

Comments
 (0)