-
Notifications
You must be signed in to change notification settings - Fork 25.7k
Reuse prod code and reduce EsqlSession public surface #132728
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -180,19 +180,31 @@ public void execute(EsqlQueryRequest request, EsqlExecutionInfo executionInfo, P | |
| analyzedPlan(parsed, executionInfo, request.filter(), new EsqlCCSUtils.CssPartialErrorsActionListener(executionInfo, listener) { | ||
| @Override | ||
| public void onResponse(LogicalPlan analyzedPlan) { | ||
| SubscribableListener.<LogicalPlan>newForked(l -> preOptimizedPlan(analyzedPlan, l)) | ||
| .<LogicalPlan>andThen((l, p) -> preMapper.preMapper(optimizedPlan(p), l)) | ||
| .<Result>andThen((l, p) -> executeOptimizedPlan(request, executionInfo, planRunner, p, l)) | ||
| .addListener(listener); | ||
| optimizeAndExecute(request, executionInfo, planRunner, analyzedPlan, listener); | ||
| } | ||
| }); | ||
| } | ||
|
|
||
| // visible for testing in CsvTests | ||
| public void optimizeAndExecute( | ||
| EsqlQueryRequest request, | ||
| EsqlExecutionInfo executionInfo, | ||
| PlanRunner planRunner, | ||
| LogicalPlan analyzedPlan, | ||
| ActionListener<Result> listener | ||
| ) { | ||
| SubscribableListener.<LogicalPlan>newForked(l -> logicalPlanPreOptimizer.preOptimize(analyzedPlan, l)) | ||
| .andThenApply(this::optimizedPlan) | ||
| .<LogicalPlan>andThen((l, p) -> preMapper.preMapper(p, l)) | ||
| .<Result>andThen((l, p) -> executeOptimizedPlan(request, executionInfo, planRunner, p, l)) | ||
| .addListener(listener); | ||
| } | ||
|
|
||
| /** | ||
| * Execute an analyzed plan. Most code should prefer calling {@link #execute} but | ||
| * this is public for testing. | ||
| */ | ||
| public void executeOptimizedPlan( | ||
| private void executeOptimizedPlan( | ||
| EsqlQueryRequest request, | ||
| EsqlExecutionInfo executionInfo, | ||
| PlanRunner planRunner, | ||
|
|
@@ -792,7 +804,7 @@ private PhysicalPlan logicalPlanToPhysicalPlan(LogicalPlan optimizedPlan, EsqlQu | |
| return EstimatesRowSize.estimateRowSize(0, physicalPlan); | ||
| } | ||
|
|
||
| public LogicalPlan optimizedPlan(LogicalPlan logicalPlan) { | ||
| private LogicalPlan optimizedPlan(LogicalPlan logicalPlan) { | ||
| if (logicalPlan.preOptimized() == false) { | ||
| throw new IllegalStateException("Expected pre-optimized plan"); | ||
| } | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (here and in This is now private and we control the order of invocations in EsqlSession.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm fine keeping these as hard |
||
|
|
@@ -801,11 +813,7 @@ public LogicalPlan optimizedPlan(LogicalPlan logicalPlan) { | |
| return plan; | ||
| } | ||
|
|
||
| public void preOptimizedPlan(LogicalPlan logicalPlan, ActionListener<LogicalPlan> listener) { | ||
| logicalPlanPreOptimizer.preOptimize(logicalPlan, listener); | ||
| } | ||
|
|
||
| public PhysicalPlan physicalPlan(LogicalPlan optimizedPlan) { | ||
| private PhysicalPlan physicalPlan(LogicalPlan optimizedPlan) { | ||
| if (optimizedPlan.optimized() == false) { | ||
| throw new IllegalStateException("Expected optimized plan"); | ||
| } | ||
|
|
@@ -815,7 +823,7 @@ public PhysicalPlan physicalPlan(LogicalPlan optimizedPlan) { | |
| return plan; | ||
| } | ||
|
|
||
| public PhysicalPlan optimizedPhysicalPlan(LogicalPlan optimizedPlan) { | ||
| private PhysicalPlan optimizedPhysicalPlan(LogicalPlan optimizedPlan) { | ||
| var plan = physicalPlanOptimizer.optimize(physicalPlan(optimizedPlan)); | ||
| LOGGER.debug("Optimized physical plan:\n{}", plan); | ||
| return plan; | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -596,28 +596,21 @@ private ActualResults executePlan(BigArrays bigArrays) throws Exception { | |
| TestPhysicalOperationProviders physicalOperationProviders = testOperationProviders(foldCtx, testDatasets); | ||
|
|
||
| PlainActionFuture<ActualResults> listener = new PlainActionFuture<>(); | ||
|
|
||
| session.preOptimizedPlan(analyzed, listener.delegateFailureAndWrap((l, preOptimized) -> { | ||
| session.executeOptimizedPlan( | ||
| new EsqlQueryRequest(), | ||
| new EsqlExecutionInfo(randomBoolean()), | ||
| planRunner(bigArrays, foldCtx, physicalOperationProviders), | ||
| session.optimizedPlan(preOptimized), | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is same as org.elasticsearch.xpack.esql.session.EsqlSession#optimizeAndExecute except it did not call Lets test production code instead of its test copy.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The preMapper part wasn't an issue with the CsvTests so far, since full text function tests aren't run part of this suite. (Still, great to have this part fixed!) |
||
| listener.delegateFailureAndWrap( | ||
| // Wrap so we can capture the warnings in the calling thread | ||
|
Comment on lines
-606
to
-607
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why are we not using |
||
| (next, result) -> next.onResponse( | ||
| new ActualResults( | ||
| result.schema().stream().map(Attribute::name).toList(), | ||
| result.schema().stream().map(a -> Type.asType(a.dataType().nameUpper())).toList(), | ||
| result.schema().stream().map(Attribute::dataType).toList(), | ||
| result.pages(), | ||
| threadPool.getThreadContext().getResponseHeaders() | ||
| ) | ||
| ) | ||
| session.optimizeAndExecute( | ||
| new EsqlQueryRequest(), | ||
| new EsqlExecutionInfo(randomBoolean()), | ||
| planRunner(bigArrays, foldCtx, physicalOperationProviders), | ||
| analyzed, | ||
| listener.map( | ||
| result -> new ActualResults( | ||
| result.schema().stream().map(Attribute::name).toList(), | ||
| result.schema().stream().map(a -> Type.asType(a.dataType().nameUpper())).toList(), | ||
| result.schema().stream().map(Attribute::dataType).toList(), | ||
| result.pages(), | ||
| threadPool.getThreadContext().getResponseHeaders() | ||
| ) | ||
| ); | ||
| })); | ||
|
|
||
| ) | ||
| ); | ||
| return listener.get(); | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wish this could be protected or default visibility, however CsvTests are in different package.
I wonder if in future it would be possible to extend EsqlSession in CsvTests, override
analyzedPlanwith custom test resolution and reduce visibility of this method.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The tests can make a
TestEsqlSessioninside of the same package. Right?