Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -180,19 +180,31 @@ public void execute(EsqlQueryRequest request, EsqlExecutionInfo executionInfo, P
analyzedPlan(parsed, executionInfo, request.filter(), new EsqlCCSUtils.CssPartialErrorsActionListener(executionInfo, listener) {
@Override
public void onResponse(LogicalPlan analyzedPlan) {
SubscribableListener.<LogicalPlan>newForked(l -> preOptimizedPlan(analyzedPlan, l))
.<LogicalPlan>andThen((l, p) -> preMapper.preMapper(optimizedPlan(p), l))
.<Result>andThen((l, p) -> executeOptimizedPlan(request, executionInfo, planRunner, p, l))
.addListener(listener);
optimizeAndExecute(request, executionInfo, planRunner, analyzedPlan, listener);
}
});
}

// visible for testing in CsvTests
public void optimizeAndExecute(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wish this could be protected or default visibility, however CsvTests are in different package.

I wonder if in future it would be possible to extend EsqlSession in CsvTests, override analyzedPlan with custom test resolution and reduce visibility of this method.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tests can make a TestEsqlSession inside of the same package. Right?

EsqlQueryRequest request,
EsqlExecutionInfo executionInfo,
PlanRunner planRunner,
LogicalPlan analyzedPlan,
ActionListener<Result> listener
) {
SubscribableListener.<LogicalPlan>newForked(l -> logicalPlanPreOptimizer.preOptimize(analyzedPlan, l))
.andThenApply(this::optimizedPlan)
.<LogicalPlan>andThen((l, p) -> preMapper.preMapper(p, l))
.<Result>andThen((l, p) -> executeOptimizedPlan(request, executionInfo, planRunner, p, l))
.addListener(listener);
}

/**
* Execute an analyzed plan. Most code should prefer calling {@link #execute} but
* this is public for testing.
*/
public void executeOptimizedPlan(
private void executeOptimizedPlan(
EsqlQueryRequest request,
EsqlExecutionInfo executionInfo,
PlanRunner planRunner,
Expand Down Expand Up @@ -792,7 +804,7 @@ private PhysicalPlan logicalPlanToPhysicalPlan(LogicalPlan optimizedPlan, EsqlQu
return EstimatesRowSize.estimateRowSize(0, physicalPlan);
}

public LogicalPlan optimizedPlan(LogicalPlan logicalPlan) {
private LogicalPlan optimizedPlan(LogicalPlan logicalPlan) {
if (logicalPlan.preOptimized() == false) {
throw new IllegalStateException("Expected pre-optimized plan");
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(here and in physicalPlan(..))

This is now private and we control the order of invocations in EsqlSession.
Should we convert those exceptions into assertions?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine keeping these as hard if statements unless we see a problem on the benchmarks. It'd be bad to mess up the ordering. Not likely, but whatever.

Expand All @@ -801,11 +813,7 @@ public LogicalPlan optimizedPlan(LogicalPlan logicalPlan) {
return plan;
}

public void preOptimizedPlan(LogicalPlan logicalPlan, ActionListener<LogicalPlan> listener) {
logicalPlanPreOptimizer.preOptimize(logicalPlan, listener);
}

public PhysicalPlan physicalPlan(LogicalPlan optimizedPlan) {
private PhysicalPlan physicalPlan(LogicalPlan optimizedPlan) {
if (optimizedPlan.optimized() == false) {
throw new IllegalStateException("Expected optimized plan");
}
Expand All @@ -815,7 +823,7 @@ public PhysicalPlan physicalPlan(LogicalPlan optimizedPlan) {
return plan;
}

public PhysicalPlan optimizedPhysicalPlan(LogicalPlan optimizedPlan) {
private PhysicalPlan optimizedPhysicalPlan(LogicalPlan optimizedPlan) {
var plan = physicalPlanOptimizer.optimize(physicalPlan(optimizedPlan));
LOGGER.debug("Optimized physical plan:\n{}", plan);
return plan;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -596,28 +596,21 @@ private ActualResults executePlan(BigArrays bigArrays) throws Exception {
TestPhysicalOperationProviders physicalOperationProviders = testOperationProviders(foldCtx, testDatasets);

PlainActionFuture<ActualResults> listener = new PlainActionFuture<>();

session.preOptimizedPlan(analyzed, listener.delegateFailureAndWrap((l, preOptimized) -> {
session.executeOptimizedPlan(
new EsqlQueryRequest(),
new EsqlExecutionInfo(randomBoolean()),
planRunner(bigArrays, foldCtx, physicalOperationProviders),
session.optimizedPlan(preOptimized),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is same as org.elasticsearch.xpack.esql.session.EsqlSession#optimizeAndExecute except it did not call preMapper.preMapper.

Lets test production code instead of its test copy.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The preMapper part wasn't an issue with the CsvTests so far, since full text function tests aren't run part of this suite. (Still, great to have this part fixed!)

listener.delegateFailureAndWrap(
// Wrap so we can capture the warnings in the calling thread
Comment on lines -606 to -607
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we not using delegateFailureAndWrap anymore? (Naive question)

(next, result) -> next.onResponse(
new ActualResults(
result.schema().stream().map(Attribute::name).toList(),
result.schema().stream().map(a -> Type.asType(a.dataType().nameUpper())).toList(),
result.schema().stream().map(Attribute::dataType).toList(),
result.pages(),
threadPool.getThreadContext().getResponseHeaders()
)
)
session.optimizeAndExecute(
new EsqlQueryRequest(),
new EsqlExecutionInfo(randomBoolean()),
planRunner(bigArrays, foldCtx, physicalOperationProviders),
analyzed,
listener.map(
result -> new ActualResults(
result.schema().stream().map(Attribute::name).toList(),
result.schema().stream().map(a -> Type.asType(a.dataType().nameUpper())).toList(),
result.schema().stream().map(Attribute::dataType).toList(),
result.pages(),
threadPool.getThreadContext().getResponseHeaders()
)
);
}));

)
);
return listener.get();
}

Expand Down