Skip to content

Commit 56f9897

Browse files
committed
Make analyzer async.
1 parent 36466fa commit 56f9897

File tree

3 files changed

+59
-33
lines changed

3 files changed

+59
-33
lines changed

benchmarks/src/main/java/org/elasticsearch/benchmark/_nightly/esql/QueryPlanningBenchmark.java

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@
99

1010
package org.elasticsearch.benchmark._nightly.esql;
1111

12+
import org.elasticsearch.action.ActionListener;
13+
import org.elasticsearch.action.support.PlainActionFuture;
14+
import org.elasticsearch.action.support.SubscribableListener;
1215
import org.elasticsearch.common.logging.LogConfigurator;
1316
import org.elasticsearch.common.settings.Settings;
1417
import org.elasticsearch.index.IndexMode;
@@ -49,6 +52,7 @@
4952
import java.util.LinkedHashMap;
5053
import java.util.Locale;
5154
import java.util.Map;
55+
import java.util.concurrent.ExecutionException;
5256
import java.util.concurrent.TimeUnit;
5357

5458
import static java.util.Collections.emptyMap;
@@ -115,15 +119,22 @@ public void setup() {
115119
defaultOptimizer = new LogicalPlanOptimizer(new LogicalOptimizerContext(config, FoldContext.small()));
116120
}
117121

118-
private LogicalPlan plan(EsqlParser parser, Analyzer analyzer, LogicalPlanOptimizer optimizer, String query) {
122+
private void plan(
123+
EsqlParser parser,
124+
Analyzer analyzer,
125+
LogicalPlanOptimizer optimizer,
126+
String query,
127+
ActionListener<LogicalPlan> listener
128+
) {
119129
var parsed = parser.createStatement(query, new QueryParams(), telemetry);
120-
var analyzed = analyzer.analyze(parsed);
121-
var optimized = optimizer.optimize(analyzed);
122-
return optimized;
130+
SubscribableListener.<LogicalPlan>newForked(analyzedPlanListener -> analyzer.analyze(parsed, analyzedPlanListener))
131+
.addListener(listener.map(optimizer::optimize));
123132
}
124133

125134
@Benchmark
126-
public void manyFields(Blackhole blackhole) {
127-
blackhole.consume(plan(defaultParser, manyFieldsAnalyzer, defaultOptimizer, "FROM test | LIMIT 10"));
135+
public void manyFields(Blackhole blackhole) throws ExecutionException, InterruptedException {
136+
PlainActionFuture<LogicalPlan> optimizedPlanFuture = new PlainActionFuture<>();
137+
plan(defaultParser, manyFieldsAnalyzer, defaultOptimizer, "FROM test | LIMIT 10", optimizedPlanFuture);
138+
blackhole.consume(optimizedPlanFuture.get());
128139
}
129140
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
package org.elasticsearch.xpack.esql.analysis;
99

10+
import org.elasticsearch.action.ActionListener;
1011
import org.elasticsearch.common.logging.HeaderWarning;
1112
import org.elasticsearch.common.logging.LoggerMessageFormat;
1213
import org.elasticsearch.common.lucene.BytesRefs;
@@ -194,9 +195,11 @@ public Analyzer(AnalyzerContext context, Verifier verifier) {
194195
this.verifier = verifier;
195196
}
196197

197-
public LogicalPlan analyze(LogicalPlan plan) {
198-
BitSet partialMetrics = new BitSet(FeatureMetric.values().length);
199-
return verify(execute(plan), gatherPreAnalysisMetrics(plan, partialMetrics));
198+
public void analyze(LogicalPlan plan, ActionListener<LogicalPlan> listener) {
199+
execute(plan, listener.map(analzedPlan -> {
200+
BitSet partialMetrics = new BitSet(FeatureMetric.values().length);
201+
return this.verify(analzedPlan, gatherPreAnalysisMetrics(plan, partialMetrics));
202+
}));
200203
}
201204

202205
public LogicalPlan verify(LogicalPlan plan, BitSet partialMetrics) {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

Lines changed: 36 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@
108108
import java.util.List;
109109
import java.util.Map;
110110
import java.util.Set;
111-
import java.util.function.Function;
111+
import java.util.function.BiConsumer;
112112
import java.util.stream.Collectors;
113113

114114
import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
@@ -342,14 +342,22 @@ public void analyzedPlan(
342342
return;
343343
}
344344

345-
Function<PreAnalysisResult, LogicalPlan> analyzeAction = (l) -> {
345+
BiConsumer<PreAnalysisResult, ActionListener<LogicalPlan>> analyzeAction = (preAnalysisResult, listener) -> {
346346
Analyzer analyzer = new Analyzer(
347-
new AnalyzerContext(configuration, functionRegistry, l.indices, l.lookupIndices, l.enrichResolution, l.inferenceResolution),
347+
new AnalyzerContext(
348+
configuration,
349+
functionRegistry,
350+
preAnalysisResult.indices,
351+
preAnalysisResult.lookupIndices,
352+
preAnalysisResult.enrichResolution,
353+
preAnalysisResult.inferenceResolution
354+
),
348355
verifier
349356
);
350-
LogicalPlan plan = analyzer.analyze(parsed);
351-
plan.setAnalyzed();
352-
return plan;
357+
analyzer.analyze(parsed, listener.map(analyzedPlan -> {
358+
analyzedPlan.setAnalyzed();
359+
return analyzedPlan;
360+
}));
353361
};
354362
// Capture configured remotes list to ensure consistency throughout the session
355363
configuredClusters = Set.copyOf(indicesExpressionGrouper.getConfiguredClusters());
@@ -404,13 +412,14 @@ public void analyzedPlan(
404412
// do we want to declare it successful or skipped? For now, unavailability takes precedence.
405413
EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, result.indices.unavailableClusters());
406414
EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, result.indices, null);
407-
plan = analyzeAction.apply(result);
415+
analyzeAction.accept(result, l.delegateFailureAndWrap((analyzedPlanListener, analyzedPlan) -> {
416+
LOGGER.debug("Analyzed plan (second attempt, without filter):\n{}", analyzedPlan);
417+
analyzedPlanListener.onResponse(analyzedPlan);
418+
}));
408419
} catch (Exception e) {
409420
l.onFailure(e);
410421
return;
411422
}
412-
LOGGER.debug("Analyzed plan (second attempt, without filter):\n{}", plan);
413-
l.onResponse(plan);
414423
}).addListener(logicalPlanListener);
415424
}
416425

