Skip to content

Commit f8b2b3b

Browse files
authored
Reuse prod code and reduce EsqlSession public surface (#132728)
1 parent 32e6815 commit f8b2b3b

File tree

2 files changed

+34
-33
lines changed

2 files changed

+34
-33
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -180,19 +180,31 @@ public void execute(EsqlQueryRequest request, EsqlExecutionInfo executionInfo, P
180180
analyzedPlan(parsed, executionInfo, request.filter(), new EsqlCCSUtils.CssPartialErrorsActionListener(executionInfo, listener) {
181181
@Override
182182
public void onResponse(LogicalPlan analyzedPlan) {
183-
SubscribableListener.<LogicalPlan>newForked(l -> preOptimizedPlan(analyzedPlan, l))
184-
.<LogicalPlan>andThen((l, p) -> preMapper.preMapper(optimizedPlan(p), l))
185-
.<Result>andThen((l, p) -> executeOptimizedPlan(request, executionInfo, planRunner, p, l))
186-
.addListener(listener);
183+
optimizeAndExecute(request, executionInfo, planRunner, analyzedPlan, listener);
187184
}
188185
});
189186
}
190187

188+
// visible for testing in CsvTests
189+
public void optimizeAndExecute(
190+
EsqlQueryRequest request,
191+
EsqlExecutionInfo executionInfo,
192+
PlanRunner planRunner,
193+
LogicalPlan analyzedPlan,
194+
ActionListener<Result> listener
195+
) {
196+
SubscribableListener.<LogicalPlan>newForked(l -> logicalPlanPreOptimizer.preOptimize(analyzedPlan, l))
197+
.andThenApply(this::optimizedPlan)
198+
.<LogicalPlan>andThen((l, p) -> preMapper.preMapper(p, l))
199+
.<Result>andThen((l, p) -> executeOptimizedPlan(request, executionInfo, planRunner, p, l))
200+
.addListener(listener);
201+
}
202+
191203
/**
192204
* Execute an analyzed plan. Most code should prefer calling {@link #execute} but
193205
* this is public for testing.
194206
*/
195-
public void executeOptimizedPlan(
207+
private void executeOptimizedPlan(
196208
EsqlQueryRequest request,
197209
EsqlExecutionInfo executionInfo,
198210
PlanRunner planRunner,
@@ -792,7 +804,7 @@ private PhysicalPlan logicalPlanToPhysicalPlan(LogicalPlan optimizedPlan, EsqlQu
792804
return EstimatesRowSize.estimateRowSize(0, physicalPlan);
793805
}
794806

795-
public LogicalPlan optimizedPlan(LogicalPlan logicalPlan) {
807+
private LogicalPlan optimizedPlan(LogicalPlan logicalPlan) {
796808
if (logicalPlan.preOptimized() == false) {
797809
throw new IllegalStateException("Expected pre-optimized plan");
798810
}
@@ -801,11 +813,7 @@ public LogicalPlan optimizedPlan(LogicalPlan logicalPlan) {
801813
return plan;
802814
}
803815

804-
public void preOptimizedPlan(LogicalPlan logicalPlan, ActionListener<LogicalPlan> listener) {
805-
logicalPlanPreOptimizer.preOptimize(logicalPlan, listener);
806-
}
807-
808-
public PhysicalPlan physicalPlan(LogicalPlan optimizedPlan) {
816+
private PhysicalPlan physicalPlan(LogicalPlan optimizedPlan) {
809817
if (optimizedPlan.optimized() == false) {
810818
throw new IllegalStateException("Expected optimized plan");
811819
}
@@ -815,7 +823,7 @@ public PhysicalPlan physicalPlan(LogicalPlan optimizedPlan) {
815823
return plan;
816824
}
817825

818-
public PhysicalPlan optimizedPhysicalPlan(LogicalPlan optimizedPlan) {
826+
private PhysicalPlan optimizedPhysicalPlan(LogicalPlan optimizedPlan) {
819827
var plan = physicalPlanOptimizer.optimize(physicalPlan(optimizedPlan));
820828
LOGGER.debug("Optimized physical plan:\n{}", plan);
821829
return plan;

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java

Lines changed: 14 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -596,28 +596,21 @@ private ActualResults executePlan(BigArrays bigArrays) throws Exception {
596596
TestPhysicalOperationProviders physicalOperationProviders = testOperationProviders(foldCtx, testDatasets);
597597

598598
PlainActionFuture<ActualResults> listener = new PlainActionFuture<>();
599-
600-
session.preOptimizedPlan(analyzed, listener.delegateFailureAndWrap((l, preOptimized) -> {
601-
session.executeOptimizedPlan(
602-
new EsqlQueryRequest(),
603-
new EsqlExecutionInfo(randomBoolean()),
604-
planRunner(bigArrays, foldCtx, physicalOperationProviders),
605-
session.optimizedPlan(preOptimized),
606-
listener.delegateFailureAndWrap(
607-
// Wrap so we can capture the warnings in the calling thread
608-
(next, result) -> next.onResponse(
609-
new ActualResults(
610-
result.schema().stream().map(Attribute::name).toList(),
611-
result.schema().stream().map(a -> Type.asType(a.dataType().nameUpper())).toList(),
612-
result.schema().stream().map(Attribute::dataType).toList(),
613-
result.pages(),
614-
threadPool.getThreadContext().getResponseHeaders()
615-
)
616-
)
599+
session.optimizeAndExecute(
600+
new EsqlQueryRequest(),
601+
new EsqlExecutionInfo(randomBoolean()),
602+
planRunner(bigArrays, foldCtx, physicalOperationProviders),
603+
analyzed,
604+
listener.map(
605+
result -> new ActualResults(
606+
result.schema().stream().map(Attribute::name).toList(),
607+
result.schema().stream().map(a -> Type.asType(a.dataType().nameUpper())).toList(),
608+
result.schema().stream().map(Attribute::dataType).toList(),
609+
result.pages(),
610+
threadPool.getThreadContext().getResponseHeaders()
617611
)
618-
);
619-
}));
620-
612+
)
613+
);
621614
return listener.get();
622615
}
623616

0 commit comments

Comments
 (0)