Skip to content

Commit a87871e

Browse files
committed
Make Analyzer::execute an async function.
1 parent 9058064 commit a87871e

File tree

19 files changed

+135
-95
lines changed

19 files changed

+135
-95
lines changed

benchmarks/src/main/java/org/elasticsearch/benchmark/_nightly/esql/QueryPlanningBenchmark.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
package org.elasticsearch.benchmark._nightly.esql;
1111

12+
import org.elasticsearch.action.ActionListener;
1213
import org.elasticsearch.common.logging.LogConfigurator;
1314
import org.elasticsearch.common.settings.Settings;
1415
import org.elasticsearch.index.IndexMode;
@@ -115,15 +116,18 @@ public void setup() {
115116
defaultOptimizer = new LogicalPlanOptimizer(new LogicalOptimizerContext(config, FoldContext.small()));
116117
}
117118

118-
private LogicalPlan plan(EsqlParser parser, Analyzer analyzer, LogicalPlanOptimizer optimizer, String query) {
119+
private void plan(EsqlParser parser, Analyzer analyzer, LogicalPlanOptimizer optimizer, String query, ActionListener<LogicalPlan> listener) {
119120
var parsed = parser.createStatement(query, new QueryParams(), telemetry);
120-
var analyzed = analyzer.analyze(parsed);
121-
var optimized = optimizer.optimize(analyzed);
122-
return optimized;
121+
analyzer.analyze(parsed, listener.map(optimizer::optimize));
123122
}
124123

125124
@Benchmark
126125
public void manyFields(Blackhole blackhole) {
127-
blackhole.consume(plan(defaultParser, manyFieldsAnalyzer, defaultOptimizer, "FROM test | LIMIT 10"));
126+
plan(defaultParser, manyFieldsAnalyzer, defaultOptimizer, "FROM test | LIMIT 10", ActionListener.wrap(
127+
p -> blackhole.consume(p),
128+
e -> {
129+
throw new RuntimeException("Unexpected exception", e);
130+
}
131+
));
128132
}
129133
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
package org.elasticsearch.xpack.esql.analysis;
99

10+
import org.elasticsearch.action.ActionListener;
1011
import org.elasticsearch.common.logging.HeaderWarning;
1112
import org.elasticsearch.common.logging.LoggerMessageFormat;
1213
import org.elasticsearch.common.lucene.BytesRefs;
@@ -194,9 +195,11 @@ public Analyzer(AnalyzerContext context, Verifier verifier) {
194195
this.verifier = verifier;
195196
}
196197

197-
public LogicalPlan analyze(LogicalPlan plan) {
198-
BitSet partialMetrics = new BitSet(FeatureMetric.values().length);
199-
return verify(execute(plan), gatherPreAnalysisMetrics(plan, partialMetrics));
198+
public void analyze(LogicalPlan plan, ActionListener<LogicalPlan> listener) {
199+
execute(plan, listener.delegateFailureAndWrap((l, p) -> {
200+
BitSet partialMetrics = new BitSet(FeatureMetric.values().length);
201+
l.onResponse(verify(p, gatherPreAnalysisMetrics(p, partialMetrics)));
202+
}));
200203
}
201204

202205
public LogicalPlan verify(LogicalPlan plan, BitSet partialMetrics) {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

Lines changed: 46 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@
108108
import java.util.List;
109109
import java.util.Map;
110110
import java.util.Set;
111-
import java.util.function.Function;
111+
import java.util.function.BiConsumer;
112112
import java.util.stream.Collectors;
113113

114114
import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
@@ -342,14 +342,22 @@ public void analyzedPlan(
342342
return;
343343
}
344344

345-
Function<PreAnalysisResult, LogicalPlan> analyzeAction = (l) -> {
345+
BiConsumer<PreAnalysisResult, ActionListener<LogicalPlan>> analyzeAction = (preanalysisResult, l) -> {
346346
Analyzer analyzer = new Analyzer(
347-
new AnalyzerContext(configuration, functionRegistry, l.indices, l.lookupIndices, l.enrichResolution, l.inferenceResolution),
347+
new AnalyzerContext(
348+
configuration,
349+
functionRegistry,
350+
preanalysisResult.indices,
351+
preanalysisResult.lookupIndices,
352+
preanalysisResult.enrichResolution,
353+
preanalysisResult.inferenceResolution
354+
),
348355
verifier
349356
);
350-
LogicalPlan plan = analyzer.analyze(parsed);
351-
plan.setAnalyzed();
352-
return plan;
357+
analyzer.analyze(parsed, l.delegateFailureAndWrap((analyzedPlanListener, plan) -> {
358+
plan.setAnalyzed();
359+
analyzedPlanListener.onResponse(plan);
360+
}));
353361
};
354362
// Capture configured remotes list to ensure consistency throughout the session
355363
configuredClusters = Set.copyOf(indicesExpressionGrouper.getConfiguredClusters());
@@ -398,19 +406,19 @@ public void analyzedPlan(
398406
}).<LogicalPlan>andThen((l, result) -> {
399407
assert requestFilter != null : "The second analysis shouldn't take place when there is no index filter in the request";
400408
LOGGER.debug("Analyzing the plan (second attempt, without filter)");
401-
LogicalPlan plan;
402409
try {
403410
// the order here is tricky - if the cluster has been filtered and later became unavailable,
404411
// do we want to declare it successful or skipped? For now, unavailability takes precedence.
405412
EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, result.indices.unavailableClusters());
406413
EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, result.indices, null);
407-
plan = analyzeAction.apply(result);
414+
analyzeAction.accept(result, l.delegateFailureAndWrap((analyzedPlanListener, plan) -> {
415+
LOGGER.debug("Analyzed plan (second attempt, without filter):\n{}", plan);
416+
analyzedPlanListener.onResponse(plan);
417+
}));
418+
408419
} catch (Exception e) {
409420
l.onFailure(e);
410-
return;
411421
}
412-
LOGGER.debug("Analyzed plan (second attempt, without filter):\n{}", plan);
413-
l.onResponse(plan);
414422
}).addListener(logicalPlanListener);
415423
}
416424

