From a3062d3ef906376b1e04cae5a479271d1c0435eb Mon Sep 17 00:00:00 2001 From: "ievgen.degtiarenko" Date: Tue, 12 Aug 2025 15:15:35 +0200 Subject: [PATCH] Reuse prod code and reduce EsqlSession public surface --- .../xpack/esql/session/EsqlSession.java | 32 ++++++++++------- .../elasticsearch/xpack/esql/CsvTests.java | 35 ++++++++----------- 2 files changed, 34 insertions(+), 33 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index a98b0f3c52735..307be48de1a9e 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -180,19 +180,31 @@ public void execute(EsqlQueryRequest request, EsqlExecutionInfo executionInfo, P analyzedPlan(parsed, executionInfo, request.filter(), new EsqlCCSUtils.CssPartialErrorsActionListener(executionInfo, listener) { @Override public void onResponse(LogicalPlan analyzedPlan) { - SubscribableListener.newForked(l -> preOptimizedPlan(analyzedPlan, l)) - .andThen((l, p) -> preMapper.preMapper(optimizedPlan(p), l)) - .andThen((l, p) -> executeOptimizedPlan(request, executionInfo, planRunner, p, l)) - .addListener(listener); + optimizeAndExecute(request, executionInfo, planRunner, analyzedPlan, listener); } }); } + // visible for testing in CsvTests + public void optimizeAndExecute( + EsqlQueryRequest request, + EsqlExecutionInfo executionInfo, + PlanRunner planRunner, + LogicalPlan analyzedPlan, + ActionListener listener + ) { + SubscribableListener.newForked(l -> logicalPlanPreOptimizer.preOptimize(analyzedPlan, l)) + .andThenApply(this::optimizedPlan) + .andThen((l, p) -> preMapper.preMapper(p, l)) + .andThen((l, p) -> executeOptimizedPlan(request, executionInfo, planRunner, p, l)) + .addListener(listener); + } + /** * Execute an analyzed plan. Most code should prefer calling {@link #execute} but * this is public for testing. */ - public void executeOptimizedPlan( + private void executeOptimizedPlan( EsqlQueryRequest request, EsqlExecutionInfo executionInfo, PlanRunner planRunner, @@ -792,7 +804,7 @@ private PhysicalPlan logicalPlanToPhysicalPlan(LogicalPlan optimizedPlan, EsqlQu return EstimatesRowSize.estimateRowSize(0, physicalPlan); } - public LogicalPlan optimizedPlan(LogicalPlan logicalPlan) { + private LogicalPlan optimizedPlan(LogicalPlan logicalPlan) { if (logicalPlan.preOptimized() == false) { throw new IllegalStateException("Expected pre-optimized plan"); } @@ -801,11 +813,7 @@ public LogicalPlan optimizedPlan(LogicalPlan logicalPlan) { return plan; } - public void preOptimizedPlan(LogicalPlan logicalPlan, ActionListener listener) { - logicalPlanPreOptimizer.preOptimize(logicalPlan, listener); - } - - public PhysicalPlan physicalPlan(LogicalPlan optimizedPlan) { + private PhysicalPlan physicalPlan(LogicalPlan optimizedPlan) { if (optimizedPlan.optimized() == false) { throw new IllegalStateException("Expected optimized plan"); } @@ -815,7 +823,7 @@ public PhysicalPlan physicalPlan(LogicalPlan optimizedPlan) { return plan; } - public PhysicalPlan optimizedPhysicalPlan(LogicalPlan optimizedPlan) { + private PhysicalPlan optimizedPhysicalPlan(LogicalPlan optimizedPlan) { var plan = physicalPlanOptimizer.optimize(physicalPlan(optimizedPlan)); LOGGER.debug("Optimized physical plan:\n{}", plan); return plan; diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java index d149fb012a14b..957b235d16323 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java @@ -596,28 +596,21 @@ private ActualResults executePlan(BigArrays bigArrays) throws Exception { TestPhysicalOperationProviders physicalOperationProviders = testOperationProviders(foldCtx, testDatasets); PlainActionFuture listener = new PlainActionFuture<>(); - - session.preOptimizedPlan(analyzed, listener.delegateFailureAndWrap((l, preOptimized) -> { - session.executeOptimizedPlan( - new EsqlQueryRequest(), - new EsqlExecutionInfo(randomBoolean()), - planRunner(bigArrays, foldCtx, physicalOperationProviders), - session.optimizedPlan(preOptimized), - listener.delegateFailureAndWrap( - // Wrap so we can capture the warnings in the calling thread - (next, result) -> next.onResponse( - new ActualResults( - result.schema().stream().map(Attribute::name).toList(), - result.schema().stream().map(a -> Type.asType(a.dataType().nameUpper())).toList(), - result.schema().stream().map(Attribute::dataType).toList(), - result.pages(), - threadPool.getThreadContext().getResponseHeaders() - ) - ) + session.optimizeAndExecute( + new EsqlQueryRequest(), + new EsqlExecutionInfo(randomBoolean()), + planRunner(bigArrays, foldCtx, physicalOperationProviders), + analyzed, + listener.map( + result -> new ActualResults( + result.schema().stream().map(Attribute::name).toList(), + result.schema().stream().map(a -> Type.asType(a.dataType().nameUpper())).toList(), + result.schema().stream().map(Attribute::dataType).toList(), + result.pages(), + threadPool.getThreadContext().getResponseHeaders() ) - ); - })); - + ) + ); return listener.get(); }