@@ -528,7 +537,7 @@ private boolean allCCSClustersSkipped(
528537
}
529538

530539
private static void analyzeAndMaybeRetry(
531-
Function<PreAnalysisResult, LogicalPlan> analyzeAction,
540+
BiConsumer<PreAnalysisResult, ActionListener<LogicalPlan>> analyzeAction,
532541
QueryBuilder requestFilter,
533542
PreAnalysisResult result,
534543
EsqlExecutionInfo executionInfo,
@@ -540,14 +549,7 @@ private static void analyzeAndMaybeRetry(
540549
var attemptMessage = requestFilter == null ? "the only" : "first";
541550
LOGGER.debug("Analyzing the plan ({} attempt, {} filter)", attemptMessage, filterPresentMessage);
542551

543-
try {
544-
if (result.indices.isValid() || requestFilter != null) {
545-
// We won't run this check with no filter and no valid indices since this may lead to false positive - missing index report
546-
// when the resolution result is not valid for a different reason.
547-
EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, result.indices, requestFilter);
548-
}
549-
plan = analyzeAction.apply(result);
550-
} catch (Exception e) {
552+
logicalPlanListener = logicalPlanListener.delegateResponse((errorListener, e) -> {
551553
if (e instanceof VerificationException ve) {
552554
LOGGER.debug(
553555
"Analyzing the plan ({} attempt, {} filter) failed with {}",
@@ -557,21 +559,31 @@ private static void analyzeAndMaybeRetry(
557559
);
558560
if (requestFilter == null) {
559561
// if the initial request didn't have a filter, then just pass the exception back to the user
560-
logicalPlanListener.onFailure(ve);
562+
errorListener.onFailure(ve);
561563
} else {
562564
// interested only in a VerificationException, but this time we are taking out the index filter
563565
// to try and make the index resolution work without any index filtering. In the next step... to be continued
564566
l.onResponse(result);
565567
}
566568
} else {
567569
// if the query failed with any other type of exception, then just pass the exception back to the user
568-
logicalPlanListener.onFailure(e);
570+
errorListener.onFailure(e);
569571
}
570-
return;
572+
});
573+
574+
try {
575+
if (result.indices.isValid() || requestFilter != null) {
576+
// We won't run this check with no filter and no valid indices since this may lead to false positive - missing index report
577+
// when the resolution result is not valid for a different reason.
578+
EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, result.indices, requestFilter);
579+
}
580+
analyzeAction.accept(result, ActionListener.runBefore(logicalPlanListener, () -> {
581+
LOGGER.debug("Analyzed plan ({} attempt, {} filter):\n{}", attemptMessage, filterPresentMessage, plan);
582+
// the analysis succeeded from the first attempt, irrespective if it had a filter or not, just continue with the planning
583+
}));
584+
} catch (Exception e) {
585+
l.onFailure(e);
571586
}
572-
LOGGER.debug("Analyzed plan ({} attempt, {} filter):\n{}", attemptMessage, filterPresentMessage, plan);
573-
// the analysis succeeded from the first attempt, irrespective if it had a filter or not, just continue with the planning
574-
logicalPlanListener.onResponse(plan);
575587
}
576588

577589
private static void resolveFieldNames(LogicalPlan parsed, EnrichResolution enrichResolution, ActionListener<PreAnalysisResult> l) {

0 commit comments

Comments
 (0)