Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -180,31 +180,19 @@ public void execute(EsqlQueryRequest request, EsqlExecutionInfo executionInfo, P
analyzedPlan(parsed, executionInfo, request.filter(), new EsqlCCSUtils.CssPartialErrorsActionListener(executionInfo, listener) {
@Override
public void onResponse(LogicalPlan analyzedPlan) {
optimizeAndExecute(request, executionInfo, planRunner, analyzedPlan, listener);
SubscribableListener.<LogicalPlan>newForked(l -> preOptimizedPlan(analyzedPlan, l))
.<LogicalPlan>andThen((l, p) -> preMapper.preMapper(optimizedPlan(p), l))
.<Result>andThen((l, p) -> executeOptimizedPlan(request, executionInfo, planRunner, p, l))
.addListener(listener);
}
});
}

// visible for testing in CsvTests
public void optimizeAndExecute(
EsqlQueryRequest request,
EsqlExecutionInfo executionInfo,
PlanRunner planRunner,
LogicalPlan analyzedPlan,
ActionListener<Result> listener
) {
SubscribableListener.<LogicalPlan>newForked(l -> logicalPlanPreOptimizer.preOptimize(analyzedPlan, l))
.andThenApply(this::optimizedPlan)
.<LogicalPlan>andThen((l, p) -> preMapper.preMapper(p, l))
.<Result>andThen((l, p) -> executeOptimizedPlan(request, executionInfo, planRunner, p, l))
.addListener(listener);
}

/**
* Execute an analyzed plan. Most code should prefer calling {@link #execute} but
* this is public for testing.
*/
private void executeOptimizedPlan(
public void executeOptimizedPlan(
EsqlQueryRequest request,
EsqlExecutionInfo executionInfo,
PlanRunner planRunner,
Expand Down Expand Up @@ -804,7 +792,7 @@ private PhysicalPlan logicalPlanToPhysicalPlan(LogicalPlan optimizedPlan, EsqlQu
return EstimatesRowSize.estimateRowSize(0, physicalPlan);
}

private LogicalPlan optimizedPlan(LogicalPlan logicalPlan) {
public LogicalPlan optimizedPlan(LogicalPlan logicalPlan) {
if (logicalPlan.preOptimized() == false) {
throw new IllegalStateException("Expected pre-optimized plan");
}
Expand All @@ -813,7 +801,11 @@ private LogicalPlan optimizedPlan(LogicalPlan logicalPlan) {
return plan;
}

private PhysicalPlan physicalPlan(LogicalPlan optimizedPlan) {
public void preOptimizedPlan(LogicalPlan logicalPlan, ActionListener<LogicalPlan> listener) {
logicalPlanPreOptimizer.preOptimize(logicalPlan, listener);
}

public PhysicalPlan physicalPlan(LogicalPlan optimizedPlan) {
if (optimizedPlan.optimized() == false) {
throw new IllegalStateException("Expected optimized plan");
}
Expand All @@ -823,7 +815,7 @@ private PhysicalPlan physicalPlan(LogicalPlan optimizedPlan) {
return plan;
}

private PhysicalPlan optimizedPhysicalPlan(LogicalPlan optimizedPlan) {
public PhysicalPlan optimizedPhysicalPlan(LogicalPlan optimizedPlan) {
var plan = physicalPlanOptimizer.optimize(physicalPlan(optimizedPlan));
LOGGER.debug("Optimized physical plan:\n{}", plan);
return plan;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -596,21 +596,28 @@ private ActualResults executePlan(BigArrays bigArrays) throws Exception {
TestPhysicalOperationProviders physicalOperationProviders = testOperationProviders(foldCtx, testDatasets);

PlainActionFuture<ActualResults> listener = new PlainActionFuture<>();
session.optimizeAndExecute(
new EsqlQueryRequest(),
new EsqlExecutionInfo(randomBoolean()),
planRunner(bigArrays, foldCtx, physicalOperationProviders),
analyzed,
listener.map(
result -> new ActualResults(
result.schema().stream().map(Attribute::name).toList(),
result.schema().stream().map(a -> Type.asType(a.dataType().nameUpper())).toList(),
result.schema().stream().map(Attribute::dataType).toList(),
result.pages(),
threadPool.getThreadContext().getResponseHeaders()

session.preOptimizedPlan(analyzed, listener.delegateFailureAndWrap((l, preOptimized) -> {
session.executeOptimizedPlan(
new EsqlQueryRequest(),
new EsqlExecutionInfo(randomBoolean()),
planRunner(bigArrays, foldCtx, physicalOperationProviders),
session.optimizedPlan(preOptimized),
listener.delegateFailureAndWrap(
// Wrap so we can capture the warnings in the calling thread
(next, result) -> next.onResponse(
new ActualResults(
result.schema().stream().map(Attribute::name).toList(),
result.schema().stream().map(a -> Type.asType(a.dataType().nameUpper())).toList(),
result.schema().stream().map(Attribute::dataType).toList(),
result.pages(),
threadPool.getThreadContext().getResponseHeaders()
)
)
)
)
);
);
}));

return listener.get();
}

Expand Down