@@ -528,14 +536,13 @@ private boolean allCCSClustersSkipped(
528536
}
529537

530538
private static void analyzeAndMaybeRetry(
531-
Function<PreAnalysisResult, LogicalPlan> analyzeAction,
539+
BiConsumer<PreAnalysisResult, ActionListener<LogicalPlan>> analyzeAction,
532540
QueryBuilder requestFilter,
533541
PreAnalysisResult result,
534542
EsqlExecutionInfo executionInfo,
535543
ActionListener<LogicalPlan> logicalPlanListener,
536544
ActionListener<PreAnalysisResult> l
537545
) {
538-
LogicalPlan plan = null;
539546
var filterPresentMessage = requestFilter == null ? "without" : "with";
540547
var attemptMessage = requestFilter == null ? "the only" : "first";
541548
LOGGER.debug("Analyzing the plan ({} attempt, {} filter)", attemptMessage, filterPresentMessage);
@@ -546,32 +553,35 @@ private static void analyzeAndMaybeRetry(
546553
// when the resolution result is not valid for a different reason.
547554
EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, result.indices, requestFilter);
548555
}
549-
plan = analyzeAction.apply(result);
550-
} catch (Exception e) {
551-
if (e instanceof VerificationException ve) {
552-
LOGGER.debug(
553-
"Analyzing the plan ({} attempt, {} filter) failed with {}",
554-
attemptMessage,
555-
filterPresentMessage,
556-
ve.getDetailedMessage()
557-
);
558-
if (requestFilter == null) {
559-
// if the initial request didn't have a filter, then just pass the exception back to the user
560-
logicalPlanListener.onFailure(ve);
556+
analyzeAction.accept(result, ActionListener.wrap(plan -> {
557+
LOGGER.debug("Analyzed plan ({} attempt, {} filter):\n{}", attemptMessage, filterPresentMessage, plan);
558+
// the analysis succeeded from the first attempt, irrespective if it had a filter or not, just continue with the planning
559+
logicalPlanListener.onResponse(plan);
560+
}, e -> {
561+
if (e instanceof VerificationException ve) {
562+
LOGGER.debug(
563+
"Analyzing the plan ({} attempt, {} filter) failed with {}",
564+
attemptMessage,
565+
filterPresentMessage,
566+
ve.getDetailedMessage()
567+
);
568+
if (requestFilter == null) {
569+
// if the initial request didn't have a filter, then just pass the exception back to the user
570+
logicalPlanListener.onFailure(ve);
571+
} else {
572+
// interested only in a VerificationException, but this time we are taking out the index filter
573+
// to try and make the index resolution work without any index filtering. In the next step... to be continued
574+
l.onResponse(result);
575+
}
561576
} else {
562-
// interested only in a VerificationException, but this time we are taking out the index filter
563-
// to try and make the index resolution work without any index filtering. In the next step... to be continued
564-
l.onResponse(result);
577+
// if the query failed with any other type of exception, then just pass the exception back to the user
578+
logicalPlanListener.onFailure(e);
565579
}
566-
} else {
567-
// if the query failed with any other type of exception, then just pass the exception back to the user
568-
logicalPlanListener.onFailure(e);
569-
}
570-
return;
580+
}));
581+
} catch (Exception e) {
582+
// if failing with any other type of exception, then just pass the exception back to the user
583+
logicalPlanListener.onFailure(e);
571584
}
572-
LOGGER.debug("Analyzed plan ({} attempt, {} filter):\n{}", attemptMessage, filterPresentMessage, plan);
573-
// the analysis succeeded from the first attempt, irrespective if it had a filter or not, just continue with the planning
574-
logicalPlanListener.onResponse(plan);
575585
}
576586

