diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/execution/PlanExecutor.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/execution/PlanExecutor.java index 620d86650abf4..414e1f372ea3f 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/execution/PlanExecutor.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/execution/PlanExecutor.java @@ -21,6 +21,8 @@ import org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry; import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext; import org.elasticsearch.xpack.esql.optimizer.LogicalPlanOptimizer; +import org.elasticsearch.xpack.esql.optimizer.LogicalPlanPreOptimizer; +import org.elasticsearch.xpack.esql.optimizer.LogicalPreOptimizerContext; import org.elasticsearch.xpack.esql.planner.mapper.Mapper; import org.elasticsearch.xpack.esql.plugin.TransportActionServices; import org.elasticsearch.xpack.esql.querylog.EsqlQueryLog; @@ -85,6 +87,7 @@ public void esql( indexResolver, enrichPolicyResolver, preAnalyzer, + new LogicalPlanPreOptimizer(new LogicalPreOptimizerContext(foldContext)), functionRegistry, new LogicalPlanOptimizer(new LogicalOptimizerContext(cfg, foldContext)), mapper, diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanPreOptimizer.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanPreOptimizer.java new file mode 100644 index 0000000000000..fdd8e1318f636 --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanPreOptimizer.java @@ -0,0 +1,50 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.optimizer; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; + +/** + * The class is responsible for invoking any steps that need to be applied to the logical plan, + * before this is being optimized. + *

+ * This is useful, especially if you need to execute some async tasks before the plan is optimized. + *

+ */ +public class LogicalPlanPreOptimizer { + + private final LogicalPreOptimizerContext preOptimizerContext; + + public LogicalPlanPreOptimizer(LogicalPreOptimizerContext preOptimizerContext) { + this.preOptimizerContext = preOptimizerContext; + } + + /** + * Pre-optimize a logical plan. + * + * @param plan the analyzed logical plan to pre-optimize + * @param listener the listener returning the pre-optimized plan when pre-optimization is complete + */ + public void preOptimize(LogicalPlan plan, ActionListener listener) { + if (plan.analyzed() == false) { + listener.onFailure(new IllegalStateException("Expected analyzed plan")); + return; + } + + doPreOptimize(plan, listener.delegateFailureAndWrap((l, preOptimized) -> { + preOptimized.setPreOptimized(); + listener.onResponse(preOptimized); + })); + } + + private void doPreOptimize(LogicalPlan plan, ActionListener listener) { + // this is where we will be executing async tasks + listener.onResponse(plan); + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPreOptimizerContext.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPreOptimizerContext.java new file mode 100644 index 0000000000000..d082bd56fc46d --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPreOptimizerContext.java @@ -0,0 +1,43 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.optimizer; + +import org.elasticsearch.xpack.esql.core.expression.FoldContext; + +import java.util.Objects; + +public class LogicalPreOptimizerContext { + + private final FoldContext foldCtx; + + public LogicalPreOptimizerContext(FoldContext foldCtx) { + this.foldCtx = foldCtx; + } + + public FoldContext foldCtx() { + return foldCtx; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) return true; + if (obj == null || obj.getClass() != this.getClass()) return false; + var that = (LogicalPreOptimizerContext) obj; + return this.foldCtx.equals(that.foldCtx); + } + + @Override + public int hashCode() { + return Objects.hash(foldCtx); + } + + @Override + public String toString() { + return "LogicalPreOptimizerContext[foldCtx=" + foldCtx + ']'; + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/LogicalPlan.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/LogicalPlan.java index ac4baea8bc853..762b22389ae24 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/LogicalPlan.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/LogicalPlan.java @@ -25,6 +25,7 @@ public enum Stage { PARSED, PRE_ANALYZED, ANALYZED, + PRE_OPTIMIZED, OPTIMIZED; } @@ -52,6 +53,14 @@ public void setAnalyzed() { stage = Stage.ANALYZED; } + public boolean preOptimized() { + return stage.ordinal() >= Stage.PRE_OPTIMIZED.ordinal(); + } + + public void setPreOptimized() { + stage = Stage.PRE_OPTIMIZED; + } + public boolean optimized() { return stage.ordinal() >= Stage.OPTIMIZED.ordinal(); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index df18051bcf721..4590cb8c6d3bd 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -71,6 +71,7 @@ import org.elasticsearch.xpack.esql.inference.InferenceResolution; import org.elasticsearch.xpack.esql.inference.InferenceRunner; import org.elasticsearch.xpack.esql.optimizer.LogicalPlanOptimizer; +import org.elasticsearch.xpack.esql.optimizer.LogicalPlanPreOptimizer; import org.elasticsearch.xpack.esql.optimizer.PhysicalOptimizerContext; import org.elasticsearch.xpack.esql.optimizer.PhysicalPlanOptimizer; import org.elasticsearch.xpack.esql.parser.EsqlParser; @@ -147,6 +148,7 @@ public interface PlanRunner { private final PreAnalyzer preAnalyzer; private final Verifier verifier; private final EsqlFunctionRegistry functionRegistry; + private final LogicalPlanPreOptimizer logicalPlanPreOptimizer; private final LogicalPlanOptimizer logicalPlanOptimizer; private final PreMapper preMapper; @@ -168,6 +170,7 @@ public EsqlSession( IndexResolver indexResolver, EnrichPolicyResolver enrichPolicyResolver, PreAnalyzer preAnalyzer, + LogicalPlanPreOptimizer logicalPlanPreOptimizer, EsqlFunctionRegistry functionRegistry, LogicalPlanOptimizer logicalPlanOptimizer, Mapper mapper, @@ -181,6 +184,7 @@ public EsqlSession( this.indexResolver = indexResolver; this.enrichPolicyResolver = enrichPolicyResolver; this.preAnalyzer = preAnalyzer; + this.logicalPlanPreOptimizer = logicalPlanPreOptimizer; this.verifier = verifier; this.functionRegistry = functionRegistry; this.mapper = mapper; @@ -212,11 +216,10 @@ public void execute(EsqlQueryRequest request, EsqlExecutionInfo executionInfo, P analyzedPlan(parsed, executionInfo, request.filter(), new EsqlCCSUtils.CssPartialErrorsActionListener(executionInfo, listener) { @Override public void onResponse(LogicalPlan analyzedPlan) { - LogicalPlan optimizedPlan = optimizedPlan(analyzedPlan); - preMapper.preMapper( - optimizedPlan, - listener.delegateFailureAndWrap((l, p) -> executeOptimizedPlan(request, executionInfo, planRunner, p, l)) - ); + SubscribableListener.newForked(l -> preOptimizedPlan(analyzedPlan, l)) + .andThen((l, p) -> preMapper.preMapper(optimizedPlan(p), l)) + .andThen((l, p) -> executeOptimizedPlan(request, executionInfo, planRunner, p, l)) + .addListener(listener); } }); } @@ -1043,14 +1046,18 @@ private PhysicalPlan logicalPlanToPhysicalPlan(LogicalPlan optimizedPlan, EsqlQu } public LogicalPlan optimizedPlan(LogicalPlan logicalPlan) { - if (logicalPlan.analyzed() == false) { - throw new IllegalStateException("Expected analyzed plan"); + if (logicalPlan.preOptimized() == false) { + throw new IllegalStateException("Expected pre-optimized plan"); } var plan = logicalPlanOptimizer.optimize(logicalPlan); LOGGER.debug("Optimized logicalPlan plan:\n{}", plan); return plan; } + public void preOptimizedPlan(LogicalPlan logicalPlan, ActionListener listener) { + logicalPlanPreOptimizer.preOptimize(logicalPlan, listener); + } + public PhysicalPlan physicalPlan(LogicalPlan optimizedPlan) { if (optimizedPlan.optimized() == false) { throw new IllegalStateException("Expected optimized plan"); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java index 62280a38ba608..b38b3089823d5 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java @@ -73,6 +73,8 @@ import org.elasticsearch.xpack.esql.optimizer.LocalPhysicalOptimizerContext; import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext; import org.elasticsearch.xpack.esql.optimizer.LogicalPlanOptimizer; +import org.elasticsearch.xpack.esql.optimizer.LogicalPlanPreOptimizer; +import org.elasticsearch.xpack.esql.optimizer.LogicalPreOptimizerContext; import org.elasticsearch.xpack.esql.optimizer.TestLocalPhysicalPlanOptimizer; import org.elasticsearch.xpack.esql.parser.EsqlParser; import org.elasticsearch.xpack.esql.plan.logical.Enrich; @@ -582,6 +584,7 @@ private ActualResults executePlan(BigArrays bigArrays) throws Exception { null, null, null, + new LogicalPlanPreOptimizer(new LogicalPreOptimizerContext(foldCtx)), functionRegistry, new LogicalPlanOptimizer(new LogicalOptimizerContext(configuration, foldCtx)), mapper, @@ -594,24 +597,27 @@ private ActualResults executePlan(BigArrays bigArrays) throws Exception { PlainActionFuture listener = new PlainActionFuture<>(); - session.executeOptimizedPlan( - new EsqlQueryRequest(), - new EsqlExecutionInfo(randomBoolean()), - planRunner(bigArrays, foldCtx, physicalOperationProviders), - session.optimizedPlan(analyzed), - listener.delegateFailureAndWrap( - // Wrap so we can capture the warnings in the calling thread - (next, result) -> next.onResponse( - new ActualResults( - result.schema().stream().map(Attribute::name).toList(), - result.schema().stream().map(a -> Type.asType(a.dataType().nameUpper())).toList(), - result.schema().stream().map(Attribute::dataType).toList(), - result.pages(), - threadPool.getThreadContext().getResponseHeaders() + session.preOptimizedPlan(analyzed, listener.delegateFailureAndWrap((l, preOptimized) -> { + session.executeOptimizedPlan( + new EsqlQueryRequest(), + new EsqlExecutionInfo(randomBoolean()), + planRunner(bigArrays, foldCtx, physicalOperationProviders), + session.optimizedPlan(preOptimized), + listener.delegateFailureAndWrap( + // Wrap so we can capture the warnings in the calling thread + (next, result) -> next.onResponse( + new ActualResults( + result.schema().stream().map(Attribute::name).toList(), + result.schema().stream().map(a -> Type.asType(a.dataType().nameUpper())).toList(), + result.schema().stream().map(Attribute::dataType).toList(), + result.pages(), + threadPool.getThreadContext().getResponseHeaders() + ) ) ) - ) - ); + ); + })); + return listener.get(); } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanPreOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanPreOptimizerTests.java new file mode 100644 index 0000000000000..8e573dd1cf3c9 --- /dev/null +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanPreOptimizerTests.java @@ -0,0 +1,110 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.optimizer; + +import org.apache.lucene.util.SetOnce; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.esql.EsqlTestUtils; +import org.elasticsearch.xpack.esql.core.expression.Alias; +import org.elasticsearch.xpack.esql.core.expression.Expression; +import org.elasticsearch.xpack.esql.core.expression.FoldContext; +import org.elasticsearch.xpack.esql.core.tree.Source; +import org.elasticsearch.xpack.esql.expression.function.scalar.string.Concat; +import org.elasticsearch.xpack.esql.expression.predicate.operator.arithmetic.Add; +import org.elasticsearch.xpack.esql.plan.logical.Eval; +import org.elasticsearch.xpack.esql.plan.logical.Filter; +import org.elasticsearch.xpack.esql.plan.logical.Limit; +import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; +import org.elasticsearch.xpack.esql.plan.logical.Project; + +import java.util.List; + +import static org.elasticsearch.xpack.esql.EsqlTestUtils.as; +import static org.elasticsearch.xpack.esql.EsqlTestUtils.fieldAttribute; +import static org.elasticsearch.xpack.esql.EsqlTestUtils.of; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; + +public class LogicalPlanPreOptimizerTests extends ESTestCase { + + public void testPlanIsMarkedAsPreOptimized() throws Exception { + for (int round = 0; round < 100; round++) { + // We want to make sure that the pre-optimizer woks for a wide range of plans + preOptimizedPlan(randomPlan()); + } + } + + public void testPreOptimizeFailsIfPlanIsNotAnalyzed() throws Exception { + LogicalPlan plan = EsqlTestUtils.relation(); + SetOnce exceptionHolder = new SetOnce<>(); + + preOptimizer().preOptimize(plan, ActionListener.wrap(r -> fail("Should have failed"), exceptionHolder::set)); + assertBusy(() -> { + assertThat(exceptionHolder.get(), notNullValue()); + IllegalStateException e = as(exceptionHolder.get(), IllegalStateException.class); + assertThat(e.getMessage(), equalTo("Expected analyzed plan")); + }); + } + + public LogicalPlan preOptimizedPlan(LogicalPlan plan) throws Exception { + // set plan as analyzed + plan.setPreOptimized(); + + SetOnce resultHolder = new SetOnce<>(); + SetOnce exceptionHolder = new SetOnce<>(); + + preOptimizer().preOptimize(plan, ActionListener.wrap(resultHolder::set, exceptionHolder::set)); + + if (exceptionHolder.get() != null) { + throw exceptionHolder.get(); + } + + assertThat(resultHolder.get(), notNullValue()); + assertThat(resultHolder.get().preOptimized(), equalTo(true)); + + return resultHolder.get(); + } + + private LogicalPlanPreOptimizer preOptimizer() { + LogicalPreOptimizerContext preOptimizerContext = new LogicalPreOptimizerContext(FoldContext.small()); + return new LogicalPlanPreOptimizer(preOptimizerContext); + } + + private LogicalPlan randomPlan() { + LogicalPlan plan = EsqlTestUtils.relation(); + int numCommands = between(0, 100); + + for (int i = 0; i < numCommands; i++) { + plan = switch (randomInt(3)) { + case 0 -> new Eval(Source.EMPTY, plan, List.of(new Alias(Source.EMPTY, randomIdentifier(), randomExpression()))); + case 1 -> new Limit(Source.EMPTY, of(randomInt()), plan); + case 2 -> new Filter(Source.EMPTY, plan, randomCondition()); + default -> new Project(Source.EMPTY, plan, List.of(new Alias(Source.EMPTY, randomIdentifier(), fieldAttribute()))); + }; + } + return plan; + } + + private Expression randomExpression() { + return switch (randomInt(3)) { + case 0 -> of(randomInt()); + case 1 -> of(randomIdentifier()); + case 2 -> new Add(Source.EMPTY, of(randomInt()), of(randomDouble())); + default -> new Concat(Source.EMPTY, of(randomIdentifier()), randomList(1, 10, () -> of(randomIdentifier()))); + }; + } + + private Expression randomCondition() { + if (randomBoolean()) { + return EsqlTestUtils.equalsOf(randomExpression(), randomExpression()); + } + + return EsqlTestUtils.greaterThanOf(randomExpression(), randomExpression()); + } +}