577587
private static void resolveFieldNames(LogicalPlan parsed, EnrichResolution enrichResolution, ActionListener<PreAnalysisResult> l) {

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.Build;
1313
import org.elasticsearch.Version;
1414
import org.elasticsearch.action.ActionListener;
15+
import org.elasticsearch.action.support.ListenableActionFuture;
1516
import org.elasticsearch.action.support.PlainActionFuture;
1617
import org.elasticsearch.common.Randomness;
1718
import org.elasticsearch.common.breaker.CircuitBreaker;
@@ -509,7 +510,9 @@ private LogicalPlan analyzedPlan(LogicalPlan parsed, CsvTestsDataLoader.MultiInd
509510
new AnalyzerContext(configuration, functionRegistry, indexResolution, enrichPolicies, emptyInferenceResolution()),
510511
TEST_VERIFIER
511512
);
512-
LogicalPlan plan = analyzer.analyze(parsed);
513+
ListenableActionFuture<LogicalPlan> planFuture = new ListenableActionFuture<>();
514+
analyzer.analyze(parsed, planFuture);
515+
LogicalPlan plan = planFuture.actionResult();
513516
plan.setAnalyzed();
514517
LOGGER.debug("Analyzed plan:\n{}", plan);
515518
return plan;

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTestUtils.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
package org.elasticsearch.xpack.esql.analysis;
99

10+
import org.elasticsearch.action.support.ListenableActionFuture;
1011
import org.elasticsearch.index.IndexMode;
1112
import org.elasticsearch.inference.TaskType;
1213
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
@@ -130,16 +131,19 @@ public static LogicalPlan analyze(String query, String index, String mapping) {
130131

131132
public static LogicalPlan analyze(String query, Analyzer analyzer) {
132133
var plan = new EsqlParser().createStatement(query);
133-
// System.out.println(plan);
134-
var analyzed = analyzer.analyze(plan);
135-
// System.out.println(analyzed);
136-
return analyzed;
134+
return analyze(analyzer, plan);
137135
}
138136

139137
public static LogicalPlan analyze(String query, String mapping, QueryParams params) {
140138
var plan = new EsqlParser().createStatement(query, params);
141139
var analyzer = analyzer(loadMapping(mapping, "test"), TEST_VERIFIER, configuration(query));
142-
return analyzer.analyze(plan);
140+
return analyze(analyzer, plan);
141+
}
142+
143+
public static LogicalPlan analyze(Analyzer analyzer, LogicalPlan plan) {
144+
ListenableActionFuture<LogicalPlan> planFuture = new ListenableActionFuture<>();
145+
analyzer.analyze(plan, planFuture);
146+
return planFuture.actionResult();
143147
}
144148

145149
public static IndexResolution loadMapping(String resource, String indexName, IndexMode indexMode) {

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTests.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ public class AnalyzerTests extends ESTestCase {
159159
public void testIndexResolution() {
160160
EsIndex idx = new EsIndex("idx", Map.of());
161161
Analyzer analyzer = analyzer(IndexResolution.valid(idx));
162-
var plan = analyzer.analyze(UNRESOLVED_RELATION);
162+
var plan = analyze(analyzer, UNRESOLVED_RELATION);
163163
var limit = as(plan, Limit.class);
164164

165165
assertEquals(new EsRelation(EMPTY, idx.name(), IndexMode.STANDARD, idx.indexNameWithModes(), NO_FIELDS), limit.child());
@@ -168,7 +168,7 @@ public void testIndexResolution() {
168168
public void testFailOnUnresolvedIndex() {
169169
Analyzer analyzer = analyzer(IndexResolution.invalid("Unknown index [idx]"));
170170

171-
VerificationException e = expectThrows(VerificationException.class, () -> analyzer.analyze(UNRESOLVED_RELATION));
171+
VerificationException e = expectThrows(VerificationException.class, () -> analyze(analyzer, UNRESOLVED_RELATION));
172172

173173
assertThat(e.getMessage(), containsString("Unknown index [idx]"));
174174
}
@@ -177,7 +177,7 @@ public void testIndexWithClusterResolution() {
177177
EsIndex idx = new EsIndex("cluster:idx", Map.of());
178178
Analyzer analyzer = analyzer(IndexResolution.valid(idx));
179179

180-
var plan = analyzer.analyze(UNRESOLVED_RELATION);
180+
var plan = analyze(analyzer, UNRESOLVED_RELATION);
181181
var limit = as(plan, Limit.class);
182182

183183
assertEquals(new EsRelation(EMPTY, idx.name(), IndexMode.STANDARD, idx.indexNameWithModes(), NO_FIELDS), limit.child());
@@ -187,7 +187,8 @@ public void testAttributeResolution() {
187187
EsIndex idx = new EsIndex("idx", LoadMapping.loadMapping("mapping-one-field.json"));
188188
Analyzer analyzer = analyzer(IndexResolution.valid(idx));
189189

190-
var plan = analyzer.analyze(
190+
var plan = analyze(
191+
analyzer,
191192
new Eval(EMPTY, UNRESOLVED_RELATION, List.of(new Alias(EMPTY, "e", new UnresolvedAttribute(EMPTY, "emp_no"))))
192193
);
193194

@@ -208,7 +209,8 @@ public void testAttributeResolution() {
208209
public void testAttributeResolutionOfChainedReferences() {
209210
Analyzer analyzer = analyzer(loadMapping("mapping-one-field.json", "idx"));
210211

211-
var plan = analyzer.analyze(
212+
var plan = analyze(
213+
analyzer,
212214
new Eval(
213215
EMPTY,
214216
new Eval(EMPTY, UNRESOLVED_RELATION, List.of(new Alias(EMPTY, "e", new UnresolvedAttribute(EMPTY, "emp_no")))),
@@ -240,7 +242,8 @@ public void testRowAttributeResolution() {
240242
EsIndex idx = new EsIndex("idx", Map.of());
241243
Analyzer analyzer = analyzer(IndexResolution.valid(idx));
242244

243-
var plan = analyzer.analyze(
245+
var plan = analyze(
246+
analyzer,
244247
new Eval(
245248
EMPTY,
246249
new Row(EMPTY, List.of(new Alias(EMPTY, "emp_no", new Literal(EMPTY, 1, DataType.INTEGER)))),
@@ -271,7 +274,8 @@ public void testUnresolvableAttribute() {
271274

272275
VerificationException ve = expectThrows(
273276
VerificationException.class,
274-
() -> analyzer.analyze(
277+
() -> analyze(
278+
analyzer,
275279
new Eval(EMPTY, UNRESOLVED_RELATION, List.of(new Alias(EMPTY, "e", new UnresolvedAttribute(EMPTY, "emp_nos"))))
276280
)
277281
);

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/ParsingTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import static org.elasticsearch.xpack.esql.EsqlTestUtils.as;
3737
import static org.elasticsearch.xpack.esql.EsqlTestUtils.emptyInferenceResolution;
3838
import static org.elasticsearch.xpack.esql.EsqlTestUtils.emptyPolicyResolution;
39+
import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.analyze;
3940
import static org.hamcrest.Matchers.equalTo;
4041
import static org.hamcrest.Matchers.hasSize;
4142

@@ -182,7 +183,7 @@ public void testInvalidSample() {
182183
}
183184

184185
private String error(String query) {
185-
ParsingException e = expectThrows(ParsingException.class, () -> defaultAnalyzer.analyze(parser.createStatement(query)));
186+
ParsingException e = expectThrows(ParsingException.class, () -> analyze(defaultAnalyzer, (parser.createStatement(query))));
186187
String message = e.getMessage();
187188
assertTrue(message.startsWith("line "));
188189
return message.substring("line ".length());

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/VerifierTests.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939

4040
import static org.elasticsearch.xpack.esql.EsqlTestUtils.paramAsConstant;
4141
import static org.elasticsearch.xpack.esql.EsqlTestUtils.withDefaultLimitWarning;
42+
import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.analyze;
4243
import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.loadMapping;
4344
import static org.elasticsearch.xpack.esql.core.type.DataType.BOOLEAN;
4445
import static org.elasticsearch.xpack.esql.core.type.DataType.CARTESIAN_POINT;
@@ -2241,7 +2242,7 @@ private void query(String query) {
22412242
}
22422243

22432244
private void query(String query, Analyzer analyzer) {
2244-
analyzer.analyze(parser.createStatement(query));
2245+
analyze(analyzer, parser.createStatement(query));
22452246
}
22462247

22472248
private String error(String query) {
@@ -2272,7 +2273,7 @@ private String error(String query, Analyzer analyzer, Class<? extends Exception>
22722273
Throwable e = expectThrows(
22732274
exception,
22742275
"Expected error for query [" + query + "] but no error was raised",
2275-
() -> analyzer.analyze(parser.createStatement(query, new QueryParams(parameters)))
2276+
() -> analyze(analyzer, parser.createStatement(query, new QueryParams(parameters)))
22762277
);
22772278
assertThat(e, instanceOf(exception));
22782279

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/CheckLicenseTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.elasticsearch.xpack.esql.VerificationException;
2020
import org.elasticsearch.xpack.esql.analysis.Analyzer;
2121
import org.elasticsearch.xpack.esql.analysis.AnalyzerContext;
22+
import org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils;
2223
import org.elasticsearch.xpack.esql.analysis.Verifier;
2324
import org.elasticsearch.xpack.esql.core.expression.Expression;
2425
import org.elasticsearch.xpack.esql.core.expression.FoldContext;
@@ -86,7 +87,7 @@ public EsqlFunctionRegistry snapshotRegistry() {
8687
? new LicensedLimit(l.source(), l.limit(), l.child(), functionLicenseFeature)
8788
: l
8889
);
89-
return analyzer(registry, operationMode).analyze(plan);
90+
return AnalyzerTestUtils.analyze(analyzer(registry, operationMode), plan);
9091
}
9192

9293
private static Analyzer analyzer(EsqlFunctionRegistry registry, License.OperationMode operationMode) {

0 commit comments

Comments
 (0)